25 #include <condition_variable>
67 template <
class MessageT>
74 using Callback = std::function<void(MessageT)>;
81 template <
class... Args>
83 messageCallback(
std::bind(args...,
std::placeholders::_1)),
86 workerThread->start();
106 template <
class... Args>
110 messageCallback = std::bind(args..., std::placeholders::_1);
112 if (not workerThread->isRunning())
114 workerThread->start();
126 if (!processMessages.load())
133 std::lock_guard g{mtx};
134 messageQueue.push(
data);
137 const size_t queueSize = messageQueue.size();
138 if ((maxQueueSize > 0) and (queueSize > maxQueueSize))
140 const size_t elementsToRemove = queueSize - maxQueueSize;
143 for (
size_t i = 0; i < elementsToRemove; i++)
165 std::optional<MessageT>
message;
169 std::unique_lock<std::mutex> lock(mtx);
170 cv.wait(lock, [
this] {
return not messageQueue.empty(); });
172 if (not messageQueue.empty())
174 message = messageQueue.front();
178 if (
message and processMessages.load())
181 messageCallback(
message.value());
186 ARMARX_DEBUG <<
"[Message queue] Worker thread has finished.";
189 [[nodiscard]] std::size_t
194 std::lock_guard g{mtx};
195 return messageQueue.size();
202 processMessages.store(
true);
209 processMessages.store(
false);
218 processMessages.store(
false);
220 std::unique_lock<std::mutex> lock(mtx);
222 cv.wait(lock, [
this] {
return messageQueue.empty(); });
231 std::lock_guard g{mtx};
233 messageQueue = std::queue<MessageT>();
240 this->maxQueueSize = maxQueueSize;
244 mutable std::mutex mtx;
245 std::queue<MessageT> messageQueue;
247 std::condition_variable
cv;
255 std::atomic<bool> processMessages{
true};
256 std::atomic<bool> ok{
true};
258 size_t maxQueueSize{0};