MessageQueue.h
Go to the documentation of this file.
1 /**
2  * This file is part of ArmarX.
3  *
4  * ArmarX is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License version 2 as
6  * published by the Free Software Foundation.
7  *
8  * ArmarX is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  * GNU General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public License
14  * along with this program. If not, see <http://www.gnu.org/licenses/>.
15  *
16  * @author Fabian Reister ( fabian dot reister at kit dot edu )
17  * @date 2021 Humanoids Group, H2T, KIT
18  * @license http://www.gnu.org/licenses/gpl-2.0.txt
19  * GNU General Public License
20  */
21 
22 #pragma once
23 
24 #include <atomic>
25 #include <condition_variable>
26 #include <cstddef>
27 #include <functional>
28 #include <mutex>
29 #include <optional>
30 #include <queue>
31 #include <thread>
32 
35 
37 {
38 
39  // https://embeddedartistry.com/blog/2017/02/08/implementing-an-asynchronous-dispatch-queue/
40 
41  /**
42  * @brief The purpose of the MessageQueue is to provide a convenient way to process incoming messages sequentially.
43  *
44  * Althought, the main idea is to pass in ICE messages, the MessageQueue can be used for any data.
45  *
46  * Usage example:
47  *
48  * struct ExampleMessage{
49  * int x, y;
50  * }
51  *
52  * class Consumer{
53  * public:
54  *
55  * void processMessage(const ExampleMessagePtr& message){
56  * // your implementation
57  * }
58  * }
59  *
60  * Consumer message_consumer;
61  * MessageQueue<ExampleMessage> message_queue(&Consumer::processMessage, message_consumer);
62  *
63  * message_queue.push({1, 2});
64  * message_queue.push({3, 4});
65  *
66  */
67  template <class MessageT>
69  {
70 
71  public:
72  using MessageType = MessageT;
73 
74  using Callback = std::function<void(MessageT)>;
75 
77  workerThread(new RunningTask<MessageQueue>(this, &MessageQueue::run, "worker")){};
78 
79  MessageQueue(const MessageQueue&) = delete;
80 
81  template <class... Args>
82  MessageQueue(Args... args) :
83  messageCallback(std::bind(args..., std::placeholders::_1)),
84  workerThread(new RunningTask<MessageQueue>(this, &MessageQueue::run, "worker"))
85  {
86  workerThread->start();
87  }
88 
90  {
91  // trigger thread to stop
92  ok.store(false);
93 
94  cv.notify_all();
95 
96  if (workerThread)
97  {
98  workerThread->stop();
99  workerThread->join();
100  }
101 
102  // ARMARX_INFO << "Waiting for workerThread to return";
103  // workerThread.join();
104  }
105 
106  template <class... Args>
107  void
108  connect(Args... args)
109  {
110  messageCallback = std::bind(args..., std::placeholders::_1);
111 
112  if (not workerThread->isRunning())
113  {
114  workerThread->start();
115  }
116  }
117 
118  /**
119  * @brief push data into the queue.
120  * @param data
121  */
122  void
123  push(MessageT data)
124  {
125  ARMARX_TRACE;
126  if (!processMessages.load())
127  {
128  return;
129  }
130 
131  ARMARX_TRACE;
132  {
133  std::lock_guard g{mtx};
134  messageQueue.push(data);
135 
136  // limit queue size
137  const size_t queueSize = messageQueue.size();
138  if ((maxQueueSize > 0) and (queueSize > maxQueueSize))
139  {
140  const size_t elementsToRemove = queueSize - maxQueueSize;
141 
142  // TODO(fabian.reister): is there a better way?
143  for (size_t i = 0; i < elementsToRemove; i++)
144  {
145  messageQueue.pop();
146  }
147  }
148 
149  ARMARX_VERBOSE << deactivateSpam(1) << "Message queue size is " << queueSize;
150  }
151  cv.notify_all(); // for run() and waitUntilProcessed()
152  }
153 
154  /**
155  * @brief run the worker thread function sending out messages via callback
156  */
157  void
158  run()
159  {
160  ARMARX_TRACE;
161 
162  while (ok.load())
163  {
164  // Get new data.
165  std::optional<MessageT> message;
166  {
167  ARMARX_TRACE;
168  // Wait until we have data
169  std::unique_lock<std::mutex> lock(mtx);
170  cv.wait(lock, [this] { return not messageQueue.empty(); });
171  // after wait, we own the lock
172  if (not messageQueue.empty())
173  {
174  message = messageQueue.front();
175  messageQueue.pop();
176  }
177  }
178  if (message and processMessages.load())
179  {
180  ARMARX_TRACE;
181  messageCallback(message.value());
182  }
183  // else: drop the message if the signal has changed
184  }
185 
186  ARMARX_DEBUG << "[Message queue] Worker thread has finished.";
187  }
188 
189  [[nodiscard]] std::size_t
190  size() const
191  {
192  ARMARX_DEBUG << "Locking size()";
193 
194  std::lock_guard g{mtx};
195  return messageQueue.size();
196  }
197 
198  void
200  {
201  ARMARX_TRACE;
202  processMessages.store(true);
203  }
204 
205  void
207  {
208  ARMARX_TRACE;
209  processMessages.store(false);
210  }
211 
212  void
214  {
215  ARMARX_DEBUG << "Locking waitUntilProcessed()";
216 
217  ARMARX_TRACE;
218  processMessages.store(false);
219 
220  std::unique_lock<std::mutex> lock(mtx);
221 
222  cv.wait(lock, [this] { return messageQueue.empty(); });
223  }
224 
225  void
227  {
228  ARMARX_DEBUG << "Locking clear()";
229 
230  ARMARX_TRACE;
231  std::lock_guard g{mtx};
232 
233  messageQueue = std::queue<MessageT>();
234  }
235 
236  void
237  setQueueSize(const size_t maxQueueSize) noexcept
238  {
239  ARMARX_TRACE;
240  this->maxQueueSize = maxQueueSize;
241  }
242 
243  private:
244  mutable std::mutex mtx;
245  std::queue<MessageT> messageQueue;
246 
247  std::condition_variable cv;
248 
249  Callback messageCallback;
250 
251  // std::thread workerThread;
252 
254 
255  std::atomic<bool> processMessages{true};
256  std::atomic<bool> ok{true};
257 
258  size_t maxQueueSize{0};
259  };
260 
261 } // namespace armarx::localization_and_mapping::cartographer_adapter
ARMARX_VERBOSE
#define ARMARX_VERBOSE
Definition: Logging.h:187
armarx::localization_and_mapping::cartographer_adapter::MessageQueue::disable
void disable()
Definition: MessageQueue.h:206
armarx::localization_and_mapping::cartographer_adapter::MessageQueue::waitUntilProcessed
void waitUntilProcessed()
Definition: MessageQueue.h:213
armarx::localization_and_mapping::cartographer_adapter::LaserScannerMessage
Definition: types.h:153
armarx::localization_and_mapping::cartographer_adapter::MessageQueue< armarx::localization_and_mapping::cartographer_adapter::LaserScannerMessage >::Callback
std::function< void(armarx::localization_and_mapping::cartographer_adapter::LaserScannerMessage)> Callback
Definition: MessageQueue.h:74
armarx::localization_and_mapping::cartographer_adapter::MessageQueue
The purpose of the MessageQueue is to provide a convenient way to process incoming messages sequentia...
Definition: MessageQueue.h:68
message
message(STATUS "Boost-Library-Dir: " "${Boost_LIBRARY_DIRS}") message(STATUS "Boost-LIBRARIES
Definition: CMakeLists.txt:8
RunningTask.h
armarx::RunningTask
Definition: ArmarXMultipleObjectsScheduler.h:36
armarx::localization_and_mapping::cartographer_adapter::MessageQueue::run
void run()
run the worker thread function sending out messages via callback
Definition: MessageQueue.h:158
deactivateSpam
SpamFilterDataPtr deactivateSpam(SpamFilterDataPtr const &spamFilter, float deactivationDurationSec, const std::string &identifier, bool deactivate)
Definition: Logging.cpp:75
ARMARX_TRACE
#define ARMARX_TRACE
Definition: trace.h:77
armarx::localization_and_mapping::cartographer_adapter::MessageQueue::MessageQueue
MessageQueue()
Definition: MessageQueue.h:76
data
uint8_t data[1]
Definition: EtherCATFrame.h:68
ARMARX_DEBUG
#define ARMARX_DEBUG
Definition: Logging.h:184
armarx::localization_and_mapping::cartographer_adapter::MessageQueue::size
std::size_t size() const
Definition: MessageQueue.h:190
armarx::localization_and_mapping::cartographer_adapter::MessageQueue::enable
void enable()
Definition: MessageQueue.h:199
armarx::localization_and_mapping::cartographer_adapter::MessageQueue::setQueueSize
void setQueueSize(const size_t maxQueueSize) noexcept
Definition: MessageQueue.h:237
armarx::localization_and_mapping::cartographer_adapter
This file is part of ArmarX.
Definition: ApproximateTimeQueue.cpp:15
armarx::localization_and_mapping::cartographer_adapter::MessageQueue::~MessageQueue
~MessageQueue()
Definition: MessageQueue.h:89
armarx::localization_and_mapping::cartographer_adapter::MessageQueue::push
void push(MessageT data)
push data into the queue.
Definition: MessageQueue.h:123
std
Definition: Application.h:66
IceUtil::Handle
Definition: forward_declarations.h:30
armarx::localization_and_mapping::cartographer_adapter::MessageQueue::connect
void connect(Args... args)
Definition: MessageQueue.h:108
armarx::localization_and_mapping::cartographer_adapter::MessageQueue::clear
void clear()
Definition: MessageQueue.h:226
cv
Definition: helper.h:34
Logging.h
armarx::localization_and_mapping::cartographer_adapter::MessageQueue::MessageQueue
MessageQueue(Args... args)
Definition: MessageQueue.h:82