MemoryListener.cpp
Go to the documentation of this file.
1#include "MemoryListener.h"
2
3#include <sstream>
4
9
12
14{
15
16 std::string
18 {
19 return "MemoryUpdates." + memoryID.memoryName;
20 }
21
22 MemoryListener::MemoryListener(ManagedIceObject* component) : component(component)
23 {
24 }
25
26 void
28 {
29 this->component = component;
30 }
31
32 void
33 MemoryListener::updated(const std::vector<data::MemoryID>& updatedSnapshotIDs) const
34 {
35 std::vector<MemoryID> bos;
36 armarx::fromIce(updatedSnapshotIDs, bos);
37 updated(bos);
38 }
39
40 void
41 MemoryListener::updated(const std::vector<MemoryID>& updatedSnapshotIDs) const
42 {
43 std::stringstream error;
44
45 std::scoped_lock lock{subscribeMutex};
46
47 for (const auto& [subscription, subCallbacks] : this->callbacks)
48 {
49 std::vector<MemoryID> matchingSnapshotIDs;
50
51 for (const MemoryID& updatedSnapshotID : updatedSnapshotIDs)
52 {
53 try
54 {
55 if (contains(subscription, updatedSnapshotID))
56 {
57 // ARMARX_IMPORTANT << VAROUT(subscription) << " matches " << VAROUT(updatedSnapshotID);
58 matchingSnapshotIDs.push_back(updatedSnapshotID);
59 }
60 else
61 {
62 // ARMARX_IMPORTANT << VAROUT(subscription) << " does not match " << VAROUT(updatedSnapshotID);
63 }
64 }
65 catch (const armem::error::InvalidMemoryID& e)
66 {
67 // Log to debug, but ignore otherwise
68 error << "Error when comparing subscribed ID " << subscription
69 << " with updated ID " << updatedSnapshotID << ":\n"
70 << e.what() << "\n\n";
71 }
72 }
73
74 if (not matchingSnapshotIDs.empty())
75 {
76 ARMARX_DEBUG << "Calling " << subCallbacks.size() << " callbacks"
77 << " subscribing " << subscription << " with "
78 << matchingSnapshotIDs.size() << " snapshot IDs ...";
79 for (auto& managedCallback : subCallbacks)
80 {
81 try
82 {
83 managedCallback.callback(subscription, matchingSnapshotIDs);
84 }
85 catch (const armarx::LocalException& e)
86 {
87 error << "Calling callback subscribing " << subscription << " failed."
88 << "\nCaught armarx::LocalException:"
89 << "\n"
90 << e.getReason() << "\n Stacktrace: \n"
91 << e.generateBacktrace() << "\n";
92 }
93 catch (const std::exception& e)
94 {
95 error << "Calling callback subscribing " << subscription << " failed."
96 << "\nCaught armarx::Exception:"
97 << "\n"
98 << e.what() << "\n";
99 }
100 catch (...)
101 {
102 error << "Calling callback subscribing " << subscription << " failed."
103 << "\nCaught unknown exception."
104 << "\n";
105 }
106 }
107 }
108 }
109 if (error.str().size() > 0)
110 {
111 ARMARX_WARNING << "The following issues were encountered during MemoryListener::"
112 << __FUNCTION__ << "(): \n\n"
113 << error.str();
114 }
115 }
116
118 MemoryListener::subscribe(const MemoryID& memoryID, Callback callback)
119 {
121 << "The memoryName must be specified to subscribe";
122
123 if (component and memoryRefCount[memoryID.memoryName] == 0)
124 {
125 component->usingTopic(MakeMemoryTopicName(memoryID));
126 }
127
128 std::scoped_lock lock{subscribeMutex};
129
130 auto id = nextId++;
131 callbacks[memoryID].push_back({id, callback});
132
133 memoryRefCount[memoryID.memoryName]++;
134
135 return SubscriptionHandle(this, memoryID, id);
136 }
137
140 {
141 return subscribe(
142 subscriptionID,
143 [callback](const MemoryID&, const std::vector<MemoryID>& updatedSnapshotIDs)
144 { callback(updatedSnapshotIDs); });
145 }
146
147 void
149 {
150 if (not handle.valid)
151 {
152 return;
153 }
154 handle.valid = false;
155
156 std::scoped_lock lock{subscribeMutex};
157
158 // Remove ManagedCallback with ManagedCallback.id == handle.id from callbacks[handle.memoryID]
159 auto it = std::find_if(callbacks[handle.memoryID].begin(),
160 callbacks[handle.memoryID].end(),
161 [&handle](ManagedCallback& mCb) { return mCb.id == handle.id; });
162
163 std::iter_swap(it, callbacks[handle.memoryID].end() - 1);
164 callbacks[handle.memoryID].pop_back();
165
166 memoryRefCount[handle.memoryID.memoryName]--;
167
168 if (callbacks[handle.memoryID].empty())
169 {
170 callbacks.erase(handle.memoryID);
171
172 // unsubscribe from memory topic if no remainig callback needs it
173 if (component and memoryRefCount[handle.memoryID.memoryName] == 0)
174 {
175 component->unsubscribeFromTopic(MakeMemoryTopicName(handle.memoryID));
176 }
177 }
178 }
179
180
181} // namespace armarx::armem::client::util
#define ARMARX_CHECK_NOT_EMPTY(c)
The ManagedIceObject is the base class for all ArmarX objects.
std::string memoryName
Definition MemoryID.h:50
void unsubscribe(SubscriptionHandle &subscriptionHandle)
void updated(const std::vector< MemoryID > &updatedIDs) const
Function handling updates from the MemoryListener ice topic.
std::unordered_map< MemoryID, std::vector< ManagedCallback > > callbacks
SubscriptionHandle subscribe(const MemoryID &subscriptionID, Callback Callback)
MemoryListener(ManagedIceObject *component=nullptr)
std::unordered_map< std::string, int > memoryRefCount
memoryName -> callbacks needing memory topic
std::function< void(const MemoryID &subscriptionID, const std::vector< MemoryID > &updatedSnapshotIDs)> Callback
std::function< void(const std::vector< MemoryID > &updatedSnapshotIDs)> CallbackUpdatedOnly
static std::string MakeMemoryTopicName(const MemoryID &memoryID)
void setComponent(ManagedIceObject *component)
Indicates that a memory ID is invalid, e.g.
Definition ArMemError.h:152
#define ARMARX_DEBUG
The logging level for output that is only interesting while debugging.
Definition Logging.h:184
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
bool contains(const MemoryID &general, const MemoryID &specific)
Indicates whether general is "less specific" than, or equal to, specific, i.e.
Definition MemoryID.cpp:563
void fromIce(const std::map< IceKeyT, IceValueT > &iceMap, boost::container::flat_map< CppKeyT, CppValueT > &cppMap)