303 std::shared_ptr<const armem::wm::Memory> memoryToStore;
305 std::lock_guard l(bufferMutex);
309 memoryToStore = std::shared_ptr<const armem::wm::Memory>(std::move(
to_store));
312 if (memoryToStore->empty())
319 auto preFilterStart = std::chrono::steady_clock::now();
320 uint64_t filteredCount = 0;
321 uint64_t passedCount = 0;
322 auto filteredMemory =
_preFilterMemory(*memoryToStore, filteredCount, passedCount);
323 auto preFilterEnd = std::chrono::steady_clock::now();
326 auto preFilterTimeNs = std::chrono::duration_cast<std::chrono::nanoseconds>(
327 preFilterEnd - preFilterStart).count();
328 asyncStats.snapshotsPreFiltered.fetch_add(filteredCount, std::memory_order_relaxed);
329 asyncStats.snapshotsPassedPreFilter.fetch_add(passedCount, std::memory_order_relaxed);
330 asyncStats.totalPreFilterTimeNs.fetch_add(preFilterTimeNs, std::memory_order_relaxed);
333 if (!filteredMemory || filteredMemory->empty())
336 <<
" snapshots were pre-filtered, skipping enqueue";
634 ARMARX_INFO <<
"Async storage worker thread #" << threadId <<
" started";
636 size_t itemsProcessed = 0;
637 double totalTimeMs = 0.0;
638 double minTimeMs = std::numeric_limits<double>::max();
639 double maxTimeMs = 0.0;
644 size_t queueSizeBeforePop = 0;
645 bool hasItem =
false;
648 if (storageQueue.pop(itemPtr))
651 queueSizeBeforePop = queueSize.fetch_sub(1, std::memory_order_acq_rel);
652 numThreadsProcessing.fetch_add(1, std::memory_order_release);
658 std::unique_lock<std::mutex> lock(queueMutex);
661 if (stopWorkerThread.load(std::memory_order_acquire))
664 if (!storageQueue.pop(itemPtr))
669 queueSizeBeforePop = queueSize.fetch_sub(1, std::memory_order_acq_rel);
670 numThreadsProcessing.fetch_add(1, std::memory_order_release);
676 queueCondition.wait(lock, [
this]() {
677 return queueSize.load(std::memory_order_acquire) > 0 ||
678 stopWorkerThread.load(std::memory_order_acquire);
688 auto startTime = std::chrono::high_resolution_clock::now();
689 size_t snapshotCount = 0;
690 double conversionTimeMs = 0.0;
698 std::visit([&](
auto&& item) {
699 using T = std::decay_t<
decltype(item)>;
700 if constexpr (std::is_same_v<T, std::shared_ptr<const armem::wm::Memory>>)
703 item->forEachCoreSegment(
704 [&snapshotCount](
const auto& coreSegment)
706 coreSegment.forEachProviderSegment(
707 [&snapshotCount](
const auto& providerSegment)
709 providerSegment.forEachEntity(
710 [&snapshotCount](
const auto& entity)
712 entity.forEachSnapshot(
713 [&snapshotCount](
const auto&) { snapshotCount++; });
719 else if constexpr (std::is_same_v<T, PendingConversion>)
722 auto conversionStart = std::chrono::high_resolution_clock::now();
724 snapshotCount = item.snapshots.size();
725 auto memory = std::make_shared<armem::wm::Memory>(item.memoryName);
728 for (
const auto& snapshot : item.snapshots)
730 const std::string& coreSegmentName = snapshot.id().coreSegmentName;
731 const std::string& providerSegmentName = snapshot.id().providerSegmentName;
734 auto coreMeta = std::find_if(item.segmentMetadata.begin(), item.segmentMetadata.end(),
735 [&coreSegmentName](
const auto&
meta) { return meta.coreSegmentName == coreSegmentName; });
737 if (coreMeta == item.segmentMetadata.end())
739 ARMARX_WARNING <<
"Missing metadata for core segment: " << coreSegmentName;
744 if (!
memory->hasCoreSegment(coreSegmentName))
746 memory->addCoreSegment(coreSegmentName, coreMeta->coreSegmentAronType);
748 auto* coreSegment =
memory->findCoreSegment(coreSegmentName);
751 if (!coreSegment->hasProviderSegment(providerSegmentName))
753 auto providerTypeIt = coreMeta->providerSegmentAronTypes.find(providerSegmentName);
755 ? providerTypeIt->second :
nullptr;
756 coreSegment->addProviderSegment(providerSegmentName, providerType);
758 auto* providerSegment = coreSegment->findProviderSegment(providerSegmentName);
761 if (!providerSegment->hasEntity(snapshot.id().entityName))
763 providerSegment->addEntity(snapshot.id().entityName);
765 auto* entity = providerSegment->findEntity(snapshot.id().entityName);
766 entity->addSnapshot(snapshot);
769 auto conversionEnd = std::chrono::high_resolution_clock::now();
770 conversionTimeMs = std::chrono::duration<double, std::milli>(conversionEnd - conversionStart).count();
779 catch (
const std::exception& e)
781 ARMARX_ERROR <<
"Error during async storage: " << e.what();
791 auto endTime = std::chrono::high_resolution_clock::now();
792 double durationMs = std::chrono::duration<double, std::milli>(endTime - startTime).count();
793 uint64_t durationNs = std::chrono::duration_cast<std::chrono::nanoseconds>(endTime - startTime).count();
797 totalTimeMs += durationMs;
798 minTimeMs = std::min(minTimeMs, durationMs);
799 maxTimeMs = std::max(maxTimeMs, durationMs);
800 double avgTimeMs = totalTimeMs / itemsProcessed;
803 asyncStats.totalItemsProcessed.fetch_add(1, std::memory_order_relaxed);
804 asyncStats.totalSnapshotsStored.fetch_add(snapshotCount, std::memory_order_relaxed);
805 asyncStats.totalStorageTimeNs.fetch_add(durationNs, std::memory_order_relaxed);
806 if (conversionTimeMs > 0)
808 uint64_t conversionNs =
static_cast<uint64_t
>(conversionTimeMs * 1e6);
809 asyncStats.totalConversionTimeNs.fetch_add(conversionNs, std::memory_order_relaxed);
813 uint64_t currentMax = asyncStats.maxStorageTimeNs.load(std::memory_order_relaxed);
814 while (durationNs > currentMax)
816 if (asyncStats.maxStorageTimeNs.compare_exchange_weak(currentMax, durationNs, std::memory_order_relaxed))
823 numThreadsProcessing.fetch_sub(1, std::memory_order_release);
824 size_t remainingInQueue = queueSize.load(std::memory_order_acquire);
827 <<
"snapshots=" << snapshotCount
828 <<
", time=" << std::fixed << std::setprecision(2) << durationMs <<
"ms"
829 << (conversionTimeMs > 0 ?
" (conversion=" + std::to_string(
static_cast<int>(conversionTimeMs)) +
"ms)" :
"")
830 <<
", queue_before=" << queueSizeBeforePop
831 <<
", queue_after=" << remainingInQueue
832 <<
" | Stats: items=" << itemsProcessed
833 <<
", avg=" << std::fixed << std::setprecision(2) << avgTimeMs <<
"ms"
834 <<
", min=" << std::fixed << std::setprecision(2) << minTimeMs <<
"ms"
835 <<
", max=" << std::fixed << std::setprecision(2) << maxTimeMs <<
"ms";
839 ARMARX_DEBUG <<
"Async storage worker thread #" << threadId <<
" exiting. Final stats: "
840 <<
"total_items=" << itemsProcessed
841 <<
", total_time=" << std::fixed << std::setprecision(2) << totalTimeMs <<
"ms"
842 <<
", avg_time=" << std::fixed << std::setprecision(2) << (itemsProcessed > 0 ? totalTimeMs / itemsProcessed : 0.0) <<
"ms"
843 <<
", min_time=" << std::fixed << std::setprecision(2) << (itemsProcessed > 0 ? minTimeMs : 0.0) <<
"ms"
844 <<
", max_time=" << std::fixed << std::setprecision(2) << maxTimeMs <<
"ms";