ReadStream.cpp
Go to the documentation of this file.
1#include "ReadStream.h"
2
4
6
8{
9 ReadStream::ReadStream() : metronome{armarx::core::time::Frequency::Hertz(10)}
10 {
11 }
12
14 const MemoryID& queriedId,
15 const core::time::Frequency& maxPollFrequency) :
16 reader{reader}, queriedId{queriedId}, metronome{maxPollFrequency}
17 {
19 }
20
21 std::optional<wm::EntitySnapshot>
23 {
24 if (isPolling.exchange(true))
25 {
26 throw armarx::armem::error::ReadStreamAlreadyPolling(queriedId, __PRETTY_FUNCTION__);
27 }
28 pollingStoppedExternally = false;
29
30 auto result = _pollBlocking(callback);
31
32 isPolling = false;
33 return result;
34 }
35
36 void
38 {
39 if (isPolling.exchange(true))
40 {
41 throw armarx::armem::error::ReadStreamAlreadyPolling(queriedId, __PRETTY_FUNCTION__);
42 }
43 pollingStoppedExternally = false;
44
45 this->pollingThread = std::thread([&]() { this->_pollBlocking(callback); });
46 }
47
48 std::optional<wm::EntitySnapshot>
49 ReadStream::_pollBlocking(const SnapshotCallbackT& callback)
50 {
51 while (not pollingStoppedExternally)
52 {
53 auto snapshot = _pollOnce(callback);
54 if (snapshot.has_value())
55 {
56 return snapshot;
57 }
58 }
59 return std::nullopt;
60 }
61
62 void
64 {
65 pollingStoppedExternally = true;
66 if (pollingThread.joinable())
67 {
68 pollingThread.join();
69 isPolling = false;
70 }
71 }
72
73 std::optional<wm::EntitySnapshot>
75 {
76 if (isPolling.exchange(true))
77 {
78 throw armarx::armem::error::ReadStreamAlreadyPolling(queriedId, __PRETTY_FUNCTION__);
79 }
80
81 auto snapshot = _pollOnce(callback);
82
83 isPolling = false;
84 return snapshot;
85 }
86
87 std::optional<wm::EntitySnapshot>
88 ReadStream::_pollOnce(const SnapshotCallbackT& callback)
89 {
90 // Make sure to not busy wait. Also wait until probably data is available in first iteration.
91 metronome.waitForNextTick();
92
93 auto timeEnd = armarx::core::time::Clock::Now();
94 auto makeQuery = [this, &timeEnd](const MemoryID& id)
95 {
98 id.hasCoreSegmentName() ? qb.coreSegments().withID(id) : qb.coreSegments().all();
99 query::ProviderSegmentSelector& prov = id.hasProviderSegmentName()
100 ? core.providerSegments().withID(id)
101 : core.providerSegments().all();
102 query::EntitySelector& entity =
103 id.hasEntityName() ? prov.entities().withID(id) : prov.entities().all();
104 entity.snapshots().timeRange(timeStart, timeEnd);
105
106 return qb.buildQueryInput();
107 };
108
109 auto query = makeQuery(queriedId);
110
111 auto result = reader.query(query);
112
113 if (result.success)
114 {
115 using EntitySnapshotReference =
116 std::reference_wrapper<armarx::armem::wm::EntitySnapshot>;
117 // Copy references of snapshots into vector to sort them.
118 std::vector<EntitySnapshotReference> snapshots;
119
120 result.memory.forEachSnapshot([&snapshots](armarx::armem::wm::EntitySnapshot& snapshot)
121 { snapshots.push_back(snapshot); });
122
123 // Sort correctly.
124 std::sort(snapshots.begin(),
125 snapshots.end(),
126 [](const EntitySnapshotReference& a, const EntitySnapshotReference& b)
127 { return a.get().id().timestamp < b.get().id().timestamp; });
128
129 // Determine the next start time.
130 DateTime nextStart;
131 if (snapshots.size() > 0)
132 {
133 // Because they are sorted, back() has the highest time stamp.
134 nextStart = snapshots.back().get().id().timestamp +
136 }
137 else
138 {
139 nextStart = timeStart;
140 }
141
142 // Call the callback on all snapshots.
143 for (const auto& snapshot : snapshots)
144 {
145 // Assert times in correct interval.
146 ARMARX_CHECK_LESS_EQUAL(timeStart, snapshot.get().id().timestamp);
147 ARMARX_CHECK_GREATER_EQUAL(timeEnd, snapshot.get().id().timestamp);
148
149 const bool continue_ = callback(snapshot.get());
150 if (not continue_)
151 {
152 return snapshot;
153 }
154 }
155
156 timeStart = nextStart;
157 }
158 else
159 {
161 << deactivateSpam()
162 << "Received an error in ReadStream when querying data from a "
163 "memory. The error was '"
164 << result.errorMessage
165 << "'. Continue with stream, perhaps the memory was not yet initialized.";
166 }
167
168 return std::nullopt;
169 }
170
171
172} // namespace armarx::armem::client
SpamFilterDataPtr deactivateSpam(SpamFilterDataPtr const &spamFilter, float deactivationDurationSec, const std::string &identifier, bool deactivate)
Definition Logging.cpp:75
ReadStream()
Inizialize a ReadStream which does not represent a stream.
Definition ReadStream.cpp:9
std::optional< wm::EntitySnapshot > pollOnce(const SnapshotCallbackT &callback)
Perform one query and call the callbacks on each snapshot.
std::optional< wm::EntitySnapshot > pollBlocking(const SnapshotCallbackT &callback)
Poll in this thread as long as callback returns true.
void stop()
Stop a running polling loop.
std::function< bool(const wm::EntitySnapshot &)> SnapshotCallbackT
Callback called on each entity snapshot in the queried ID.
Definition ReadStream.h:39
void pollAsync(const SnapshotCallbackT &callback)
Poll in a new thread as long as callback returns true.
Reads data from a memory server.
Definition Reader.h:25
QueryResult query(const QueryInput &input) const
Perform a query on the WM.
Definition Reader.cpp:119
The query::Builder class provides a fluent-style specification of hierarchical queries.
Definition Builder.h:22
CoreSegmentSelector & coreSegments()
Start specifying core segments.
Definition Builder.cpp:42
CoreSegmentSelector & withID(const MemoryID &id) override
Definition selectors.h:141
EntitySelector & withID(const MemoryID &id) override
Definition selectors.h:63
SnapshotSelector & snapshots()
Start specifying entity snapshots.
Definition selectors.cpp:92
EntitySelector & entities()
Start specifying entities.
SnapshotSelector & timeRange(Time min, Time max)
Definition selectors.cpp:46
Indicates that a ReadStream is already polling when a polling method was called.
Definition ArMemError.h:227
Client-side working memory entity snapshot.
static DateTime Now()
Current time on the virtual clock.
Definition Clock.cpp:93
Represents a point in time.
Definition DateTime.h:25
static Duration MicroSeconds(std::int64_t microSeconds)
Constructs a duration in microseconds.
Definition Duration.cpp:24
Represents a frequency.
Definition Frequency.h:17
Duration waitForNextTick() const
Wait and block until the target period is met.
Definition Metronome.cpp:27
#define ARMARX_CHECK_LESS_EQUAL(lhs, rhs)
This macro evaluates whether lhs is less or equal (<=) rhs and if it turns out to be false it will th...
#define ARMARX_CHECK_GREATER_EQUAL(lhs, rhs)
This macro evaluates whether lhs is greater or equal (>=) rhs and if it turns out to be false it will...
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
This file is part of ArmarX.
This file offers overloads of toIce() and fromIce() functions for STL container types.