MemoryToIceAdapter.cpp
Go to the documentation of this file.
2
3#include <iostream>
4#include <list>
5#include <sstream>
6#include <thread>
7
8#include <IceUtil/Time.h>
9
15
26
28#include "query_proc/wm/wm.h"
29
31{
32 namespace
33 {
34 /**
35 * @brief Extract metadata from WM structure for given snapshots
36 *
37 * This function is called OUTSIDE of any WM locks to avoid deadlocks.
38 * It extracts only the metadata needed for async conversion (aron types).
39 *
40 * @param structure The WM structure to extract from
41 * @param snapshots The snapshots to extract metadata for
42 * @return Vector of segment metadata
43 */
44 std::vector<ltm::detail::mixin::PendingConversion::SegmentMetadata>
45 extractSegmentMetadata(const wm::Memory& structure,
46 const std::vector<wm::EntitySnapshot>& snapshots)
47 {
48 std::vector<ltm::detail::mixin::PendingConversion::SegmentMetadata> metadata;
49
50 // Build unique set of core/provider segments from snapshots
51 std::map<std::string, std::set<std::string>> segmentMap; // core -> set of providers
52 for (const auto& snapshot : snapshots)
53 {
54 segmentMap[snapshot.id().coreSegmentName].insert(snapshot.id().providerSegmentName);
55 }
56
57 // Extract metadata for each segment
58 for (const auto& [coreSegmentName, providerNames] : segmentMap)
59 {
60 auto* coreStructure = structure.findCoreSegment(coreSegmentName);
61 if (!coreStructure)
62 {
63 ARMARX_WARNING << "Core segment not found in structure: " << coreSegmentName;
64 continue;
65 }
66
68 meta.coreSegmentName = coreSegmentName;
69 meta.coreSegmentAronType = coreStructure->aronType();
70
71 for (const auto& providerName : providerNames)
72 {
73 auto* providerStructure = coreStructure->findProviderSegment(providerName);
74 if (providerStructure)
75 {
76 meta.providerSegmentAronTypes[providerName] = providerStructure->aronType();
77 }
78 }
79
80 metadata.push_back(std::move(meta));
81 }
82
83 return metadata;
84 }
85 } // anonymous namespace
86
87
93
94 void
95 MemoryToIceAdapter::setMemoryListener(client::MemoryListenerInterfacePrx memoryListener)
96 {
97 this->memoryListenerTopic = memoryListener;
98 }
99
100 // WRITING
101 data::AddSegmentResult
102 MemoryToIceAdapter::addSegment(const data::AddSegmentInput& input, bool addCoreSegments)
103 {
106
107 ARMARX_DEBUG << "Adding segment using MemoryToIceAdapter";
108
109 data::AddSegmentResult output;
110
111 server::wm::CoreSegment* coreSegment = nullptr;
112 try
113 {
114 coreSegment = &workingMemory->getCoreSegment(input.coreSegmentName);
115 }
116 catch (const armem::error::MissingEntry& e)
117 {
118 if (addCoreSegments)
119 {
120 coreSegment = &workingMemory->addCoreSegment(input.coreSegmentName);
121 }
122 else
123 {
124 output.success = false;
125 output.errorMessage = e.what();
126 return output;
127 }
128 }
129 ARMARX_CHECK_NOT_NULL(coreSegment);
130
131 if (input.providerSegmentName.size() > 0)
132 {
133 coreSegment->doLockedExclusive(
134 [&coreSegment, &input]()
135 {
136 try
137 {
138 coreSegment->addProviderSegment(input.providerSegmentName);
139 }
141 {
142 // This is ok.
143 if (input.clearWhenExists)
144 {
146 coreSegment->getProviderSegment(input.providerSegmentName);
147 provider.clear();
148 }
149 }
150 });
151 }
152
153 armem::MemoryID segmentID;
154 segmentID.memoryName = workingMemory->name();
155 segmentID.coreSegmentName = input.coreSegmentName;
156 segmentID.providerSegmentName = input.providerSegmentName;
157
158 output.success = true;
159 output.segmentID = segmentID.str();
160 return output;
161 }
162
163 data::AddSegmentsResult
164 MemoryToIceAdapter::addSegments(const data::AddSegmentsInput& input, bool addCoreSegments)
165 {
168
169 data::AddSegmentsResult output;
170 for (const auto& i : input)
171 {
172 output.push_back(addSegment(i, addCoreSegments));
173 }
174 return output;
175 }
176
177 data::CommitResult
178 MemoryToIceAdapter::commit(const data::Commit& commitIce, Time timeArrived)
179 {
182 auto handleException = [](const std::string& what)
183 {
184 data::CommitResult result;
185 data::EntityUpdateResult& r = result.results.emplace_back();
186 r.success = false;
187 r.errorMessage = what;
188 return result;
189 };
190
192 try
193 {
194 ::armarx::armem::fromIce(commitIce, commit, timeArrived);
195 }
197 {
198 throw;
199 return handleException(e.what());
200 }
201 catch (const Ice::Exception& e)
202 {
203 throw;
204 return handleException(e.what());
205 }
206
207 armem::CommitResult result = this->commit(commit);
208 data::CommitResult resultIce;
209 toIce(resultIce, result);
210
211 return resultIce;
212 }
213
214 data::CommitResult
215 MemoryToIceAdapter::commit(const data::Commit& commitIce)
216 {
218 return commit(commitIce, armem::Time::Now());
219 }
220
223 {
225 return this->_commit(commit, false);
226 }
227
228 data::CommitResult
229 MemoryToIceAdapter::commitLocking(const data::Commit& commitIce, Time timeArrived)
230 {
233 auto handleException = [](const std::string& what)
234 {
235 data::CommitResult result;
236 data::EntityUpdateResult& r = result.results.emplace_back();
237 r.success = false;
238 r.errorMessage = what;
239 return result;
240 };
241
243 try
244 {
245 ::armarx::armem::fromIce(commitIce, commit, timeArrived);
246 }
248 {
249 throw;
250 return handleException(e.what());
251 }
252 catch (const Ice::Exception& e)
253 {
254 throw;
255 return handleException(e.what());
256 }
257
258 armem::CommitResult result = this->commitLocking(commit);
259 data::CommitResult resultIce;
260 toIce(resultIce, result);
261
262 return resultIce;
263 }
264
265 data::CommitResult
266 MemoryToIceAdapter::commitLocking(const data::Commit& commitIce)
267 {
269 return commitLocking(commitIce, armem::Time::Now());
270 }
271
274 {
276 return this->_commit(commit, true);
277 }
278
280 MemoryToIceAdapter::_commit(const armem::Commit& commit, bool locking)
281 {
283 TIMING_START(MemoryToIceAdapter_commit);
284
285 // Start timing for this commit
286 auto commitStartTime = std::chrono::steady_clock::now();
287
288 IceUtil::Time startTime;
289 auto debugObserver = longtermMemory ? longtermMemory->getDebugObserver() : nullptr;
290 if (debugObserver)
291 {
292 startTime = IceUtil::Time::now();
293 }
294
295 // Update statistics - increment commit count and entity update count
296 statistics.totalCommitCount.fetch_add(1, std::memory_order_relaxed);
297 statistics.totalEntityUpdates.fetch_add(commit.updates.size(), std::memory_order_relaxed);
298
299 std::vector<data::MemoryID> updatedIDs;
300 const bool publishUpdates = bool(memoryListenerTopic);
301
302 CommitResult commitResult;
303 for (const EntityUpdate& update : commit.updates)
304 {
305 EntityUpdateResult& result = commitResult.results.emplace_back();
306 try
307 {
308 IceUtil::Time updateStartTime;
309 if (debugObserver)
310 {
311 updateStartTime = IceUtil::Time::now();
312 }
313
314 auto updateResult =
315 locking ? workingMemory->updateLocking(update) : workingMemory->update(update);
316
317 result.success = true;
318 result.snapshotID = updateResult.id;
319 result.arrivedTime = update.arrivedTime;
320
321 // Track successful update
322 statistics.successfulUpdates.fetch_add(1, std::memory_order_relaxed);
323
324 if (debugObserver)
325 {
326 IceUtil::Time updateEndTime = IceUtil::Time::now();
327 IceUtil::Time updateElapsed = updateEndTime - updateStartTime;
328 float updateElapsedMs = updateElapsed.toMilliSecondsDouble();
329
330 std::string channelName = workingMemory ? workingMemory->name() + "Memory" : "Memory";
331 debugObserver->setDebugChannel(
332 channelName,
333 {
334 {"Memory | Commit | updateResult step [ms]", new armarx::Variant(updateElapsedMs)},
335 });
336 }
337
338 for (const auto& snapshot : updateResult.removedSnapshots)
339 {
340 ARMARX_DEBUG << "The id " << snapshot.id() << " was removed from wm";
341 }
342
343 // Consolidate to ltm(s) if recording mode is CLONE_WM
344 if (longtermMemory->isRecording() &&
345 longtermMemory->getRecordingMode() ==
347 {
348 // PERFORMANCE OPTIMIZATION: Defer conversion to async thread
349 // Previously, toMemory() was called synchronously here, blocking the commit path
350 // Now we pass snapshots directly and conversion happens in async storage thread
351 //
352 // DEADLOCK FIX: Extract metadata BEFORE calling storeSnapshotsAsync to ensure
353 // we don't access WM structure while any WM locks might be held
354 IceUtil::Time storeStartTime;
355 if (debugObserver)
356 {
357 storeStartTime = IceUtil::Time::now();
358 }
359
360 // Extract metadata from WM structure (done outside any locks)
361 auto segmentMetadata = extractSegmentMetadata(*workingMemory, updateResult.updatedSnapshots);
362
363 // Store snapshots with deferred conversion (happens in async thread)
364 // This no longer accesses WM structure - all metadata is pre-extracted
365 longtermMemory->storeSnapshotsAsync(
366 longtermMemory->name(),
367 updateResult.updatedSnapshots,
368 segmentMetadata);
369
370 if (debugObserver)
371 {
372 IceUtil::Time storeEndTime = IceUtil::Time::now();
373 IceUtil::Time storeElapsed = storeEndTime - storeStartTime;
374 float storeElapsedMs = storeElapsed.toMilliSecondsDouble();
375
376 std::string channelName = workingMemory ? workingMemory->name() + "Memory" : "Memory";
377 debugObserver->setDebugChannel(
378 channelName,
379 {
380 {"Memory | Commit | LTM enqueue (CONSOLIDATE_ALL) [ms]", new armarx::Variant(storeElapsedMs)},
381 });
382 }
383 }
384
385
386 // Consolidate to ltm(s) if recording mode is CONSOLIDATE_REMOVED
387 if (longtermMemory->isRecording() &&
388 longtermMemory->getRecordingMode() ==
390 {
391 // PERFORMANCE OPTIMIZATION: Defer conversion to async thread
392 // Previously, toMemory() was called synchronously here, blocking the commit path
393 // Now we pass snapshots directly and conversion happens in async storage thread
394 //
395 // DEADLOCK FIX: Extract metadata BEFORE calling storeSnapshotsAsync to ensure
396 // we don't access WM structure while any WM locks might be held
397 IceUtil::Time storeStartTime;
398 if (debugObserver)
399 {
400 storeStartTime = IceUtil::Time::now();
401 }
402
403 // Extract metadata from WM structure (done outside any locks)
404 auto segmentMetadata = extractSegmentMetadata(*workingMemory, updateResult.removedSnapshots);
405
406 // Store snapshots with deferred conversion (happens in async thread)
407 // This no longer accesses WM structure - all metadata is pre-extracted
408 longtermMemory->storeSnapshotsAsync(
409 longtermMemory->name(),
410 updateResult.removedSnapshots,
411 segmentMetadata);
412
413 if (debugObserver)
414 {
415 IceUtil::Time storeEndTime = IceUtil::Time::now();
416 IceUtil::Time storeElapsed = storeEndTime - storeStartTime;
417 float storeElapsedMs = storeElapsed.toMilliSecondsDouble();
418
419 std::string channelName = workingMemory ? workingMemory->name() + "Memory" : "Memory";
420 debugObserver->setDebugChannel(
421 channelName,
422 {
423 {"Memory | Commit | LTM enqueue (CONSOLIDATE_REMOVED) [ms]", new armarx::Variant(storeElapsedMs)},
424 });
425 }
426 }
427
428 if (longtermMemory->isRecording() &&
429 longtermMemory->getRecordingMode() ==
431 {
432 ARMARX_WARNING << deactivateSpam() << "THIS IS NOT IMPLEMENTED YET!!!";
433 }
434
435 if (publishUpdates)
436 {
437 data::MemoryID& id = updatedIDs.emplace_back();
438 toIce(id, result.snapshotID);
439 }
440 }
441 catch (const error::ArMemError& e)
442 {
443 result.success = false;
444 result.errorMessage = e.what();
445 statistics.failedUpdates.fetch_add(1, std::memory_order_relaxed);
446 }
447 catch (const aron::error::AronException& e)
448 {
449 result.success = false;
450 result.errorMessage = e.what();
451 statistics.failedUpdates.fetch_add(1, std::memory_order_relaxed);
452 }
453 catch (const Ice::Exception& e)
454 {
455 result.success = false;
456 result.errorMessage = e.what();
457 statistics.failedUpdates.fetch_add(1, std::memory_order_relaxed);
458 }
459 catch (...)
460 {
461 ARMARX_INFO << "Error during LTM consollidation";
462 statistics.failedUpdates.fetch_add(1, std::memory_order_relaxed);
463 }
464 }
465
466 if (publishUpdates)
467 {
468 memoryListenerTopic->memoryUpdated(updatedIDs);
469 }
470
471 // Calculate commit blocking time
472 auto commitEndTime = std::chrono::steady_clock::now();
473 double commitBlockingTimeMs = std::chrono::duration<double, std::milli>(commitEndTime - commitStartTime).count();
474
475 // Update blocking time statistics (using compare-exchange for max)
476 {
477 double currentTotal = statistics.totalCommitBlockingTimeMs.load(std::memory_order_relaxed);
478 statistics.totalCommitBlockingTimeMs.store(currentTotal + commitBlockingTimeMs, std::memory_order_relaxed);
479
480 double currentMax = statistics.maxCommitBlockingTimeMs.load(std::memory_order_relaxed);
481 while (commitBlockingTimeMs > currentMax)
482 {
483 if (statistics.maxCommitBlockingTimeMs.compare_exchange_weak(currentMax, commitBlockingTimeMs, std::memory_order_relaxed))
484 {
485 break;
486 }
487 }
488 }
489
490 if (debugObserver)
491 {
492 IceUtil::Time endTime = IceUtil::Time::now();
493 IceUtil::Time elapsed = endTime - startTime;
494 float elapsedMs = elapsed.toMilliSecondsDouble();
495
496 // Calculate current rates
497 auto [commitsPerSec, queriesPerSec] = statistics.updateRates();
498
499 // Get queue size from LTM if available
500 size_t queueSize = longtermMemory ? longtermMemory->getAsyncQueueSize() : 0;
501
502 std::string channelName = workingMemory ? workingMemory->name() + "Memory" : "Memory";
503 debugObserver->setDebugChannel(
504 channelName,
505 {
506 // Timing metrics
507 {"Memory | Commit | t blocked [ms]", new armarx::Variant(elapsedMs)},
508 {"Memory | Commit | max blocked [ms]", new armarx::Variant(static_cast<float>(statistics.maxCommitBlockingTimeMs.load()))},
509 // Rate metrics
510 {"Memory | writes/sec", new armarx::Variant(static_cast<float>(commitsPerSec))},
511 {"Memory | reads/sec", new armarx::Variant(static_cast<float>(queriesPerSec))},
512 // Count metrics
513 {"Memory | total commits", new armarx::Variant(static_cast<int>(statistics.totalCommitCount.load()))},
514 {"Memory | total entity updates", new armarx::Variant(static_cast<int>(statistics.totalEntityUpdates.load()))},
515 {"Memory | successful updates", new armarx::Variant(static_cast<int>(statistics.successfulUpdates.load()))},
516 {"Memory | failed updates", new armarx::Variant(static_cast<int>(statistics.failedUpdates.load()))},
517 // Queue metrics
518 {"Memory | LTM async queue size", new armarx::Variant(static_cast<int>(queueSize))},
519 });
520 }
521
522 TIMING_END_STREAM(MemoryToIceAdapter_commit, ARMARX_DEBUG);
523 return commitResult;
524 }
525
526 // READING
527 armem::query::data::Result
528 MemoryToIceAdapter::query(const armem::query::data::Input& input)
529 {
533
534 // Start timing for this query
535 auto queryStartTime = std::chrono::steady_clock::now();
536
537 // Update statistics - increment query count
538 statistics.totalQueryCount.fetch_add(1, std::memory_order_relaxed);
539
540 // Core segment processors will aquire the core segment locks.
542 armem::query::boolToDataMode(input.withData));
543 armem::wm::Memory wmResult = wmServerProcessor.process(input, *workingMemory);
544
545 armem::query::data::Result result;
546
547
548 result.memory = armarx::toIce<data::MemoryPtr>(wmResult);
549
550 result.success = true;
551 if (result.memory->coreSegments.size() == 0)
552 {
553 ARMARX_DEBUG << "No data in memory found after query.";
554 }
555
556 // Calculate query blocking time
557 auto queryEndTime = std::chrono::steady_clock::now();
558 double queryBlockingTimeMs = std::chrono::duration<double, std::milli>(queryEndTime - queryStartTime).count();
559
560 // Update blocking time statistics (using compare-exchange for max)
561 {
562 double currentTotal = statistics.totalQueryBlockingTimeMs.load(std::memory_order_relaxed);
563 statistics.totalQueryBlockingTimeMs.store(currentTotal + queryBlockingTimeMs, std::memory_order_relaxed);
564
565 double currentMax = statistics.maxQueryBlockingTimeMs.load(std::memory_order_relaxed);
566 while (queryBlockingTimeMs > currentMax)
567 {
568 if (statistics.maxQueryBlockingTimeMs.compare_exchange_weak(currentMax, queryBlockingTimeMs, std::memory_order_relaxed))
569 {
570 break;
571 }
572 }
573 }
574
575 // Report to debug observer if available
576 auto debugObserver = longtermMemory ? longtermMemory->getDebugObserver() : nullptr;
577 if (debugObserver)
578 {
579 std::string channelName = workingMemory ? workingMemory->name() + "Memory" : "Memory";
580 debugObserver->setDebugChannel(
581 channelName,
582 {
583 {"Memory | Query | t blocked [ms]", new armarx::Variant(static_cast<float>(queryBlockingTimeMs))},
584 {"Memory | Query | max blocked [ms]", new armarx::Variant(static_cast<float>(statistics.maxQueryBlockingTimeMs.load()))},
585 {"Memory | total queries", new armarx::Variant(static_cast<int>(statistics.totalQueryCount.load()))},
586 });
587 }
588
589 return result;
590 }
591
592 armem::query::data::Result
593 MemoryToIceAdapter::queryLTM(const armem::query::data::Input& input, bool storeIntoWM)
594 {
596
598 armem::wm::Memory ltmResult = ltmProcessor.process(input, *longtermMemory);
599
600 // convert memory ==> meaning resolving references
601 // upon query, the LTM only returns a structure of the data (memory without data)
602 if (input.withData)
603 {
604 longtermMemory->resolve(ltmResult);
605 }
606
607 if (longtermMemory->isRecording() || storeIntoWM)
608 {
609 this->commit(toCommit(ltmResult));
610
611 // mark removed entries of wm in viewer
612 // TODO
613 }
614
615 armem::query::data::Result result;
616
617 result.memory = armarx::toIce<data::MemoryPtr>(ltmResult);
618
619 result.success = true;
620 if (result.memory->coreSegments.size() == 0)
621 {
622 ARMARX_DEBUG << "No data in ltm found after query.";
623 }
624
625 return result;
626 }
627
630 {
632
633 return client::QueryResult::fromIce(query(input.toIce()));
634 }
635
636 armem::structure::data::GetServerStructureResult
638 {
642
643 armem::structure::data::GetServerStructureResult ret;
644 ret.success = true;
645
646 wm::Memory structure;
647 structure.id() = workingMemory->id();
648
649 // Get all info from the WM
651 builder.all();
652
653 auto query_result = this->query(builder.buildQueryInput());
654 if (query_result.success)
655 {
656 structure.append(query_result.memory);
657 }
658
659 // Get all info from the LTM
660 structure.append(longtermMemory->loadAllReferences());
661
662 ret.serverStructure = armarx::toIce<data::MemoryPtr>(structure);
663
664 return ret;
665 }
666
669 {
671
672 ARMARX_INFO << "Reloading of all core segments from LTM into WM triggered";
673
674 int maxAmountOfSnapshots = this->longtermMemory->p.maxAmountOfSnapshotsLoaded;
675 //create WM and load latest references
677 this->longtermMemory->loadLatestNReferences(maxAmountOfSnapshots, m);
678
679 //construct a commit of the loaded data and commit it to the working memory
680 auto com = armem::toCommit(m);
681 auto res = this->commit(com);
682
683 //the CommitResult contains some information which might be helpful:
684 return res;
685 }
686
688 MemoryToIceAdapter::reloadCoreSegmentsFromLTM(std::list<std::string>& coreSegmentNames)
689 {
691
692 ARMARX_INFO << "Reloading of specific core segments from LTM into WM triggered";
693
694 std::ostringstream namesStr;
695 for (auto it = coreSegmentNames.begin(); it != coreSegmentNames.end(); ++it)
696 {
697 if (it != coreSegmentNames.begin())
698 namesStr << ", "; // Add comma before every element except the first
699 namesStr << *it;
700 }
701
702 ARMARX_INFO << "Loading core segments=" << namesStr.str();
703
704
705 int maxAmountOfSnapshots = this->longtermMemory->p.maxAmountOfSnapshotsLoaded;
706 //create WM and load latest references
708 this->longtermMemory->loadLatestNReferences(maxAmountOfSnapshots, m, coreSegmentNames);
709
710 //construct a commit of the loaded data and commit it to the working memory
711 auto com = armem::toCommit(m);
712 auto res = this->commit(com);
713
714 //the CommitResult contains some information which might be helpful:
715 return res;
716 }
717
720 {
721 ARMARX_INFO << "Reloading of coresegment defined in 'loadedCoreSegments' from LTM into WM "
722 "on startup triggered";
723
724 auto coreNames = this->longtermMemory->p.coreSegmentsToLoad;
725
726 ARMARX_INFO << "Loading core segments=" << coreNames
727 << " defined in property 'loadedCoreSegments'";
728
729 //convert string to list of names:
730 std::list<std::string> names;
731 std::stringstream ss(coreNames);
732 std::string item;
733
734 while (std::getline(ss, item, ','))
735 {
736 names.push_back(item);
737 }
738
739 return reloadCoreSegmentsFromLTM(names);
740 }
741
742 // WM LOADING FROM LTM
745 {
747
748 ARMARX_INFO << "Reloading of data from LTM into WM on startup triggered";
749
750 if (this->longtermMemory->p.importOnStartUp)
751 {
753 }
754 else
755 {
756 ARMARX_INFO << "Not loading initial data from LTM due to importOnStartup being "
757 << this->longtermMemory->p.importOnStartUp;
759 return r;
760 }
761 }
762
763 // LTM STORING AND RECORDING
764 dto::DirectlyStoreResult
765 MemoryToIceAdapter::directlyStore(const dto::DirectlyStoreInput& directlStoreInput)
766 {
769
770 dto::DirectlyStoreResult output;
771 output.success = true;
772
773 armem::wm::Memory m = armarx::fromIce<armem::wm::Memory>(directlStoreInput.memory);
774 longtermMemory->directlyStore(m);
775
776 return output;
777 }
778
779 dto::StartRecordResult
780 MemoryToIceAdapter::startRecord(const dto::StartRecordInput& startRecordInput)
781 {
784 ARMARX_IMPORTANT << "Enabling the recording of memory " << longtermMemory->id().str();
785 longtermMemory->startRecording();
786
787 dto::StartRecordResult ret;
788 ret.success = true;
789
790 return ret;
791 }
792
793 dto::StopRecordResult
795 {
799 ARMARX_IMPORTANT << "Disabling the recording of memory " << longtermMemory->id().str();
800
801 if (longtermMemory->p.storeOnStop)
802 { //if true this means when stopping LTM recording leftover snapshots are transferred to WM using the simulated consolidation
803 ARMARX_INFO << "Starting to save left-over WM data into LTM";
804 longtermMemory->directlyStore(*workingMemory, true);
805 ARMARX_INFO << "Stored leftover WM data into LTM";
806 }
807 else
808 {
809 ARMARX_INFO << "Not storing WM data into LTM on stop, because storeOnStop is "
810 << longtermMemory->p.storeOnStop;
811 }
812
813 // Stop the recording (stops the periodic buffer task)
814 longtermMemory->stopRecording();
815
816 // Use the new async flush mechanism to ensure all queued data is written
817 // This runs in a separate thread to avoid blocking the caller
818 auto ltm = longtermMemory;
819 std::thread flushThread(
820 [ltm]()
821 {
822 ARMARX_INFO << "Flushing async storage queue...";
823 bool success = ltm->flushAsyncStorage(0); // 0 = wait indefinitely
824 if (success)
825 {
826 ARMARX_INFO << "All pending data stored successfully";
827 }
828 else
829 {
830 ARMARX_WARNING << "Flush completed with timeout or errors";
831 }
832 ltm->bufferFinished();
833 });
834 flushThread.detach();
835
837 << "Stopped all LTM recordings, flushing async queue in background. "
838 << "Please wait with stopping the component until all files are written";
839
840 dto::StopRecordResult ret;
841 ret.success = true;
842
843 return ret;
844 }
845
846 dto::RecordStatusResult
848 {
849 dto::RecordStatusResult ret;
850 ret.success = true;
851
852 long savedSnapshots;
853 long totalSnapshots;
854
855 ARMARX_DEBUG << "Get record status";
856
857 longtermMemory->forEachCoreSegment(
858 [&savedSnapshots, &totalSnapshots](const auto& c)
859 {
860 c.forEachProviderSegment(
861 [&savedSnapshots, &totalSnapshots](const auto& p)
862 {
863 p.forEachEntity(
864 [&savedSnapshots, &totalSnapshots](const auto& e)
865 {
866 savedSnapshots += e.getStatistics().recordedSnapshots;
867
868 e.forEachSnapshot([&totalSnapshots](const auto&)
869 { totalSnapshots++; });
870 });
871 });
872 });
873
874 ret.status.savedSnapshots = savedSnapshots;
875 ret.status.totalSnapshots = totalSnapshots;
876
877 return ret;
878 }
879
880 // PREDICTION
881 prediction::data::PredictionResultSeq
882 MemoryToIceAdapter::predict(prediction::data::PredictionRequestSeq requests)
883 {
884 auto res = workingMemory->dispatchPredictions(
885 armarx::fromIce<std::vector<PredictionRequest>>(requests));
887 }
888
889 prediction::data::EngineSupportMap
891 {
892 prediction::data::EngineSupportMap result;
893 armarx::toIce(result, workingMemory->getAllPredictionEngines());
894
895 // Uncomment once LTM also supports prediction engines.
896
897 /*prediction::data::EngineSupportMap ltmMap;
898 armarx::toIce(ltmMap, longtermMemory->getAllPredictionEngines());
899 for (const auto& [memoryID, engines] : ltmMap)
900 {
901 auto entryIter = result.find(memoryID);
902 if (entryIter == result.end())
903 {
904 result.emplace(memoryID, engines);
905 }
906 else
907 {
908 // Merge LTM-supported engines with WM-supported engines, removing duplicates
909 std::set<prediction::data::PredictionEngine> engineSet;
910 engineSet.insert(entryIter->second.begin(), entryIter->second.end());
911 engineSet.insert(engines.begin(), engines.end());
912 entryIter->second.assign(engineSet.begin(), engineSet.end());
913 }
914 }*/
915
916 return result;
917 }
918
919 void
921 {
922 auto debugObserver = longtermMemory ? longtermMemory->getDebugObserver() : nullptr;
923 if (!debugObserver)
924 {
925 return;
926 }
927
928 // Calculate current rates
929 auto [commitsPerSec, queriesPerSec] = statistics.updateRates();
930
931 // Get queue size and async stats from LTM if available
932 size_t queueSize = 0;
933 size_t numThreadsProcessing = 0;
934 uint64_t asyncItemsEnqueued = 0;
935 uint64_t asyncItemsProcessed = 0;
936 uint64_t asyncSnapshotsStored = 0;
937 uint64_t asyncSnapshotsDropped = 0;
938 uint64_t asyncBackpressureEvents = 0;
939 double asyncAvgStorageTimeMs = 0.0;
940 double asyncMaxStorageTimeMs = 0.0;
941
942 if (longtermMemory)
943 {
944 queueSize = longtermMemory->getAsyncQueueSize();
945 numThreadsProcessing = longtermMemory->getNumThreadsProcessing();
946 const auto& asyncStats = longtermMemory->getAsyncStorageStatistics();
947 asyncItemsEnqueued = asyncStats.totalItemsEnqueued.load(std::memory_order_relaxed);
948 asyncItemsProcessed = asyncStats.totalItemsProcessed.load(std::memory_order_relaxed);
949 asyncSnapshotsStored = asyncStats.totalSnapshotsStored.load(std::memory_order_relaxed);
950 asyncSnapshotsDropped = asyncStats.snapshotsDropped.load(std::memory_order_relaxed);
951 asyncBackpressureEvents = asyncStats.backpressureEvents.load(std::memory_order_relaxed);
952 asyncAvgStorageTimeMs = asyncStats.getAvgStorageTimeMs();
953 asyncMaxStorageTimeMs = asyncStats.getMaxStorageTimeMs();
954 }
955
956 // Calculate average blocking times
957 uint64_t commitCount = statistics.totalCommitCount.load(std::memory_order_relaxed);
958 uint64_t queryCount = statistics.totalQueryCount.load(std::memory_order_relaxed);
959 double avgCommitBlockingMs = commitCount > 0
960 ? statistics.totalCommitBlockingTimeMs.load(std::memory_order_relaxed) / commitCount
961 : 0.0;
962 double avgQueryBlockingMs = queryCount > 0
963 ? statistics.totalQueryBlockingTimeMs.load(std::memory_order_relaxed) / queryCount
964 : 0.0;
965
966 std::string channelName = workingMemory ? workingMemory->name() + "Memory" : "Memory";
967 debugObserver->setDebugChannel(
968 channelName,
969 {
970 // Rate metrics
971 {"Memory | writes/sec", new armarx::Variant(static_cast<float>(commitsPerSec))},
972 {"Memory | reads/sec", new armarx::Variant(static_cast<float>(queriesPerSec))},
973 // Count metrics
974 {"Memory | total commits", new armarx::Variant(static_cast<int>(statistics.totalCommitCount.load()))},
975 {"Memory | total queries", new armarx::Variant(static_cast<int>(statistics.totalQueryCount.load()))},
976 {"Memory | total entity updates", new armarx::Variant(static_cast<int>(statistics.totalEntityUpdates.load()))},
977 {"Memory | successful updates", new armarx::Variant(static_cast<int>(statistics.successfulUpdates.load()))},
978 {"Memory | failed updates", new armarx::Variant(static_cast<int>(statistics.failedUpdates.load()))},
979 // Timing metrics
980 {"Memory | Commit | avg blocked [ms]", new armarx::Variant(static_cast<float>(avgCommitBlockingMs))},
981 {"Memory | Commit | max blocked [ms]", new armarx::Variant(static_cast<float>(statistics.maxCommitBlockingTimeMs.load()))},
982 {"Memory | Query | avg blocked [ms]", new armarx::Variant(static_cast<float>(avgQueryBlockingMs))},
983 {"Memory | Query | max blocked [ms]", new armarx::Variant(static_cast<float>(statistics.maxQueryBlockingTimeMs.load()))},
984 // Async storage queue metrics
985 {"Memory | LTM async queue size", new armarx::Variant(static_cast<int>(queueSize))},
986 {"Memory | LTM threads processing", new armarx::Variant(static_cast<int>(numThreadsProcessing))},
987 {"Memory | LTM items enqueued", new armarx::Variant(static_cast<int>(asyncItemsEnqueued))},
988 {"Memory | LTM items processed", new armarx::Variant(static_cast<int>(asyncItemsProcessed))},
989 {"Memory | LTM snapshots stored", new armarx::Variant(static_cast<int>(asyncSnapshotsStored))},
990 {"Memory | LTM snapshots dropped", new armarx::Variant(static_cast<int>(asyncSnapshotsDropped))},
991 {"Memory | LTM backpressure events", new armarx::Variant(static_cast<int>(asyncBackpressureEvents))},
992 {"Memory | LTM avg storage [ms]", new armarx::Variant(static_cast<float>(asyncAvgStorageTimeMs))},
993 {"Memory | LTM max storage [ms]", new armarx::Variant(static_cast<float>(asyncMaxStorageTimeMs))},
994 });
995 }
996
997} // namespace armarx::armem::server
SpamFilterDataPtr deactivateSpam(SpamFilterDataPtr const &spamFilter, float deactivationDurationSec, const std::string &identifier, bool deactivate)
Definition Logging.cpp:75
constexpr T c
The Variant class is described here: Variants.
Definition Variant.h:224
std::string coreSegmentName
Definition MemoryID.h:51
std::string str(bool escapeDelimiters=true) const
Get a string representation of this memory ID.
Definition MemoryID.cpp:102
std::string memoryName
Definition MemoryID.h:50
std::string providerSegmentName
Definition MemoryID.h:52
ProviderSegmentT & getProviderSegment(const std::string &name)
CoreSegmentT * findCoreSegment(const std::string &name)
Definition MemoryBase.h:121
void append(const OtherDerivedT &other)
Merge another memory into this one.
Definition MemoryBase.h:363
std::vector< UpdateResult > update(const Commit &commit, const bool addMissingCoreSegmentDuringUpdate=false, const bool checkMemoryName=true)
Store all updates in commit.
Definition MemoryBase.h:310
void all()
Get all snapshots from all entities in all segments.
Definition Builder.cpp:54
Indicates that a name in a given ID does not match a container's own name.
Definition ArMemError.h:58
Indicates that a container did not have an entry under a given name.
Definition ArMemError.h:75
armem::structure::data::GetServerStructureResult getServerStructure()
void setMemoryListener(client::MemoryListenerInterfacePrx memoryListenerTopic)
dto::StartRecordResult startRecord(const dto::StartRecordInput &startRecordInput)
armem::CommitResult reloadFromLTMOnStartup()
Triggers a reload (.
armem::CommitResult reloadAllFromLTM()
Loads all core segments and their data from the LTM.
query::data::Result queryLTM(const armem::query::data::Input &input, bool storeIntoWM)
Query the LTMs of the memory server.
client::MemoryListenerInterfacePrx memoryListenerTopic
query::data::Result query(const armem::query::data::Input &input)
armem::CommitResult reloadCoreSegmentsFromLTM(std::list< std::string > &coreSegmentname)
Only load specific core segments and their data from the LTM.
prediction::data::PredictionResultSeq predict(prediction::data::PredictionRequestSeq requests)
dto::DirectlyStoreResult directlyStore(const dto::DirectlyStoreInput &directlStoreInput)
void reportDebugMetrics()
Report all debug metrics to the debug observer.
armem::CommitResult reloadPropertyDefinedCoreSegmentsFromLTM()
MemoryToIceAdapter(server::wm::Memory *workingMemory=nullptr, server::ltm::Memory *longtermMemory=nullptr)
Construct an MemoryToIceAdapter from an existing Memory.
data::AddSegmentResult addSegment(const data::AddSegmentInput &input, bool addCoreSegments=false)
data::CommitResult commitLocking(const data::Commit &commitIce, Time timeArrived)
prediction::data::EngineSupportMap getAvailableEngines()
data::AddSegmentsResult addSegments(const data::AddSegmentsInput &input, bool addCoreSegments=false)
data::CommitResult commit(const data::Commit &commitIce, Time timeArrived)
A memory storing data on the hard drive and in mongodb (needs 'armarx memory start' to start the mong...
Definition Memory.h:24
DebugObserverInterfacePrx getDebugObserver() const
Get the current debug observer (may be nullptr)
Definition MemoryBase.h:522
ResultMemoryT process(const armem::query::data::Input &input, const MemoryT &memory) const
ResultMemoryT process(const armem::query::data::Input &input, const MemoryT &memory) const
ProviderSegment & addProviderSegment(const std::string &name, Args... args)
auto doLockedExclusive(FunctionT &&function)
Execute function under exclusive (write) lock.
std::vector< Base::UpdateResult > updateLocking(const Commit &commit)
Perform the commit, locking the core segments.
Client-side working memory.
The AronNotValidException class.
Definition Exception.h:72
static DateTime Now()
Definition DateTime.cpp:51
#define ARMARX_CHECK_NOT_NULL(ptr)
This macro evaluates whether ptr is not null and if it turns out to be false it will throw an Express...
#define ARMARX_INFO
The normal logging level.
Definition Logging.h:181
#define ARMARX_IMPORTANT
The logging level for always important information, but expected behaviour (in contrast to ARMARX_WAR...
Definition Logging.h:190
#define ARMARX_DEBUG
The logging level for output that is only interesting while debugging.
Definition Logging.h:184
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
#define TIMING_START(name)
Helper macro to do timing tests.
Definition TimeUtil.h:289
#define TIMING_END_STREAM(name, os)
Prints duration.
Definition TimeUtil.h:310
@ NoData
Just get the structure, but no ARON data.
Definition DataMode.h:8
DataMode boolToDataMode(bool withData)
Definition DataMode.cpp:6
void fromIce(const data::MemoryID &ice, MemoryID &id)
armarx::core::time::DateTime Time
Commit toCommit(const ContainerT &container)
Definition operations.h:23
void toIce(data::MemoryID &ice, const MemoryID &id)
void fromIce(const std::map< IceKeyT, IceValueT > &iceMap, boost::container::flat_map< CppKeyT, CppValueT > &cppMap)
void toIce(std::map< IceKeyT, IceValueT > &iceMap, const boost::container::flat_map< CppKeyT, CppValueT > &cppMap)
Result of a Commit.
Definition Commit.h:111
std::vector< EntityUpdateResult > results
Definition Commit.h:112
A bundle of updates to be sent to the memory.
Definition Commit.h:90
Result of an EntityUpdate.
Definition Commit.h:75
An update of an entity for a specific point in time.
Definition Commit.h:26
A query for parts of a memory.
Definition Query.h:24
Result of a QueryInput.
Definition Query.h:51
static QueryResult fromIce(const armem::query::data::Result &ice)
Definition Query.cpp:26
#define ARMARX_TRACE
Definition trace.h:77