MemoryPolling.cpp
Go to the documentation of this file.
1#include <algorithm>
2#include <mutex>
3#include <string>
4#include <type_traits>
5#include <vector>
6
13
23
25#include <armarx/navigation/core/aron/Events.aron.generated.h>
28
29#include <range/v3/algorithm/for_each.hpp>
30#include <range/v3/algorithm/sort.hpp>
31
33{
34
35 MemoryPolling::MemoryPolling(const std::string& callerId,
37 callerId(callerId), memoryNameSystem(mns), lastMemoryEventReceived(armem::Time::Now())
38 {
39 const std::string memoryName = "Navigation";
40
42 memoryReader = memoryNameSystem.getReader(armem::MemoryID().withMemoryName(memoryName));
43
44 ARMARX_INFO << "MemoryPolling: will handle all events newer than "
45 << lastMemoryEventReceived << ".";
46
47 // subscription api
48 // armem::MemoryID subscriptionID;
49 // subscriptionID.memoryName = memoryName;
50 // subscriptionID.coreSegmentName = "Events";
51 // subscriptionID.providerSegmentName = callerId;
52
54 // memoryNameSystem.subscribe(subscriptionID, this, &MemoryPolling::onEntityUpdate);
56 task->start();
57 }
58
59 void
60 MemoryPolling::onEntityUpdate(const std::vector<armem::MemoryID>& snapshotIDs)
61 {
62 ARMARX_INFO << "Received " << snapshotIDs.size() << " events from memory";
63 const armem::client::QueryResult qResult = memoryReader.queryMemoryIDs(snapshotIDs);
64
65 if (not qResult.success) /* c++20 [[unlikely]] */
66 {
67 ARMARX_WARNING << deactivateSpam(0.1F) << "Memory lookup failed.";
68 return;
69 }
70
72 handleEvents(qResult.memory);
73 }
74
75 template <typename AronEventT, typename EventT>
76 auto
77 fromAron(const armem::wm::EntityInstance& entity, EventT& bo)
78 {
79 static_assert(std::is_base_of<armarx::aron::codegenerator::cpp::AronGeneratedClass,
80 AronEventT>::value);
81
82 // see events::Writer::storeImpl
83 const auto dataDict =
84 aron::data::Dict::DynamicCastAndCheck(entity.data()->getElement("data"));
85 ARMARX_CHECK_NOT_NULL(dataDict);
86 const auto dto = AronEventT::FromAron(dataDict);
87
88 core::fromAron(dto, bo);
89 return bo;
90 }
91
92 void
94 {
96
97 // only one event will be handled at a time
98 std::lock_guard g{eventHandlingMtx};
99
100 ARMARX_CHECK(memoryEntity.id().hasEntityName());
101 const auto& eventName = memoryEntity.id().entityName;
102
103 ARMARX_IMPORTANT << "Handling event " << QUOTED(eventName);
104
105 // factory
107 {
110
112 }
113 else if (eventName == core::event_names::LocalPlanningFailed)
114 {
117
119 }
120 else if (eventName == core::event_names::GlobalTrajectoryUpdated)
121 {
124
126 }
127 else if (eventName == core::event_names::LocalTrajectoryUpdated)
128 {
131
133 }
134 else if (eventName == core::event_names::MovementStarted)
135 {
138
139 movementStarted(evt);
140 }
141 else if (eventName == core::event_names::GoalReached)
142 {
145
146 goalReached(evt);
147 }
148 else if (eventName == core::event_names::WaypointReached)
149 {
152
153 waypointReached(evt);
154 }
156 {
159
161 }
162 else if (eventName == core::event_names::SafetyStopTriggered)
163 {
166
168 }
169 else if (eventName == core::event_names::UserAbortTriggered)
170 {
173
175 }
176 else if (eventName == core::event_names::InternalError)
177 {
180
181 internalError(evt);
182 }
183 else
184 {
185 ARMARX_WARNING << "Unknown event " << QUOTED(eventName);
186 }
187 }
188
190 {
191 ARMARX_INFO << "Stopping event polling";
192 task->stop();
193 ARMARX_INFO << "done.";
194 }
195
196 void
198 {
200
201 const armem::Time now = armem::Time::Now();
202
204 auto& q = qb.coreSegments().withName("Events");
205
208 {
209 if (callerId.empty())
210 {
211 return q.providerSegments().all();
212 }
213 return q.providerSegments().withName(callerId);
214 }();
215
216
217 qq.entities().all().snapshots().timeRange(lastMemoryEventReceived, now);
218
219 // ARMARX_DEBUG << "Polling memory events in interval "
220 // << "[" << lastMemoryPoll << ", " << now << "]";
221
222 const armem::client::QueryResult qResult = memoryReader.query(qb.buildQueryInput());
223
224 // ARMARX_DEBUG << "Lookup result in reader: " << qResult;
225
226 if (not qResult.success) /* c++20 [[unlikely]] */
227 {
228 ARMARX_WARNING << deactivateSpam(0.1F) << "Memory lookup failed.";
229 return;
230 }
231
232 handleEvents(qResult.memory);
233 }
234
235 void
237 {
239
240 // clang-format off
241 const armem::wm::CoreSegment& coreSegment = memory
242 .getCoreSegment("Events");
243 // clang-format on
244
245 std::vector<armem::wm::EntityInstance> events;
246
247 coreSegment.forEachInstance([&](const armem::wm::EntityInstance& instance)
248 { events.push_back(instance); });
249
250 const auto sortByTimestampAsc = [](const armem::wm::EntityInstance& a,
251 const armem::wm::EntityInstance& b) -> bool
252 {
253 ARMARX_CHECK(a.id().hasTimestamp());
254 ARMARX_CHECK(b.id().hasTimestamp());
255
256 return a.id().timestamp < b.id().timestamp;
257 };
258
259 ranges::sort(events, sortByTimestampAsc);
260
261 if (not events.empty())
262 {
263 ARMARX_INFO << "Updating memory timestamp to " << events.back().id().timestamp;
264 // we add 1 µs to ensure that the events are only handled once
265 lastMemoryEventReceived =
266 events.back().id().timestamp + armarx::Duration::MicroSeconds(1);
267 }
268
269 ranges::for_each(events, [&](const auto& event) { handleEvent(event); });
270 }
271
272
273} // namespace armarx::navigation::client
SpamFilterDataPtr deactivateSpam(SpamFilterDataPtr const &spamFilter, float deactivationDurationSec, const std::string &identifier, bool deactivate)
Definition Logging.cpp:75
#define QUOTED(x)
static Duration MicroSeconds(std::int64_t microSeconds)
Constructs a duration in microseconds.
Definition Duration.cpp:24
The periodic task executes one thread method repeatedly using the time period specified in the constr...
bool hasEntityName() const
Definition MemoryID.h:121
std::string entityName
Definition MemoryID.h:53
The memory name system (MNS) client.
The query::Builder class provides a fluent-style specification of hierarchical queries.
Definition Builder.h:22
CoreSegmentSelector & coreSegments()
Start specifying core segments.
Definition Builder.cpp:42
CoreSegmentSelector & withName(const std::string &name) override
SnapshotSelector & snapshots()
Start specifying entity snapshots.
Definition selectors.cpp:92
EntitySelector & entities()
Start specifying entities.
SnapshotSelector & timeRange(Time min, Time max)
Definition selectors.cpp:46
Client-side working memory core segment.
Client-side working entity instance.
Client-side working memory.
static DateTime Now()
Definition DateTime.cpp:51
void handleEvents(const armem::wm::Memory &memory)
void handleEvent(const armem::wm::EntityInstance &memoryEntity)
MemoryPolling(const std::string &callerId, armem::client::MemoryNameSystem &mns)
void onEntityUpdate(const std::vector< armem::MemoryID > &snapshotIDs)
void globalTrajectoryUpdated(const core::GlobalTrajectoryUpdatedEvent &event) override
void safetyThrottlingTriggered(const core::SafetyThrottlingTriggeredEvent &event) override
Will be called whenever safety throttling is triggered to a certain degree (configurable).
void movementStarted(const core::MovementStartedEvent &event) override
void localTrajectoryUpdated(const core::LocalTrajectoryUpdatedEvent &event) override
void safetyStopTriggered(const core::SafetyStopTriggeredEvent &event) override
Will be called whenever a safety stop is triggered.
void localPlanningFailed(const core::LocalPlanningFailedEvent &event) override
void waypointReached(const core::WaypointReachedEvent &event) override
Will be called whenever the navigator reached a user-defined waypoint.
void globalPlanningFailed(const core::GlobalPlanningFailedEvent &event) override
void userAbortTriggered(const core::UserAbortTriggeredEvent &event) override
Will be called whenever the user aborts the current navigation.
void internalError(const core::InternalErrorEvent &event) override
Will be called whenever an internal error occurs.
void goalReached(const core::GoalReachedEvent &event) override
Will be called whenever the navigator reached the goal.
#define ARMARX_CHECK(expression)
Shortcut for ARMARX_CHECK_EXPRESSION.
#define ARMARX_CHECK_NOT_NULL(ptr)
This macro evaluates whether ptr is not null and if it turns out to be false it will throw an Express...
#define ARMARX_INFO
The normal logging level.
Definition Logging.h:181
#define ARMARX_IMPORTANT
The logging level for always important information, but expected behaviour (in contrast to ARMARX_WAR...
Definition Logging.h:190
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
#define q
armarx::core::time::DateTime Time
aron::cpp::AronGeneratedClass AronGeneratedClass
This file is part of ArmarX.
auto fromAron(const armem::wm::EntityInstance &entity, EventT &bo)
const std::string SafetyStopTriggered
Definition events.h:24
const std::string SafetyThrottlingTriggered
Definition events.h:23
const std::string UserAbortTriggered
Definition events.h:25
const std::string LocalTrajectoryUpdated
Definition events.h:17
const std::string MovementStarted
Definition events.h:20
const std::string WaypointReached
Definition events.h:22
const std::string GlobalTrajectoryUpdated
Definition events.h:16
const std::string GlobalPlanningFailed
Definition events.h:18
const std::string InternalError
Definition events.h:26
const std::string LocalPlanningFailed
Definition events.h:19
void fromAron(const arondto::GlobalTrajectoryPoint &dto, GlobalTrajectoryPoint &bo)
Result of a QueryInput.
Definition Query.h:51
wm::Memory memory
The slice of the memory that matched the query.
Definition Query.h:58
Event describing that the global trajectory was updated.
Definition events.h:114
Event describing that the targeted goal was successfully reached.
Definition events.h:53
Event describing the occurance of an internal unhandled error.
Definition events.h:105
Event describing that the local trajectory was updated.
Definition events.h:122
Event describing that for security reasons, the robot was stopped completely.
Definition events.h:88
Event desciribing that a significant safety throttling factor was reached.
Definition events.h:72
Event describing that the user aborted the current execution.
Definition events.h:97
Event describing that a user-defined waypoint was successfully reached.
Definition events.h:61
#define ARMARX_TRACE
Definition trace.h:77