MemoryListener.cpp
Go to the documentation of this file.
1 #include "MemoryListener.h"
2 
3 #include <sstream>
4 
9 
12 
14 {
15 
16  std::string
18  {
19  return "MemoryUpdates." + memoryID.memoryName;
20  }
21 
23  {
24  }
25 
26  void
28  {
29  this->component = component;
30  }
31 
32  void
33  MemoryListener::updated(const std::vector<data::MemoryID>& updatedSnapshotIDs) const
34  {
35  std::vector<MemoryID> bos;
36  armarx::fromIce(updatedSnapshotIDs, bos);
37  updated(bos);
38  }
39 
40  void
41  MemoryListener::updated(const std::vector<MemoryID>& updatedSnapshotIDs) const
42  {
43  std::stringstream error;
44 
45  for (const auto& [subscription, subCallbacks] : this->callbacks)
46  {
47  std::vector<MemoryID> matchingSnapshotIDs;
48 
49  for (const MemoryID& updatedSnapshotID : updatedSnapshotIDs)
50  {
51  try
52  {
53  if (contains(subscription, updatedSnapshotID))
54  {
55  // ARMARX_IMPORTANT << VAROUT(subscription) << " matches " << VAROUT(updatedSnapshotID);
56  matchingSnapshotIDs.push_back(updatedSnapshotID);
57  }
58  else
59  {
60  // ARMARX_IMPORTANT << VAROUT(subscription) << " does not match " << VAROUT(updatedSnapshotID);
61  }
62  }
63  catch (const armem::error::InvalidMemoryID& e)
64  {
65  // Log to debug, but ignore otherwise
66  error << "Error when comparing subscribed ID " << subscription
67  << " with updated ID " << updatedSnapshotID << ":\n"
68  << e.what() << "\n\n";
69  }
70  }
71 
72  if (not matchingSnapshotIDs.empty())
73  {
74  ARMARX_DEBUG << "Calling " << subCallbacks.size() << " callbacks"
75  << " subscribing " << subscription << " with "
76  << matchingSnapshotIDs.size() << " snapshot IDs ...";
77  for (auto& managedCallback : subCallbacks)
78  {
79  try
80  {
81  managedCallback.callback(subscription, matchingSnapshotIDs);
82  }
83  catch (const armarx::LocalException& e)
84  {
85  error << "Calling callback subscribing " << subscription << " failed."
86  << "\nCaught armarx::LocalException:"
87  << "\n"
88  << e.getReason() << "\n Stacktrace: \n"
89  << e.generateBacktrace() << "\n";
90  }
91  catch (const std::exception& e)
92  {
93  error << "Calling callback subscribing " << subscription << " failed."
94  << "\nCaught armarx::Exception:"
95  << "\n"
96  << e.what() << "\n";
97  }
98  catch (...)
99  {
100  error << "Calling callback subscribing " << subscription << " failed."
101  << "\nCaught unknown exception."
102  << "\n";
103  }
104  }
105  }
106  }
107  if (error.str().size() > 0)
108  {
109  ARMARX_WARNING << "The following issues were encountered during MemoryListener::"
110  << __FUNCTION__ << "(): \n\n"
111  << error.str();
112  }
113  }
114 
117  {
119  << "The memoryName must be specified to subscribe";
120 
121  if (component and memoryRefCount[memoryID.memoryName] == 0)
122  {
124  }
125 
126  auto id = nextId++;
127  callbacks[memoryID].push_back({id, callback});
128 
130 
131  return SubscriptionHandle(this, memoryID, id);
132  }
133 
135  MemoryListener::subscribe(const MemoryID& subscriptionID, CallbackUpdatedOnly callback)
136  {
137  return subscribe(
138  subscriptionID,
139  [callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs)
140  { callback(updatedSnapshotIDs); });
141  }
142 
143  void
145  {
146  if (not handle.valid)
147  {
148  return;
149  }
150  handle.valid = false;
151 
152  // Remove ManagedCallback with ManagedCallback.id == handle.id from callbacks[handle.memoryID]
153  auto it = std::find_if(callbacks[handle.memoryID].begin(),
154  callbacks[handle.memoryID].end(),
155  [&handle](ManagedCallback& mCb) { return mCb.id == handle.id; });
156 
157  std::iter_swap(it, callbacks[handle.memoryID].end() - 1);
158  callbacks[handle.memoryID].pop_back();
159 
160  memoryRefCount[handle.memoryID.memoryName]--;
161 
162  if (callbacks[handle.memoryID].empty())
163  {
164  callbacks.erase(handle.memoryID);
165 
166  // unsubscribe from memory topic if no remainig callback needs it
167  if (component and memoryRefCount[handle.memoryID.memoryName] == 0)
168  {
169  component->unsubscribeFromTopic(MakeMemoryTopicName(handle.memoryID));
170  }
171  }
172  }
173 
174 
175 } // namespace armarx::armem::client::util
armarx::armem::client::util::MemoryListener::updated
void updated(const std::vector< MemoryID > &updatedIDs) const
Function handling updates from the MemoryListener ice topic.
Definition: MemoryListener.cpp:41
LocalException.h
armarx::armem::client::util::MemoryListener::ManagedCallback
Definition: MemoryListener.h:95
armarx::ProxyType::component
@ component
armarx::armem::client::util::MemoryListener::callbacks
std::unordered_map< MemoryID, std::vector< ManagedCallback > > callbacks
Definition: MemoryListener.h:101
ARMARX_CHECK_NOT_EMPTY
#define ARMARX_CHECK_NOT_EMPTY(c)
Definition: ExpressionException.h:224
armarx::armem::contains
bool contains(const MemoryID &general, const MemoryID &specific)
Indicates whether general is "less specific" than, or equal to, specific, i.e.
Definition: MemoryID.cpp:558
armarx::armem::client::util::MemoryListener::MakeMemoryTopicName
static std::string MakeMemoryTopicName(const MemoryID &memoryID)
Definition: MemoryListener.cpp:17
armarx::armem::client::util::MemoryListener::Callback
std::function< void(const MemoryID &subscriptionID, const std::vector< MemoryID > &updatedSnapshotIDs)> Callback
Definition: MemoryListener.h:30
armarx::armem::MemoryID
A memory ID.
Definition: MemoryID.h:47
ManagedIceObject.h
armarx::armem::client::util::SubscriptionHandle
Definition: SubscriptionHandle.h:10
ARMARX_DEBUG
#define ARMARX_DEBUG
Definition: Logging.h:177
error.h
ice_conversions.h
armarx::armem::client::util::MemoryListener::subscribe
SubscriptionHandle subscribe(const MemoryID &subscriptionID, Callback Callback)
Definition: MemoryListener.cpp:116
armarx::ManagedIceObject::unsubscribeFromTopic
bool unsubscribeFromTopic(const std::string &name)
Unsubscribe from a topic.
Definition: ManagedIceObject.cpp:271
armarx::fromIce
void fromIce(const std::map< IceKeyT, IceValueT > &iceMap, boost::container::flat_map< CppKeyT, CppValueT > &cppMap)
Definition: ice_conversions_boost_templates.h:26
armarx::armem::MemoryID::memoryName
std::string memoryName
Definition: MemoryID.h:50
armarx::ManagedIceObject::usingTopic
void usingTopic(const std::string &name, bool orderedPublishing=false)
Registers a proxy for subscription after initialization.
Definition: ManagedIceObject.cpp:248
armarx::armem::error::InvalidMemoryID
Indicates that a memory ID is invalid, e.g.
Definition: ArMemError.h:151
armarx::ManagedIceObject
The ManagedIceObject is the base class for all ArmarX objects.
Definition: ManagedIceObject.h:163
armarx::armem::client::util::MemoryListener::MemoryListener
MemoryListener(ManagedIceObject *component=nullptr)
Definition: MemoryListener.cpp:22
armarx::armem::client::util::MemoryListener::setComponent
void setComponent(ManagedIceObject *component)
Definition: MemoryListener.cpp:27
armarx::armem::client::util::MemoryListener::nextId
long nextId
Definition: MemoryListener.h:93
armarx::armem::index::memoryID
const MemoryID memoryID
Definition: memory_ids.cpp:29
armarx::armem::client::util
Definition: MemoryListener.cpp:13
ice_conversions_templates.h
armarx::armem::client::util::MemoryListener::unsubscribe
void unsubscribe(SubscriptionHandle &subscriptionHandle)
Definition: MemoryListener.cpp:144
Logging.h
ARMARX_WARNING
#define ARMARX_WARNING
Definition: Logging.h:186
MemoryListener.h
armarx::armem::client::util::MemoryListener::CallbackUpdatedOnly
std::function< void(const std::vector< MemoryID > &updatedSnapshotIDs)> CallbackUpdatedOnly
Definition: MemoryListener.h:32
armarx::armem::client::util::MemoryListener::memoryRefCount
std::unordered_map< std::string, int > memoryRefCount
memoryName -> callbacks needing memory topic
Definition: MemoryListener.h:104