16 reader{reader}, queriedId{queriedId}, metronome{maxPollFrequency}
21 std::optional<wm::EntitySnapshot>
24 if (isPolling.exchange(
true))
28 pollingStoppedExternally =
false;
30 auto result = _pollBlocking(callback);
39 if (isPolling.exchange(
true))
43 pollingStoppedExternally =
false;
45 this->pollingThread = std::thread([&]() { this->_pollBlocking(callback); });
48 std::optional<wm::EntitySnapshot>
49 ReadStream::_pollBlocking(
const SnapshotCallbackT& callback)
51 while (not pollingStoppedExternally)
53 auto snapshot = _pollOnce(callback);
54 if (snapshot.has_value())
65 pollingStoppedExternally =
true;
66 if (pollingThread.joinable())
73 std::optional<wm::EntitySnapshot>
76 if (isPolling.exchange(
true))
81 auto snapshot = _pollOnce(callback);
87 std::optional<wm::EntitySnapshot>
88 ReadStream::_pollOnce(
const SnapshotCallbackT& callback)
94 auto makeQuery = [
this, &timeEnd](
const MemoryID& id)
100 ?
core.providerSegments().withID(
id)
101 :
core.providerSegments().all();
109 auto query = makeQuery(queriedId);
115 using EntitySnapshotReference =
116 std::reference_wrapper<armarx::armem::wm::EntitySnapshot>;
118 std::vector<EntitySnapshotReference> snapshots;
121 { snapshots.push_back(snapshot); });
124 std::sort(snapshots.begin(),
126 [](
const EntitySnapshotReference& a,
const EntitySnapshotReference& b)
127 { return a.get().id().timestamp < b.get().id().timestamp; });
131 if (snapshots.size() > 0)
134 nextStart = snapshots.back().get().id().timestamp +
139 nextStart = timeStart;
143 for (
const auto& snapshot : snapshots)
149 const bool continue_ = callback(snapshot.get());
156 timeStart = nextStart;
162 <<
"Received an error in ReadStream when querying data from a "
163 "memory. The error was '"
164 << result.errorMessage
165 <<
"'. Continue with stream, perhaps the memory was not yet initialized.";
SpamFilterDataPtr deactivateSpam(SpamFilterDataPtr const &spamFilter, float deactivationDurationSec, const std::string &identifier, bool deactivate)
ReadStream()
Inizialize a ReadStream which does not represent a stream.
std::optional< wm::EntitySnapshot > pollOnce(const SnapshotCallbackT &callback)
Perform one query and call the callbacks on each snapshot.
std::optional< wm::EntitySnapshot > pollBlocking(const SnapshotCallbackT &callback)
Poll in this thread as long as callback returns true.
void stop()
Stop a running polling loop.
std::function< bool(const wm::EntitySnapshot &)> SnapshotCallbackT
Callback called on each entity snapshot in the queried ID.
void pollAsync(const SnapshotCallbackT &callback)
Poll in a new thread as long as callback returns true.
Reads data from a memory server.
QueryResult query(const QueryInput &input) const
Perform a query on the WM.
The query::Builder class provides a fluent-style specification of hierarchical queries.
QueryInput buildQueryInput() const
CoreSegmentSelector & coreSegments()
Start specifying core segments.
CoreSegmentSelector & withID(const MemoryID &id) override
CoreSegmentSelector & all() override
EntitySelector & withID(const MemoryID &id) override
EntitySelector & all() override
SnapshotSelector & snapshots()
Start specifying entity snapshots.
EntitySelector & entities()
Start specifying entities.
SnapshotSelector & timeRange(Time min, Time max)
Indicates that a ReadStream is already polling when a polling method was called.
Client-side working memory entity snapshot.
static DateTime Now()
Current time on the virtual clock.
Represents a point in time.
static Duration MicroSeconds(std::int64_t microSeconds)
Constructs a duration in microseconds.
Duration waitForNextTick() const
Wait and block until the target period is met.
#define ARMARX_CHECK_LESS_EQUAL(lhs, rhs)
This macro evaluates whether lhs is less or equal (<=) rhs and if it turns out to be false it will th...
#define ARMARX_CHECK_GREATER_EQUAL(lhs, rhs)
This macro evaluates whether lhs is greater or equal (>=) rhs and if it turns out to be false it will...
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
This file is part of ArmarX.
This file offers overloads of toIce() and fromIce() functions for STL container types.