8#include <IceUtil/Time.h>
44 std::vector<ltm::detail::mixin::PendingConversion::SegmentMetadata>
45 extractSegmentMetadata(
const wm::Memory& structure,
46 const std::vector<wm::EntitySnapshot>& snapshots)
48 std::vector<ltm::detail::mixin::PendingConversion::SegmentMetadata> metadata;
51 std::map<std::string, std::set<std::string>> segmentMap;
52 for (
const auto& snapshot : snapshots)
54 segmentMap[snapshot.id().coreSegmentName].insert(snapshot.id().providerSegmentName);
58 for (
const auto& [coreSegmentName, providerNames] : segmentMap)
63 ARMARX_WARNING <<
"Core segment not found in structure: " << coreSegmentName;
69 meta.coreSegmentAronType = coreStructure->aronType();
71 for (
const auto& providerName : providerNames)
73 auto* providerStructure = coreStructure->findProviderSegment(providerName);
74 if (providerStructure)
76 meta.providerSegmentAronTypes[providerName] = providerStructure->aronType();
80 metadata.push_back(std::move(meta));
101 data::AddSegmentResult
107 ARMARX_DEBUG <<
"Adding segment using MemoryToIceAdapter";
109 data::AddSegmentResult output;
114 coreSegment = &
workingMemory->getCoreSegment(input.coreSegmentName);
120 coreSegment = &
workingMemory->addCoreSegment(input.coreSegmentName);
124 output.success =
false;
125 output.errorMessage = e.what();
131 if (input.providerSegmentName.size() > 0)
134 [&coreSegment, &input]()
143 if (input.clearWhenExists)
158 output.success =
true;
159 output.segmentID = segmentID.
str();
163 data::AddSegmentsResult
169 data::AddSegmentsResult output;
170 for (
const auto& i : input)
172 output.push_back(
addSegment(i, addCoreSegments));
182 auto handleException = [](
const std::string& what)
184 data::CommitResult result;
185 data::EntityUpdateResult& r = result.results.emplace_back();
187 r.errorMessage = what;
199 return handleException(e.what());
201 catch (
const Ice::Exception& e)
204 return handleException(e.what());
208 data::CommitResult resultIce;
209 toIce(resultIce, result);
225 return this->_commit(
commit,
false);
233 auto handleException = [](
const std::string& what)
235 data::CommitResult result;
236 data::EntityUpdateResult& r = result.results.emplace_back();
238 r.errorMessage = what;
250 return handleException(e.what());
252 catch (
const Ice::Exception& e)
255 return handleException(e.what());
259 data::CommitResult resultIce;
260 toIce(resultIce, result);
276 return this->_commit(
commit,
true);
280 MemoryToIceAdapter::_commit(
const armem::Commit& commit,
bool locking)
286 auto commitStartTime = std::chrono::steady_clock::now();
288 IceUtil::Time startTime;
292 startTime = IceUtil::Time::now();
299 std::vector<data::MemoryID> updatedIDs;
308 IceUtil::Time updateStartTime;
311 updateStartTime = IceUtil::Time::now();
326 IceUtil::Time updateEndTime = IceUtil::Time::now();
327 IceUtil::Time updateElapsed = updateEndTime - updateStartTime;
328 float updateElapsedMs = updateElapsed.toMilliSecondsDouble();
331 debugObserver->setDebugChannel(
334 {
"Memory | Commit | updateResult step [ms]",
new armarx::Variant(updateElapsedMs)},
338 for (
const auto& snapshot : updateResult.removedSnapshots)
340 ARMARX_DEBUG <<
"The id " << snapshot.id() <<
" was removed from wm";
354 IceUtil::Time storeStartTime;
357 storeStartTime = IceUtil::Time::now();
361 auto segmentMetadata = extractSegmentMetadata(*
workingMemory, updateResult.updatedSnapshots);
367 updateResult.updatedSnapshots,
372 IceUtil::Time storeEndTime = IceUtil::Time::now();
373 IceUtil::Time storeElapsed = storeEndTime - storeStartTime;
374 float storeElapsedMs = storeElapsed.toMilliSecondsDouble();
377 debugObserver->setDebugChannel(
380 {
"Memory | Commit | LTM enqueue (CONSOLIDATE_ALL) [ms]",
new armarx::Variant(storeElapsedMs)},
397 IceUtil::Time storeStartTime;
400 storeStartTime = IceUtil::Time::now();
404 auto segmentMetadata = extractSegmentMetadata(*
workingMemory, updateResult.removedSnapshots);
410 updateResult.removedSnapshots,
415 IceUtil::Time storeEndTime = IceUtil::Time::now();
416 IceUtil::Time storeElapsed = storeEndTime - storeStartTime;
417 float storeElapsedMs = storeElapsed.toMilliSecondsDouble();
420 debugObserver->setDebugChannel(
423 {
"Memory | Commit | LTM enqueue (CONSOLIDATE_REMOVED) [ms]",
new armarx::Variant(storeElapsedMs)},
437 data::MemoryID&
id = updatedIDs.emplace_back();
441 catch (
const error::ArMemError& e)
445 statistics.failedUpdates.fetch_add(1, std::memory_order_relaxed);
447 catch (
const aron::error::AronException& e)
451 statistics.failedUpdates.fetch_add(1, std::memory_order_relaxed);
453 catch (
const Ice::Exception& e)
457 statistics.failedUpdates.fetch_add(1, std::memory_order_relaxed);
462 statistics.failedUpdates.fetch_add(1, std::memory_order_relaxed);
472 auto commitEndTime = std::chrono::steady_clock::now();
473 double commitBlockingTimeMs = std::chrono::duration<double, std::milli>(commitEndTime - commitStartTime).count();
477 double currentTotal = statistics.totalCommitBlockingTimeMs.load(std::memory_order_relaxed);
478 statistics.totalCommitBlockingTimeMs.store(currentTotal + commitBlockingTimeMs, std::memory_order_relaxed);
480 double currentMax = statistics.maxCommitBlockingTimeMs.load(std::memory_order_relaxed);
481 while (commitBlockingTimeMs > currentMax)
483 if (statistics.maxCommitBlockingTimeMs.compare_exchange_weak(currentMax, commitBlockingTimeMs, std::memory_order_relaxed))
492 IceUtil::Time endTime = IceUtil::Time::now();
493 IceUtil::Time elapsed = endTime - startTime;
494 float elapsedMs = elapsed.toMilliSecondsDouble();
497 auto [commitsPerSec, queriesPerSec] = statistics.updateRates();
503 debugObserver->setDebugChannel(
507 {
"Memory | Commit | t blocked [ms]",
new armarx::Variant(elapsedMs)},
508 {
"Memory | Commit | max blocked [ms]",
new armarx::Variant(
static_cast<float>(statistics.maxCommitBlockingTimeMs.load()))},
510 {
"Memory | writes/sec",
new armarx::Variant(
static_cast<float>(commitsPerSec))},
511 {
"Memory | reads/sec",
new armarx::Variant(
static_cast<float>(queriesPerSec))},
513 {
"Memory | total commits",
new armarx::Variant(
static_cast<int>(statistics.totalCommitCount.load()))},
514 {
"Memory | total entity updates",
new armarx::Variant(
static_cast<int>(statistics.totalEntityUpdates.load()))},
515 {
"Memory | successful updates",
new armarx::Variant(
static_cast<int>(statistics.successfulUpdates.load()))},
516 {
"Memory | failed updates",
new armarx::Variant(
static_cast<int>(statistics.failedUpdates.load()))},
518 {
"Memory | LTM async queue size",
new armarx::Variant(
static_cast<int>(queueSize))},
527 armem::query::data::Result
535 auto queryStartTime = std::chrono::steady_clock::now();
538 statistics.totalQueryCount.fetch_add(1, std::memory_order_relaxed);
545 armem::query::data::Result result;
550 result.success =
true;
551 if (result.memory->coreSegments.size() == 0)
557 auto queryEndTime = std::chrono::steady_clock::now();
558 double queryBlockingTimeMs = std::chrono::duration<double, std::milli>(queryEndTime - queryStartTime).count();
562 double currentTotal = statistics.totalQueryBlockingTimeMs.load(std::memory_order_relaxed);
563 statistics.totalQueryBlockingTimeMs.store(currentTotal + queryBlockingTimeMs, std::memory_order_relaxed);
565 double currentMax = statistics.maxQueryBlockingTimeMs.load(std::memory_order_relaxed);
566 while (queryBlockingTimeMs > currentMax)
568 if (statistics.maxQueryBlockingTimeMs.compare_exchange_weak(currentMax, queryBlockingTimeMs, std::memory_order_relaxed))
580 debugObserver->setDebugChannel(
583 {
"Memory | Query | t blocked [ms]",
new armarx::Variant(
static_cast<float>(queryBlockingTimeMs))},
584 {
"Memory | Query | max blocked [ms]",
new armarx::Variant(
static_cast<float>(statistics.maxQueryBlockingTimeMs.load()))},
585 {
"Memory | total queries",
new armarx::Variant(
static_cast<int>(statistics.totalQueryCount.load()))},
592 armem::query::data::Result
615 armem::query::data::Result result;
619 result.success =
true;
620 if (result.memory->coreSegments.size() == 0)
636 armem::structure::data::GetServerStructureResult
643 armem::structure::data::GetServerStructureResult ret;
654 if (query_result.success)
656 structure.
append(query_result.memory);
672 ARMARX_INFO <<
"Reloading of all core segments from LTM into WM triggered";
674 int maxAmountOfSnapshots = this->
longtermMemory->p.maxAmountOfSnapshotsLoaded;
677 this->
longtermMemory->loadLatestNReferences(maxAmountOfSnapshots, m);
681 auto res = this->
commit(com);
692 ARMARX_INFO <<
"Reloading of specific core segments from LTM into WM triggered";
694 std::ostringstream namesStr;
695 for (
auto it = coreSegmentNames.begin(); it != coreSegmentNames.end(); ++it)
697 if (it != coreSegmentNames.begin())
702 ARMARX_INFO <<
"Loading core segments=" << namesStr.str();
705 int maxAmountOfSnapshots = this->
longtermMemory->p.maxAmountOfSnapshotsLoaded;
708 this->
longtermMemory->loadLatestNReferences(maxAmountOfSnapshots, m, coreSegmentNames);
712 auto res = this->
commit(com);
721 ARMARX_INFO <<
"Reloading of coresegment defined in 'loadedCoreSegments' from LTM into WM "
722 "on startup triggered";
726 ARMARX_INFO <<
"Loading core segments=" << coreNames
727 <<
" defined in property 'loadedCoreSegments'";
730 std::list<std::string> names;
731 std::stringstream ss(coreNames);
734 while (std::getline(ss, item,
','))
736 names.push_back(item);
748 ARMARX_INFO <<
"Reloading of data from LTM into WM on startup triggered";
756 ARMARX_INFO <<
"Not loading initial data from LTM due to importOnStartup being "
764 dto::DirectlyStoreResult
770 dto::DirectlyStoreResult output;
771 output.success =
true;
779 dto::StartRecordResult
787 dto::StartRecordResult ret;
793 dto::StopRecordResult
803 ARMARX_INFO <<
"Starting to save left-over WM data into LTM";
809 ARMARX_INFO <<
"Not storing WM data into LTM on stop, because storeOnStop is "
819 std::thread flushThread(
826 ARMARX_INFO <<
"All pending data stored successfully";
832 ltm->bufferFinished();
834 flushThread.detach();
837 <<
"Stopped all LTM recordings, flushing async queue in background. "
838 <<
"Please wait with stopping the component until all files are written";
840 dto::StopRecordResult ret;
846 dto::RecordStatusResult
849 dto::RecordStatusResult ret;
858 [&savedSnapshots, &totalSnapshots](
const auto&
c)
860 c.forEachProviderSegment(
861 [&savedSnapshots, &totalSnapshots](
const auto& p)
864 [&savedSnapshots, &totalSnapshots](
const auto& e)
866 savedSnapshots += e.getStatistics().recordedSnapshots;
868 e.forEachSnapshot([&totalSnapshots](
const auto&)
869 { totalSnapshots++; });
874 ret.status.savedSnapshots = savedSnapshots;
875 ret.status.totalSnapshots = totalSnapshots;
881 prediction::data::PredictionResultSeq
889 prediction::data::EngineSupportMap
892 prediction::data::EngineSupportMap result;
929 auto [commitsPerSec, queriesPerSec] = statistics.updateRates();
932 size_t queueSize = 0;
933 size_t numThreadsProcessing = 0;
934 uint64_t asyncItemsEnqueued = 0;
935 uint64_t asyncItemsProcessed = 0;
936 uint64_t asyncSnapshotsStored = 0;
937 uint64_t asyncSnapshotsDropped = 0;
938 uint64_t asyncBackpressureEvents = 0;
939 double asyncAvgStorageTimeMs = 0.0;
940 double asyncMaxStorageTimeMs = 0.0;
946 const auto& asyncStats =
longtermMemory->getAsyncStorageStatistics();
947 asyncItemsEnqueued = asyncStats.totalItemsEnqueued.load(std::memory_order_relaxed);
948 asyncItemsProcessed = asyncStats.totalItemsProcessed.load(std::memory_order_relaxed);
949 asyncSnapshotsStored = asyncStats.totalSnapshotsStored.load(std::memory_order_relaxed);
950 asyncSnapshotsDropped = asyncStats.snapshotsDropped.load(std::memory_order_relaxed);
951 asyncBackpressureEvents = asyncStats.backpressureEvents.load(std::memory_order_relaxed);
952 asyncAvgStorageTimeMs = asyncStats.getAvgStorageTimeMs();
953 asyncMaxStorageTimeMs = asyncStats.getMaxStorageTimeMs();
957 uint64_t commitCount = statistics.totalCommitCount.load(std::memory_order_relaxed);
958 uint64_t queryCount = statistics.totalQueryCount.load(std::memory_order_relaxed);
959 double avgCommitBlockingMs = commitCount > 0
960 ? statistics.totalCommitBlockingTimeMs.load(std::memory_order_relaxed) / commitCount
962 double avgQueryBlockingMs = queryCount > 0
963 ? statistics.totalQueryBlockingTimeMs.load(std::memory_order_relaxed) / queryCount
967 debugObserver->setDebugChannel(
971 {
"Memory | writes/sec",
new armarx::Variant(
static_cast<float>(commitsPerSec))},
972 {
"Memory | reads/sec",
new armarx::Variant(
static_cast<float>(queriesPerSec))},
974 {
"Memory | total commits",
new armarx::Variant(
static_cast<int>(statistics.totalCommitCount.load()))},
975 {
"Memory | total queries",
new armarx::Variant(
static_cast<int>(statistics.totalQueryCount.load()))},
976 {
"Memory | total entity updates",
new armarx::Variant(
static_cast<int>(statistics.totalEntityUpdates.load()))},
977 {
"Memory | successful updates",
new armarx::Variant(
static_cast<int>(statistics.successfulUpdates.load()))},
978 {
"Memory | failed updates",
new armarx::Variant(
static_cast<int>(statistics.failedUpdates.load()))},
980 {
"Memory | Commit | avg blocked [ms]",
new armarx::Variant(
static_cast<float>(avgCommitBlockingMs))},
981 {
"Memory | Commit | max blocked [ms]",
new armarx::Variant(
static_cast<float>(statistics.maxCommitBlockingTimeMs.load()))},
982 {
"Memory | Query | avg blocked [ms]",
new armarx::Variant(
static_cast<float>(avgQueryBlockingMs))},
983 {
"Memory | Query | max blocked [ms]",
new armarx::Variant(
static_cast<float>(statistics.maxQueryBlockingTimeMs.load()))},
985 {
"Memory | LTM async queue size",
new armarx::Variant(
static_cast<int>(queueSize))},
986 {
"Memory | LTM threads processing",
new armarx::Variant(
static_cast<int>(numThreadsProcessing))},
987 {
"Memory | LTM items enqueued",
new armarx::Variant(
static_cast<int>(asyncItemsEnqueued))},
988 {
"Memory | LTM items processed",
new armarx::Variant(
static_cast<int>(asyncItemsProcessed))},
989 {
"Memory | LTM snapshots stored",
new armarx::Variant(
static_cast<int>(asyncSnapshotsStored))},
990 {
"Memory | LTM snapshots dropped",
new armarx::Variant(
static_cast<int>(asyncSnapshotsDropped))},
991 {
"Memory | LTM backpressure events",
new armarx::Variant(
static_cast<int>(asyncBackpressureEvents))},
992 {
"Memory | LTM avg storage [ms]",
new armarx::Variant(
static_cast<float>(asyncAvgStorageTimeMs))},
993 {
"Memory | LTM max storage [ms]",
new armarx::Variant(
static_cast<float>(asyncMaxStorageTimeMs))},
SpamFilterDataPtr deactivateSpam(SpamFilterDataPtr const &spamFilter, float deactivationDurationSec, const std::string &identifier, bool deactivate)
The Variant class is described here: Variants.
std::string coreSegmentName
std::string str(bool escapeDelimiters=true) const
Get a string representation of this memory ID.
std::string providerSegmentName
ProviderSegmentT & getProviderSegment(const std::string &name)
CoreSegmentT * findCoreSegment(const std::string &name)
void append(const OtherDerivedT &other)
Merge another memory into this one.
std::vector< UpdateResult > update(const Commit &commit, const bool addMissingCoreSegmentDuringUpdate=false, const bool checkMemoryName=true)
Store all updates in commit.
void all()
Get all snapshots from all entities in all segments.
QueryInput buildQueryInput() const
Indicates that a name in a given ID does not match a container's own name.
Indicates that a container did not have an entry under a given name.
server::wm::Memory * workingMemory
armem::structure::data::GetServerStructureResult getServerStructure()
void setMemoryListener(client::MemoryListenerInterfacePrx memoryListenerTopic)
dto::StartRecordResult startRecord(const dto::StartRecordInput &startRecordInput)
armem::CommitResult reloadFromLTMOnStartup()
Triggers a reload (.
armem::CommitResult reloadAllFromLTM()
Loads all core segments and their data from the LTM.
query::data::Result queryLTM(const armem::query::data::Input &input, bool storeIntoWM)
Query the LTMs of the memory server.
client::MemoryListenerInterfacePrx memoryListenerTopic
dto::StopRecordResult stopRecord()
query::data::Result query(const armem::query::data::Input &input)
armem::CommitResult reloadCoreSegmentsFromLTM(std::list< std::string > &coreSegmentname)
Only load specific core segments and their data from the LTM.
prediction::data::PredictionResultSeq predict(prediction::data::PredictionRequestSeq requests)
dto::DirectlyStoreResult directlyStore(const dto::DirectlyStoreInput &directlStoreInput)
void reportDebugMetrics()
Report all debug metrics to the debug observer.
armem::CommitResult reloadPropertyDefinedCoreSegmentsFromLTM()
MemoryToIceAdapter(server::wm::Memory *workingMemory=nullptr, server::ltm::Memory *longtermMemory=nullptr)
Construct an MemoryToIceAdapter from an existing Memory.
server::ltm::Memory * longtermMemory
data::AddSegmentResult addSegment(const data::AddSegmentInput &input, bool addCoreSegments=false)
dto::RecordStatusResult getRecordStatus()
data::CommitResult commitLocking(const data::Commit &commitIce, Time timeArrived)
prediction::data::EngineSupportMap getAvailableEngines()
data::AddSegmentsResult addSegments(const data::AddSegmentsInput &input, bool addCoreSegments=false)
data::CommitResult commit(const data::Commit &commitIce, Time timeArrived)
A memory storing data on the hard drive and in mongodb (needs 'armarx memory start' to start the mong...
DebugObserverInterfacePrx getDebugObserver() const
Get the current debug observer (may be nullptr)
ResultMemoryT process(const armem::query::data::Input &input, const MemoryT &memory) const
ResultMemoryT process(const armem::query::data::Input &input, const MemoryT &memory) const
ProviderSegment & addProviderSegment(const std::string &name, Args... args)
auto doLockedExclusive(FunctionT &&function)
Execute function under exclusive (write) lock.
std::vector< Base::UpdateResult > updateLocking(const Commit &commit)
Perform the commit, locking the core segments.
Client-side working memory.
The AronNotValidException class.
#define ARMARX_CHECK_NOT_NULL(ptr)
This macro evaluates whether ptr is not null and if it turns out to be false it will throw an Express...
#define ARMARX_INFO
The normal logging level.
#define ARMARX_IMPORTANT
The logging level for always important information, but expected behaviour (in contrast to ARMARX_WAR...
#define ARMARX_DEBUG
The logging level for output that is only interesting while debugging.
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
#define TIMING_START(name)
Helper macro to do timing tests.
#define TIMING_END_STREAM(name, os)
Prints duration.
query::Builder QueryBuilder
@ NoData
Just get the structure, but no ARON data.
DataMode boolToDataMode(bool withData)
void fromIce(const data::MemoryID &ice, MemoryID &id)
armarx::core::time::DateTime Time
Commit toCommit(const ContainerT &container)
void toIce(data::MemoryID &ice, const MemoryID &id)
void fromIce(const std::map< IceKeyT, IceValueT > &iceMap, boost::container::flat_map< CppKeyT, CppValueT > &cppMap)
void toIce(std::map< IceKeyT, IceValueT > &iceMap, const boost::container::flat_map< CppKeyT, CppValueT > &cppMap)
std::vector< EntityUpdateResult > results
A bundle of updates to be sent to the memory.
Result of an EntityUpdate.
An update of an entity for a specific point in time.
static QueryResult fromIce(const armem::query::data::Result &ice)
std::atomic< uint64_t > totalEntityUpdates
std::atomic< uint64_t > successfulUpdates
std::atomic< uint64_t > totalCommitCount