ReadStream.cpp
Go to the documentation of this file.
1 #include "ReadStream.h"
2 
4 
6 
7 namespace armarx::armem::client
8 {
10  {
11  }
12 
14  const MemoryID& queriedId,
15  const core::time::Frequency& maxPollFrequency) :
16  reader{reader}, queriedId{queriedId}, metronome{maxPollFrequency}
17  {
18  timeStart = armarx::core::time::Clock::Now();
19  }
20 
21  std::optional<wm::EntitySnapshot>
23  {
24  if (isPolling.exchange(true))
25  {
26  throw armarx::armem::error::ReadStreamAlreadyPolling(queriedId, __PRETTY_FUNCTION__);
27  }
28  pollingStoppedExternally = false;
29 
30  auto result = _pollBlocking(callback);
31 
32  isPolling = false;
33  return result;
34  }
35 
36  void
38  {
39  if (isPolling.exchange(true))
40  {
41  throw armarx::armem::error::ReadStreamAlreadyPolling(queriedId, __PRETTY_FUNCTION__);
42  }
43  pollingStoppedExternally = false;
44 
45  this->pollingThread = std::thread([&]() { this->_pollBlocking(callback); });
46  }
47 
48  std::optional<wm::EntitySnapshot>
49  ReadStream::_pollBlocking(const SnapshotCallbackT& callback)
50  {
51  while (not pollingStoppedExternally)
52  {
53  auto snapshot = _pollOnce(callback);
54  if (snapshot.has_value())
55  {
56  return snapshot;
57  }
58  }
59  return std::nullopt;
60  }
61 
62  void
64  {
65  pollingStoppedExternally = true;
66  if (pollingThread.joinable())
67  {
68  pollingThread.join();
69  isPolling = false;
70  }
71  }
72 
73  std::optional<wm::EntitySnapshot>
75  {
76  if (isPolling.exchange(true))
77  {
78  throw armarx::armem::error::ReadStreamAlreadyPolling(queriedId, __PRETTY_FUNCTION__);
79  }
80 
81  auto snapshot = _pollOnce(callback);
82 
83  isPolling = false;
84  return snapshot;
85  }
86 
87  std::optional<wm::EntitySnapshot>
88  ReadStream::_pollOnce(const SnapshotCallbackT& callback)
89  {
90  // Make sure to not busy wait. Also wait until probably data is available in first iteration.
91  metronome.waitForNextTick();
92 
93  auto timeEnd = armarx::core::time::Clock::Now();
94  auto makeQuery = [this, &timeEnd](const MemoryID& id)
95  {
96  query::Builder qb;
98  id.hasCoreSegmentName() ? qb.coreSegments().withID(id) : qb.coreSegments().all();
99  query::ProviderSegmentSelector& prov = id.hasProviderSegmentName()
100  ? core.providerSegments().withID(id)
101  : core.providerSegments().all();
102  query::EntitySelector& entity =
103  id.hasEntityName() ? prov.entities().withID(id) : prov.entities().all();
104  entity.snapshots().timeRange(timeStart, timeEnd);
105 
106  return qb.buildQueryInput();
107  };
108 
109  auto query = makeQuery(queriedId);
110 
111  auto result = reader.query(query);
112 
113  if (result.success)
114  {
115  using EntitySnapshotReference =
116  std::reference_wrapper<armarx::armem::wm::EntitySnapshot>;
117  // Copy references of snapshots into vector to sort them.
118  std::vector<EntitySnapshotReference> snapshots;
119 
120  result.memory.forEachSnapshot([&snapshots](armarx::armem::wm::EntitySnapshot& snapshot)
121  { snapshots.push_back(snapshot); });
122 
123  // Sort correctly.
124  std::sort(snapshots.begin(),
125  snapshots.end(),
126  [](const EntitySnapshotReference& a, const EntitySnapshotReference& b)
127  { return a.get().id().timestamp < b.get().id().timestamp; });
128 
129  // Determine the next start time.
130  DateTime nextStart;
131  if (snapshots.size() > 0)
132  {
133  // Because they are sorted, back() has the highest time stamp.
134  nextStart = snapshots.back().get().id().timestamp +
136  }
137  else
138  {
139  nextStart = timeStart;
140  }
141 
142  // Call the callback on all snapshots.
143  for (const auto& snapshot : snapshots)
144  {
145  // Assert times in correct interval.
146  ARMARX_CHECK_LESS_EQUAL(timeStart, snapshot.get().id().timestamp);
147  ARMARX_CHECK_GREATER_EQUAL(timeEnd, snapshot.get().id().timestamp);
148 
149  const bool continue_ = callback(snapshot.get());
150  if (not continue_)
151  {
152  return snapshot;
153  }
154  }
155 
156  timeStart = nextStart;
157  }
158  else
159  {
161  << deactivateSpam()
162  << "Received an error in ReadStream when querying data from a "
163  "memory. The error was '"
164  << result.errorMessage
165  << "'. Continue with stream, perhaps the memory was not yet initialized.";
166  }
167 
168  return std::nullopt;
169  }
170 
171 
172 } // namespace armarx::armem::client
armarx::armem::client::query::EntitySelector::all
EntitySelector & all() override
Definition: selectors.cpp:96
armarx::armem::client::query::ProviderSegmentSelector
Definition: selectors.h:80
armarx::armem::MemoryID::timestamp
Time timestamp
Definition: MemoryID.h:54
armarx::armem::client::Reader
Reads data from a memory server.
Definition: Reader.h:24
armarx::armem::client::ReadStream::pollAsync
void pollAsync(const SnapshotCallbackT &callback)
Poll in a new thread as long as callback returns true.
Definition: ReadStream.cpp:37
armarx::armem::client::query::ProviderSegmentSelector::entities
EntitySelector & entities()
Start specifying entities.
Definition: selectors.cpp:123
armarx::armem::client::QueryResult::memory
wm::Memory memory
The slice of the memory that matched the query.
Definition: Query.h:58
armarx::armem::client::query::SnapshotSelector::timeRange
SnapshotSelector & timeRange(Time min, Time max)
Definition: selectors.cpp:42
armarx::armem::client::ReadStream::SnapshotCallbackT
std::function< bool(const wm::EntitySnapshot &)> SnapshotCallbackT
Callback called on each entity snapshot in the queried ID.
Definition: ReadStream.h:39
armarx::armem::client::query::Builder::buildQueryInput
QueryInput buildQueryInput() const
Definition: Builder.cpp:11
armarx::armem::client::query::EntitySelector::snapshots
SnapshotSelector & snapshots()
Start specifying entity snapshots.
Definition: selectors.cpp:86
armarx::armem::client::ReadStream::pollBlocking
std::optional< wm::EntitySnapshot > pollBlocking(const SnapshotCallbackT &callback)
Poll in this thread as long as callback returns true.
Definition: ReadStream.cpp:22
armarx::armem::client::ReadStream::ReadStream
ReadStream()
Inizialize a ReadStream which does not represent a stream.
Definition: ReadStream.cpp:9
armarx::armem::client
This file is part of ArmarX.
Definition: forward_declarations.h:7
armarx::core::time::Frequency
Represents a frequency.
Definition: Frequency.h:17
armarx::armem::client::query::CoreSegmentSelector::all
CoreSegmentSelector & all() override
Definition: selectors.cpp:170
ReadStream.h
armarx::armem::client::query::EntitySelector::withID
EntitySelector & withID(const MemoryID &id) override
Definition: selectors.h:63
Clock.h
armarx::armem::client::query::ProviderSegmentSelector::withID
ProviderSegmentSelector & withID(const MemoryID &id) override
Definition: selectors.h:100
deactivateSpam
SpamFilterDataPtr deactivateSpam(SpamFilterDataPtr const &spamFilter, float deactivationDurationSec, const std::string &identifier, bool deactivate)
Definition: Logging.cpp:72
armarx::ctrlutil::a
double a(double t, double a0, double j)
Definition: CtrlUtil.h:45
armarx::armem::MemoryID
A memory ID.
Definition: MemoryID.h:47
armarx::armem::client::query::Builder::coreSegments
CoreSegmentSelector & coreSegments()
Start specifying core segments.
Definition: Builder.cpp:38
armarx::armem::client::query::CoreSegmentSelector::withID
CoreSegmentSelector & withID(const MemoryID &id) override
Definition: selectors.h:137
armarx::armem::base::detail::ForEachEntitySnapshotMixin::forEachSnapshot
bool forEachSnapshot(SnapshotFunctionT &&func)
Definition: iteration_mixins.h:232
armarx::armem::wm::EntitySnapshot
Client-side working memory entity snapshot.
Definition: memory_definitions.h:80
armarx::armem::base::detail::MemoryItem::id
MemoryID & id()
Definition: MemoryItem.h:27
ARMARX_CHECK_GREATER_EQUAL
#define ARMARX_CHECK_GREATER_EQUAL(lhs, rhs)
This macro evaluates whether lhs is greater or equal (>=) rhs and if it turns out to be false it will...
Definition: ExpressionException.h:123
ARMARX_CHECK_LESS_EQUAL
#define ARMARX_CHECK_LESS_EQUAL(lhs, rhs)
This macro evaluates whether lhs is less or equal (<=) rhs and if it turns out to be false it will th...
Definition: ExpressionException.h:109
armarx::core::time::Metronome::waitForNextTick
Duration waitForNextTick()
Wait and block until the target period is met.
Definition: Metronome.cpp:31
armarx::core::time::DateTime
Represents a point in time.
Definition: DateTime.h:24
armarx::armem::client::query::CoreSegmentSelector
Definition: selectors.h:117
Builder.h
armarx::core::time::Clock::Now
static DateTime Now()
Current time on the virtual clock.
Definition: Clock.cpp:97
armarx::armem::client::query::Builder
The query::Builder class provides a fluent-style specification of hierarchical queries.
Definition: Builder.h:22
armarx::armem::client::query::CoreSegmentSelector::providerSegments
ProviderSegmentSelector & providerSegments()
Start specifying provider segments.
Definition: selectors.cpp:160
armarx::core::time::Frequency::Hertz
static Frequency Hertz(std::int64_t hertz)
Definition: Frequency.cpp:23
armarx::armem::client::query::EntitySelector
Definition: selectors.h:42
armarx::armem::client::ReadStream::pollOnce
std::optional< wm::EntitySnapshot > pollOnce(const SnapshotCallbackT &callback)
Perform one query and call the callbacks on each snapshot.
Definition: ReadStream.cpp:74
ARMARX_WARNING
#define ARMARX_WARNING
Definition: Logging.h:186
armarx::core::time::Duration::MicroSeconds
static Duration MicroSeconds(std::int64_t microSeconds)
Constructs a duration in microseconds.
Definition: Duration.cpp:27
armarx::armem::error::ReadStreamAlreadyPolling
Indicates that a ReadStream is already polling when a polling method was called.
Definition: ArMemError.h:226
armarx::armem::client::ReadStream::stop
void stop()
Stop a running polling loop.
Definition: ReadStream.cpp:63
armarx::armem::client::query::ProviderSegmentSelector::all
ProviderSegmentSelector & all() override
Definition: selectors.cpp:133
armarx::armem::client::Reader::query
QueryResult query(const QueryInput &input) const
Perform a query.
Definition: Reader.cpp:33