MemoryBase.h
Go to the documentation of this file.
1#pragma once
2
3#include <functional>
4#include <list>
5#include <map>
6#include <set>
7#include <string>
8#include <vector>
9
10
11// BaseClass
12#include "MemoryItem.h"
13
14// ChildType
15
16// ArmarX
20#include <ArmarXCore/interface/core/UserException.h>
21#include <ArmarXCore/interface/observers/ObserverInterface.h>
23
28#include <RobotAPI/libraries/armem/server/test/ForgettingExperiments.h>
30
31// PendingConversion is defined separately to avoid coupling MemoryBase to BufferedMemoryMixin
32// This allows different storage backends (disk, MongoDB, etc.) to be used independently
33#include "PendingConversion.h"
34
36{
37 enum class RecordingMode
38 {
39 CONSOLIDATE_REMOVED, // only store whats removed from the WM
40 CONSOLIDATE_ALL, // store whats in the WM (on commit)
41 CONSOLIDATE_LATEST // store the latest snapshots at a fixed frequency (for now 1Hz)
42 };
43} // namespace armarx::armem::server::ltm
44
46{
47 /// @brief Interface functions for the longterm memory classes
48 template <class _CoreSegmentT>
49 class MemoryBase : public MemoryItem
50 {
51 public:
63
64 public:
65 using CoreSegmentT = _CoreSegmentT;
66
67 MemoryBase(const std::string& exportName, const MemoryID& id) :
68 MemoryItem(exportName, id), enabled(false)
69 {
70 }
71
72 void
73 setRecordingMode(const std::string& m)
74 {
75 if (m == "CONSOLIDATE_REMOVED")
76 {
78 }
79 else if (m == "CONSOLIDATE_ALL")
80 {
82 }
83 else if (m == "CONSOLIDATE_LATEST")
84 {
86 }
87 else
88 {
89 ARMARX_WARNING << "Unknown recording mode: " << m;
90 }
91 }
92
93 void
95 {
96 this->recordingMode = m;
97 }
98
101 {
102 return recordingMode;
103 }
104
105 /// initialize config
106 void
108 {
109 bool en = p.enabled_on_startup;
110 ARMARX_INFO << VAROUT(p.configuration_on_startup);
111 ARMARX_INFO << VAROUT(p.export_name);
112 ARMARX_INFO << VAROUT(p.export_path);
113
114 this->setExportName(p.export_name);
115 this->setRecordingMode(p.recordingMode);
116
117 try
118 {
119 const auto json = nlohmann::json::parse(p.configuration_on_startup);
120
121 // Processors are shared. So we only need to configure the root
122 processors->configure(json);
123
124 this->_configure(json);
125 }
126 catch (...)
127 {
128 ARMARX_WARNING << "Failed to parse `" << p.configuration_on_startup << "`";
129 en = false;
130 }
131
132 if (en)
133 {
134 this->startRecording();
135 }
136 }
137
138 /// enable this LTM
139 void
141 {
142 processors->resetFilterStatisticsForNewEpisode(); //make sure the statistics are
143 //resetted before the recording starts
145 ARMARX_INFO << "Enabling LTM " << id().str() << " at " << now.toDateTimeString();
146 ARMARX_INFO << "Storing information at " << this->getExportName();
147 enabled = true;
148 if (statistics.firstStart)
149 {
150 statistics.firstStarted = now;
151 statistics.firstStart = false;
152 }
153 statistics.lastStarted = now;
154 this->_enable();
155 }
156
157 /// disable this LTM
158 void
160 {
162 ARMARX_INFO << "Disabling LTM " << id().str() << " at " << now.toDateTimeString();
163 enabled = false;
164 if (statistics.firstStop)
165 {
166 statistics.firstStopped = now;
167 statistics.firstStop = false;
168 }
169 statistics.lastStopped = now;
170 this->_disable();
171 ARMARX_INFO << "Disabling of LTM finished";
172 }
173
174 /// return the full ltm as a wm::Memory with only references
175 /// the ltm may be huge, use with caution
178 {
181 return ret;
182 }
183
184 void
186 {
187 TIMING_START(LTM_Memory_LoadAll);
189 TIMING_END_STREAM(LTM_Memory_LoadAll, ARMARX_DEBUG);
190 }
191
192 //return the newest part of the ltm as a wm::Memory with only references
193 //will return the newest n snapshots of each entity
196 {
198 loadLatestNReferences(n, ret);
199 return ret;
200 }
201
202 //return the newest part of the ltm as a wm::Memory with only references
203 //will return the newest n snapshots of each entity for only the CoreSegments matching one of the names in the list
204 void
205 loadLatestNReferences(int n, armem::wm::Memory& memory, std::list<std::string> coreSegNames)
206 {
207 TIMING_START(LTM_Memory_loadNewestn_someCoreSegs);
208 _loadLatestNReferences(n, memory, coreSegNames);
209 TIMING_END_STREAM(LTM_Memory_loadNewestn_someCoreSegs, ARMARX_DEBUG);
210 }
211
212 void
214 {
215 TIMING_START(LTM_Memory_loadNewestn);
217 TIMING_END_STREAM(LTM_Memory_loadNewestn, ARMARX_DEBUG);
218 }
219
220 /// return the full ltm as a wm::Memory and resolves the references
221 /// the ltm may be huge, use with caution
224 {
227 return ret;
228 }
229
230 void
232 {
233 TIMING_START(LTM_Memory_LoadAllAndResolve);
236 TIMING_END_STREAM(LTM_Memory_LoadAllAndResolve, ARMARX_DEBUG);
237 }
238
239 void
240 loadAllAndResolve(armem::wm::Memory& memory, std::list<std::string> coreSegmentNames)
241 {
242 TIMING_START(LTM_Memory_LoadAllAndResolve);
243 _loadAllReferences(memory, coreSegmentNames);
245 TIMING_END_STREAM(LTM_Memory_LoadAllAndResolve, ARMARX_DEBUG);
246 }
247
248 /// convert the references of the input into a wm::Memory
249 void
251 {
252 TIMING_START(LTM_Memory_Load);
254 TIMING_END_STREAM(LTM_Memory_Load, ARMARX_DEBUG);
255 }
256
257 /// append a wm::Memory instance to the ltm (asynchronously via thread pool)
258 void
259 store(std::shared_ptr<const armem::wm::Memory> memory)
260 {
261 TIMING_START(LTM_Memory_Append);
262
263 IceUtil::Time startTime;
264 if (debugObserver)
265 {
266 startTime = IceUtil::Time::now();
267 }
268
269 for (auto& f : processors->memFilters)
270 {
271 if (!f->accept(*memory))
272 {
274 << deactivateSpam(60)
275 << "Ignoring to put a Memory into the LTM because it got filtered.";
276 return;
277 }
278 }
279 // Store asynchronously using the thread pool in BufferedMemoryMixin
280 // No copy needed - just move the shared_ptr
282
283 if (debugObserver)
284 {
285 IceUtil::Time endTime = IceUtil::Time::now();
286 IceUtil::Time elapsed = endTime - startTime;
287 float elapsedMs = elapsed.toMilliSecondsDouble();
288
289 std::string channelName = id().memoryName + "Memory_LTM";
290 debugObserver->setDebugChannel(
291 channelName,
292 {
293 {"LTM | t store [ms]", new Variant(elapsedMs)},
294 });
295 }
296
297 TIMING_END_STREAM(LTM_Memory_Append, ARMARX_DEBUG);
298 }
299
300 /// append a wm::Memory instance to the ltm
301 void
302 store(const armem::server::wm::Memory& serverMemory)
303 {
304 auto memory = std::make_shared<armem::wm::Memory>(serverMemory.name());
305 memory->update(armem::toCommit(serverMemory), true, false);
306 this->store(std::move(memory));
307 }
308
309 /// Store snapshots with deferred conversion (happens in async thread)
310 /// This is more efficient than calling store() with a pre-converted Memory
311 /// because the expensive toMemory() conversion happens off the critical path
312 ///
313 /// IMPORTANT: This method does NOT access the WM structure to avoid deadlocks.
314 /// The caller must pre-extract metadata before calling this method.
315 void
316 storeSnapshotsAsync(const std::string& memoryName,
317 const std::vector<wm::EntitySnapshot>& snapshots,
318 const std::vector<mixin::PendingConversion::SegmentMetadata>& segmentMetadata)
319 {
320 if (!enabled)
321 {
322 return;
323 }
324
325 // Pre-filter snapshots before enqueuing to reduce queue pressure
326 std::vector<wm::EntitySnapshot> filteredSnapshots;
327 if (processors && !processors->snapFilters.empty())
328 {
329 filteredSnapshots.reserve(snapshots.size());
330 for (const auto& snapshot : snapshots)
331 {
332 bool accepted = true;
333 for (auto& filter : processors->snapFilters)
334 {
335 if (!filter->accept(snapshot, false))
336 {
337 accepted = false;
338 break;
339 }
340 }
341 if (accepted)
342 {
343 filteredSnapshots.push_back(snapshot);
344 }
345 }
346
347 // If all snapshots were filtered, skip enqueuing
348 if (filteredSnapshots.empty())
349 {
350 ARMARX_DEBUG << deactivateSpam(10) << "All " << snapshots.size()
351 << " snapshots were pre-filtered in storeSnapshotsAsync";
352 return;
353 }
354 }
355 else
356 {
357 // No filters, use all snapshots
358 filteredSnapshots = snapshots;
359 }
360
361 // Build pending conversion with pre-filtered snapshots
363 pending.memoryName = memoryName;
364 pending.snapshots = std::move(filteredSnapshots);
365 pending.segmentMetadata = segmentMetadata;
366
367 // Enqueue for async conversion and storage
368 this->_enqueuePendingConversion(std::move(pending));
369 }
370
371 /// iterate over all core segments of this ltm
372 bool
373 forEachCoreSegment(std::function<void(CoreSegmentT&)> func) const
374 {
375 return _implForEachCoreSegment(func);
376 }
377
378 /// check if core segment exists
379 bool
380 hasCoreSegment(const std::string& coreSegmentName) const
381 {
382 return _implHasCoreSegment(coreSegmentName);
383 }
384
385 /// find core segment
386 std::shared_ptr<CoreSegmentT>
387 findCoreSegment(const std::string& coreSegmentName) const
388 {
389 return _implFindCoreSegment(coreSegmentName);
390 }
391
392 /// default parameters. Implementation should use the configuration to configure
393 virtual void
394 createPropertyDefinitions(PropertyDefinitionsPtr& defs, const std::string& prefix)
395 {
396 defs->optional(p.enabled_on_startup, prefix + "enabled");
397 defs->optional(p.recordingMode, prefix + "recordingMode");
398 defs->optional(p.configuration_on_startup, prefix + "configuration");
399 defs->optional(p.export_name, prefix + "exportName");
400 defs->optional(p.storeOnStop, prefix + "storeOnStop");
401 defs->optional(p.export_path, prefix + "exportPath");
402
403 defs->optional(p.importOnStartUp, prefix + "importOnStartUp");
404 defs->optional(p.maxAmountOfSnapshotsLoaded, prefix + "maxAmountSnapshotsLoaded");
405 defs->optional(p.coreSegmentsToLoad, prefix + "loadedCoreSegments");
406
407 defs->optional(p.persistenceStrategyIdentifier,
408 prefix + "persistenceStrategyIdentifier");
409 defs->optional(p.persistenceStrategies, prefix + "persistenceStrategies");
410 defs->optional(p.restHost, prefix + "rest.host");
411 defs->optional(p.restPort, prefix + "rest.port");
412 defs->optional(p.restDisableIfNotAvailable, prefix + "rest.disableIfNotAvailable");
413
414 defs->optional(p.minDiskSpace, prefix + "minDiskSpace");
415
416 // Batch write configuration
417 defs->optional(p.batchWriteEnabled, prefix + "batchWriteEnabled",
418 "Enable batch writing for improved I/O throughput");
419 defs->optional(p.batchSizeThreshold, prefix + "batchSizeThreshold",
420 "Number of items to accumulate before flushing to disk");
421 defs->optional(p.batchTimeThresholdMs, prefix + "batchTimeThresholdMs",
422 "Maximum time in ms to hold items before flushing");
423 }
424
425 /// enable/disable
426 void
428 {
430
431 enable();
432 }
433
434 void
436 {
437
438 disable();
439 }
440
441 bool
443 {
444 return enabled;
445 }
446
447 /// statistics
448 virtual void
450 {
451 // enabled stays the same
453 statistics.recordedCoreSegments = 0;
454 }
455
456 Statistics
458 {
459 return statistics;
460 }
461
462 std::map<std::string, processor::SnapshotFilter::FilterStatistics>
464 {
465 try
466 {
467 ARMARX_INFO << "Trying to save statistics";
468 auto stats = processors->getSnapshotFilterStatistics();
469 return stats;
470 }
471 catch (InvalidArgumentException& e)
472 { //no data was actually recorded
473 ARMARX_INFO << e.what();
474 }
475 catch (...)
476 {
477 ARMARX_WARNING << "Saving statistics did not work";
478 }
479 std::map<std::string, processor::SnapshotFilter::FilterStatistics> emptyStatistics;
480 return emptyStatistics;
481 }
482
483 /// get level name1
484 static std::string
486 {
487 return "LT-Memory";
488 }
489
490 void
492 {
494 }
495
496 /**
497 * @brief Set an optional debug observer for timing measurements
498 * @param observer The debug observer proxy (can be nullptr to disable)
499 *
500 * This allows optional timing reporting to a debug observer.
501 * If set, you can report timing information like:
502 *
503 * @code
504 * if (debugObserver)
505 * {
506 * debugObserver->setDebugChannel("ChannelName", {
507 * {"MetricName [ms]", new Variant(timeInMs)}
508 * });
509 * }
510 * @endcode
511 */
512 void
514 {
515 debugObserver = observer;
516 }
517
518 /**
519 * @brief Get the current debug observer (may be nullptr)
520 */
523 {
524 return debugObserver;
525 }
526
527
528 protected:
529 /// configuration
530 virtual void
531 _configure(const nlohmann::json&)
532 {
533 }
534
535 virtual void
537 {
538 }
539
540 virtual void
542 {
543 }
544
545 virtual void
546 _setExportName(const std::string&)
547 {
548 }
549
552 std::list<std::string> coreSegNames) = 0;
554 virtual void _store(const armem::wm::Memory& memory) = 0;
556 bool simulatedVersion = false) = 0;
557 virtual void _loadOnStartup() = 0;
559 virtual void _loadLatestNReferences(int n,
561 std::list<std::string> coreSegNames) = 0;
562 virtual void _enqueueForAsyncStorage(std::shared_ptr<const armem::wm::Memory> memory) = 0;
564
565 // Implementation methods - subclasses override these
566 // Thread safety is handled by DiskPersistence's per-directory mutexes
567 virtual bool _implForEachCoreSegment(std::function<void(CoreSegmentT&)> func) const = 0;
568 virtual bool _implHasCoreSegment(const std::string& coreSegmentName) const = 0;
569 virtual std::shared_ptr<CoreSegmentT> _implFindCoreSegment(const std::string& coreSegmentName) const = 0;
570
571 public:
572 // stuff for scenario parameters
574 {
575 // whether the LTM is enabled on startup
576 bool enabled_on_startup = false;
577
578 // How data is stored, if the LTM is enabled
579 std::string recordingMode = "CONSOLIDATE_REMOVED";
580
581 // Name and export path (TODO: Should this be part of the base class? Export path is a disk memory thing)
582 std::string export_name = "MemoryExport";
583 std::string export_path = "/tmp/ltm";
584 bool storeOnStop = false;
585 int minDiskSpace = 50; //GB
586
587 // TODO: belong more to WM, but no good solution for that currently:
588 bool importOnStartUp = false;
590 std::string coreSegmentsToLoad = "";
591
592 // Other configuration
594 "{ \"SnapshotFrequencyFilter\": "
595 "{\"WaitingTimeInMsForFilter\" : 50}}"; //record with 20 fps as standard
596
597 std::string persistenceStrategies = "disk";
598 std::string persistenceStrategyIdentifier = "localDisk";
599 std::string restHost = "localhost";
600 int restPort = 8080;
602
604
605 // Batch write configuration for disk persistence
606 bool batchWriteEnabled = false; // Enable batch writing for improved I/O throughput
607 int batchSizeThreshold = 100; // Flush batch when it reaches this many items
608 int batchTimeThresholdMs = 100; // Flush batch after this many milliseconds
609
610 } p;
611
612 protected:
613 mutable Statistics statistics;
614
615 std::atomic_bool enabled = true;
616
618
619 /// Optional debug observer for timing measurements
621 };
622} // namespace armarx::armem::server::ltm::detail
SpamFilterDataPtr deactivateSpam(SpamFilterDataPtr const &spamFilter, float deactivationDurationSec, const std::string &identifier, bool deactivate)
Definition Logging.cpp:75
#define VAROUT(x)
The Variant class is described here: Variants.
Definition Variant.h:224
std::string str(bool escapeDelimiters=true) const
Get a string representation of this memory ID.
Definition MemoryID.cpp:102
std::string memoryName
Definition MemoryID.h:50
virtual bool _implHasCoreSegment(const std::string &coreSegmentName) const =0
virtual void _resolve(armem::wm::Memory &memory)=0
void loadLatestNReferences(int n, armem::wm::Memory &memory)
Definition MemoryBase.h:213
virtual void _directlyStore(const armem::wm::Memory &memory, bool simulatedVersion=false)=0
void loadLatestNReferences(int n, armem::wm::Memory &memory, std::list< std::string > coreSegNames)
Definition MemoryBase.h:205
DebugObserverInterfacePrx getDebugObserver() const
Get the current debug observer (may be nullptr)
Definition MemoryBase.h:522
virtual void _loadAllReferences(armem::wm::Memory &memory, std::list< std::string > coreSegNames)=0
void setRecordingMode(const std::string &m)
Definition MemoryBase.h:73
virtual void createPropertyDefinitions(PropertyDefinitionsPtr &defs, const std::string &prefix)
default parameters. Implementation should use the configuration to configure
Definition MemoryBase.h:394
std::shared_ptr< CoreSegmentT > findCoreSegment(const std::string &coreSegmentName) const
find core segment
Definition MemoryBase.h:387
virtual void _enqueuePendingConversion(mixin::PendingConversion pending)=0
bool forEachCoreSegment(std::function< void(CoreSegmentT &)> func) const
iterate over all core segments of this ltm
Definition MemoryBase.h:373
void store(std::shared_ptr< const armem::wm::Memory > memory)
append a wm::Memory instance to the ltm (asynchronously via thread pool)
Definition MemoryBase.h:259
virtual void _setExportName(const std::string &)
Definition MemoryBase.h:546
void setDebugObserver(DebugObserverInterfacePrx observer)
Set an optional debug observer for timing measurements.
Definition MemoryBase.h:513
void loadAllAndResolve(armem::wm::Memory &memory, std::list< std::string > coreSegmentNames)
Definition MemoryBase.h:240
armem::wm::Memory loadLatestNReferences(int n)
Definition MemoryBase.h:195
armem::wm::Memory loadAllReferences()
return the full ltm as a wm::Memory with only references the ltm may be huge, use with caution
Definition MemoryBase.h:177
void storeSnapshotsAsync(const std::string &memoryName, const std::vector< wm::EntitySnapshot > &snapshots, const std::vector< mixin::PendingConversion::SegmentMetadata > &segmentMetadata)
Store snapshots with deferred conversion (happens in async thread) This is more efficient than callin...
Definition MemoryBase.h:316
virtual void _store(const armem::wm::Memory &memory)=0
virtual std::shared_ptr< CoreSegmentT > _implFindCoreSegment(const std::string &coreSegmentName) const =0
virtual void _loadLatestNReferences(int n, armem::wm::Memory &memory, std::list< std::string > coreSegNames)=0
void setRecordingMode(const RecordingMode m)
Definition MemoryBase.h:94
virtual void _enqueueForAsyncStorage(std::shared_ptr< const armem::wm::Memory > memory)=0
static std::string getLevelName()
get level name1
Definition MemoryBase.h:485
bool hasCoreSegment(const std::string &coreSegmentName) const
check if core segment exists
Definition MemoryBase.h:380
armem::wm::Memory loadAllAndResolve()
return the full ltm as a wm::Memory and resolves the references the ltm may be huge,...
Definition MemoryBase.h:223
MemoryBase(const std::string &exportName, const MemoryID &id)
Definition MemoryBase.h:67
virtual void _loadAllReferences(armem::wm::Memory &memory)=0
virtual void _configure(const nlohmann::json &)
configuration
Definition MemoryBase.h:531
virtual bool _implForEachCoreSegment(std::function< void(CoreSegmentT &)> func) const =0
struct armarx::armem::server::ltm::detail::MemoryBase::Properties p
void loadAllReferences(armem::wm::Memory &memory)
Definition MemoryBase.h:185
void resolve(armem::wm::Memory &memory)
convert the references of the input into a wm::Memory
Definition MemoryBase.h:250
void store(const armem::server::wm::Memory &serverMemory)
append a wm::Memory instance to the ltm
Definition MemoryBase.h:302
virtual void _loadLatestNReferences(int n, armem::wm::Memory &memory)=0
void loadAllAndResolve(armem::wm::Memory &memory)
Definition MemoryBase.h:231
std::map< std::string, processor::SnapshotFilter::FilterStatistics > getFilterStatistics()
Definition MemoryBase.h:463
virtual std::string getExportName() const
Definition MemoryItem.h:27
std::shared_ptr< Processors > processors
Definition MemoryItem.h:54
MemoryItem(const std::string &exportName, const MemoryID &)
Client-side working memory.
Represents a point in time.
Definition DateTime.h:25
static DateTime Now()
Definition DateTime.cpp:51
static DateTime Invalid()
Definition DateTime.cpp:57
Brief description of class memory.
Definition memory.h:39
#define ARMARX_INFO
The normal logging level.
Definition Logging.h:181
#define ARMARX_DEBUG
The logging level for output that is only interesting while debugging.
Definition Logging.h:184
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
#define TIMING_START(name)
Helper macro to do timing tests.
Definition TimeUtil.h:289
#define TIMING_END_STREAM(name, os)
Prints duration.
Definition TimeUtil.h:310
Commit toCommit(const ContainerT &container)
Definition operations.h:23
::IceInternal::ProxyHandle<::IceProxy::armarx::DebugObserverInterface > DebugObserverInterfacePrx
IceUtil::Handle< class PropertyDefinitionContainer > PropertyDefinitionsPtr
PropertyDefinitions smart pointer type.
Holds snapshots and metadata for deferred conversion in async thread This allows us to defer the expe...