BufferedMemoryMixin.h
Go to the documentation of this file.
1#pragma once
2
3#include <atomic>
4#include <chrono>
5#include <condition_variable>
6#include <iomanip>
7#include <limits>
8#include <map>
9#include <mutex>
10#include <queue>
11#include <thread>
12#include <variant>
13
14#include <boost/lockfree/queue.hpp>
15
16#include <SimoxUtility/json.h>
17
19#include <ArmarXCore/interface/observers/ObserverInterface.h>
21
26
27// Include PendingConversion definition (shared with MemoryBase)
29
31{
32 /**
33 * @brief Statistics for async storage queue operations.
34 *
35 * Thread-safe statistics for tracking queue performance and throughput.
36 */
38 {
39 // Queue statistics
40 std::atomic<uint64_t> totalItemsEnqueued{0};
41 std::atomic<uint64_t> totalItemsProcessed{0};
42 std::atomic<uint64_t> totalSnapshotsStored{0};
43
44 // Timing statistics (in nanoseconds)
45 std::atomic<uint64_t> totalStorageTimeNs{0};
46 std::atomic<uint64_t> maxStorageTimeNs{0};
47 std::atomic<uint64_t> totalConversionTimeNs{0};
48
49 // Backpressure statistics
50 std::atomic<uint64_t> backpressureEvents{0};
51
52 // Dropped snapshots due to queue being full (non-blocking drop policy)
53 std::atomic<uint64_t> snapshotsDroppedBackpressure{0};
54
55 // Dropped snapshots (other reasons, e.g., errors)
56 std::atomic<uint64_t> snapshotsDropped{0};
57
58 // Pre-filter statistics (snapshots filtered before enqueue)
59 std::atomic<uint64_t> snapshotsPreFiltered{0};
60 std::atomic<uint64_t> snapshotsPassedPreFilter{0};
61 std::atomic<uint64_t> totalPreFilterTimeNs{0};
62
78
79 double getAvgStorageTimeMs() const
80 {
81 uint64_t count = totalItemsProcessed.load(std::memory_order_relaxed);
82 if (count == 0) return 0.0;
83 return totalStorageTimeNs.load(std::memory_order_relaxed) / (count * 1e6);
84 }
85
86 double getMaxStorageTimeMs() const
87 {
88 return maxStorageTimeNs.load(std::memory_order_relaxed) / 1e6;
89 }
90
92 {
93 uint64_t count = totalItemsProcessed.load(std::memory_order_relaxed);
94 if (count == 0) return 0.0;
95 return totalConversionTimeNs.load(std::memory_order_relaxed) / (count * 1e6);
96 }
97
98 double getAvgPreFilterTimeMs() const
99 {
100 uint64_t total = snapshotsPreFiltered.load(std::memory_order_relaxed) +
101 snapshotsPassedPreFilter.load(std::memory_order_relaxed);
102 if (total == 0) return 0.0;
103 return totalPreFilterTimeNs.load(std::memory_order_relaxed) / (total * 1e6);
104 }
105
107 {
108 uint64_t filtered = snapshotsPreFiltered.load(std::memory_order_relaxed);
109 uint64_t passed = snapshotsPassedPreFilter.load(std::memory_order_relaxed);
110 uint64_t total = filtered + passed;
111 if (total == 0) return 0.0;
112 return static_cast<double>(filtered) / static_cast<double>(total);
113 }
114 };
115
116
117 template <class _CoreSegmentT>
119 {
120 public:
121 /// Can hold either pre-converted Memory or pending conversion data
122 using StorageItem = std::variant<std::shared_ptr<const armem::wm::Memory>, PendingConversion>;
123
125 buffer(std::make_unique<armem::wm::Memory>(id)),
126 to_store(std::make_unique<armem::wm::Memory>(id)),
127 storageQueue(1000), // Initialize lock-free queue with capacity
128 queueSize(0),
129 stopWorkerThread(false)
130 {
131 }
132
134 {
135 stopAsyncStorageWorker();
136
137 // Clean up any remaining items in the lock-free queue
138 StorageItem* item;
139 while (storageQueue.pop(item))
140 {
141 delete item;
142 }
143 }
144
145 void
146 directlyStore(const armem::wm::Memory& memory, bool simulatedVersion = false)
147 {
148 // DEADLOCK FIX: Removed storeMutex to prevent lock order inversion
149 // Previously: storeMutex → ltm_mutex (in _directlyStore)
150 // This could deadlock with code that holds ltm_mutex → storeMutex
151 //
152 // The _directlyStore() implementations are responsible for their own
153 // thread safety (e.g., Memory::_directlyStore uses ltm_mutex)
154
155 TIMING_START(LTM_Memory_DirectlyStore);
156 _directlyStore(memory, simulatedVersion);
157 TIMING_END_STREAM(LTM_Memory_DirectlyStore, ARMARX_DEBUG);
158 }
159
160 void
161 directlyStore(const armem::server::wm::Memory& serverMemory, bool simulatedVersion = false)
162 {
164 memory.setName(serverMemory.name());
165 auto ids = memory.update(armem::toCommit(serverMemory), true);
166 ARMARX_DEBUG << "Amount of ids in update: " << ids.size();
167 this->directlyStore(memory, simulatedVersion);
168 }
169
170 void
172 {
173 //use this to count how much work is still left
174 }
175
176 /**
177 * @brief Flush the async storage queue and wait for all pending items to be stored.
178 * This blocks until the queue is empty and all storage operations are complete.
179 * @param timeoutMs Maximum time to wait in milliseconds (0 = wait indefinitely)
180 * @return true if queue was flushed successfully, false if timeout occurred
181 */
182 bool
183 flushAsyncStorage(int timeoutMs = 0)
184 {
185 ARMARX_DEBUG << "Flushing async storage queue...";
186
187 auto startTime = std::chrono::steady_clock::now();
188
189 while (true)
190 {
191 // Lock-free check: no mutex needed for atomic operations
192 if (queueSize.load(std::memory_order_acquire) == 0 &&
193 numThreadsProcessing.load(std::memory_order_acquire) == 0)
194 {
195 ARMARX_DEBUG << "Async storage queue flushed successfully";
196 return true;
197 }
198
199 // Check timeout
200 if (timeoutMs > 0)
201 {
202 auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
203 std::chrono::steady_clock::now() - startTime).count();
204 if (elapsed >= timeoutMs)
205 {
206 ARMARX_WARNING << "Flush timeout after " << elapsed << "ms with "
207 << getQueueSize() << " items still in queue";
208 return false;
209 }
210 }
211
212 std::this_thread::sleep_for(std::chrono::milliseconds(10));
213 }
214 }
215
216 /**
217 * @brief Get the current size of the async storage queue
218 */
219 size_t
221 {
222 return queueSize.load(std::memory_order_acquire);
223 }
224
225 /**
226 * @brief Get the async storage statistics
227 */
230 {
231 return asyncStats;
232 }
233
234 /**
235 * @brief Reset the async storage statistics
236 */
237 void
239 {
240 asyncStats.reset();
241 }
242
243 /**
244 * @brief Get the number of threads currently processing items
245 */
246 size_t
248 {
249 return numThreadsProcessing.load(std::memory_order_acquire);
250 }
251
252 protected:
253 void
255 {
256 ARMARX_CHECK_NOT_EMPTY(id.memoryName) << " The full id was: " << id.str();
257
258 buffer->id() = id.getMemoryID();
259 to_store->id() = id.getMemoryID();
260 }
261
262 void
264 {
265 // Start the async storage worker thread
266 startAsyncStorageWorker();
267
268 // create task if not already exists
269 if (!task)
270 {
271 int waitingTimeMs = 1000.f / storeFrequency;
273 this, &BufferedMemoryMixin::storeBuffer, waitingTimeMs);
274 task->start();
275 task->setDelayWarningTolerance(
276 waitingTimeMs); //a warning will be issued if the task takes longer than the waitingTime
277 }
278 }
279
280 void
282 {
283 if (task)
284 {
285 task->stop();
286 task = nullptr;
287 }
288
289 // Stop the async storage worker thread (but don't flush yet)
290 // The caller should call flushAsyncStorage() if they want to wait for completion
291 }
292
294 getBuffer() const
295 {
296 std::lock_guard l(bufferMutex);
297 return *buffer;
298 }
299
300 void
302 {
303 std::shared_ptr<const armem::wm::Memory> memoryToStore;
304 {
305 std::lock_guard l(bufferMutex);
306 to_store = std::move(buffer);
307 buffer = std::make_unique<armem::wm::Memory>(to_store->id());
308 // Convert unique_ptr to shared_ptr<const>
309 memoryToStore = std::shared_ptr<const armem::wm::Memory>(std::move(to_store));
310 }
311
312 if (memoryToStore->empty())
313 {
314 ARMARX_DEBUG << deactivateSpam(120) << "Cannot store an empty buffer. Ignoring.";
315 return;
316 }
317
318 // Pre-filter the memory before enqueuing to reduce queue pressure
319 auto preFilterStart = std::chrono::steady_clock::now();
320 uint64_t filteredCount = 0;
321 uint64_t passedCount = 0;
322 auto filteredMemory = _preFilterMemory(*memoryToStore, filteredCount, passedCount);
323 auto preFilterEnd = std::chrono::steady_clock::now();
324
325 // Update pre-filter statistics
326 auto preFilterTimeNs = std::chrono::duration_cast<std::chrono::nanoseconds>(
327 preFilterEnd - preFilterStart).count();
328 asyncStats.snapshotsPreFiltered.fetch_add(filteredCount, std::memory_order_relaxed);
329 asyncStats.snapshotsPassedPreFilter.fetch_add(passedCount, std::memory_order_relaxed);
330 asyncStats.totalPreFilterTimeNs.fetch_add(preFilterTimeNs, std::memory_order_relaxed);
331
332 // If all snapshots were filtered, skip enqueuing
333 if (!filteredMemory || filteredMemory->empty())
334 {
335 ARMARX_DEBUG << deactivateSpam(10) << "All " << filteredCount
336 << " snapshots were pre-filtered, skipping enqueue";
337 return;
338 }
339
340 // Push filtered memory to async storage queue
341 enqueueForAsyncStorage(std::move(filteredMemory));
342 }
343
344 /// configuration
345 void
346 configureMixin(const nlohmann::json& json)
347 {
348 if (json.find("BufferedMemory.storeFrequency") != json.end())
349 {
350 storeFrequency = json.at("BufferedMemory.storeFrequency");
351 ARMARX_INFO << "Setting store frequency from configuration json to "
353 }
354 if (json.find("BufferedMemory.maxAsyncQueueSize") != json.end())
355 {
356 maxAsyncQueueSize = json.at("BufferedMemory.maxAsyncQueueSize");
357 ARMARX_INFO << "Setting max async queue size from configuration json to "
359 }
360 if (json.find("BufferedMemory.numAsyncStorageThreads") != json.end())
361 {
362 numAsyncStorageThreads = json.at("BufferedMemory.numAsyncStorageThreads");
363 ARMARX_INFO << "Setting number of async storage threads from configuration json to "
365 }
366 if (json.find("BufferedMemory.workerShutdownTimeoutSeconds") != json.end())
367 {
368 workerShutdownTimeoutSeconds = json.at("BufferedMemory.workerShutdownTimeoutSeconds");
369 ARMARX_INFO << "Setting worker shutdown timeout from configuration json to "
370 << workerShutdownTimeoutSeconds << " seconds";
371 }
372 }
373
374 void
375 createPropertyDefinitions(PropertyDefinitionsPtr& defs, const std::string& prefix)
376 {
377 defs->optional(storeFrequency, prefix + "storeFrequency");
378 defs->optional(maxAsyncQueueSize, prefix + "maxAsyncQueueSize");
379 defs->optional(numAsyncStorageThreads, prefix + "numAsyncStorageThreads");
380 defs->optional(workerShutdownTimeoutSeconds, prefix + "workerShutdownTimeoutSeconds");
381 }
382
384 bool simulatedVersion = false) = 0;
385
386 /**
387 * @brief Pre-filter a memory object before enqueuing for async storage.
388 *
389 * This method applies snapshot filters BEFORE the memory enters the async queue,
390 * reducing queue pressure and avoiding work on the async worker threads.
391 *
392 * @param memory The memory to filter
393 * @param filteredCount Output: number of snapshots that were filtered out
394 * @param passedCount Output: number of snapshots that passed the filter
395 * @return A new memory containing only the snapshots that passed the filters,
396 * or nullptr if all snapshots were filtered out
397 */
398 virtual std::shared_ptr<armem::wm::Memory> _preFilterMemory(
400 uint64_t& filteredCount,
401 uint64_t& passedCount) = 0;
402
403 void
405 {
406 std::lock_guard l(bufferMutex);
407 buffer->append(memory);
408 }
409
410 /**
411 * @brief Public interface to enqueue memory for async storage
412 * This allows MemoryBase::store() to use the async thread pool
413 */
414 void
415 enqueueForAsyncStoragePublic(std::shared_ptr<const armem::wm::Memory> memory)
416 {
417 enqueueForAsyncStorage(std::move(memory));
418 }
419
420 protected:
421 /**
422 * @brief Enqueue a memory object for async storage
423 */
424 void
425 enqueueForAsyncStorage(std::shared_ptr<const armem::wm::Memory> memory)
426 {
428 }
429
430 /**
431 * @brief Enqueue snapshots for deferred conversion and async storage
432 * This defers the expensive toMemory() conversion to the async thread
433 */
434 void
436 {
437 enqueueStorageItem(StorageItem{std::move(pending)});
438 }
439
440 private:
441 /**
442 * @brief Start the async storage worker threads (thread pool)
443 */
444 void
445 startAsyncStorageWorker()
446 {
447 std::lock_guard<std::mutex> lock(workerMutex);
448 if (workerThreads.empty())
449 {
450 stopWorkerThread = false;
451 size_t numThreads = std::max(size_t(1), numAsyncStorageThreads);
452 workerThreads.reserve(numThreads);
453 for (size_t i = 0; i < numThreads; ++i)
454 {
455 workerThreads.emplace_back(&BufferedMemoryMixin::asyncStorageWorker, this, i);
456 }
457 ARMARX_INFO << "Async storage thread pool started with " << numThreads << " worker threads";
458 }
459 }
460
461 /**
462 * @brief Stop the async storage worker threads with configurable timeout
463 */
464 void
465 stopAsyncStorageWorker()
466 {
467 {
468 std::lock_guard<std::mutex> lock(workerMutex);
469 if (workerThreads.empty())
470 {
471 return;
472 }
473 stopWorkerThread = true;
474 }
475 queueCondition.notify_all();
476
477 // Wait for threads to finish with timeout
478 auto startTime = std::chrono::steady_clock::now();
479 auto timeout = std::chrono::seconds(workerShutdownTimeoutSeconds);
480 bool allThreadsFinished = false;
481
482 // Poll until all threads exit or timeout
483 while (true)
484 {
485 std::this_thread::sleep_for(std::chrono::milliseconds(100));
486
487 // Check if all threads have finished processing
488 if (numThreadsProcessing.load(std::memory_order_acquire) == 0 &&
489 queueSize.load(std::memory_order_acquire) == 0)
490 {
491 allThreadsFinished = true;
492 break;
493 }
494
495 // Check timeout
496 auto elapsed = std::chrono::steady_clock::now() - startTime;
497 if (elapsed >= timeout)
498 {
499 ARMARX_WARNING << "Worker threads did not finish within timeout of "
500 << workerShutdownTimeoutSeconds << " seconds. "
501 << "Still processing: " << numThreadsProcessing.load()
502 << " threads, queue size: " << queueSize.load();
503 break;
504 }
505 }
506
507 // RACE CONDITION FIX: Hold lock during thread join to protect workerThreads vector
508 {
509 std::lock_guard<std::mutex> lock(workerMutex);
510
511 if (allThreadsFinished)
512 {
513 // Threads finished gracefully, join them
514 for (auto& thread : workerThreads)
515 {
516 if (thread.joinable())
517 {
518 thread.join();
519 }
520 }
521 ARMARX_INFO << "Async storage thread pool stopped gracefully";
522 }
523 else
524 {
525 // Timeout occurred - detach threads to avoid blocking
526 ARMARX_WARNING << "Detaching worker threads that did not terminate in time. "
527 << "This may indicate slow disk I/O or stuck operations.";
528 for (auto& thread : workerThreads)
529 {
530 if (thread.joinable())
531 {
532 thread.detach();
533 }
534 }
535 ARMARX_WARNING << "Async storage thread pool stopped with timeout - threads detached";
536 }
537
538 workerThreads.clear();
539 }
540 }
541
542 protected:
543 /**
544 * @brief Internal method to enqueue any storage item (Memory or PendingConversion)
545 * PERFORMANCE: Uses lock-free queue to avoid mutex contention at 50Hz commit rate
546 * NON-BLOCKING: If the queue is full, the item is DROPPED immediately to prevent
547 * upstream queue (e.g., RobotWriterQueue) from backing up.
548 */
549 void
551 {
552 // Check queue size BEFORE allocating to avoid unnecessary allocation
553 size_t currentSize = queueSize.load(std::memory_order_acquire);
554 if (currentSize >= maxAsyncQueueSize)
555 {
556 // NON-BLOCKING DROP POLICY: Immediately drop new items when queue is full
557 // This prevents upstream queues (RobotWriterQueue) from backing up
558 asyncStats.backpressureEvents.fetch_add(1, std::memory_order_relaxed);
559
560 // Count snapshots in the item we're about to drop
561 size_t snapshotCount = countSnapshotsInItem(item);
562 asyncStats.snapshotsDroppedBackpressure.fetch_add(snapshotCount, std::memory_order_relaxed);
563
565 << "LTM async storage queue full (" << currentSize << "/" << maxAsyncQueueSize
566 << " items). DROPPING " << snapshotCount << " snapshots to prevent upstream backup. "
567 << "Consider increasing queue size, reducing commit rate, or enabling more aggressive filtering.";
568
569 return; // Non-blocking: return immediately without waiting
570 }
571
572 // Allocate on heap for lock-free queue (requires pointer type)
573 StorageItem* itemPtr = new StorageItem(std::move(item));
574
575 // Lock-free push - should succeed since we checked size above
576 // (there's a small race window but that's acceptable)
577 if (!storageQueue.push(itemPtr))
578 {
579 // Rare case: queue filled up between size check and push
580 size_t snapshotCount = countSnapshotsInItem(*itemPtr);
581 asyncStats.snapshotsDroppedBackpressure.fetch_add(snapshotCount, std::memory_order_relaxed);
582 ARMARX_WARNING << deactivateSpam(1) << "Lock-free queue race: dropping " << snapshotCount << " snapshots";
583 delete itemPtr;
584 return;
585 }
586
587 // Track enqueued item
588 asyncStats.totalItemsEnqueued.fetch_add(1, std::memory_order_relaxed);
589
590 // Atomically increment size counter
591 queueSize.fetch_add(1, std::memory_order_release);
592
593 // Signal waiting worker threads (use mutex only for condition variable)
594 {
595 std::lock_guard<std::mutex> lock(queueMutex);
596 }
597 queueCondition.notify_one();
598 }
599
600 /**
601 * @brief Count the number of snapshots in a storage item
602 */
603 size_t
605 {
606 size_t count = 0;
607 std::visit([&count](auto&& arg) {
608 using T = std::decay_t<decltype(arg)>;
609 if constexpr (std::is_same_v<T, std::shared_ptr<const armem::wm::Memory>>)
610 {
611 arg->forEachCoreSegment([&count](const auto& coreSegment) {
612 coreSegment.forEachProviderSegment([&count](const auto& providerSegment) {
613 providerSegment.forEachEntity([&count](const auto& entity) {
614 entity.forEachSnapshot([&count](const auto&) { count++; });
615 });
616 });
617 });
618 }
619 else if constexpr (std::is_same_v<T, PendingConversion>)
620 {
621 count = arg.snapshots.size();
622 }
623 }, item);
624 return count;
625 }
626
627 /**
628 * @brief Worker thread that processes the async storage queue
629 * @param threadId ID of this worker thread (for logging)
630 */
631 void
632 asyncStorageWorker(size_t threadId)
633 {
634 ARMARX_INFO << "Async storage worker thread #" << threadId << " started";
635
636 size_t itemsProcessed = 0;
637 double totalTimeMs = 0.0;
638 double minTimeMs = std::numeric_limits<double>::max();
639 double maxTimeMs = 0.0;
640
641 while (true)
642 {
643 StorageItem* itemPtr = nullptr;
644 size_t queueSizeBeforePop = 0;
645 bool hasItem = false;
646
647 // Try lock-free pop first (fast path - no mutex)
648 if (storageQueue.pop(itemPtr))
649 {
650 // Got an item from the queue
651 queueSizeBeforePop = queueSize.fetch_sub(1, std::memory_order_acq_rel);
652 numThreadsProcessing.fetch_add(1, std::memory_order_release);
653 hasItem = true;
654 }
655 else
656 {
657 // Queue is empty - wait for signal or stop
658 std::unique_lock<std::mutex> lock(queueMutex);
659
660 // Exit if stop requested and queue is still empty
661 if (stopWorkerThread.load(std::memory_order_acquire))
662 {
663 // Double-check queue is empty after acquiring lock
664 if (!storageQueue.pop(itemPtr))
665 {
666 break;
667 }
668 // Found an item, process it before exiting
669 queueSizeBeforePop = queueSize.fetch_sub(1, std::memory_order_acq_rel);
670 numThreadsProcessing.fetch_add(1, std::memory_order_release);
671 hasItem = true;
672 }
673 else
674 {
675 // Wait for notification
676 queueCondition.wait(lock, [this]() {
677 return queueSize.load(std::memory_order_acquire) > 0 ||
678 stopWorkerThread.load(std::memory_order_acquire);
679 });
680 // Loop back to try popping again
681 continue;
682 }
683 }
684
685 // Store the memory (outside the lock to avoid blocking enqueue operations)
686 if (hasItem)
687 {
688 auto startTime = std::chrono::high_resolution_clock::now();
689 size_t snapshotCount = 0;
690 double conversionTimeMs = 0.0;
691
692 try
693 {
694 TIMING_START(LTM_AsyncStorage);
695
696 // Handle variant: either pre-converted Memory or pending conversion
697 // Note: itemPtr is a pointer to StorageItem, dereference it
698 std::visit([&](auto&& item) {
699 using T = std::decay_t<decltype(item)>;
700 if constexpr (std::is_same_v<T, std::shared_ptr<const armem::wm::Memory>>)
701 {
702 // Already converted Memory - just store it
703 item->forEachCoreSegment(
704 [&snapshotCount](const auto& coreSegment)
705 {
706 coreSegment.forEachProviderSegment(
707 [&snapshotCount](const auto& providerSegment)
708 {
709 providerSegment.forEachEntity(
710 [&snapshotCount](const auto& entity)
711 {
712 entity.forEachSnapshot(
713 [&snapshotCount](const auto&) { snapshotCount++; });
714 });
715 });
716 });
717 this->directlyStore(*item);
718 }
719 else if constexpr (std::is_same_v<T, PendingConversion>)
720 {
721 // Pending conversion - do toMemory() here in async thread
722 auto conversionStart = std::chrono::high_resolution_clock::now();
723
724 snapshotCount = item.snapshots.size();
725 auto memory = std::make_shared<armem::wm::Memory>(item.memoryName);
726
727 // Perform the conversion using captured metadata
728 for (const auto& snapshot : item.snapshots)
729 {
730 const std::string& coreSegmentName = snapshot.id().coreSegmentName;
731 const std::string& providerSegmentName = snapshot.id().providerSegmentName;
732
733 // Find metadata for this core segment
734 auto coreMeta = std::find_if(item.segmentMetadata.begin(), item.segmentMetadata.end(),
735 [&coreSegmentName](const auto& meta) { return meta.coreSegmentName == coreSegmentName; });
736
737 if (coreMeta == item.segmentMetadata.end())
738 {
739 ARMARX_WARNING << "Missing metadata for core segment: " << coreSegmentName;
740 continue;
741 }
742
743 // Add core segment if needed
744 if (!memory->hasCoreSegment(coreSegmentName))
745 {
746 memory->addCoreSegment(coreSegmentName, coreMeta->coreSegmentAronType);
747 }
748 auto* coreSegment = memory->findCoreSegment(coreSegmentName);
749
750 // Add provider segment if needed
751 if (!coreSegment->hasProviderSegment(providerSegmentName))
752 {
753 auto providerTypeIt = coreMeta->providerSegmentAronTypes.find(providerSegmentName);
754 aron::type::ObjectPtr providerType = (providerTypeIt != coreMeta->providerSegmentAronTypes.end())
755 ? providerTypeIt->second : nullptr;
756 coreSegment->addProviderSegment(providerSegmentName, providerType);
757 }
758 auto* providerSegment = coreSegment->findProviderSegment(providerSegmentName);
759
760 // Add entity and snapshot
761 if (!providerSegment->hasEntity(snapshot.id().entityName))
762 {
763 providerSegment->addEntity(snapshot.id().entityName);
764 }
765 auto* entity = providerSegment->findEntity(snapshot.id().entityName);
766 entity->addSnapshot(snapshot);
767 }
768
769 auto conversionEnd = std::chrono::high_resolution_clock::now();
770 conversionTimeMs = std::chrono::duration<double, std::milli>(conversionEnd - conversionStart).count();
771
772 // Now store the converted memory
773 this->directlyStore(*memory);
774 }
775 }, *itemPtr); // Dereference the pointer to get the StorageItem
776
777 TIMING_END_STREAM(LTM_AsyncStorage, ARMARX_DEBUG);
778 }
779 catch (const std::exception& e)
780 {
781 ARMARX_ERROR << "Error during async storage: " << e.what();
782 }
783 catch (...)
784 {
785 ARMARX_ERROR << "Unknown error during async storage";
786 }
787
788 // Clean up the heap-allocated item
789 delete itemPtr;
790
791 auto endTime = std::chrono::high_resolution_clock::now();
792 double durationMs = std::chrono::duration<double, std::milli>(endTime - startTime).count();
793 uint64_t durationNs = std::chrono::duration_cast<std::chrono::nanoseconds>(endTime - startTime).count();
794
795 // Update local statistics
796 itemsProcessed++;
797 totalTimeMs += durationMs;
798 minTimeMs = std::min(minTimeMs, durationMs);
799 maxTimeMs = std::max(maxTimeMs, durationMs);
800 double avgTimeMs = totalTimeMs / itemsProcessed;
801
802 // Update global async storage statistics
803 asyncStats.totalItemsProcessed.fetch_add(1, std::memory_order_relaxed);
804 asyncStats.totalSnapshotsStored.fetch_add(snapshotCount, std::memory_order_relaxed);
805 asyncStats.totalStorageTimeNs.fetch_add(durationNs, std::memory_order_relaxed);
806 if (conversionTimeMs > 0)
807 {
808 uint64_t conversionNs = static_cast<uint64_t>(conversionTimeMs * 1e6);
809 asyncStats.totalConversionTimeNs.fetch_add(conversionNs, std::memory_order_relaxed);
810 }
811
812 // Update max storage time (using compare-exchange)
813 uint64_t currentMax = asyncStats.maxStorageTimeNs.load(std::memory_order_relaxed);
814 while (durationNs > currentMax)
815 {
816 if (asyncStats.maxStorageTimeNs.compare_exchange_weak(currentMax, durationNs, std::memory_order_relaxed))
817 {
818 break;
819 }
820 }
821
822 // Decrement processing counter and get queue size (all lock-free)
823 numThreadsProcessing.fetch_sub(1, std::memory_order_release);
824 size_t remainingInQueue = queueSize.load(std::memory_order_acquire);
825
826 ARMARX_DEBUG << "Async storage completed: "
827 << "snapshots=" << snapshotCount
828 << ", time=" << std::fixed << std::setprecision(2) << durationMs << "ms"
829 << (conversionTimeMs > 0 ? " (conversion=" + std::to_string(static_cast<int>(conversionTimeMs)) + "ms)" : "")
830 << ", queue_before=" << queueSizeBeforePop
831 << ", queue_after=" << remainingInQueue
832 << " | Stats: items=" << itemsProcessed
833 << ", avg=" << std::fixed << std::setprecision(2) << avgTimeMs << "ms"
834 << ", min=" << std::fixed << std::setprecision(2) << minTimeMs << "ms"
835 << ", max=" << std::fixed << std::setprecision(2) << maxTimeMs << "ms";
836 }
837 }
838
839 ARMARX_DEBUG << "Async storage worker thread #" << threadId << " exiting. Final stats: "
840 << "total_items=" << itemsProcessed
841 << ", total_time=" << std::fixed << std::setprecision(2) << totalTimeMs << "ms"
842 << ", avg_time=" << std::fixed << std::setprecision(2) << (itemsProcessed > 0 ? totalTimeMs / itemsProcessed : 0.0) << "ms"
843 << ", min_time=" << std::fixed << std::setprecision(2) << (itemsProcessed > 0 ? minTimeMs : 0.0) << "ms"
844 << ", max_time=" << std::fixed << std::setprecision(2) << maxTimeMs << "ms";
845 }
846
847
848 protected:
849 /// Internal memory for data consolidated from wm to ltm (double-buffer)
850 /// The to-put-to-ltm buffer (contains data in plain text)
851 /// This buffer may still be filtered (e.g. snapshot filters).
852 /// This means that it is not guaranteed that all data in the buffer will be stored in the ltm
853 std::unique_ptr<armem::wm::Memory> buffer;
854 std::unique_ptr<armem::wm::Memory> to_store;
855 std::atomic_flag storeFlag = ATOMIC_FLAG_INIT;
856
857 /// The frequency (Hz) to store data to the ltm
858 float storeFrequency = 10;
859
860 /// Maximum size of the async storage queue (default 100 items)
861 size_t maxAsyncQueueSize = 1000;
862
863 /// Number of worker threads for async storage (default 4)
865
866 /// Timeout in seconds for worker thread shutdown (default 30 seconds)
868
869 private:
870 /// The periodic'task to store the content of the buffer to the ltm
872
873 /// A mutex to access the buffer object
874 /// PERFORMANCE: Using std::mutex instead of recursive_mutex since no recursion occurs
875 /// All buffer access methods (getBuffer, storeBuffer, addToBuffer) are non-recursive
876 mutable std::mutex bufferMutex;
877 // NOTE: storeMutex removed - was causing lock order inversion deadlock
878 // _directlyStore() implementations handle their own synchronization
879
880 /// Async storage queue infrastructure
881 /// PERFORMANCE: Using lock-free queue to eliminate mutex contention at 50Hz commit rate
882 boost::lockfree::queue<StorageItem*> storageQueue; // Lock-free queue (holds pointers)
883 std::atomic<size_t> queueSize{0}; // Atomic counter for queue size
884 mutable std::mutex queueMutex; // Only used for condition variable signaling, not queue access
885 std::condition_variable queueCondition;
886 std::vector<std::thread> workerThreads;
887 std::mutex workerMutex;
888 std::atomic<bool> stopWorkerThread;
889 std::atomic<size_t> numThreadsProcessing{0}; // Tracks how many threads are currently processing items
890
891 /// Statistics for async storage operations
892 mutable AsyncStorageStatistics asyncStats;
893 };
894} // namespace armarx::armem::server::ltm::detail::mixin
#define ARMARX_CHECK_NOT_EMPTY(c)
SpamFilterDataPtr deactivateSpam(SpamFilterDataPtr const &spamFilter, float deactivationDurationSec, const std::string &identifier, bool deactivate)
Definition Logging.cpp:75
The periodic task executes one thread method repeatedly using the time period specified in the constr...
IceUtil::Handle< PeriodicTask< T > > pointer_type
Shared pointer type for convenience.
A memory storing data on the hard drive and in mongodb (needs 'armarx memory start' to start the mong...
Definition Memory.h:24
virtual void _directlyStore(const armem::wm::Memory &memory, bool simulatedVersion=false)=0
virtual std::shared_ptr< armem::wm::Memory > _preFilterMemory(const armem::wm::Memory &memory, uint64_t &filteredCount, uint64_t &passedCount)=0
Pre-filter a memory object before enqueuing for async storage.
void resetAsyncStorageStatistics()
Reset the async storage statistics.
void createPropertyDefinitions(PropertyDefinitionsPtr &defs, const std::string &prefix)
size_t countSnapshotsInItem(const StorageItem &item) const
Count the number of snapshots in a storage item.
size_t getQueueSize() const
Get the current size of the async storage queue.
void directlyStore(const armem::server::wm::Memory &serverMemory, bool simulatedVersion=false)
void enqueueForAsyncStorage(std::shared_ptr< const armem::wm::Memory > memory)
Enqueue a memory object for async storage.
void configureMixin(const nlohmann::json &json)
configuration
const AsyncStorageStatistics & getAsyncStorageStatistics() const
Get the async storage statistics.
std::variant< std::shared_ptr< const armem::wm::Memory >, PendingConversion > StorageItem
Can hold either pre-converted Memory or pending conversion data.
size_t getNumThreadsProcessing() const
Get the number of threads currently processing items.
void enqueuePendingConversion(PendingConversion pending)
Enqueue snapshots for deferred conversion and async storage This defers the expensive toMemory() conv...
void enqueueForAsyncStoragePublic(std::shared_ptr< const armem::wm::Memory > memory)
Public interface to enqueue memory for async storage This allows MemoryBase::store() to use the async...
bool flushAsyncStorage(int timeoutMs=0)
Flush the async storage queue and wait for all pending items to be stored.
void directlyStore(const armem::wm::Memory &memory, bool simulatedVersion=false)
void asyncStorageWorker(size_t threadId)
Worker thread that processes the async storage queue.
void enqueueStorageItem(StorageItem item)
Internal method to enqueue any storage item (Memory or PendingConversion) PERFORMANCE: Uses lock-free...
Client-side working memory.
Brief description of class memory.
Definition memory.h:39
#define ARMARX_INFO
The normal logging level.
Definition Logging.h:181
#define ARMARX_ERROR
The logging level for unexpected behaviour, that must be fixed.
Definition Logging.h:196
#define ARMARX_DEBUG
The logging level for output that is only interesting while debugging.
Definition Logging.h:184
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
#define TIMING_START(name)
Helper macro to do timing tests.
Definition TimeUtil.h:289
#define TIMING_END_STREAM(name, os)
Prints duration.
Definition TimeUtil.h:310
Commit toCommit(const ContainerT &container)
Definition operations.h:23
std::shared_ptr< Object > ObjectPtr
Definition Object.h:36
IceUtil::Handle< class PropertyDefinitionContainer > PropertyDefinitionsPtr
PropertyDefinitions smart pointer type.
Holds snapshots and metadata for deferred conversion in async thread This allows us to defer the expe...