memory_definitions.cpp
Go to the documentation of this file.
2
3#include <limits>
4#include <map>
5#include <vector>
6
9
12
13#include "error.h"
14
16{
17
18 void
20 {
21 std::string containerID = (this->id()).str();
22 MaxHistorySize::setMaxHistorySize(maxSize, containerID);
23 truncate();
24 }
25
26 auto
27 Entity::update(const EntityUpdate& update) -> UpdateResult
28 {
29 UpdateResult result = EntityBase::update(update);
30 result.removedSnapshots = this->truncate();
31 return result;
32 }
33
34 std::vector<EntitySnapshot>
36 {
37 std::vector<EntitySnapshot> removedElements;
38 if (_maxHistorySize >= 0)
39 {
40 // Limit removals per call to avoid long blocking
41 // _truncateMaxBatchSize == 0 means unlimited
42 const size_t maxRemovals = (_truncateMaxBatchSize > 0)
44 : std::numeric_limits<size_t>::max();
45 size_t removed = 0;
46
47 while (this->_container.size() > size_t(_maxHistorySize) && removed < maxRemovals)
48 {
49 removedElements.push_back(std::move(this->_container.begin()->second));
50 this->_container.erase(this->_container.begin());
51 ++removed;
52 }
53 }
56 {
57 // Unlimited history mode - warn if growing too large
59 << "Entity '" << this->id().str() << "' has "
60 << this->_container.size()
61 << " snapshots with unlimited history (maxHistorySize=-1). "
62 << "Consider setting a limit to prevent memory exhaustion.";
63 }
64 return removedElements;
65 }
66
67 // TODO: add core segment if param is set
68 std::vector<Memory::Base::UpdateResult>
70 {
71 // Group updates by core segment, then update each core segment in a batch to only lock it once.
72 std::map<std::string, std::vector<const EntityUpdate*>> updatesPerCoreSegment;
73 for (const EntityUpdate& update : commit.updates)
74 {
75 updatesPerCoreSegment[update.entityID.coreSegmentName].push_back(&update);
76 }
77
78 std::vector<Memory::Base::UpdateResult> result;
79 // To throw an exception after the commit if a core segment is missing and the memory should not create new ones
80 std::vector<std::string> missingCoreSegmentNames;
81 for (const auto& [coreSegmentName, updates] : updatesPerCoreSegment)
82 {
83 auto it = this->_container.find(coreSegmentName);
84 if (it != this->_container.end())
85 {
86 CoreSegment& coreSegment = it->second;
87
88 // Lock the core segment for the whole batch (exclusive lock for writes).
89 coreSegment.doLockedExclusive(
90 [&result, &coreSegment, updates = &updates]()
91 {
92 for (const EntityUpdate* update : *updates)
93 {
94 auto r = coreSegment.update(*update);
95 Base::UpdateResult ret{r};
96 ret.memoryUpdateType = UpdateType::UpdatedExisting;
97 result.push_back(ret);
98 }
99 });
100 }
101 else
102 {
103 // Perform the other updates first, then throw afterwards.
104 missingCoreSegmentNames.push_back(coreSegmentName);
105 }
106 }
107 // Throw an exception if something went wrong.
108 if (not missingCoreSegmentNames.empty())
109 {
110 // Just throw an exception for the first entry. We can extend this exception in the future.
111 throw armem::error::MissingEntry::create<CoreSegment>(missingCoreSegmentNames.front(),
112 *this);
113 }
114 return result;
115 }
116
117 // TODO: Add core segment if param is set
118 Memory::Base::UpdateResult
120 {
121 this->_checkContainerName(update.entityID.memoryName, this->name());
122
123 CoreSegment& segment = getCoreSegment(update.entityID.coreSegmentName);
124 Base::UpdateResult result;
125 segment.doLockedExclusive([&result, &segment, &update]() { result = segment.update(update); });
126 result.memoryUpdateType = UpdateType::UpdatedExisting;
127 return result;
128 }
129} // namespace armarx::armem::server::wm
SpamFilterDataPtr deactivateSpam(SpamFilterDataPtr const &spamFilter, float deactivationDurationSec, const std::string &identifier, bool deactivate)
Definition Logging.cpp:75
std::string str(const T &t)
std::string str(bool escapeDelimiters=true) const
Get a string representation of this memory ID.
Definition MemoryID.cpp:102
std::string memoryName
Definition MemoryID.h:50
UpdateResult update(const EntityUpdate &update)
Updates an entity's history.
std::vector< UpdateResult > update(const Commit &commit, const bool addMissingCoreSegmentDuringUpdate=false, const bool checkMemoryName=true)
Definition MemoryBase.h:310
CoreSegmentT & getCoreSegment(const std::string &name)
Definition MemoryBase.h:134
void _checkContainerName(const std::string &gottenName, const std::string &actualName, bool emptyOk=true) const
static MissingEntry create(const std::string &missingKey, const ContainerT &container)
Definition ArMemError.h:79
auto doLockedExclusive(FunctionT &&function)
Execute function under exclusive (write) lock.
void setMaxHistorySize(long maxSize)
Sets the maximum history size.
std::vector< EntitySnapshotT > truncate()
If maximum size is set, ensure history's is not higher.
UpdateResult update(const EntityUpdate &update)
std::vector< Base::UpdateResult > updateLocking(const Commit &commit)
Perform the commit, locking the core segments.
long _maxHistorySize
Maximum size of entity histories.
size_t _truncateMaxBatchSize
Maximum number of snapshots to remove per truncate() call.
size_t _unlimitedHistoryWarningThreshold
Threshold for warning about unlimited history growth.
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
A bundle of updates to be sent to the memory.
Definition Commit.h:90
std::vector< EntityUpdate > updates
The entity updates.
Definition Commit.h:97
An update of an entity for a specific point in time.
Definition Commit.h:26
MemoryID entityID
The entity's ID.
Definition Commit.h:28