ReadStream.h
Go to the documentation of this file.
1 #pragma once
2 
3 #include <functional>
4 #include <optional>
5 #include <thread>
6 
10 
13 
14 #include "Reader.h"
15 
16 namespace armarx::armem::client
17 {
18  /**
19  * @brief A stream reading entity snapshots from the memory.
20  *
21  * After constructing a ReadStream, polling can be started in three ways:
22  *
23  * 1. Run a polling loop in this thread, blocking execution until terminated.
24  * See pollBlocking().
25  * 2. Run a polling loop in a new, separate thread, until it is stopped via stop().
26  * See pollAsync() and stop().
27  * 3. Perform a single query and process the result, embedded in your own loop or other control
28  * flow logic.
29  * See pollOnce().
30  */
31  class ReadStream
32  {
33  public:
34  /**
35  * @brief Callback called on each entity snapshot in the queried ID.
36  *
37  * If it returns false, the stream is stopped.
38  */
39  using SnapshotCallbackT = std::function<bool(const wm::EntitySnapshot&)>;
40 
41  /**
42  * @brief Inizialize a ReadStream which does not represent a stream.
43  */
44  ReadStream();
45 
46  /**
47  * @brief Initialize a read stream.
48  *
49  * @param reader
50  * The reader to perform the queries.
51  * @param queriedId
52  * The memory ID in which all snapshots should be processed by the stream.
53  * @param maxPollFrequency
54  * The maximum frequency with which queries are performed. The
55  * real frequency might be lower.
56  */
57  ReadStream(const Reader& reader,
58  const MemoryID& queriedId,
59  const armarx::core::time::Frequency& maxPollFrequency =
61 
62 
63  /**
64  * @brief Poll in this thread as long as callback returns true.
65  *
66  * @param callback Function to call on each entity snapshot.
67  * @return The snapshot object that returns false.
68  * @throw armarx::armem::error::ReadStreamAlreadyPolling If the stream is already polling.
69  */
70  std::optional<wm::EntitySnapshot> pollBlocking(const SnapshotCallbackT& callback);
71 
72  /**
73  * @brief Poll in a new thread as long as callback returns true.
74  *
75  * Note that callback will be called in a separate thread, so take care of synchronizing
76  * access to variables in the callback appropriately.
77  *
78  * Roughly equivalent to:
79  * @code
80  * std::thread thread([]() { stream.pollBlocking(); });
81  * @endcode
82  *
83  * @param callback Function to call on each entity snapshot.
84  * @throw armarx::armem::error::ReadStreamAlreadyPolling If the stream is already polling.
85  */
86  void pollAsync(const SnapshotCallbackT& callback);
87  /**
88  * @brief Stop a running polling loop.
89  *
90  * If a polling thread has been started by pollAsync() before, joins the thread.
91  */
92  void stop();
93 
94  /**
95  * @brief Perform one query and call the callbacks on each snapshot.
96  *
97  * This allows you to define your own loop, for example:
98  *
99  * @code
100  * bool condition = true;
101  * while (condition)
102  * {
103  * auto snapshot = stream.pollOnce(callback);
104  *
105  * ...
106  *
107  * if (...)
108  * {
109  * condition = false;
110  * }
111  * }
112  * @endcode
113  *
114  * @param callback Function to call on each entity snapshot.
115  * @throw armarx::armem::error::ReadStreamAlreadyPolling If the stream is already polling.
116  */
117  std::optional<wm::EntitySnapshot> pollOnce(const SnapshotCallbackT& callback);
118 
119 
120  private:
121  std::optional<wm::EntitySnapshot> _pollBlocking(const SnapshotCallbackT& callback);
122  std::optional<wm::EntitySnapshot> _pollOnce(const SnapshotCallbackT& callback);
123 
124 
125  private:
126  Reader reader;
127  MemoryID queriedId;
128 
130  armarx::DateTime timeStart;
131 
132  std::atomic_bool isPolling = false;
133  std::atomic_bool pollingStoppedExternally = false;
134 
135  std::thread pollingThread;
136  };
137 
138 } // namespace armarx::armem::client
armarx::armem::client::Reader
Reads data from a memory server.
Definition: Reader.h:24
armarx::armem::client::ReadStream::pollAsync
void pollAsync(const SnapshotCallbackT &callback)
Poll in a new thread as long as callback returns true.
Definition: ReadStream.cpp:37
Reader.h
armarx::armem::client::ReadStream::SnapshotCallbackT
std::function< bool(const wm::EntitySnapshot &)> SnapshotCallbackT
Callback called on each entity snapshot in the queried ID.
Definition: ReadStream.h:39
MemoryID.h
DateTime.h
Frequency.h
armarx::armem::client::ReadStream::pollBlocking
std::optional< wm::EntitySnapshot > pollBlocking(const SnapshotCallbackT &callback)
Poll in this thread as long as callback returns true.
Definition: ReadStream.cpp:22
armarx::armem::client::ReadStream::ReadStream
ReadStream()
Inizialize a ReadStream which does not represent a stream.
Definition: ReadStream.cpp:9
armarx::armem::client
This file is part of ArmarX.
Definition: forward_declarations.h:7
armarx::core::time::Frequency
Represents a frequency.
Definition: Frequency.h:17
armarx::armem::MemoryID
A memory ID.
Definition: MemoryID.h:47
Metronome.h
armarx::armem::wm::EntitySnapshot
Client-side working memory entity snapshot.
Definition: memory_definitions.h:80
memory_definitions.h
armarx::core::time::DateTime
Represents a point in time.
Definition: DateTime.h:24
armarx::armem::client::ReadStream
A stream reading entity snapshots from the memory.
Definition: ReadStream.h:31
armarx::core::time::Metronome
Simple rate limiter for use in loops to maintain a certain frequency given a clock.
Definition: Metronome.h:35
armarx::core::time::Frequency::Hertz
static Frequency Hertz(std::int64_t hertz)
Definition: Frequency.cpp:23
armarx::armem::client::ReadStream::pollOnce
std::optional< wm::EntitySnapshot > pollOnce(const SnapshotCallbackT &callback)
Perform one query and call the callbacks on each snapshot.
Definition: ReadStream.cpp:74
armarx::armem::client::ReadStream::stop
void stop()
Stop a running polling loop.
Definition: ReadStream.cpp:63