25 std::string exportPath =
p.export_path;
27 std::string persistenceStrategiesStr =
p.persistenceStrategies;
29 ARMARX_INFO <<
"Persistence strategies=" << persistenceStrategiesStr;
31 std::vector<std::string> persistenceStrategies = split(persistenceStrategiesStr,
',');
33 ARMARX_INFO <<
"Found " << persistenceStrategies.size() <<
" persistence strategies";
36 std::string identifiersStr =
p.persistenceStrategyIdentifier;
37 std::vector<std::string>
identifiers = split(identifiersStr,
',');
38 std::string identifier =
"defaultStrategy";
39 ARMARX_INFO <<
"Persistence identifiers=" << identifiersStr;
43 std::shared_ptr<armem::server::ltm::persistence::RedundantPersistenceStrategy>
44 redundantPersistence =
45 std::make_shared<armem::server::ltm::persistence::RedundantPersistenceStrategy>();
48 for (
size_t i = 0; i < persistenceStrategies.size(); i++)
50 std::string s = persistenceStrategies.at(i);
64 ARMARX_INFO <<
"Min available disk space=" <<
p.minDiskSpace;
66 std::shared_ptr<armem::server::ltm::persistence::DiskPersistence> diskPersistence =
67 std::make_shared<armem::server::ltm::persistence::DiskPersistence>(
68 std::filesystem::path(exportPath));
69 diskPersistence->setIdentifier(identifier);
71 diskPersistence->setMinAvailableDiskSpace(
p.minDiskSpace);
74 if (
p.batchWriteEnabled)
76 diskPersistence->setBatchSizeThreshold(
static_cast<size_t>(
p.batchSizeThreshold));
77 diskPersistence->setBatchTimeThresholdMs(
static_cast<size_t>(
p.batchTimeThresholdMs));
78 diskPersistence->setBatchWriteEnabled(
true);
79 ARMARX_INFO <<
"Batch write mode enabled: threshold=" <<
p.batchSizeThreshold
80 <<
" items or " <<
p.batchTimeThresholdMs <<
"ms";
83 redundantPersistence->addStrategy(diskPersistence);
88 ARMARX_WARNING <<
"Persistence strategy=" << s <<
" currently deactivated";
109 else if (s ==
"mongodb")
112 ARMARX_WARNING <<
"Persistence strategy=" << s <<
" not implemented";
136 const std::string& exportName,
137 const std::string& memoryName,
138 const std::shared_ptr<persistence::RedundantPersistenceStrategy>& persistenceStrategy) :
142 persistenceStrategy_(persistenceStrategy)
151 if (persistenceStrategy_)
153 persistenceStrategy_->setExportName(memoryName);
171 ARMARX_IMPORTANT <<
"Storing of data finished, starting to generate and save statistics...";
195 for (
auto& core_segment : persistenceStrategy_->getContainerKeys(
id()))
199 std::shared_ptr<persistence::MemoryPersistenceStrategy> coreSegmentPersistenceStrategy(
200 persistenceStrategy_);
203 id().withCoreSegmentName(core_segment),
205 coreSegmentPersistenceStrategy);
219 persistenceStrategy_->containsContainer(
id(), coreSegmentName);
221 return foundCoreSegment;
224 std::shared_ptr<CoreSegment>
234 std::shared_ptr<persistence::MemoryPersistenceStrategy> coreSegmentPersistenceStrategy(
235 persistenceStrategy_);
238 id().withCoreSegmentName(coreSegmentName),
240 coreSegmentPersistenceStrategy);
262 [&wmMemory](
auto& ltmCoreSegment)
265 ltmCoreSegment.loadAllReferences(wmCoreSegment);
272 existingWmCoreSegment.
append(wmCoreSegment);
292 [&wmMemory, &coreSegmentNames](
auto& ltmCoreSegment)
295 (std::find(coreSegmentNames.begin(),
296 coreSegmentNames.end(),
297 ltmCoreSegment.id().coreSegmentName) != coreSegmentNames.end());
301 ltmCoreSegment.loadAllReferences(wmCoreSegment);
308 existingWmCoreSegment.
append(wmCoreSegment);
317 ARMARX_DEBUG <<
"Skipping loading CoreSegment with name "
319 <<
" from LTM into WM as it is not in the defined list";
330 ARMARX_DEBUG <<
"Memory: Load latest N references for all core segments (id="
334 [&wmMemory, &n](
auto& ltmCoreSegment)
337 ltmCoreSegment.loadLatestNReferences(n, wmCoreSegment);
344 existingWmCoreSegment.
append(wmCoreSegment);
356 std::list<std::string> coreSegNames)
361 ARMARX_DEBUG <<
"Memory: Load latest references for set of core segments (id="
365 [&wmMemory, &n, &coreSegNames](
auto& ltmCoreSegment)
368 (std::find(coreSegNames.begin(),
370 ltmCoreSegment.id().coreSegmentName) != coreSegNames.end());
373 ARMARX_DEBUG <<
"Load core segment=" << ltmCoreSegment.id().coreSegmentName;
375 ltmCoreSegment.loadLatestNReferences(n, wmCoreSegment);
382 existingWmCoreSegment.
append(wmCoreSegment);
391 ARMARX_DEBUG <<
"Skipping loading CoreSegment with name "
393 <<
" from LTM into WM as it is not in the defined list";
406 [&](
auto& wmCoreSegment)
408 std::shared_ptr<persistence::MemoryPersistenceStrategy>
409 coreSegmentPersistenceStrategy(persistenceStrategy_);
413 id().withCoreSegmentName(wmCoreSegment.id().coreSegmentName),
415 coreSegmentPersistenceStrategy);
417 ltmCoreSegment.
resolve(wmCoreSegment);
433 if (
id().memoryName.empty())
436 <<
"During storage of memory '" << wmMemory.
id().
str()
437 <<
"' I noticed that the corresponding LTM has no id set. "
438 <<
"I set the id of the LTM to the same name, however this should not happen!";
446 [&](
const auto& wmCoreSegment)
448 std::shared_ptr<persistence::MemoryPersistenceStrategy>
449 coreSegmentPersistenceStrategy(persistenceStrategy_);
453 id().withCoreSegmentName(wmCoreSegment.id().coreSegmentName),
455 coreSegmentPersistenceStrategy);
458 ltmCoreSegment.
store(wmCoreSegment, simulatedVersion);
469 std::shared_ptr<armem::wm::Memory>
471 uint64_t& filteredCount,
472 uint64_t& passedCount)
483 memory.forEachCoreSegment([&](
const auto& cs) {
484 cs.forEachProviderSegment([&](
const auto& ps) {
485 ps.forEachEntity([&](
const auto& entity) {
486 entity.forEachSnapshot([&](
const auto&) {
494 return std::make_shared<armem::wm::Memory>(
memory);
498 auto filteredMemory = std::make_shared<armem::wm::Memory>(
memory.id());
501 memory.forEachCoreSegment([&](
const auto& wmCoreSegment) {
502 wmCoreSegment.forEachProviderSegment([&](
const auto& wmProviderSegment) {
503 wmProviderSegment.forEachEntity([&](
const auto& wmEntity) {
504 wmEntity.forEachSnapshot([&](
const auto& wmSnapshot) {
506 bool accepted =
true;
509 if (!filter->accept(wmSnapshot,
false))
520 auto& cs = filteredMemory->hasCoreSegment(wmCoreSegment.name())
521 ? filteredMemory->getCoreSegment(wmCoreSegment.name())
522 : filteredMemory->addCoreSegment(wmCoreSegment.name());
524 auto& ps = cs.hasProviderSegment(wmProviderSegment.name())
525 ? cs.getProviderSegment(wmProviderSegment.name())
526 : cs.addProviderSegment(wmProviderSegment.name());
528 auto& entity = ps.hasEntity(wmEntity.name())
529 ? ps.getEntity(wmEntity.name())
530 : ps.addEntity(wmEntity.name());
532 entity.addSnapshot(wmSnapshot);
544 ARMARX_DEBUG <<
"Pre-filter: " << passedCount <<
" passed, "
545 << filteredCount <<
" filtered out";
547 return filteredMemory;
576 auto firstTimeStarted = this->
statistics.firstStarted;
577 if (!firstTimeStarted.isValid())
580 ARMARX_DEBUG <<
"No Statistics will be saved because firstStarted is invalid: "
589 if (first_stats.empty())
592 ARMARX_DEBUG <<
"No Statistics will be saved because no actual data was recorded.";
595 std::map<std::string,
596 std::map<std::string, ltm::processor::SnapshotFilter::FilterStatistics>>
598 std::map<std::string, armarx::core::time::DateTime> times;
604 information[
"LTM"] = first_stats;
608 ARMARX_DEBUG <<
"Something went wrong after getting the statistics";
624 ARMARX_DEBUG <<
"Something went wrong with the statistics saving process";
628 std::vector<std::string>
629 Memory::split(std::string
str,
char delimiter)
633 std::stringstream ss(
str);
634 std::vector<std::string> res;
636 while (std::getline(ss, token, delimiter))
638 res.push_back(token);
std::string coreSegmentName
MemoryID getMemoryID() const
std::string str(bool escapeDelimiters=true) const
Get a string representation of this memory ID.
void append(const OtherDerivedT &other)
bool forEachCoreSegment(CoreSegmentFunctionT &&func)
bool hasCoreSegment(const std::string &name) const
CoreSegmentT & addCoreSegment(const std::string &name, aron::type::ObjectPtr coreSegmentType=nullptr, const std::vector< PredictionEngine > &predictionEngines={})
Add an empty core segment with the given name, type and prediction engines.
std::vector< std::string > getCoreSegmentNames() const
CoreSegmentT & getCoreSegment(const std::string &name)
void getAndSaveStatistics()
getAndSaveStatistics generates and saves statistics for a LTM recording
void _setMemoryID(const MemoryID &memoryId) final
void _enqueueForAsyncStorage(std::shared_ptr< const armem::wm::Memory > memory) final
void _resolve(armem::wm::Memory &wmMemory) final
detail::mixin::BufferedMemoryMixin< CoreSegment > BufferedBase
detail::mixin::CachedMemoryMixin< CoreSegment > CachedBase
void _loadLatestNReferences(int n, armem::wm::Memory &wmMemory) final
void _directlyStore(const armem::wm::Memory &wmMemory, bool simulatedVersion) final
void _setExportName(const std::string &memoryName) final
bool _implHasCoreSegment(const std::string &coreSegmentName) const final
bool _implForEachCoreSegment(std::function< void(CoreSegment &)> func) const final
std::shared_ptr< armem::wm::Memory > _preFilterMemory(const armem::wm::Memory &memory, uint64_t &filteredCount, uint64_t &passedCount) final
Pre-filter a memory object before enqueuing for async storage.
void setPersistenceStrategy(std::shared_ptr< persistence::RedundantPersistenceStrategy > persistenceStrategy)
std::shared_ptr< CoreSegment > _implFindCoreSegment(const std::string &coreSegmentName) const final
void _loadAllReferences(armem::wm::Memory &wmMemory) final
void _store(const armem::wm::Memory &wmMemory) final
void _loadOnStartup() final
void createPropertyDefinitions(PropertyDefinitionsPtr &defs, const std::string &prefix) override
default parameters. Implementation should use the configuration to configure
void _configure(const nlohmann::json &config) final
configuration
void _enqueuePendingConversion(detail::mixin::PendingConversion pending) final
detail::MemoryBase< CoreSegment > MemoryBase
void store(const armem::wm::CoreSegment &coreSeg, bool simulatedVersion)
encode the content of a wm::Memory and store
void resolve(armem::wm::CoreSegment &coreSeg)
convert the references of the input into a wm::Memory
virtual void createPropertyDefinitions(PropertyDefinitionsPtr &defs, const std::string &prefix)
bool forEachCoreSegment(std::function< void(CoreSegmentT &)> func) const
struct armarx::armem::server::ltm::detail::MemoryBase::Properties p
Statistics getStatistics() const
std::map< std::string, processor::SnapshotFilter::FilterStatistics > getFilterStatistics()
MemoryID getMemoryID() const
virtual std::string getExportName() const
void setMemoryID(const MemoryID &)
std::shared_ptr< Processors > processors
void createPropertyDefinitions(PropertyDefinitionsPtr &defs, const std::string &prefix)
void addToBuffer(const armem::wm::Memory &memory)
void configureMixin(const nlohmann::json &json)
void enqueuePendingConversion(PendingConversion pending)
void enqueueForAsyncStoragePublic(std::shared_ptr< const armem::wm::Memory > memory)
void setMixinMemoryID(const MemoryID &id)
bool cacheHasCoreSegment(const std::string &n) const
void configureMixin(const nlohmann::json &json)
void setMixinMemoryID(const MemoryID &id)
Client-side working memory core segment.
Client-side working memory.
Brief description of class memory.
#define ARMARX_INFO
The normal logging level.
#define ARMARX_IMPORTANT
The logging level for always important information, but expected behaviour (in contrast to ARMARX_WAR...
#define ARMARX_DEBUG
The logging level for output that is only interesting while debugging.
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
const std::list< std::string > identifiers
auto make_shared(Args &&... args)
IceUtil::Handle< class PropertyDefinitionContainer > PropertyDefinitionsPtr
PropertyDefinitions smart pointer type.
Holds snapshots and metadata for deferred conversion in async thread This allows us to defer the expe...