Entity.cpp
Go to the documentation of this file.
1// Header
2#include "Entity.h"
3#include <memory>
4
5// SImox
6#include <SimoxUtility/algorithm/string.h>
7
8// ArmarX
11
15
17
19{
20 Entity::Entity(const std::string& exportName,
21 const armem::MemoryID& memoryId /* UNESCAPED */,
22 const std::shared_ptr<Processors>& filters,
23 const std::shared_ptr<persistence::MemoryPersistenceStrategy>& persistenceStrategy) :
24 EntityBase(exportName, memoryId, filters), persistenceStrategy_(persistenceStrategy) {}
25
26 bool
27 Entity::_implForEachSnapshot(std::function<void(EntitySnapshot&)> func) const
28 {
30
31 ARMARX_DEBUG << "For each entity snapshot (entity id=" << id().cleanID().str() << ")";
32
33 // Some weird sideeffects are happening here. Prob. some references are mixed up somewhere
35 // persistenceStrategy_->setMemoryID(id());
36
37 for (auto& entitySnapshotKey : persistenceStrategy_->getContainerKeys(id()))
38 {
39 ARMARX_DEBUG << "Found entity snapshot=" << entitySnapshotKey;
40
41 std::shared_ptr<persistence::MemoryPersistenceStrategy> entitySnapshotPersistenceStrategy(persistenceStrategy_);
42
43 EntitySnapshot ltmEntitySnapshot(
45 id().withTimestamp(timeFromStringMicroSeconds(entitySnapshotKey)),
47 entitySnapshotPersistenceStrategy);
48
49
50 func(ltmEntitySnapshot);
51 }
52
53 return true;
54 }
55
56 bool
58 long last,
59 std::function<void(EntitySnapshot&)> func) const
60 {
62
63 // ARMARX_WARNING << "PLEASE NOTE THAT QUERYING THE LTM INDEX WISE MAY BE BUGGY BECAUSE THE FILESYSTEM ITERATOR IS UNSORTED!";
64
65 if (first < 0 or last < 0)
66 {
67 // We need to know what the size of the memory is... May be slow
68 unsigned long size = 0;
69 auto countFunc = [&](EntitySnapshot& ltmEntitySnapshot) { size++; };
70 _implForEachSnapshot(std::move(countFunc)); // Direct call to _impl, no locking!
71
74 }
75
76 long checked = 0;
77 auto runInIndexRangeFunc = [&](EntitySnapshot& ltmEntitySnapshot)
78 {
79 if (checked >= first && checked <= last)
80 {
81 func(ltmEntitySnapshot);
82 }
83 checked++;
84 };
85
86 return _implForEachSnapshot(std::move(runInIndexRangeFunc)); // Direct call to _impl!
87 }
88
89 bool
91 const Time& max,
92 std::function<void(EntitySnapshot&)> func) const
93 {
95
96 auto runInTimeRange = [&](EntitySnapshot& ltmEntitySnapshot)
97 {
98 Time ts = ltmEntitySnapshot.id().timestamp;
99 if (ts >= min && ts <= max)
100 {
101 func(ltmEntitySnapshot);
102 }
103 };
104
105 return _implForEachSnapshot(std::move(runInTimeRange));
106 }
107
108 bool
110 std::function<void(EntitySnapshot&)> func) const
111 {
113
114 auto runBeforeOrAtFunc = [&](EntitySnapshot& ltmEntitySnapshot)
115 {
116 Time ts = ltmEntitySnapshot.id().timestamp;
117 if (ts <= time)
118 {
119 func(ltmEntitySnapshot);
120 }
121 };
122
123 return _implForEachSnapshot(std::move(runBeforeOrAtFunc));
124 }
125
126 bool
127 Entity::_implForEachSnapshotBefore(const Time& time, std::function<void(EntitySnapshot&)> func) const
128 {
130
131 auto runBeforeFunc = [&](EntitySnapshot& ltmEntitySnapshot)
132 {
133 Time ts = ltmEntitySnapshot.id().timestamp;
134 if (ts < time)
135 {
136 func(ltmEntitySnapshot);
137 }
138 };
139
140 return _implForEachSnapshot(std::move(runBeforeFunc));
141 }
142
143 bool
144 Entity::_implForEachSnapshotBeforeReverse(const Time& time, std::function<bool(EntitySnapshot&)> func) const
145 {
147
148 // Get all snapshot keys and sort them in reverse order (newest first)
149 id().clearTimestamp();
150 auto keys = persistenceStrategy_->getContainerKeys(id());
151
152 // Parse and sort timestamps in descending order
153 std::vector<Time> timestamps;
154 timestamps.reserve(keys.size());
155 for (const auto& key : keys)
156 {
158 if (ts < time)
159 {
160 timestamps.push_back(ts);
161 }
162 }
163 std::sort(timestamps.begin(), timestamps.end(), std::greater<Time>());
164
165 // Iterate in reverse order (newest first), with early termination
166 for (const Time& ts : timestamps)
167 {
168 std::shared_ptr<persistence::MemoryPersistenceStrategy> entitySnapshotPersistenceStrategy(persistenceStrategy_);
169
170 EntitySnapshot ltmEntitySnapshot(
172 id().withTimestamp(ts),
174 entitySnapshotPersistenceStrategy);
175
176 if (!func(ltmEntitySnapshot))
177 {
178 break; // Early termination
179 }
180 }
181
182 return true;
183 }
184
185 bool
186 Entity::_implHasSnapshot(const Time& snapshotTime) const
187 {
189
190 // Just in case
191 id().clearTimestamp();
192
193 bool foundSnapshot = persistenceStrategy_->containsContainer(id(), std::to_string(snapshotTime.toMicroSecondsSinceEpoch()));
194
195 return foundSnapshot;
196 }
197
198 std::shared_ptr<EntitySnapshot>
199 Entity::_implFindSnapshot(const Time& snapshotTime) const
200 {
202
203 if (!_implHasSnapshot(snapshotTime)) // Call _impl version since we're inside _impl method
204 {
205 return nullptr;
206 }
207
208 // // Make a copy, otherwise we would also alter the entity id
209 MemoryID entitySnapshotId = id().withTimestamp(snapshotTime);
210
211 return std::make_shared<EntitySnapshot>(getExportName(), entitySnapshotId, processors, persistenceStrategy_);
212 }
213
214 std::shared_ptr<EntitySnapshot>
216 {
218
219 Time bestMatch = Time::Invalid();
220 auto runLatestFunc = [&](EntitySnapshot& ltmEntitySnapshot)
221 {
222 Time ts = ltmEntitySnapshot.id().timestamp;
223 if (ts > bestMatch)
224 {
225 bestMatch = ts;
226 }
227 };
228
229 _implForEachSnapshot(std::move(runLatestFunc)); // Call _impl version
230
231 if (bestMatch == Time::Invalid())
232 {
233 return nullptr;
234 }
235
236 // Just in case
237 id().clearTimestamp();
238
239 MemoryID entitySnapshotId = id().withTimestamp(bestMatch);
240
241 return std::make_shared<EntitySnapshot>(getExportName(),
242 entitySnapshotId,
244 persistenceStrategy_);
245 }
246
247 std::shared_ptr<EntitySnapshot>
249 {
251
252 Time bestMatch = Time::Invalid();
253 auto findLatestSnapshotBeforeFunc = [&](EntitySnapshot& e)
254 {
255 auto ts = e.id().timestamp;
256 if (ts < time && ts > bestMatch)
257 {
258 bestMatch = ts;
259 }
260 };
261
262 _implForEachSnapshot(std::move(findLatestSnapshotBeforeFunc)); // Call _impl version
263
264 if (bestMatch == Time::Invalid())
265 {
266 return nullptr;
267 }
268
269 // Just in case
270 id().clearTimestamp();
271
272 MemoryID entitySnapshotId = id().withTimestamp(bestMatch);
273
274 return std::make_shared<EntitySnapshot>(getExportName(),
275 entitySnapshotId,
277 persistenceStrategy_);
278 }
279
280 std::shared_ptr<EntitySnapshot>
282 {
284
285 Time bestMatch = Time::Invalid();
286 auto findLatestSnapshotBeforeOrAtFunc = [&](EntitySnapshot& e)
287 {
288 auto ts = e.id().timestamp;
289 if (ts <= time && ts > bestMatch)
290 {
291 bestMatch = ts;
292 }
293 };
294
295 _implForEachSnapshot(std::move(findLatestSnapshotBeforeOrAtFunc)); // Call _impl version
296
297 if (bestMatch == Time::Invalid())
298 {
299 return nullptr;
300 }
301
302
303 MemoryID entitySnapshotId = id().withTimestamp(bestMatch);
304
305 return std::make_shared<EntitySnapshot>(getExportName(),
306 entitySnapshotId,
308 persistenceStrategy_);
309 }
310
311 std::shared_ptr<EntitySnapshot>
313 {
315
316 Time bestMatch{Duration::MicroSeconds(std::numeric_limits<long>::max())};
317 auto findFirstSnapshotAfterFunc = [&](EntitySnapshot& e)
318 {
319 auto ts = e.id().timestamp;
320 if (ts > time && ts < bestMatch)
321 {
322 bestMatch = ts;
323 }
324 };
325
326 _implForEachSnapshot(std::move(findFirstSnapshotAfterFunc)); // Call _impl version
327
328 if (bestMatch == Time(Duration::MicroSeconds(std::numeric_limits<long>::max())))
329 {
330 return nullptr;
331 }
332
333
334 MemoryID entitySnapshotId = id().withTimestamp(bestMatch);
335
336 return std::make_shared<EntitySnapshot>(getExportName(),
337 id().withTimestamp(bestMatch),
339 persistenceStrategy_);
340 }
341
342 std::shared_ptr<EntitySnapshot>
344 {
346
347 Time bestMatch{Duration::MicroSeconds(std::numeric_limits<long>::max())};
348 auto findFirstSnapshotAfterOrAtFunc = [&](EntitySnapshot& e)
349 {
350 auto ts = e.id().timestamp;
351 if (ts >= time && ts < bestMatch)
352 {
353 bestMatch = ts;
354 }
355 };
356
357 _implForEachSnapshot(std::move(findFirstSnapshotAfterOrAtFunc)); // Call _impl version
358
359 if (bestMatch == Time(Duration::MicroSeconds(std::numeric_limits<long>::max())))
360 {
361 return nullptr;
362 }
363
364 MemoryID entitySnapshotId = id().withTimestamp(bestMatch);
365
366 return std::make_shared<EntitySnapshot>(getExportName(),
367 entitySnapshotId,
369 persistenceStrategy_);
370 }
371
372 void
374 {
376
377 wmEntity.id() = id().getEntityID().cleanID();
378
379 _implForEachSnapshot( // Call _impl version
380 [&wmEntity](auto& ltmEntitySnapshot)
381 {
382 if (!wmEntity.hasSnapshot(
383 ltmEntitySnapshot.id()
384 .timestamp)) // we only load the references if the snapshot is not existant
385 {
387 ltmEntitySnapshot.loadAllReferences(s);
388 wmEntity.addSnapshot(s);
389 }
390 });
391 }
392
393 void
395 {
397
398 wmEntity.id() = id().getEntityID().cleanID();
399 int count = 0;
400 ARMARX_DEBUG << "Entity::Load latestNReference";
401 ARMARX_DEBUG << "EntityID=" << id().cleanID().str();
402
403 //this is a little bit ugly, TODO: find an easier way to count the snapshots
404 _implForEachSnapshot([&count](auto& entitySnapshot) { count++; }); // Call _impl version
405
406 int current = 0;
407
408 ARMARX_DEBUG << "Counted =" << count;
409
410 _implForEachSnapshot( // Call _impl version
411 [&wmEntity, &n, &current, &count](auto& ltmEntitySnapshot)
412 {
413 if (!wmEntity.hasSnapshot(
414 ltmEntitySnapshot.id()
415 .timestamp)) // we only load the references if the snapshot is not existant
416 {
417 if (current >= (count - n))
418 {
419 armem::wm::EntitySnapshot wmEntitySnapshot;
420 ltmEntitySnapshot.loadAllReferences(wmEntitySnapshot);
421 ltmEntitySnapshot.resolve(wmEntitySnapshot);
422 wmEntity.addSnapshot(wmEntitySnapshot);
423 }
424 else
425 {
426 ARMARX_DEBUG << "Skipping snapshot with timestamp " << ltmEntitySnapshot.id().timestamp;
427 }
428 current++;
429 }
430 });
431 }
432
433 void
435 {
437
438 ARMARX_DEBUG << "Resolve entity id=" << id().cleanID().str();
439
440 wmEntity.forEachSnapshot(
441 [&](auto& wmEntitySnapshot)
442 {
443 // Just in case
444 id().clearTimestamp();
445
446 std::shared_ptr<persistence::MemoryPersistenceStrategy> entitySnapshotPersistenceStrategy(persistenceStrategy_);
447
448
449 EntitySnapshot ltmEntitySnapshot(getExportName(),
450 id().withTimestamp(wmEntitySnapshot.id().timestamp),
452 entitySnapshotPersistenceStrategy);
453 ltmEntitySnapshot.resolve(wmEntitySnapshot);
454 });
455 }
456
457 void
458 Entity::_store(const armem::wm::Entity& wmEntity, bool simulatedVersion)
459 {
461
462 if (id().entityName.empty())
463 {
465 << "During storage of segment '" << wmEntity.id().str()
466 << "' I noticed that the corresponding LTM has no id set. "
467 << "I set the id of the LTM to the same name, however this should not happen!";
468 id().entityName = wmEntity.id().entityName;
469 }
470
471 ARMARX_DEBUG << "Directly store entity";
472
473
474 /*if (!connected())
475 {
476 ARMARX_WARNING << "LTM ENTITY NOT CONNECTED ALTHOUGH ENABLED " << id().str();
477 return;
478 }*/
479
480 //writeForeignKeyToPreviousDocument();
481
482 wmEntity.forEachSnapshot(
483 [&](const auto& wmEntitySnapshot)
484 {
485 // Just in case
486 id().clearTimestamp();
487
488 std::shared_ptr<persistence::MemoryPersistenceStrategy> entitySnapshotPersistenceStrategy(persistenceStrategy_);
489
490 EntitySnapshot ltmEntitySnapshot(getExportName(),
491 id().withTimestamp(wmEntitySnapshot.id().timestamp),
493 entitySnapshotPersistenceStrategy);
494
495 // check if snapshot already exists
496 if (_implHasSnapshot(wmEntitySnapshot.id().timestamp)) // Call _impl version
497 {
498 ARMARX_DEBUG << "Ignoring to put an EntitiySnapshot into the LTM because "
499 "the timestamp already existed (we assume snapshots are "
500 "const and do not change outside the ltm).";
501 return;
502 }
503
504 // Note: Filter evaluation is now done in Memory::_preFilterMemory() before
505 // snapshots are enqueued for async storage. This avoids duplicate filter
506 // evaluation and issues with stateful filters (e.g., FrequencyFilter updating
507 // its lastTimesPerEntity state twice for the same snapshot).
508
509 ltmEntitySnapshot.store(wmEntitySnapshot);
510 statistics.recordedSnapshots++;
511 });
512 }
513} // namespace armarx::armem::server::ltm
MemoryID cleanID() const
Definition MemoryID.cpp:133
std::string str(bool escapeDelimiters=true) const
Get a string representation of this memory ID.
Definition MemoryID.cpp:102
std::string entityName
Definition MemoryID.h:53
MemoryID withTimestamp(Time time) const
Definition MemoryID.cpp:433
MemoryID getEntityID() const
Definition MemoryID.cpp:310
EntitySnapshotT & addSnapshot(const Time &timestamp)
Add a snapshot at the given time.
Definition EntityBase.h:673
bool forEachSnapshot(SnapshotFunctionT &&func)
Definition EntityBase.h:391
bool hasSnapshot(const Time &time) const
Indicate whether a snapshot at the given time exists.
Definition EntityBase.h:115
bool _implForEachSnapshotBeforeReverse(const Time &time, std::function< bool(EntitySnapshot &)> func) const override
Definition Entity.cpp:144
bool _implForEachSnapshotInTimeRange(const Time &min, const Time &max, std::function< void(EntitySnapshot &)> func) const override
Definition Entity.cpp:90
bool _implHasSnapshot(const Time &snapshotTime) const override
Definition Entity.cpp:186
bool _implForEachSnapshotInIndexRange(long first, long last, std::function< void(EntitySnapshot &)> func) const override
Definition Entity.cpp:57
bool _implForEachSnapshot(std::function< void(EntitySnapshot &)> func) const override
Definition Entity.cpp:27
bool _implForEachSnapshotBefore(const Time &time, std::function< void(EntitySnapshot &)> func) const override
Definition Entity.cpp:127
std::shared_ptr< EntitySnapshot > _implFindLatestSnapshot() const override
Definition Entity.cpp:215
bool _implForEachSnapshotBeforeOrAt(const Time &time, std::function< void(EntitySnapshot &)> func) const override
Definition Entity.cpp:109
void _store(const armem::wm::Entity &wmEntity, bool simulatedVersion) override
Definition Entity.cpp:458
std::shared_ptr< EntitySnapshot > _implFindFirstSnapshotAfterOrAt(const Time &time) const override
Definition Entity.cpp:343
void _resolve(armem::wm::Entity &wmEntity) override
Definition Entity.cpp:434
std::shared_ptr< EntitySnapshot > _implFindFirstSnapshotAfter(const Time &time) const override
Definition Entity.cpp:312
void _loadLatestNReferences(int n, armem::wm::Entity &wmEntity) override
Definition Entity.cpp:394
Entity(const std::string &exportName, const MemoryID &memoryId, const std::shared_ptr< Processors > &filters, const std::shared_ptr< persistence::MemoryPersistenceStrategy > &persistenceStrategy)
Definition Entity.cpp:20
void _loadAllReferences(armem::wm::Entity &wmEntity) override
Definition Entity.cpp:373
std::shared_ptr< EntitySnapshot > _implFindLatestSnapshotBefore(const Time &time) const override
Definition Entity.cpp:248
std::shared_ptr< EntitySnapshot > _implFindSnapshot(const Time &snapshotTime) const override
Definition Entity.cpp:199
std::shared_ptr< EntitySnapshot > _implFindLatestSnapshotBeforeOrAt(const Time &time) const override
Definition Entity.cpp:281
void resolve(armem::wm::EntitySnapshot &e) const
convert the references of the input into a wm::Memory
void store(const armem::wm::EntitySnapshot &e)
encode the content of a wm::Memory and store
virtual std::string getExportName() const
Definition MemoryItem.h:27
std::shared_ptr< Processors > processors
Definition MemoryItem.h:54
Client-side working memory entity snapshot.
Client-side working memory entity.
static DateTime Invalid()
Definition DateTime.cpp:57
std::int64_t toMicroSecondsSinceEpoch() const
Definition DateTime.cpp:87
static Duration MicroSeconds(std::int64_t microSeconds)
Constructs a duration in microseconds.
Definition Duration.cpp:24
#define ARMARX_DEBUG
The logging level for output that is only interesting while debugging.
Definition Logging.h:184
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
size_t negativeIndexSemantics(long index, size_t size)
armarx::core::time::DateTime Time
Time timeFromStringMicroSeconds(const std::string &microSeconds)
Get a Time from the microseconds as text.
Definition Time.cpp:50
std::vector< T > max(const std::vector< T > &v1, const std::vector< T > &v2)
std::vector< T > min(const std::vector< T > &v1, const std::vector< T > &v2)
#define ARMARX_TRACE
Definition trace.h:77