Memory.cpp
Go to the documentation of this file.
1#include "Memory.h"
2
3#include <memory>
4
7
14
16{
17 void
18 Memory::_configure(const nlohmann::json& json)
19 {
21
24
25 std::string exportPath = p.export_path;
26
27 std::string persistenceStrategiesStr = p.persistenceStrategies;
28
29 ARMARX_INFO << "Persistence strategies=" << persistenceStrategiesStr;
30
31 std::vector<std::string> persistenceStrategies = split(persistenceStrategiesStr, ',');
32
33 ARMARX_INFO << "Found " << persistenceStrategies.size() << " persistence strategies";
34
35
36 std::string identifiersStr = p.persistenceStrategyIdentifier;
37 std::vector<std::string> identifiers = split(identifiersStr, ',');
38 std::string identifier = "defaultStrategy";
39 ARMARX_INFO << "Persistence identifiers=" << identifiersStr;
40 ARMARX_INFO << "Found " << identifiers.size() << " persistence strategies identifiers";
41
42
43 std::shared_ptr<armem::server::ltm::persistence::RedundantPersistenceStrategy>
44 redundantPersistence =
45 std::make_shared<armem::server::ltm::persistence::RedundantPersistenceStrategy>();
46 redundantPersistence->setExportName(getExportName());
47
48 for (size_t i = 0; i < persistenceStrategies.size(); i++)
49 {
50 std::string s = persistenceStrategies.at(i);
51
52 if (i < identifiers.size())
53 {
54 identifier = identifiers.at(i);
55 }
56
57 if (s == "disk")
58 {
59 ARMARX_IMPORTANT << "Using persistence strategy=" << s;
60 ARMARX_INFO << "MemoryID=" << getMemoryID().str();
61 ARMARX_INFO << "Identifier=" << identifier;
62 ARMARX_INFO << "Export name=" << getExportName();
63 ARMARX_INFO << "Export path=" << exportPath;
64 ARMARX_INFO << "Min available disk space=" << p.minDiskSpace;
65
66 std::shared_ptr<armem::server::ltm::persistence::DiskPersistence> diskPersistence =
67 std::make_shared<armem::server::ltm::persistence::DiskPersistence>(
68 std::filesystem::path(exportPath));
69 diskPersistence->setIdentifier(identifier);
70 diskPersistence->setExportName(getExportName());
71 diskPersistence->setMinAvailableDiskSpace(p.minDiskSpace);
72
73 // Configure batch writing if enabled
74 if (p.batchWriteEnabled)
75 {
76 diskPersistence->setBatchSizeThreshold(static_cast<size_t>(p.batchSizeThreshold));
77 diskPersistence->setBatchTimeThresholdMs(static_cast<size_t>(p.batchTimeThresholdMs));
78 diskPersistence->setBatchWriteEnabled(true);
79 ARMARX_INFO << "Batch write mode enabled: threshold=" << p.batchSizeThreshold
80 << " items or " << p.batchTimeThresholdMs << "ms";
81 }
82
83 redundantPersistence->addStrategy(diskPersistence);
84 }
85 else if (s == "rest")
86 {
87 ARMARX_IMPORTANT << "Using persistence strategy=" << s;
88 ARMARX_WARNING << "Persistence strategy=" << s << " currently deactivated";
89 /**
90 ARMARX_INFO << "MemoryID=" << getMemoryID().str();
91 ARMARX_INFO << "Identifier=" << identifier;
92 ARMARX_INFO << "Export name=" << getExportName();
93 ARMARX_INFO << "Host=" << p.restHost;
94 ARMARX_INFO << "Port=" << p.restPort;
95 ARMARX_INFO << "DisableIfNotAvailable=" << p.restDisableIfNotAvailable;
96
97 std::shared_ptr<armem::server::ltm::persistence::RestPersistence> restPersistence =
98 std::make_shared<armem::server::ltm::persistence::RestPersistence>(
99 identifier,
100 getExportName(),
101 p.restHost,
102 p.restPort,
103 p.restDisableIfNotAvailable);
104
105
106 redundantPersistence->addStrategy(restPersistence);
107 */
108 }
109 else if (s == "mongodb")
110 {
111 ARMARX_IMPORTANT << "Using persistence strategy=" << s;
112 ARMARX_WARNING << "Persistence strategy=" << s << " not implemented";
113 }
114 else
115 {
116 ARMARX_WARNING << "Persistence strategy=" << s << " unknown";
117 }
118 }
119
120 setPersistenceStrategy(redundantPersistence);
121 }
122
123 Memory::Memory() : Memory("MemoryExport", "Test")
124 {
125 }
126
127 Memory::Memory(const std::string& exportName, const std::string& memoryName) :
128 MemoryBase(exportName, MemoryID(memoryName, "")),
129 BufferedBase(MemoryID(memoryName, "")),
130 CachedBase(MemoryID(memoryName, "")),
131 persistenceStrategy_(std::make_shared<persistence::RedundantPersistenceStrategy>())
132 {
133 }
134
136 const std::string& exportName,
137 const std::string& memoryName,
138 const std::shared_ptr<persistence::RedundantPersistenceStrategy>& persistenceStrategy) :
139 MemoryBase(exportName, MemoryID(memoryName, "")),
140 BufferedBase(MemoryID(memoryName, "")),
141 CachedBase(MemoryID(memoryName, "")),
142 persistenceStrategy_(persistenceStrategy)
143 {
144 }
145
146 void
147 Memory::_setExportName(const std::string& memoryName)
148 {
150
151 if (persistenceStrategy_)
152 {
153 persistenceStrategy_->setExportName(memoryName);
154 }
155 }
156
157 void
159 {
161
163 }
164
165 void
167 {
169
171 ARMARX_IMPORTANT << "Storing of data finished, starting to generate and save statistics...";
173 }
174
175 void
177 {
179
180 ARMARX_DEBUG << "Setting memory=" << memoryId.memoryName
181 << ", id=" << memoryId.cleanID().str();
182
185 }
186
187 bool
188 Memory::_implForEachCoreSegment(std::function<void(CoreSegment&)> func) const
189 {
191
192 ARMARX_DEBUG << "For each core segment (mem id=" << id().cleanID().str() << ")";
193
194
195 for (auto& core_segment : persistenceStrategy_->getContainerKeys(id()))
196 {
197 ARMARX_DEBUG << "Found core segment=" << core_segment;
198
199 std::shared_ptr<persistence::MemoryPersistenceStrategy> coreSegmentPersistenceStrategy(
200 persistenceStrategy_);
201
203 id().withCoreSegmentName(core_segment),
205 coreSegmentPersistenceStrategy);
206 func(c);
207 }
208
209
210 return true;
211 }
212
213 bool
214 Memory::_implHasCoreSegment(const std::string& coreSegmentName) const
215 {
217
218 bool foundCoreSegment = cacheHasCoreSegment(coreSegmentName) ||
219 persistenceStrategy_->containsContainer(id(), coreSegmentName);
220
221 return foundCoreSegment;
222 }
223
224 std::shared_ptr<CoreSegment>
225 Memory::_implFindCoreSegment(const std::string& coreSegmentName) const
226 {
228
229 if (!_implHasCoreSegment(coreSegmentName)) // Call _impl version
230 {
231 return nullptr;
232 }
233
234 std::shared_ptr<persistence::MemoryPersistenceStrategy> coreSegmentPersistenceStrategy(
235 persistenceStrategy_);
236
237 return std::make_shared<CoreSegment>(getExportName(),
238 id().withCoreSegmentName(coreSegmentName),
240 coreSegmentPersistenceStrategy);
241 }
242
243 void
245 {
247
250 }
251
252 void
254 {
256
257 wmMemory.id() = id().getMemoryID().cleanID();
258
259 ARMARX_DEBUG << "Memory: Load all references (id=" << wmMemory.id().cleanID().str() << ")";
260
262 [&wmMemory](auto& ltmCoreSegment)
263 {
264 armem::wm::CoreSegment wmCoreSegment;
265 ltmCoreSegment.loadAllReferences(wmCoreSegment);
266
267 // Because there might be multiple LTMs providers with the same provider segment
268 if (wmMemory.hasCoreSegment(wmCoreSegment.name()))
269 {
270 armem::wm::CoreSegment existingWmCoreSegment =
271 wmMemory.getCoreSegment(wmCoreSegment.name());
272 existingWmCoreSegment.append(wmCoreSegment);
273 }
274 else
275 {
276 wmMemory.addCoreSegment(wmCoreSegment);
277 }
278 });
279 }
280
281 void
282 Memory::_loadAllReferences(armem::wm::Memory& wmMemory, std::list<std::string> coreSegmentNames)
283 {
284 //TODO
286
287 wmMemory.id() = id().getMemoryID().cleanID();
288
289 ARMARX_DEBUG << "Memory: Load all references (id=" << wmMemory.id().cleanID().str() << ")";
290
292 [&wmMemory, &coreSegmentNames](auto& ltmCoreSegment)
293 {
294 bool loadCoreSeg =
295 (std::find(coreSegmentNames.begin(),
296 coreSegmentNames.end(),
297 ltmCoreSegment.id().coreSegmentName) != coreSegmentNames.end());
298 if (loadCoreSeg)
299 {
300 armem::wm::CoreSegment wmCoreSegment;
301 ltmCoreSegment.loadAllReferences(wmCoreSegment);
302
303 // Because there might be multiple LTMs providers with the same provider segment
304 if (wmMemory.hasCoreSegment(wmCoreSegment.name()))
305 {
306 armem::wm::CoreSegment existingWmCoreSegment =
307 wmMemory.getCoreSegment(wmCoreSegment.name());
308 existingWmCoreSegment.append(wmCoreSegment);
309 }
310 else
311 {
312 wmMemory.addCoreSegment(wmCoreSegment);
313 }
314 }
315 else
316 {
317 ARMARX_DEBUG << "Skipping loading CoreSegment with name "
318 << wmMemory.id().coreSegmentName
319 << " from LTM into WM as it is not in the defined list";
320 }
321 });
322 }
323
324 void
326 {
328 wmMemory.id() = id().getMemoryID().cleanID();
329
330 ARMARX_DEBUG << "Memory: Load latest N references for all core segments (id="
331 << wmMemory.id().cleanID().str() << ")";
332
334 [&wmMemory, &n](auto& ltmCoreSegment)
335 {
336 armem::wm::CoreSegment wmCoreSegment;
337 ltmCoreSegment.loadLatestNReferences(n, wmCoreSegment);
338
339 // Because there might be multiple LTMs providers with the same provider segment
340 if (wmMemory.hasCoreSegment(wmCoreSegment.name()))
341 {
342 armem::wm::CoreSegment existingWmCoreSegment =
343 wmMemory.getCoreSegment(wmCoreSegment.name());
344 existingWmCoreSegment.append(wmCoreSegment);
345 }
346 else
347 {
348 wmMemory.addCoreSegment(wmCoreSegment);
349 }
350 });
351 }
352
353 void
355 armem::wm::Memory& wmMemory,
356 std::list<std::string> coreSegNames)
357 {
359 wmMemory.id() = id().getMemoryID().cleanID();
360
361 ARMARX_DEBUG << "Memory: Load latest references for set of core segments (id="
362 << wmMemory.id().cleanID().str() << ")";
363
365 [&wmMemory, &n, &coreSegNames](auto& ltmCoreSegment)
366 {
367 bool loadCoreSeg =
368 (std::find(coreSegNames.begin(),
369 coreSegNames.end(),
370 ltmCoreSegment.id().coreSegmentName) != coreSegNames.end());
371 if (loadCoreSeg)
372 {
373 ARMARX_DEBUG << "Load core segment=" << ltmCoreSegment.id().coreSegmentName;
374 armem::wm::CoreSegment wmCoreSegment;
375 ltmCoreSegment.loadLatestNReferences(n, wmCoreSegment);
376
377 // Because there might be multiple LTMs providers with the same provider segment
378 if (wmMemory.hasCoreSegment(wmCoreSegment.name()))
379 {
380 armem::wm::CoreSegment existingWmCoreSegment =
381 wmMemory.getCoreSegment(wmCoreSegment.name());
382 existingWmCoreSegment.append(wmCoreSegment);
383 }
384 else
385 {
386 wmMemory.addCoreSegment(wmCoreSegment);
387 }
388 }
389 else
390 {
391 ARMARX_DEBUG << "Skipping loading CoreSegment with name "
392 << wmMemory.id().coreSegmentName
393 << " from LTM into WM as it is not in the defined list";
394 }
395 });
396 }
397
398 void
400 {
402
403 ARMARX_DEBUG << "Resolve memory id=" << id().cleanID().str();
404
405 wmMemory.forEachCoreSegment(
406 [&](auto& wmCoreSegment)
407 {
408 std::shared_ptr<persistence::MemoryPersistenceStrategy>
409 coreSegmentPersistenceStrategy(persistenceStrategy_);
410
411 CoreSegment ltmCoreSegment(
413 id().withCoreSegmentName(wmCoreSegment.id().coreSegmentName),
415 coreSegmentPersistenceStrategy);
416
417 ltmCoreSegment.resolve(wmCoreSegment);
418 });
419 }
420
421 void
423 {
426 }
427
428 void
429 Memory::_directlyStore(const armem::wm::Memory& wmMemory, bool simulatedVersion)
430 {
432
433 if (id().memoryName.empty())
434 {
436 << "During storage of memory '" << wmMemory.id().str()
437 << "' I noticed that the corresponding LTM has no id set. "
438 << "I set the id of the LTM to the same name, however this should not happen!";
439 setMemoryID(wmMemory.id());
440 }
441
442 ARMARX_DEBUG << "Directly store memory id=" << id().cleanID().str();
443 ARMARX_DEBUG << "CoreSegments: " << wmMemory.getCoreSegmentNames().size();
444
445 wmMemory.forEachCoreSegment(
446 [&](const auto& wmCoreSegment)
447 {
448 std::shared_ptr<persistence::MemoryPersistenceStrategy>
449 coreSegmentPersistenceStrategy(persistenceStrategy_);
450
451 CoreSegment ltmCoreSegment(
453 id().withCoreSegmentName(wmCoreSegment.id().coreSegmentName),
455 coreSegmentPersistenceStrategy);
456
457 // 2. store data
458 ltmCoreSegment.store(wmCoreSegment, simulatedVersion);
459
460
461 // 3. update statistics
462 statistics.recordedCoreSegments++;
463 });
464
465 // 4. update cache
466 //CachedBase::addToCache(memory);
467 }
468
469 std::shared_ptr<armem::wm::Memory>
471 uint64_t& filteredCount,
472 uint64_t& passedCount)
473 {
475
476 filteredCount = 0;
477 passedCount = 0;
478
479 // If no filters configured, return a copy of the original memory
480 if (!processors || processors->snapFilters.empty())
481 {
482 // Count all snapshots as passed
483 memory.forEachCoreSegment([&](const auto& cs) {
484 cs.forEachProviderSegment([&](const auto& ps) {
485 ps.forEachEntity([&](const auto& entity) {
486 entity.forEachSnapshot([&](const auto&) {
487 passedCount++;
488 });
489 });
490 });
491 });
492
493 // Return a copy wrapped in shared_ptr
494 return std::make_shared<armem::wm::Memory>(memory);
495 }
496
497 // Create a new memory for filtered snapshots
498 auto filteredMemory = std::make_shared<armem::wm::Memory>(memory.id());
499
500 // Iterate through the memory hierarchy and apply filters
501 memory.forEachCoreSegment([&](const auto& wmCoreSegment) {
502 wmCoreSegment.forEachProviderSegment([&](const auto& wmProviderSegment) {
503 wmProviderSegment.forEachEntity([&](const auto& wmEntity) {
504 wmEntity.forEachSnapshot([&](const auto& wmSnapshot) {
505 // Apply all snapshot filters
506 bool accepted = true;
507 for (auto& filter : processors->snapFilters)
508 {
509 if (!filter->accept(wmSnapshot, false))
510 {
511 accepted = false;
512 break;
513 }
514 }
515
516 if (accepted)
517 {
518 // Add snapshot to filtered memory
519 // Need to ensure the hierarchy exists
520 auto& cs = filteredMemory->hasCoreSegment(wmCoreSegment.name())
521 ? filteredMemory->getCoreSegment(wmCoreSegment.name())
522 : filteredMemory->addCoreSegment(wmCoreSegment.name());
523
524 auto& ps = cs.hasProviderSegment(wmProviderSegment.name())
525 ? cs.getProviderSegment(wmProviderSegment.name())
526 : cs.addProviderSegment(wmProviderSegment.name());
527
528 auto& entity = ps.hasEntity(wmEntity.name())
529 ? ps.getEntity(wmEntity.name())
530 : ps.addEntity(wmEntity.name());
531
532 entity.addSnapshot(wmSnapshot);
533 passedCount++;
534 }
535 else
536 {
537 filteredCount++;
538 }
539 });
540 });
541 });
542 });
543
544 ARMARX_DEBUG << "Pre-filter: " << passedCount << " passed, "
545 << filteredCount << " filtered out";
546
547 return filteredMemory;
548 }
549
550 void
552 {
553 //not used any more
554 }
555
556 void
557 Memory::_enqueueForAsyncStorage(std::shared_ptr<const armem::wm::Memory> memory)
558 {
560 // Forward to the BufferedMemoryMixin's async storage queue
562 }
563
564 void
566 {
568 // Forward to the BufferedMemoryMixin's async storage queue
569 BufferedBase::enqueuePendingConversion(std::move(pending));
570 }
571
572 void
574 {
576 auto firstTimeStarted = this->statistics.firstStarted;
577 if (!firstTimeStarted.isValid())
578 {
579 //No statistics to be saved, as recording was never started
580 ARMARX_DEBUG << "No Statistics will be saved because firstStarted is invalid: "
581 << VAROUT(this->statistics.firstStarted);
582 return;
583 }
584
585 ARMARX_DEBUG << "Preparing to save statistics for " << this->name();
586 try
587 {
588 auto first_stats = this->getFilterStatistics();
589 if (first_stats.empty())
590 {
591 //empty statistics mean no data was recorded and no statistics should be saved
592 ARMARX_DEBUG << "No Statistics will be saved because no actual data was recorded.";
593 return;
594 }
595 std::map<std::string,
596 std::map<std::string, ltm::processor::SnapshotFilter::FilterStatistics>>
597 information;
598 std::map<std::string, armarx::core::time::DateTime> times;
599
600 try
601 {
602 times["Started LTM1"] = this->getStatistics().lastStarted;
603 times["Stopped LTM1"] = this->getStatistics().lastStopped;
604 information["LTM"] = first_stats;
605 }
606 catch (...)
607 {
608 ARMARX_DEBUG << "Something went wrong after getting the statistics";
609 }
610 // auto exportPath = this->getMemoryBasePath();
611 auto exportName = this->getExportName();
612 auto recording_started = this->getStatistics().lastStarted;
613 auto recording_stopped = this->getStatistics().lastStopped;
614 // test::save_statistics(information,
615 // times,
616 // recording_started,
617 // recording_stopped,
618 // exportPath,
619 // exportName,
620 // this->name());
621 }
622 catch (...)
623 {
624 ARMARX_DEBUG << "Something went wrong with the statistics saving process";
625 }
626 }
627
628 std::vector<std::string>
629 Memory::split(std::string str, char delimiter)
630 {
632 // Using str in a string stream
633 std::stringstream ss(str);
634 std::vector<std::string> res;
635 std::string token;
636 while (std::getline(ss, token, delimiter))
637 {
638 res.push_back(token);
639 }
640 return res;
641 }
642} // namespace armarx::armem::server::ltm
#define VAROUT(x)
constexpr T c
std::string str(const T &t)
std::string coreSegmentName
Definition MemoryID.h:51
MemoryID getMemoryID() const
Definition MemoryID.cpp:286
MemoryID cleanID() const
Definition MemoryID.cpp:133
std::string str(bool escapeDelimiters=true) const
Get a string representation of this memory ID.
Definition MemoryID.cpp:102
std::string memoryName
Definition MemoryID.h:50
void append(const OtherDerivedT &other)
bool forEachCoreSegment(CoreSegmentFunctionT &&func)
Definition MemoryBase.h:188
bool hasCoreSegment(const std::string &name) const
Definition MemoryBase.h:107
CoreSegmentT & addCoreSegment(const std::string &name, aron::type::ObjectPtr coreSegmentType=nullptr, const std::vector< PredictionEngine > &predictionEngines={})
Add an empty core segment with the given name, type and prediction engines.
Definition MemoryBase.h:261
std::vector< std::string > getCoreSegmentNames() const
Definition MemoryBase.h:240
CoreSegmentT & getCoreSegment(const std::string &name)
Definition MemoryBase.h:134
void getAndSaveStatistics()
getAndSaveStatistics generates and saves statistics for a LTM recording
Definition Memory.cpp:573
void _setMemoryID(const MemoryID &memoryId) final
Definition Memory.cpp:176
void _enqueueForAsyncStorage(std::shared_ptr< const armem::wm::Memory > memory) final
Definition Memory.cpp:557
void _resolve(armem::wm::Memory &wmMemory) final
Definition Memory.cpp:399
detail::mixin::BufferedMemoryMixin< CoreSegment > BufferedBase
Definition Memory.h:27
detail::mixin::CachedMemoryMixin< CoreSegment > CachedBase
Definition Memory.h:28
void _loadLatestNReferences(int n, armem::wm::Memory &wmMemory) final
Definition Memory.cpp:325
void _directlyStore(const armem::wm::Memory &wmMemory, bool simulatedVersion) final
Definition Memory.cpp:429
void _setExportName(const std::string &memoryName) final
Definition Memory.cpp:147
bool _implHasCoreSegment(const std::string &coreSegmentName) const final
Definition Memory.cpp:214
bool _implForEachCoreSegment(std::function< void(CoreSegment &)> func) const final
Definition Memory.cpp:188
std::shared_ptr< armem::wm::Memory > _preFilterMemory(const armem::wm::Memory &memory, uint64_t &filteredCount, uint64_t &passedCount) final
Pre-filter a memory object before enqueuing for async storage.
Definition Memory.cpp:470
void setPersistenceStrategy(std::shared_ptr< persistence::RedundantPersistenceStrategy > persistenceStrategy)
Definition Memory.h:89
std::shared_ptr< CoreSegment > _implFindCoreSegment(const std::string &coreSegmentName) const final
Definition Memory.cpp:225
void _loadAllReferences(armem::wm::Memory &wmMemory) final
Definition Memory.cpp:253
void _store(const armem::wm::Memory &wmMemory) final
Definition Memory.cpp:422
void createPropertyDefinitions(PropertyDefinitionsPtr &defs, const std::string &prefix) override
default parameters. Implementation should use the configuration to configure
Definition Memory.cpp:244
void _configure(const nlohmann::json &config) final
configuration
Definition Memory.cpp:18
void _enqueuePendingConversion(detail::mixin::PendingConversion pending) final
Definition Memory.cpp:565
detail::MemoryBase< CoreSegment > MemoryBase
Definition Memory.h:26
void store(const armem::wm::CoreSegment &coreSeg, bool simulatedVersion)
encode the content of a wm::Memory and store
void resolve(armem::wm::CoreSegment &coreSeg)
convert the references of the input into a wm::Memory
virtual void createPropertyDefinitions(PropertyDefinitionsPtr &defs, const std::string &prefix)
Definition MemoryBase.h:394
bool forEachCoreSegment(std::function< void(CoreSegmentT &)> func) const
Definition MemoryBase.h:373
struct armarx::armem::server::ltm::detail::MemoryBase::Properties p
std::map< std::string, processor::SnapshotFilter::FilterStatistics > getFilterStatistics()
Definition MemoryBase.h:463
virtual std::string getExportName() const
Definition MemoryItem.h:27
std::shared_ptr< Processors > processors
Definition MemoryItem.h:54
void createPropertyDefinitions(PropertyDefinitionsPtr &defs, const std::string &prefix)
void enqueueForAsyncStoragePublic(std::shared_ptr< const armem::wm::Memory > memory)
Client-side working memory core segment.
Client-side working memory.
Brief description of class memory.
Definition memory.h:39
#define ARMARX_INFO
The normal logging level.
Definition Logging.h:181
#define ARMARX_IMPORTANT
The logging level for always important information, but expected behaviour (in contrast to ARMARX_WAR...
Definition Logging.h:190
#define ARMARX_DEBUG
The logging level for output that is only interesting while debugging.
Definition Logging.h:184
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
const std::list< std::string > identifiers
auto make_shared(Args &&... args)
IceUtil::Handle< class PropertyDefinitionContainer > PropertyDefinitionsPtr
PropertyDefinitions smart pointer type.
Holds snapshots and metadata for deferred conversion in async thread This allows us to defer the expe...
#define ARMARX_TRACE
Definition trace.h:77