BufferedMemoryMixin.h
Go to the documentation of this file.
1 #pragma once
2 
3 #include <atomic>
4 #include <chrono>
5 #include <thread>
6 
7 #include <SimoxUtility/json.h>
8 
10 
14 
16 {
17  template <class _CoreSegmentT>
19  {
20 
21  public:
23  buffer(std::make_unique<armem::wm::Memory>(id)),
24  to_store(std::make_unique<armem::wm::Memory>(id))
25  {
26  }
27 
28  virtual ~BufferedMemoryMixin() = default;
29 
30  void
31  directlyStore(const armem::wm::Memory& memory, bool simulatedVersion = false)
32  {
33  std::lock_guard l(storeMutex);
34 
35  TIMING_START(LTM_Memory_DirectlyStore);
36  _directlyStore(memory, simulatedVersion);
37  TIMING_END_STREAM(LTM_Memory_DirectlyStore, ARMARX_DEBUG);
38  }
39 
40  void
41  directlyStore(const armem::server::wm::Memory& serverMemory, bool simulatedVersion = false)
42  {
44  memory.setName(serverMemory.name());
45  auto ids = memory.update(armem::toCommit(serverMemory), true);
46  ARMARX_DEBUG << "Amount of ids in update: " << ids.size();
47  this->directlyStore(memory, simulatedVersion);
48  }
49 
50  void
52  {
53  //use this to count how much work is still left
54  }
55 
56  protected:
57  void
59  {
60  ARMARX_CHECK_NOT_EMPTY(id.memoryName) << " The full id was: " << id.str();
61 
62  buffer->id() = id.getMemoryID();
63  to_store->id() = id.getMemoryID();
64  }
65 
66  void
68  {
69  // create task if not already exists
70  if (!task)
71  {
72  int waitingTimeMs = 1000.f / storeFrequency;
74  this, &BufferedMemoryMixin::storeBuffer, waitingTimeMs);
75  task->start();
76  task->setDelayWarningTolerance(
77  waitingTimeMs); //a warning will be issued if the task takes longer than the waitingTime
78  }
79  }
80 
81  void
82  stop()
83  {
84  if (task)
85  {
86  task->stop();
87  task = nullptr;
88  }
89  }
90 
92  getBuffer() const
93  {
94  std::lock_guard l(bufferMutex);
95  return *buffer;
96  }
97 
98  void
100  {
101  std::lock_guard l(storeMutex);
102  {
103  std::lock_guard l(bufferMutex);
104  to_store = std::move(buffer);
105  buffer = std::make_unique<armem::wm::Memory>(to_store->id());
106  }
107 
108  if (to_store->empty())
109  {
110  ARMARX_DEBUG << deactivateSpam(120) << "Cannot store an empty buffer. Ignoring.";
111  return;
112  }
113 
114  while (storeFlag.test_and_set(std::memory_order_acquire))
115  {
116  std::this_thread::yield();
117  }
118  this->directlyStore(*to_store);
119  storeFlag.clear(std::memory_order_release);
120  }
121 
122  /// configuration
123  void
124  configureMixin(const nlohmann::json& json)
125  {
126  if (json.find("BufferedMemory.storeFrequency") != json.end())
127  {
128  storeFrequency = json.at("BufferedMemory.storeFrequency");
129  ARMARX_INFO << "Setting store frequency from configuration json to "
130  << storeFrequency;
131  }
132  }
133 
134  void
135  createPropertyDefinitions(PropertyDefinitionsPtr& defs, const std::string& prefix)
136  {
137  defs->optional(storeFrequency, prefix + "storeFrequency");
138  }
139 
140  virtual void _directlyStore(const armem::wm::Memory& memory,
141  bool simulatedVersion = false) = 0;
142 
143  void
145  {
146  std::lock_guard l(bufferMutex);
147  buffer->append(memory);
148  }
149 
150 
151  protected:
152  /// Internal memory for data consolidated from wm to ltm (double-buffer)
153  /// The to-put-to-ltm buffer (contains data in plain text)
154  /// This buffer may still be filtered (e.g. snapshot filters).
155  /// This means that it is not guaranteed that all data in the buffer will be stored in the ltm
156  std::unique_ptr<armem::wm::Memory> buffer;
157  std::unique_ptr<armem::wm::Memory> to_store;
158  std::atomic_flag storeFlag = ATOMIC_FLAG_INIT;
159 
160  /// The frequency (Hz) to store data to the ltm
161  float storeFrequency = 10;
162 
163  private:
164  /// The periodic'task to store the content of the buffer to the ltm
166 
167  /// a mutex to access the buffer object
168  mutable std::recursive_mutex bufferMutex;
169  mutable std::recursive_mutex storeMutex;
170  };
171 } // namespace armarx::armem::server::ltm::detail::mixin
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::stop
void stop()
Definition: BufferedMemoryMixin.h:82
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::setMixinMemoryID
void setMixinMemoryID(const MemoryID &id)
Definition: BufferedMemoryMixin.h:58
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::storeBuffer
void storeBuffer()
Definition: BufferedMemoryMixin.h:99
TIMING_START
#define TIMING_START(name)
Definition: TimeUtil.h:289
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::buffer
std::unique_ptr< armem::wm::Memory > buffer
Internal memory for data consolidated from wm to ltm (double-buffer) The to-put-to-ltm buffer (contai...
Definition: BufferedMemoryMixin.h:156
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::storeFlag
std::atomic_flag storeFlag
Definition: BufferedMemoryMixin.h:158
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::getBuffer
armem::wm::Memory getBuffer() const
Definition: BufferedMemoryMixin.h:92
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::to_store
std::unique_ptr< armem::wm::Memory > to_store
Definition: BufferedMemoryMixin.h:157
armarx::armem
Definition: LegacyRobotStateMemoryAdapter.cpp:32
PeriodicTask.h
armarx::armem::base::MemoryBase::name
std::string & name()
Definition: MemoryBase.h:92
ARMARX_CHECK_NOT_EMPTY
#define ARMARX_CHECK_NOT_EMPTY(c)
Definition: ExpressionException.h:224
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::~BufferedMemoryMixin
virtual ~BufferedMemoryMixin()=default
armarx::memory
Brief description of class memory.
Definition: memory.h:38
TIMING_END_STREAM
#define TIMING_END_STREAM(name, os)
Definition: TimeUtil.h:310
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::_directlyStore
virtual void _directlyStore(const armem::wm::Memory &memory, bool simulatedVersion=false)=0
armarx::armem::server::ltm::Memory
A memory storing data on the hard drive and in mongodb (needs 'armarx memory start' to start the mong...
Definition: Memory.h:19
memory_definitions.h
deactivateSpam
SpamFilterDataPtr deactivateSpam(SpamFilterDataPtr const &spamFilter, float deactivationDurationSec, const std::string &identifier, bool deactivate)
Definition: Logging.cpp:75
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::addToBuffer
void addToBuffer(const armem::wm::Memory &memory)
Definition: BufferedMemoryMixin.h:144
armarx::armem::server::wm::Memory
Definition: memory_definitions.h:122
armarx::armem::MemoryID
A memory ID.
Definition: MemoryID.h:47
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::start
void start()
Definition: BufferedMemoryMixin.h:67
ARMARX_DEBUG
#define ARMARX_DEBUG
Definition: Logging.h:184
armarx::armem::wm::Memory
Client-side working memory.
Definition: memory_definitions.h:133
operations.h
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::directlyStore
void directlyStore(const armem::wm::Memory &memory, bool simulatedVersion=false)
Definition: BufferedMemoryMixin.h:31
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::BufferedMemoryMixin
BufferedMemoryMixin(const MemoryID &id)
Definition: BufferedMemoryMixin.h:22
memory_definitions.h
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::directlyStore
void directlyStore(const armem::server::wm::Memory &serverMemory, bool simulatedVersion=false)
Definition: BufferedMemoryMixin.h:41
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::configureMixin
void configureMixin(const nlohmann::json &json)
configuration
Definition: BufferedMemoryMixin.h:124
ARMARX_INFO
#define ARMARX_INFO
Definition: Logging.h:181
armarx::armem::server::ltm::detail::mixin
Definition: BufferedMemoryMixin.cpp:3
std
Definition: Application.h:66
armarx::armem::laser_scans::constants::memoryName
const std::string memoryName
Definition: constants.h:28
IceUtil::Handle< class PropertyDefinitionContainer >
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin
Definition: BufferedMemoryMixin.h:18
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::createPropertyDefinitions
void createPropertyDefinitions(PropertyDefinitionsPtr &defs, const std::string &prefix)
Definition: BufferedMemoryMixin.h:135
armarx::PeriodicTask
Definition: ArmarXManager.h:70
armarx::armem::toCommit
Commit toCommit(const ContainerT &container)
Definition: operations.h:23
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::storeFrequency
float storeFrequency
The frequency (Hz) to store data to the ltm.
Definition: BufferedMemoryMixin.h:161
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::bufferFinished
void bufferFinished()
Definition: BufferedMemoryMixin.h:51