Entity.cpp
Go to the documentation of this file.
1 // Header
2 #include "Entity.h"
3 #include <memory>
4 
5 // SImox
6 #include <SimoxUtility/algorithm/string.h>
7 
8 // ArmarX
11 
15 
17 
19 {
20  Entity::Entity(const std::string& exportName,
21  const armem::MemoryID& memoryId /* UNESCAPED */,
22  const std::shared_ptr<Processors>& filters,
23  const std::shared_ptr<persistence::MemoryPersistenceStrategy>& persistenceStrategy) :
24  EntityBase(exportName, memoryId, filters), persistenceStrategy_(persistenceStrategy) {}
25 
26  bool
27  Entity::forEachSnapshot(std::function<void(EntitySnapshot&)> func) const
28  {
29  std::lock_guard l(ltm_mutex);
30 
32 
33  ARMARX_DEBUG << "For each entity snapshot (entity id=" << id().cleanID().str() << ")";
34 
35  // Some weird sideeffects are happening here. Prob. some references are mixed up somewhere
36  id().clearTimestamp();
37  // persistenceStrategy_->setMemoryID(id());
38 
39  for (auto& entitySnapshotKey : persistenceStrategy_->getContainerKeys(id()))
40  {
41  ARMARX_DEBUG << "Found entity snapshot=" << entitySnapshotKey;
42 
43  std::shared_ptr<persistence::MemoryPersistenceStrategy> entitySnapshotPersistenceStrategy(persistenceStrategy_);
44 
45  EntitySnapshot ltmEntitySnapshot(
46  getExportName(),
47  id().withTimestamp(timeFromStringMicroSeconds(entitySnapshotKey)),
48  processors,
49  entitySnapshotPersistenceStrategy);
50 
51 
52  func(ltmEntitySnapshot);
53  }
54 
55  return true;
56  }
57 
58  bool
60  long last,
61  std::function<void(EntitySnapshot&)> func) const
62  {
63  std::lock_guard l(ltm_mutex);
64 
66 
67  // ARMARX_WARNING << "PLEASE NOTE THAT QUERYING THE LTM INDEX WISE MAY BE BUGGY BECAUSE THE FILESYSTEM ITERATOR IS UNSORTED!";
68 
69  if (first < 0 or last < 0)
70  {
71  // We need to know what the size of the memory is... May be slow
72  unsigned long size = 0;
73  auto countFunc = [&](EntitySnapshot& ltmEntitySnapshot) { size++; };
74  forEachSnapshot(std::move(countFunc));
75 
78  }
79 
80  long checked = 0;
81  auto runInIndexRangeFunc = [&](EntitySnapshot& ltmEntitySnapshot)
82  {
83  if (checked >= first && checked <= last)
84  {
85  func(ltmEntitySnapshot);
86  }
87  checked++;
88  };
89 
90  return forEachSnapshot(std::move(runInIndexRangeFunc));
91  }
92 
93  bool
95  const Time& max,
96  std::function<void(EntitySnapshot&)> func) const
97  {
98  std::lock_guard l(ltm_mutex);
99 
100  ARMARX_TRACE;
101 
102  auto runInTimeRange = [&](EntitySnapshot& ltmEntitySnapshot)
103  {
104  Time ts = ltmEntitySnapshot.id().timestamp;
105  if (ts >= min && ts <= max)
106  {
107  func(ltmEntitySnapshot);
108  }
109  };
110 
111  return forEachSnapshot(std::move(runInTimeRange));
112  }
113 
114  bool
116  std::function<void(EntitySnapshot&)> func) const
117  {
118  std::lock_guard l(ltm_mutex);
119 
120  ARMARX_TRACE;
121 
122  auto runBeforeOrAtFunc = [&](EntitySnapshot& ltmEntitySnapshot)
123  {
124  Time ts = ltmEntitySnapshot.id().timestamp;
125  if (ts <= time)
126  {
127  func(ltmEntitySnapshot);
128  }
129  };
130 
131  return forEachSnapshot(std::move(runBeforeOrAtFunc));
132  }
133 
134  bool
135  Entity::forEachSnapshotBefore(const Time& time, std::function<void(EntitySnapshot&)> func) const
136  {
137  std::lock_guard l(ltm_mutex);
138 
139  ARMARX_TRACE;
140 
141  auto runBeforeFunc = [&](EntitySnapshot& ltmEntitySnapshot)
142  {
143  Time ts = ltmEntitySnapshot.id().timestamp;
144  if (ts < time)
145  {
146  func(ltmEntitySnapshot);
147  }
148  };
149 
150  return forEachSnapshot(std::move(runBeforeFunc));
151  }
152 
153  bool
154  Entity::hasSnapshot(const Time& snapshotTime) const
155  {
156  std::lock_guard l(ltm_mutex);
157 
158  ARMARX_TRACE;
159 
160  // Just in case
161  id().clearTimestamp();
162 
163  bool foundSnapshot = persistenceStrategy_->containsContainer(id(), std::to_string(snapshotTime.toMicroSecondsSinceEpoch()));
164 
165  return foundSnapshot;
166  }
167 
168  std::shared_ptr<EntitySnapshot>
169  Entity::findSnapshot(const Time& snapshotTime) const
170  {
171  std::lock_guard l(ltm_mutex);
172 
173  ARMARX_TRACE;
174 
175  if (!hasSnapshot(snapshotTime))
176  {
177  return nullptr;
178  }
179 
180  // // Make a copy, otherwise we would also alter the entity id
181  MemoryID entitySnapshotId = id().withTimestamp(snapshotTime);
182 
183  return std::make_shared<EntitySnapshot>(getExportName(), entitySnapshotId, processors, persistenceStrategy_);
184  }
185 
186  std::shared_ptr<EntitySnapshot>
188  {
189  std::lock_guard l(ltm_mutex);
190 
191  ARMARX_TRACE;
192 
193  Time bestMatch = Time::Invalid();
194  auto runLatestFunc = [&](EntitySnapshot& ltmEntitySnapshot)
195  {
196  Time ts = ltmEntitySnapshot.id().timestamp;
197  if (ts > bestMatch)
198  {
199  bestMatch = ts;
200  }
201  };
202 
203  forEachSnapshot(std::move(runLatestFunc));
204 
205  if (bestMatch == Time::Invalid())
206  {
207  return nullptr;
208  }
209 
210  // Just in case
211  id().clearTimestamp();
212 
213  MemoryID entitySnapshotId = id().withTimestamp(bestMatch);
214 
215  return std::make_shared<EntitySnapshot>(getExportName(),
216  entitySnapshotId,
217  processors,
218  persistenceStrategy_);
219  }
220 
221  std::shared_ptr<EntitySnapshot>
223  {
224  std::lock_guard l(ltm_mutex);
225 
226  ARMARX_TRACE;
227 
228  Time bestMatch = Time::Invalid();
229  auto findLatestSnapshotBeforeFunc = [&](EntitySnapshot& e)
230  {
231  auto ts = e.id().timestamp;
232  if (ts < time && ts > bestMatch)
233  {
234  bestMatch = ts;
235  }
236  };
237 
238  forEachSnapshot(std::move(findLatestSnapshotBeforeFunc));
239 
240  if (bestMatch == Time::Invalid())
241  {
242  return nullptr;
243  }
244 
245  // Just in case
246  id().clearTimestamp();
247 
248  MemoryID entitySnapshotId = id().withTimestamp(bestMatch);
249 
250  return std::make_shared<EntitySnapshot>(getExportName(),
251  entitySnapshotId,
252  processors,
253  persistenceStrategy_);
254  }
255 
256  std::shared_ptr<EntitySnapshot>
258  {
259  std::lock_guard l(ltm_mutex);
260 
261  ARMARX_TRACE;
262 
263  Time bestMatch = Time::Invalid();
264  auto findLatestSnapshotBeforeOrAtFunc = [&](EntitySnapshot& e)
265  {
266  auto ts = e.id().timestamp;
267  if (ts <= time && ts > bestMatch)
268  {
269  bestMatch = ts;
270  }
271  };
272 
273  forEachSnapshot(std::move(findLatestSnapshotBeforeOrAtFunc));
274 
275  if (bestMatch == Time::Invalid())
276  {
277  return nullptr;
278  }
279 
280 
281  MemoryID entitySnapshotId = id().withTimestamp(bestMatch);
282 
283  return std::make_shared<EntitySnapshot>(getExportName(),
284  entitySnapshotId,
285  processors,
286  persistenceStrategy_);
287  }
288 
289  std::shared_ptr<EntitySnapshot>
291  {
292  std::lock_guard l(ltm_mutex);
293 
294  ARMARX_TRACE;
295 
297  auto findFirstSnapshotAfterFunc = [&](EntitySnapshot& e)
298  {
299  auto ts = e.id().timestamp;
300  if (ts > time && ts < bestMatch)
301  {
302  bestMatch = ts;
303  }
304  };
305 
306  forEachSnapshot(std::move(findFirstSnapshotAfterFunc));
307 
309  {
310  return nullptr;
311  }
312 
313 
314  MemoryID entitySnapshotId = id().withTimestamp(bestMatch);
315 
316  return std::make_shared<EntitySnapshot>(getExportName(),
317  id().withTimestamp(bestMatch),
318  processors,
319  persistenceStrategy_);
320  }
321 
322  std::shared_ptr<EntitySnapshot>
324  {
325  std::lock_guard l(ltm_mutex);
326 
327  ARMARX_TRACE;
328 
330  auto findFirstSnapshotAfterOrAtFunc = [&](EntitySnapshot& e)
331  {
332  auto ts = e.id().timestamp;
333  if (ts >= time && ts < bestMatch)
334  {
335  bestMatch = ts;
336  }
337  };
338 
339  forEachSnapshot(std::move(findFirstSnapshotAfterOrAtFunc));
340 
342  {
343  return nullptr;
344  }
345 
346  MemoryID entitySnapshotId = id().withTimestamp(bestMatch);
347 
348  return std::make_shared<EntitySnapshot>(getExportName(),
349  entitySnapshotId,
350  processors,
351  persistenceStrategy_);
352  }
353 
354  void
356  {
357  std::lock_guard l(ltm_mutex);
358 
359  ARMARX_TRACE;
360 
361  wmEntity.id() = id().getEntityID().cleanID();
362 
364  [&wmEntity](auto& ltmEntitySnapshot)
365  {
366  if (!wmEntity.hasSnapshot(
367  ltmEntitySnapshot.id()
368  .timestamp)) // we only load the references if the snapshot is not existant
369  {
370  armem::wm::EntitySnapshot s;
371  ltmEntitySnapshot.loadAllReferences(s);
372  wmEntity.addSnapshot(s);
373  }
374  });
375  }
376 
377  void
379  {
380  std::lock_guard l(ltm_mutex);
381 
382  ARMARX_TRACE;
383 
384  wmEntity.id() = id().getEntityID().cleanID();
385  int count = 0;
386  ARMARX_DEBUG << "Entity::Load latestNReference";
387  ARMARX_DEBUG << "EntityID=" << id().cleanID().str();
388 
389  //this is a little bit ugly, TODO: find an easier way to count the snapshots
390  forEachSnapshot([&count](auto& entitySnapshot) { count++; });
391 
392  int current = 0;
393 
394  ARMARX_DEBUG << "Counted =" << count;
395 
397  [&wmEntity, &n, &current, &count](auto& ltmEntitySnapshot)
398  {
399  if (!wmEntity.hasSnapshot(
400  ltmEntitySnapshot.id()
401  .timestamp)) // we only load the references if the snapshot is not existant
402  {
403  if (current >= (count - n))
404  {
405  armem::wm::EntitySnapshot wmEntitySnapshot;
406  ltmEntitySnapshot.loadAllReferences(wmEntitySnapshot);
407  ltmEntitySnapshot.resolve(wmEntitySnapshot);
408  wmEntity.addSnapshot(wmEntitySnapshot);
409  }
410  else
411  {
412  ARMARX_DEBUG << "Skipping snapshot with timestamp " << ltmEntitySnapshot.id().timestamp;
413  }
414  current++;
415  }
416  });
417  }
418 
419  void
420  Entity::_resolve(armem::wm::Entity& wmEntity)
421  {
422  std::lock_guard l(ltm_mutex);
423 
424  ARMARX_TRACE;
425 
426  ARMARX_DEBUG << "Resolve entity id=" << id().cleanID().str();
427 
428  wmEntity.forEachSnapshot(
429  [&](auto& wmEntitySnapshot)
430  {
431  // Just in case
432  id().clearTimestamp();
433 
434  std::shared_ptr<persistence::MemoryPersistenceStrategy> entitySnapshotPersistenceStrategy(persistenceStrategy_);
435 
436 
437  EntitySnapshot ltmEntitySnapshot(getExportName(),
438  id().withTimestamp(wmEntitySnapshot.id().timestamp),
439  processors,
440  entitySnapshotPersistenceStrategy);
441  ltmEntitySnapshot.resolve(wmEntitySnapshot);
442  });
443  }
444 
445  void
446  Entity::_store(const armem::wm::Entity& wmEntity, bool simulatedVersion)
447  {
448  std::lock_guard l(ltm_mutex);
449 
450  ARMARX_TRACE;
451 
452  if (id().entityName.empty())
453  {
455  << "During storage of segment '" << wmEntity.id().str()
456  << "' I noticed that the corresponding LTM has no id set. "
457  << "I set the id of the LTM to the same name, however this should not happen!";
458  id().entityName = wmEntity.id().entityName;
459  }
460 
461  ARMARX_DEBUG << "Directly store entity";
462 
463 
464  /*if (!connected())
465  {
466  ARMARX_WARNING << "LTM ENTITY NOT CONNECTED ALTHOUGH ENABLED " << id().str();
467  return;
468  }*/
469 
470  //writeForeignKeyToPreviousDocument();
471 
472  wmEntity.forEachSnapshot(
473  [&](const auto& wmEntitySnapshot)
474  {
475  // Just in case
476  id().clearTimestamp();
477 
478  std::shared_ptr<persistence::MemoryPersistenceStrategy> entitySnapshotPersistenceStrategy(persistenceStrategy_);
479 
480  EntitySnapshot ltmEntitySnapshot(getExportName(),
481  id().withTimestamp(wmEntitySnapshot.id().timestamp),
482  processors,
483  entitySnapshotPersistenceStrategy);
484 
485  // check if snapshot already exists
486  if (hasSnapshot(wmEntitySnapshot.id().timestamp))
487  {
488  ARMARX_DEBUG << "Ignoring to put an EntitiySnapshot into the LTM because "
489  "the timestamp already existed (we assume snapshots are "
490  "const and do not change outside the ltm).";
491  return;
492  }
493 
494  for (auto& filters : processors->snapFilters)
495  {
496  bool accepted = filters->accept(wmEntitySnapshot, simulatedVersion);
497 
498  if (!accepted)
499  {
500  ARMARX_DEBUG << "Ignoring to put an EntitySnapshot into the LTM because it "
501  "got filtered.";
502  return;
503  }
504  else
505  {
506  //ARMARX_INFO << "Storing EntitySnapshot";
507  }
508  }
509 
510  ltmEntitySnapshot.store(wmEntitySnapshot);
511  statistics.recordedSnapshots++;
512  });
513  }
514 } // namespace armarx::armem::server::ltm
armarx::armem::server::ltm::Entity::forEachSnapshotBefore
bool forEachSnapshotBefore(const Time &time, std::function< void(EntitySnapshot &)> func) const override
Definition: Entity.cpp:135
armarx::armem::server::ltm::detail::MemoryItem::processors
std::shared_ptr< Processors > processors
Definition: MemoryItem.h:54
armarx::armem::server::ltm::Entity::findLatestSnapshotBefore
std::shared_ptr< EntitySnapshot > findLatestSnapshotBefore(const Time &time) const override
Definition: Entity.cpp:222
Entity.h
MemoryPersistenceStrategy.h
armarx::armem::server::ltm::Entity::findFirstSnapshotAfterOrAt
std::shared_ptr< EntitySnapshot > findFirstSnapshotAfterOrAt(const Time &time) const override
Definition: Entity.cpp:323
armarx::armem::MemoryID::str
std::string str(bool escapeDelimiters=true) const
Get a string representation of this memory ID.
Definition: MemoryID.cpp:102
armarx::armem::timeFromStringMicroSeconds
Time timeFromStringMicroSeconds(const std::string &microSeconds)
Get a Time from the microseconds as text.
Definition: Time.cpp:50
armarx::max
std::vector< T > max(const std::vector< T > &v1, const std::vector< T > &v2)
Definition: VectorHelpers.h:297
armarx::armem::server::ltm::Entity::findFirstSnapshotAfter
std::shared_ptr< EntitySnapshot > findFirstSnapshotAfter(const Time &time) const override
Definition: Entity.cpp:290
armarx::armem::server::ltm::Entity::forEachSnapshot
bool forEachSnapshot(std::function< void(EntitySnapshot &)> func) const override
iterate over all entity snapshots of this ltm
Definition: Entity.cpp:27
magic_enum::detail::n
constexpr auto n() noexcept
Definition: magic_enum.hpp:418
armarx::armem::server::ltm::Entity::forEachSnapshotInTimeRange
bool forEachSnapshotInTimeRange(const Time &min, const Time &max, std::function< void(EntitySnapshot &)> func) const override
Definition: Entity.cpp:94
memory_definitions.h
ARMARX_TRACE
#define ARMARX_TRACE
Definition: trace.h:77
armarx::armem::base::detail::negativeIndexSemantics
size_t negativeIndexSemantics(long index, size_t size)
Definition: negative_index_semantics.cpp:6
armarx::armem::MemoryID
A memory ID.
Definition: MemoryID.h:47
ARMARX_DEBUG
#define ARMARX_DEBUG
Definition: Logging.h:184
armarx::armem::server::ltm::Entity::forEachSnapshotBeforeOrAt
bool forEachSnapshotBeforeOrAt(const Time &time, std::function< void(EntitySnapshot &)> func) const override
Definition: Entity.cpp:115
armarx::armem::server::ltm::Entity::_loadLatestNReferences
void _loadLatestNReferences(int n, armem::wm::Entity &wmEntity) override
Definition: Entity.cpp:378
max
T max(T t1, T t2)
Definition: gdiam.h:51
armarx::armem::server::ltm::detail::MemoryItem::getExportName
virtual std::string getExportName() const
Definition: MemoryItem.h:27
armarx::armem::server::ltm::detail::EntitySnapshotBase::resolve
void resolve(armem::wm::EntitySnapshot &e) const
convert the references of the input into a wm::Memory
Definition: EntitySnapshotBase.h:38
armarx::armem::server::ltm::detail::EntityBase< EntitySnapshot >::ltm_mutex
std::recursive_mutex ltm_mutex
Definition: EntityBase.h:114
armarx::core::time::DateTime::toMicroSecondsSinceEpoch
std::int64_t toMicroSecondsSinceEpoch() const
Definition: DateTime.cpp:87
armarx::armem::base::detail::MemoryItem::id
MemoryID & id()
Definition: MemoryItem.h:25
negative_index_semantics.h
armarx::armem::Time
armarx::core::time::DateTime Time
Definition: forward_declarations.h:13
armarx::to_string
const std::string & to_string(const std::string &s)
Definition: StringHelpers.h:41
armarx::armem::server::ltm::Entity::Entity
Entity(const std::string &exportName, const MemoryID &memoryId, const std::shared_ptr< Processors > &filters, const std::shared_ptr< persistence::MemoryPersistenceStrategy > &persistenceStrategy)
Definition: Entity.cpp:20
armarx::armem::MemoryID::entityName
std::string entityName
Definition: MemoryID.h:53
armarx::armem::server::ltm
Definition: forward_declarations.h:20
armarx::core::time::DateTime
Represents a point in time.
Definition: DateTime.h:24
armarx::armem::MemoryID::clearTimestamp
void clearTimestamp()
Definition: MemoryID.h:133
armarx::armem::MemoryID::getEntityID
MemoryID getEntityID() const
Definition: MemoryID.cpp:310
armarx::armem::server::ltm::detail::MemoryItem::id
MemoryID id() const
Definition: MemoryItem.cpp:37
armarx::armem::server::ltm::Entity::forEachSnapshotInIndexRange
bool forEachSnapshotInIndexRange(long first, long last, std::function< void(EntitySnapshot &)> func) const override
Definition: Entity.cpp:59
armarx::armem::MemoryID::cleanID
MemoryID cleanID() const
Definition: MemoryID.cpp:133
TimeUtil.h
armarx::armem::base::EntityBase::hasSnapshot
bool hasSnapshot(const Time &time) const
Indicate whether a snapshot at the given time exists.
Definition: EntityBase.h:114
FrequencyFilter.h
armarx::min
std::vector< T > min(const std::vector< T > &v1, const std::vector< T > &v2)
Definition: VectorHelpers.h:327
armarx::armem::server::ltm::Entity::findLatestSnapshot
std::shared_ptr< EntitySnapshot > findLatestSnapshot() const override
Definition: Entity.cpp:187
armarx::armem::MemoryID::withTimestamp
MemoryID withTimestamp(Time time) const
Definition: MemoryID.cpp:433
armarx::armem::server::ltm::Entity::findLatestSnapshotBeforeOrAt
std::shared_ptr< EntitySnapshot > findLatestSnapshotBeforeOrAt(const Time &time) const override
Definition: Entity.cpp:257
armarx::armem::server::ltm::Entity::findSnapshot
std::shared_ptr< EntitySnapshot > findSnapshot(const Time &snapshotTime) const override
find entity snapshot segment
Definition: Entity.cpp:169
armarx::armem::base::EntityBase::forEachSnapshot
bool forEachSnapshot(SnapshotFunctionT &&func)
Definition: EntityBase.h:390
armarx::armem::wm::Entity
Client-side working memory entity.
Definition: memory_definitions.h:93
armarx::armem::server::ltm::Entity::_loadAllReferences
void _loadAllReferences(armem::wm::Entity &wmEntity) override
Definition: Entity.cpp:355
Logging.h
ARMARX_WARNING
#define ARMARX_WARNING
Definition: Logging.h:193
armarx::core::time::Duration::MicroSeconds
static Duration MicroSeconds(std::int64_t microSeconds)
Constructs a duration in microseconds.
Definition: Duration.cpp:24
armarx::armem::server::ltm::EntitySnapshot
Definition: EntitySnapshot.h:14
armarx::core::time::DateTime::Invalid
static DateTime Invalid()
Definition: DateTime.cpp:57
armarx::armem::server::ltm::Entity::hasSnapshot
bool hasSnapshot(const Time &snapshotTime) const override
check if snapshot segment exists
Definition: Entity.cpp:154
armarx::armem::server::ltm::detail::EntitySnapshotBase::store
void store(const armem::wm::EntitySnapshot &e)
encode the content of a wm::Memory and store
Definition: EntitySnapshotBase.h:45