memory_definitions.h
Go to the documentation of this file.
1#pragma once
2
3#include <atomic>
4#include <chrono>
5#include <mutex>
6#include <shared_mutex>
7
16
18#include "detail/Prediction.h"
19
21{
22 /**
23 * @brief Statistics for CoreSegment lock operations.
24 *
25 * Thread-safe statistics for tracking read/write lock contention and timing.
26 */
28 {
29 // Read lock statistics
30 std::atomic<uint64_t> readLockCount{0};
31 std::atomic<uint64_t> totalReadLockWaitNs{0};
32 std::atomic<uint64_t> maxReadLockWaitNs{0};
33
34 // Write lock statistics
35 std::atomic<uint64_t> writeLockCount{0};
36 std::atomic<uint64_t> totalWriteLockWaitNs{0};
37 std::atomic<uint64_t> maxWriteLockWaitNs{0};
38
39 void reset()
40 {
41 readLockCount = 0;
47 }
48
49 double getAvgReadLockWaitMs() const
50 {
51 uint64_t count = readLockCount.load(std::memory_order_relaxed);
52 if (count == 0) return 0.0;
53 return totalReadLockWaitNs.load(std::memory_order_relaxed) / (count * 1e6);
54 }
55
56 double getAvgWriteLockWaitMs() const
57 {
58 uint64_t count = writeLockCount.load(std::memory_order_relaxed);
59 if (count == 0) return 0.0;
60 return totalWriteLockWaitNs.load(std::memory_order_relaxed) / (count * 1e6);
61 }
62
63 double getMaxReadLockWaitMs() const
64 {
65 return maxReadLockWaitNs.load(std::memory_order_relaxed) / 1e6;
66 }
67
68 double getMaxWriteLockWaitMs() const
69 {
70 return maxWriteLockWaitNs.load(std::memory_order_relaxed) / 1e6;
71 }
72 };
73
77
80
81 /// @see base::EntityBase
82 class Entity :
83 public base::EntityBase<EntitySnapshot, Entity>,
86 {
87 public:
89
90
91 /**
92 * @brief Sets the maximum history size.
93 *
94 * The current history is truncated if necessary.
95 */
96 void setMaxHistorySize(long maxSize);
97
98 UpdateResult update(const EntityUpdate& update);
99
100
101 protected:
102 /// If maximum size is set, ensure `history`'s is not higher.
103 std::vector<EntitySnapshotT> truncate();
104 };
105
106 /// @see base::ProviderSegmentBase
108 public base::ProviderSegmentBase<Entity, ProviderSegment>,
109 public detail::MaxHistorySizeParent<ProviderSegment>,
110 public armem::wm::detail::FindInstanceDataMixin<ProviderSegment>,
111 public armem::server::wm::detail::Prediction<ProviderSegment>
112 {
113 public:
115
116
117 using ProviderSegmentBase::addEntity;
118
119 template <class... Args>
120 Entity&
121 addEntity(const std::string& name, Args... args)
122 {
123 Entity& added = ProviderSegmentBase::addEntity(name, args...);
127 return added;
128 }
129 };
130
131 /// @brief base::CoreSegmentBase
133 public base::CoreSegmentBase<ProviderSegment, CoreSegment>,
134 public detail::MaxHistorySizeParent<CoreSegment>,
137 {
139
140 public:
142
143 /// @see base::CoreSegmentBase::addProviderSegment()
144 using CoreSegmentBase::addProviderSegment;
145
146 template <class... Args>
148 addProviderSegment(const std::string& name, Args... args)
149 {
150 ProviderSegmentT& added = CoreSegmentBase::addProviderSegment(name, args...);
151 int maxHistorySize = this->getMaxHistorySize();
152 if (maxHistorySize < 0)
153 {
154 ARMARX_INFO << "The maxHistorySize for this core segment is set to < 0. "
155 << "This means nothing will ever be forgotten in working memory. "
156 << "This may slow down the memory server. \n"
157 << "Core Segment Name: " << (this->id()).str();
158 }
159 added.setMaxHistorySize(maxHistorySize);
160 added.setUnlimitedHistoryWarningThreshold(this->getUnlimitedHistoryWarningThreshold());
161 added.setTruncateMaxBatchSize(this->getTruncateMaxBatchSize());
162 return added;
163 }
164
165 // Locking interface
166
167 /**
168 * @brief Execute function under shared (read) lock.
169 * Multiple readers can hold the lock simultaneously.
170 * Use this for read-only operations (queries, GUI, etc.)
171 * Tracks lock acquisition time for debugging.
172 */
173 template <class FunctionT>
174 auto
175 doLocked(FunctionT&& function) const
176 {
177 auto startTime = std::chrono::steady_clock::now();
178 std::shared_lock lock(_mutex);
179 auto lockAcquiredTime = std::chrono::steady_clock::now();
180
181 // Track lock wait time
182 uint64_t waitNs = std::chrono::duration_cast<std::chrono::nanoseconds>(
183 lockAcquiredTime - startTime).count();
184 _lockStats.readLockCount.fetch_add(1, std::memory_order_relaxed);
185 _lockStats.totalReadLockWaitNs.fetch_add(waitNs, std::memory_order_relaxed);
186
187 // Update max wait time (using compare-exchange)
188 uint64_t currentMax = _lockStats.maxReadLockWaitNs.load(std::memory_order_relaxed);
189 while (waitNs > currentMax)
190 {
191 if (_lockStats.maxReadLockWaitNs.compare_exchange_weak(currentMax, waitNs, std::memory_order_relaxed))
192 {
193 break;
194 }
195 }
196
197 return function();
198 }
199
200 /**
201 * @brief Execute function under exclusive (write) lock.
202 * Only one writer can hold the lock, and no readers.
203 * Use this for write operations (update, add, clear, etc.)
204 * Tracks lock acquisition time for debugging.
205 */
206 template <class FunctionT>
207 auto
208 doLockedExclusive(FunctionT&& function)
209 {
210 auto startTime = std::chrono::steady_clock::now();
211 std::unique_lock lock(_mutex);
212 auto lockAcquiredTime = std::chrono::steady_clock::now();
213
214 // Track lock wait time
215 uint64_t waitNs = std::chrono::duration_cast<std::chrono::nanoseconds>(
216 lockAcquiredTime - startTime).count();
217 _lockStats.writeLockCount.fetch_add(1, std::memory_order_relaxed);
218 _lockStats.totalWriteLockWaitNs.fetch_add(waitNs, std::memory_order_relaxed);
219
220 // Update max wait time (using compare-exchange)
221 uint64_t currentMax = _lockStats.maxWriteLockWaitNs.load(std::memory_order_relaxed);
222 while (waitNs > currentMax)
223 {
224 if (_lockStats.maxWriteLockWaitNs.compare_exchange_weak(currentMax, waitNs, std::memory_order_relaxed))
225 {
226 break;
227 }
228 }
229
230 return function();
231 }
232
233 /// Get lock statistics for this core segment
234 const CoreSegmentLockStatistics& getLockStatistics() const { return _lockStats; }
235
236 /// Reset lock statistics
237 void resetLockStatistics() { _lockStats.reset(); }
238
239
240 private:
241 mutable std::shared_mutex _mutex;
242 mutable CoreSegmentLockStatistics _lockStats;
243 };
244
245 /// @see base::MemoryBase
246 class Memory :
247 public base::MemoryBase<CoreSegment, Memory>,
250 {
252
253 public:
254 using Base::MemoryBase;
255
256
257 /**
258 * @brief Perform the commit, locking the core segments.
259 *
260 * Groups the commits by core segment, and updates each core segment
261 * in a batch, locking the core segment.
262 */
263 std::vector<Base::UpdateResult> updateLocking(const Commit& commit);
264
265 /**
266 * @brief Update the memory, locking the updated core segment.
267 */
268 Base::UpdateResult updateLocking(const EntityUpdate& update);
269
270 /// @see base::MemoryBase::addCoreSegment()
271 using MemoryBase::addCoreSegment;
272
273 template <class... Args>
275 addCoreSegment(const std::string& name, Args... args)
276 {
277 CoreSegmentT& added = MemoryBase::addCoreSegment(name, args...);
278 return added;
279 }
280 };
281
282} // namespace armarx::armem::server::wm
std::string str(const T &t)
Data of a core segment containing multiple provider segments.
An entity over a period of time.
Definition EntityBase.h:49
Data of a memory consisting of multiple core segments.
Definition MemoryBase.h:30
std::vector< UpdateResult > update(const Commit &commit, const bool addMissingCoreSegmentDuringUpdate=false, const bool checkMemoryName=true)
Definition MemoryBase.h:310
Data of a provider segment containing multiple entities.
const CoreSegmentLockStatistics & getLockStatistics() const
Get lock statistics for this core segment.
ProviderSegment & addProviderSegment(const std::string &name, Args... args)
auto doLocked(FunctionT &&function) const
Execute function under shared (read) lock.
void resetLockStatistics()
Reset lock statistics.
auto doLockedExclusive(FunctionT &&function)
Execute function under exclusive (write) lock.
void setMaxHistorySize(long maxSize)
Sets the maximum history size.
std::vector< EntitySnapshotT > truncate()
If maximum size is set, ensure history's is not higher.
UpdateResult update(const EntityUpdate &update)
std::vector< Base::UpdateResult > updateLocking(const Commit &commit)
Perform the commit, locking the core segments.
CoreSegment & addCoreSegment(const std::string &name, Args... args)
Entity & addEntity(const std::string &name, Args... args)
void setTruncateMaxBatchSize(size_t batchSize)
Set the maximum number of snapshots to remove per truncate() call.
void setUnlimitedHistoryWarningThreshold(size_t threshold)
Set the threshold for warning about unlimited history growth.
Can do predictions itself and has children it could delegate predictions to.
Definition Prediction.h:158
Can do predictions, but has no children it could delegate predictions to.
Definition Prediction.h:52
Client-side working entity instance.
Client-side working memory entity snapshot.
#define ARMARX_INFO
The normal logging level.
Definition Logging.h:181
armem::wm::EntitySnapshot EntitySnapshot
base::EntityInstanceMetadata EntityInstanceMetadata
armem::wm::EntityInstance EntityInstance
armarx::aron::data::Dict EntityInstanceData
armarx::aron::data::DictPtr EntityInstanceDataPtr
std::shared_ptr< Dict > DictPtr
Definition Dict.h:42
A bundle of updates to be sent to the memory.
Definition Commit.h:90
An update of an entity for a specific point in time.
Definition Commit.h:26
Statistics for CoreSegment lock operations.