ReadStream.h
Go to the documentation of this file.
1#pragma once
2
3#include <functional>
4#include <optional>
5#include <thread>
6
10
13
14#include "Reader.h"
15
17{
18 /**
19 * @brief A stream reading entity snapshots from the memory.
20 *
21 * After constructing a ReadStream, polling can be started in three ways:
22 *
23 * 1. Run a polling loop in this thread, blocking execution until terminated.
24 * See pollBlocking().
25 * 2. Run a polling loop in a new, separate thread, until it is stopped via stop().
26 * See pollAsync() and stop().
27 * 3. Perform a single query and process the result, embedded in your own loop or other control
28 * flow logic.
29 * See pollOnce().
30 */
32 {
33 public:
34 /**
35 * @brief Callback called on each entity snapshot in the queried ID.
36 *
37 * If it returns false, the stream is stopped.
38 */
39 using SnapshotCallbackT = std::function<bool(const wm::EntitySnapshot&)>;
40
41 /**
42 * @brief Inizialize a ReadStream which does not represent a stream.
43 */
44 ReadStream();
45
46 /**
47 * @brief Initialize a read stream.
48 *
49 * @param reader
50 * The reader to perform the queries.
51 * @param queriedId
52 * The memory ID in which all snapshots should be processed by the stream.
53 * @param maxPollFrequency
54 * The maximum frequency with which queries are performed. The
55 * real frequency might be lower.
56 */
57 ReadStream(const Reader& reader,
58 const MemoryID& queriedId,
59 const armarx::core::time::Frequency& maxPollFrequency =
61
62
63 /**
64 * @brief Poll in this thread as long as callback returns true.
65 *
66 * @param callback Function to call on each entity snapshot.
67 * @return The snapshot object that returns false.
68 * @throw armarx::armem::error::ReadStreamAlreadyPolling If the stream is already polling.
69 */
70 std::optional<wm::EntitySnapshot> pollBlocking(const SnapshotCallbackT& callback);
71
72 /**
73 * @brief Poll in a new thread as long as callback returns true.
74 *
75 * Note that callback will be called in a separate thread, so take care of synchronizing
76 * access to variables in the callback appropriately.
77 *
78 * Roughly equivalent to:
79 * @code
80 * std::thread thread([]() { stream.pollBlocking(); });
81 * @endcode
82 *
83 * @param callback Function to call on each entity snapshot.
84 * @throw armarx::armem::error::ReadStreamAlreadyPolling If the stream is already polling.
85 */
86 void pollAsync(const SnapshotCallbackT& callback);
87 /**
88 * @brief Stop a running polling loop.
89 *
90 * If a polling thread has been started by pollAsync() before, joins the thread.
91 */
92 void stop();
93
94 /**
95 * @brief Perform one query and call the callbacks on each snapshot.
96 *
97 * This allows you to define your own loop, for example:
98 *
99 * @code
100 * bool condition = true;
101 * while (condition)
102 * {
103 * auto snapshot = stream.pollOnce(callback);
104 *
105 * ...
106 *
107 * if (...)
108 * {
109 * condition = false;
110 * }
111 * }
112 * @endcode
113 *
114 * @param callback Function to call on each entity snapshot.
115 * @throw armarx::armem::error::ReadStreamAlreadyPolling If the stream is already polling.
116 */
117 std::optional<wm::EntitySnapshot> pollOnce(const SnapshotCallbackT& callback);
118
119
120 private:
121 std::optional<wm::EntitySnapshot> _pollBlocking(const SnapshotCallbackT& callback);
122 std::optional<wm::EntitySnapshot> _pollOnce(const SnapshotCallbackT& callback);
123
124
125 private:
126 Reader reader;
127 MemoryID queriedId;
128
130 armarx::DateTime timeStart;
131
132 std::atomic_bool isPolling = false;
133 std::atomic_bool pollingStoppedExternally = false;
134
135 std::thread pollingThread;
136 };
137
138} // namespace armarx::armem::client
ReadStream()
Inizialize a ReadStream which does not represent a stream.
Definition ReadStream.cpp:9
std::optional< wm::EntitySnapshot > pollOnce(const SnapshotCallbackT &callback)
Perform one query and call the callbacks on each snapshot.
std::optional< wm::EntitySnapshot > pollBlocking(const SnapshotCallbackT &callback)
Poll in this thread as long as callback returns true.
void stop()
Stop a running polling loop.
std::function< bool(const wm::EntitySnapshot &)> SnapshotCallbackT
Callback called on each entity snapshot in the queried ID.
Definition ReadStream.h:39
void pollAsync(const SnapshotCallbackT &callback)
Poll in a new thread as long as callback returns true.
Reads data from a memory server.
Definition Reader.h:25
Represents a point in time.
Definition DateTime.h:25
Represents a frequency.
Definition Frequency.h:17
static Frequency Hertz(std::int64_t hertz)
Definition Frequency.cpp:20
Simple rate limiter for use in loops to maintain a certain frequency given a clock.
Definition Metronome.h:57
This file is part of ArmarX.
armem::wm::EntitySnapshot EntitySnapshot