20#include <ArmarXCore/interface/core/UserException.h>
21#include <ArmarXCore/interface/observers/ObserverInterface.h>
28#include <RobotAPI/libraries/armem/server/test/ForgettingExperiments.h>
48 template <
class _CoreSegmentT>
75 if (m ==
"CONSOLIDATE_REMOVED")
79 else if (m ==
"CONSOLIDATE_ALL")
83 else if (m ==
"CONSOLIDATE_LATEST")
109 bool en =
p.enabled_on_startup;
119 const auto json = nlohmann::json::parse(
p.configuration_on_startup);
128 ARMARX_WARNING <<
"Failed to parse `" <<
p.configuration_on_startup <<
"`";
142 processors->resetFilterStatisticsForNewEpisode();
145 ARMARX_INFO <<
"Enabling LTM " <<
id().
str() <<
" at " << now.toDateTimeString();
162 ARMARX_INFO <<
"Disabling LTM " <<
id().
str() <<
" at " << now.toDateTimeString();
263 IceUtil::Time startTime;
266 startTime = IceUtil::Time::now();
275 <<
"Ignoring to put a Memory into the LTM because it got filtered.";
285 IceUtil::Time endTime = IceUtil::Time::now();
286 IceUtil::Time elapsed = endTime - startTime;
287 float elapsedMs = elapsed.toMilliSecondsDouble();
289 std::string channelName =
id().
memoryName +
"Memory_LTM";
293 {
"LTM | t store [ms]",
new Variant(elapsedMs)},
304 auto memory = std::make_shared<armem::wm::Memory>(serverMemory.
name());
317 const std::vector<wm::EntitySnapshot>& snapshots,
318 const std::vector<mixin::PendingConversion::SegmentMetadata>& segmentMetadata)
326 std::vector<wm::EntitySnapshot> filteredSnapshots;
329 filteredSnapshots.reserve(snapshots.size());
330 for (
const auto& snapshot : snapshots)
332 bool accepted =
true;
335 if (!filter->accept(snapshot,
false))
343 filteredSnapshots.push_back(snapshot);
348 if (filteredSnapshots.empty())
351 <<
" snapshots were pre-filtered in storeSnapshotsAsync";
358 filteredSnapshots = snapshots;
364 pending.
snapshots = std::move(filteredSnapshots);
386 std::shared_ptr<CoreSegmentT>
396 defs->optional(
p.enabled_on_startup, prefix +
"enabled");
397 defs->optional(
p.recordingMode, prefix +
"recordingMode");
398 defs->optional(
p.configuration_on_startup, prefix +
"configuration");
399 defs->optional(
p.export_name, prefix +
"exportName");
400 defs->optional(
p.storeOnStop, prefix +
"storeOnStop");
401 defs->optional(
p.export_path, prefix +
"exportPath");
403 defs->optional(
p.importOnStartUp, prefix +
"importOnStartUp");
404 defs->optional(
p.maxAmountOfSnapshotsLoaded, prefix +
"maxAmountSnapshotsLoaded");
405 defs->optional(
p.coreSegmentsToLoad, prefix +
"loadedCoreSegments");
407 defs->optional(
p.persistenceStrategyIdentifier,
408 prefix +
"persistenceStrategyIdentifier");
409 defs->optional(
p.persistenceStrategies, prefix +
"persistenceStrategies");
410 defs->optional(
p.restHost, prefix +
"rest.host");
411 defs->optional(
p.restPort, prefix +
"rest.port");
412 defs->optional(
p.restDisableIfNotAvailable, prefix +
"rest.disableIfNotAvailable");
414 defs->optional(
p.minDiskSpace, prefix +
"minDiskSpace");
417 defs->optional(
p.batchWriteEnabled, prefix +
"batchWriteEnabled",
418 "Enable batch writing for improved I/O throughput");
419 defs->optional(
p.batchSizeThreshold, prefix +
"batchSizeThreshold",
420 "Number of items to accumulate before flushing to disk");
421 defs->optional(
p.batchTimeThresholdMs, prefix +
"batchTimeThresholdMs",
422 "Maximum time in ms to hold items before flushing");
462 std::map<std::string, processor::SnapshotFilter::FilterStatistics>
468 auto stats =
processors->getSnapshotFilterStatistics();
471 catch (InvalidArgumentException& e)
479 std::map<std::string, processor::SnapshotFilter::FilterStatistics> emptyStatistics;
480 return emptyStatistics;
552 std::list<std::string> coreSegNames) = 0;
556 bool simulatedVersion =
false) = 0;
561 std::list<std::string> coreSegNames) = 0;
594 "{ \"SnapshotFrequencyFilter\": "
595 "{\"WaitingTimeInMsForFilter\" : 50}}";
SpamFilterDataPtr deactivateSpam(SpamFilterDataPtr const &spamFilter, float deactivationDurationSec, const std::string &identifier, bool deactivate)
The Variant class is described here: Variants.
std::string str(bool escapeDelimiters=true) const
Get a string representation of this memory ID.
virtual bool _implHasCoreSegment(const std::string &coreSegmentName) const =0
virtual void _resolve(armem::wm::Memory &memory)=0
void loadLatestNReferences(int n, armem::wm::Memory &memory)
virtual void _directlyStore(const armem::wm::Memory &memory, bool simulatedVersion=false)=0
void loadLatestNReferences(int n, armem::wm::Memory &memory, std::list< std::string > coreSegNames)
DebugObserverInterfacePrx getDebugObserver() const
Get the current debug observer (may be nullptr)
virtual void _loadAllReferences(armem::wm::Memory &memory, std::list< std::string > coreSegNames)=0
void setRecordingMode(const std::string &m)
virtual void createPropertyDefinitions(PropertyDefinitionsPtr &defs, const std::string &prefix)
default parameters. Implementation should use the configuration to configure
void enable()
enable this LTM
std::shared_ptr< CoreSegmentT > findCoreSegment(const std::string &coreSegmentName) const
find core segment
virtual void _enqueuePendingConversion(mixin::PendingConversion pending)=0
bool forEachCoreSegment(std::function< void(CoreSegmentT &)> func) const
iterate over all core segments of this ltm
void store(std::shared_ptr< const armem::wm::Memory > memory)
append a wm::Memory instance to the ltm (asynchronously via thread pool)
virtual void _setExportName(const std::string &)
RecordingMode recordingMode
void setDebugObserver(DebugObserverInterfacePrx observer)
Set an optional debug observer for timing measurements.
void loadAllAndResolve(armem::wm::Memory &memory, std::list< std::string > coreSegmentNames)
armem::wm::Memory loadLatestNReferences(int n)
armem::wm::Memory loadAllReferences()
return the full ltm as a wm::Memory with only references the ltm may be huge, use with caution
void storeSnapshotsAsync(const std::string &memoryName, const std::vector< wm::EntitySnapshot > &snapshots, const std::vector< mixin::PendingConversion::SegmentMetadata > &segmentMetadata)
Store snapshots with deferred conversion (happens in async thread) This is more efficient than callin...
virtual void _loadOnStartup()=0
virtual void _store(const armem::wm::Memory &memory)=0
virtual std::shared_ptr< CoreSegmentT > _implFindCoreSegment(const std::string &coreSegmentName) const =0
void disable()
disable this LTM
virtual void _loadLatestNReferences(int n, armem::wm::Memory &memory, std::list< std::string > coreSegNames)=0
void setRecordingMode(const RecordingMode m)
virtual void _enqueueForAsyncStorage(std::shared_ptr< const armem::wm::Memory > memory)=0
DebugObserverInterfacePrx debugObserver
static std::string getLevelName()
get level name1
bool hasCoreSegment(const std::string &coreSegmentName) const
check if core segment exists
void startRecording()
enable/disable
virtual void resetStatistics()
statistics
armem::wm::Memory loadAllAndResolve()
return the full ltm as a wm::Memory and resolves the references the ltm may be huge,...
MemoryBase(const std::string &exportName, const MemoryID &id)
virtual void _loadAllReferences(armem::wm::Memory &memory)=0
virtual void _configure(const nlohmann::json &)
configuration
virtual bool _implForEachCoreSegment(std::function< void(CoreSegmentT &)> func) const =0
RecordingMode getRecordingMode() const
struct armarx::armem::server::ltm::detail::MemoryBase::Properties p
void configure()
initialize config
void loadAllReferences(armem::wm::Memory &memory)
void resolve(armem::wm::Memory &memory)
convert the references of the input into a wm::Memory
Statistics getStatistics() const
_CoreSegmentT CoreSegmentT
void store(const armem::server::wm::Memory &serverMemory)
append a wm::Memory instance to the ltm
virtual void _loadLatestNReferences(int n, armem::wm::Memory &memory)=0
void loadAllAndResolve(armem::wm::Memory &memory)
std::map< std::string, processor::SnapshotFilter::FilterStatistics > getFilterStatistics()
virtual std::string getExportName() const
void setExportName(const std::string &n)
std::shared_ptr< Processors > processors
MemoryItem(const std::string &exportName, const MemoryID &)
Client-side working memory.
Represents a point in time.
static DateTime Invalid()
Brief description of class memory.
#define ARMARX_INFO
The normal logging level.
#define ARMARX_DEBUG
The logging level for output that is only interesting while debugging.
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
#define TIMING_START(name)
Helper macro to do timing tests.
#define TIMING_END_STREAM(name, os)
Prints duration.
Commit toCommit(const ContainerT &container)
::IceInternal::ProxyHandle<::IceProxy::armarx::DebugObserverInterface > DebugObserverInterfacePrx
IceUtil::Handle< class PropertyDefinitionContainer > PropertyDefinitionsPtr
PropertyDefinitions smart pointer type.
std::string recordingMode
int maxAmountOfSnapshotsLoaded
std::string persistenceStrategies
bool overwriteIdentifierOnLtmLoad
std::string configuration_on_startup
std::string persistenceStrategyIdentifier
std::string coreSegmentsToLoad
bool restDisableIfNotAvailable
armarx::core::time::DateTime lastStarted
armarx::core::time::DateTime lastStopped
armarx::core::time::DateTime lastEnabled
armarx::core::time::DateTime firstStopped
long recordedCoreSegments
armarx::core::time::DateTime firstStarted
Holds snapshots and metadata for deferred conversion in async thread This allows us to defer the expe...
std::vector< SegmentMetadata > segmentMetadata
std::vector< wm::EntitySnapshot > snapshots