MessageQueue.h
Go to the documentation of this file.
1/**
2 * This file is part of ArmarX.
3 *
4 * ArmarX is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License version 2 as
6 * published by the Free Software Foundation.
7 *
8 * ArmarX is distributed in the hope that it will be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
15 *
16 * @author Fabian Reister ( fabian dot reister at kit dot edu )
17 * @date 2021 Humanoids Group, H2T, KIT
18 * @license http://www.gnu.org/licenses/gpl-2.0.txt
19 * GNU General Public License
20 */
21
22#pragma once
23
24#include <atomic>
25#include <condition_variable>
26#include <cstddef>
27#include <functional>
28#include <mutex>
29#include <optional>
30#include <queue>
31#include <thread>
32
35
37{
38
39 // https://embeddedartistry.com/blog/2017/02/08/implementing-an-asynchronous-dispatch-queue/
40
41 /**
42 * @brief The purpose of the MessageQueue is to provide a convenient way to process incoming messages sequentially.
43 *
44 * Althought, the main idea is to pass in ICE messages, the MessageQueue can be used for any data.
45 *
46 * Usage example:
47 *
48 * struct ExampleMessage{
49 * int x, y;
50 * }
51 *
52 * class Consumer{
53 * public:
54 *
55 * void processMessage(const ExampleMessagePtr& message){
56 * // your implementation
57 * }
58 * }
59 *
60 * Consumer message_consumer;
61 * MessageQueue<ExampleMessage> message_queue(&Consumer::processMessage, message_consumer);
62 *
63 * message_queue.push({1, 2});
64 * message_queue.push({3, 4});
65 *
66 */
67 template <class MessageT>
69 {
70
71 public:
72 using MessageType = MessageT;
73
74 using Callback = std::function<void(MessageT)>;
75
77 workerThread(new RunningTask<MessageQueue>(this, &MessageQueue::run, "worker")){};
78
79 MessageQueue(const MessageQueue&) = delete;
80
81 template <class... Args>
82 MessageQueue(Args... args) :
83 messageCallback(std::bind(args..., std::placeholders::_1)),
84 workerThread(new RunningTask<MessageQueue>(this, &MessageQueue::run, "worker"))
85 {
86 workerThread->start();
87 }
88
90 {
91 // trigger thread to stop
92 ok.store(false);
93
94 cv.notify_all();
95
96 if (workerThread)
97 {
98 workerThread->stop();
99 workerThread->join();
100 }
101
102 // ARMARX_INFO << "Waiting for workerThread to return";
103 // workerThread.join();
104 }
105
106 template <class... Args>
107 void
108 connect(Args... args)
109 {
110 messageCallback = std::bind(args..., std::placeholders::_1);
111
112 if (not workerThread->isRunning())
113 {
114 workerThread->start();
115 }
116 }
117
118 /**
119 * @brief push data into the queue.
120 * @param data
121 */
122 void
123 push(MessageT data)
124 {
126 if (!processMessages.load())
127 {
128 return;
129 }
130
132 {
133 std::lock_guard g{mtx};
134 messageQueue.push(data);
135
136 // limit queue size
137 const size_t queueSize = messageQueue.size();
138 if ((maxQueueSize > 0) and (queueSize > maxQueueSize))
139 {
140 const size_t elementsToRemove = queueSize - maxQueueSize;
141
142 // TODO(fabian.reister): is there a better way?
143 for (size_t i = 0; i < elementsToRemove; i++)
144 {
145 messageQueue.pop();
146 }
147 }
148
149 ARMARX_VERBOSE << deactivateSpam(1) << "Message queue size is " << queueSize;
150 }
151 cv.notify_all(); // for run() and waitUntilProcessed()
152 }
153
154 /**
155 * @brief run the worker thread function sending out messages via callback
156 */
157 void
159 {
161
162 while (ok.load())
163 {
164 // Get new data.
165 std::optional<MessageT> message;
166 {
168 // Wait until we have data
169 std::unique_lock<std::mutex> lock(mtx);
170 cv.wait(lock, [this] { return not messageQueue.empty(); });
171 // after wait, we own the lock
172 if (not messageQueue.empty())
173 {
174 message = messageQueue.front();
175 messageQueue.pop();
176 }
177 }
178 if (message and processMessages.load())
179 {
181 messageCallback(message.value());
182 }
183 // else: drop the message if the signal has changed
184 }
185
186 ARMARX_DEBUG << "[Message queue] Worker thread has finished.";
187 }
188
189 [[nodiscard]] std::size_t
190 size() const
191 {
192 ARMARX_DEBUG << "Locking size()";
193
194 std::lock_guard g{mtx};
195 return messageQueue.size();
196 }
197
198 void
200 {
202 processMessages.store(true);
203 }
204
205 void
207 {
209 processMessages.store(false);
210 }
211
212 void
214 {
215 ARMARX_DEBUG << "Locking waitUntilProcessed()";
216
218 processMessages.store(false);
219
220 std::unique_lock<std::mutex> lock(mtx);
221
222 cv.wait(lock, [this] { return messageQueue.empty(); });
223 }
224
225 void
227 {
228 ARMARX_DEBUG << "Locking clear()";
229
231 std::lock_guard g{mtx};
232
233 messageQueue = std::queue<MessageT>();
234 }
235
236 void
237 setQueueSize(const size_t maxQueueSize) noexcept
238 {
240 this->maxQueueSize = maxQueueSize;
241 }
242
243 private:
244 mutable std::mutex mtx;
245 std::queue<MessageT> messageQueue;
246
247 std::condition_variable cv;
248
249 Callback messageCallback;
250
251 // std::thread workerThread;
252
254
255 std::atomic<bool> processMessages{true};
256 std::atomic<bool> ok{true};
257
258 size_t maxQueueSize{0};
259 };
260
261} // namespace armarx::localization_and_mapping::cartographer_adapter
SpamFilterDataPtr deactivateSpam(SpamFilterDataPtr const &spamFilter, float deactivationDurationSec, const std::string &identifier, bool deactivate)
Definition Logging.cpp:75
IceUtil::Handle< RunningTask< T > > pointer_type
Shared pointer type for convenience.
void run()
run the worker thread function sending out messages via callback
#define ARMARX_DEBUG
The logging level for output that is only interesting while debugging.
Definition Logging.h:184
#define ARMARX_VERBOSE
The logging level for verbose information.
Definition Logging.h:187
Definition helper.h:35
#define ARMARX_TRACE
Definition trace.h:77