Go to the documentation of this file.
16 reader{reader}, queriedId{queriedId}, metronome{maxPollFrequency}
21 std::optional<wm::EntitySnapshot>
24 if (isPolling.exchange(
true))
28 pollingStoppedExternally =
false;
30 auto result = _pollBlocking(callback);
39 if (isPolling.exchange(
true))
43 pollingStoppedExternally =
false;
45 this->pollingThread = std::thread([&]() { this->_pollBlocking(callback); });
48 std::optional<wm::EntitySnapshot>
49 ReadStream::_pollBlocking(
const SnapshotCallbackT& callback)
51 while (not pollingStoppedExternally)
53 auto snapshot = _pollOnce(callback);
54 if (snapshot.has_value())
65 pollingStoppedExternally =
true;
66 if (pollingThread.joinable())
73 std::optional<wm::EntitySnapshot>
76 if (isPolling.exchange(
true))
81 auto snapshot = _pollOnce(callback);
87 std::optional<wm::EntitySnapshot>
88 ReadStream::_pollOnce(
const SnapshotCallbackT& callback)
94 auto makeQuery = [
this, &timeEnd](
const MemoryID& id)
109 auto query = makeQuery(queriedId);
111 auto result = reader.
query(query);
115 using EntitySnapshotReference =
116 std::reference_wrapper<armarx::armem::wm::EntitySnapshot>;
118 std::vector<EntitySnapshotReference> snapshots;
121 { snapshots.push_back(snapshot); });
124 std::sort(snapshots.begin(),
126 [](
const EntitySnapshotReference&
a,
const EntitySnapshotReference& b)
127 { return a.get().id().timestamp < b.get().id().timestamp; });
131 if (snapshots.size() > 0)
134 nextStart = snapshots.back().get().id().timestamp +
139 nextStart = timeStart;
143 for (
const auto& snapshot : snapshots)
149 const bool continue_ = callback(snapshot.get());
156 timeStart = nextStart;
162 <<
"Received an error in ReadStream when querying data from a "
163 "memory. The error was '"
164 << result.errorMessage
165 <<
"'. Continue with stream, perhaps the memory was not yet initialized.";
EntitySelector & all() override
Reads data from a memory server.
void pollAsync(const SnapshotCallbackT &callback)
Poll in a new thread as long as callback returns true.
EntitySelector & entities()
Start specifying entities.
wm::Memory memory
The slice of the memory that matched the query.
SnapshotSelector & timeRange(Time min, Time max)
std::function< bool(const wm::EntitySnapshot &)> SnapshotCallbackT
Callback called on each entity snapshot in the queried ID.
QueryInput buildQueryInput() const
SnapshotSelector & snapshots()
Start specifying entity snapshots.
std::optional< wm::EntitySnapshot > pollBlocking(const SnapshotCallbackT &callback)
Poll in this thread as long as callback returns true.
ReadStream()
Inizialize a ReadStream which does not represent a stream.
This file is part of ArmarX.
CoreSegmentSelector & all() override
EntitySelector & withID(const MemoryID &id) override
ProviderSegmentSelector & withID(const MemoryID &id) override
SpamFilterDataPtr deactivateSpam(SpamFilterDataPtr const &spamFilter, float deactivationDurationSec, const std::string &identifier, bool deactivate)
double a(double t, double a0, double j)
CoreSegmentSelector & coreSegments()
Start specifying core segments.
CoreSegmentSelector & withID(const MemoryID &id) override
bool forEachSnapshot(SnapshotFunctionT &&func)
Client-side working memory entity snapshot.
#define ARMARX_CHECK_GREATER_EQUAL(lhs, rhs)
This macro evaluates whether lhs is greater or equal (>=) rhs and if it turns out to be false it will...
#define ARMARX_CHECK_LESS_EQUAL(lhs, rhs)
This macro evaluates whether lhs is less or equal (<=) rhs and if it turns out to be false it will th...
Duration waitForNextTick()
Wait and block until the target period is met.
Represents a point in time.
static DateTime Now()
Current time on the virtual clock.
The query::Builder class provides a fluent-style specification of hierarchical queries.
ProviderSegmentSelector & providerSegments()
Start specifying provider segments.
static Frequency Hertz(std::int64_t hertz)
std::optional< wm::EntitySnapshot > pollOnce(const SnapshotCallbackT &callback)
Perform one query and call the callbacks on each snapshot.
static Duration MicroSeconds(std::int64_t microSeconds)
Constructs a duration in microseconds.
Indicates that a ReadStream is already polling when a polling method was called.
void stop()
Stop a running polling loop.
ProviderSegmentSelector & all() override
QueryResult query(const QueryInput &input) const
Perform a query.