BufferedMemoryMixin.h
Go to the documentation of this file.
1 #pragma once
2 
3 #include <atomic>
4 #include <chrono>
5 #include <thread>
6 
7 #include <SimoxUtility/json.h>
8 
10 
14 
16 {
17  template <class _CoreSegmentT>
19  {
20 
21  public:
23  buffer(std::make_unique<armem::wm::Memory>(id)),
24  to_store(std::make_unique<armem::wm::Memory>(id))
25  {
26  }
27 
28  virtual ~BufferedMemoryMixin() = default;
29 
30  void
32  {
33  std::lock_guard l(storeMutex);
34 
35  TIMING_START(LTM_Memory_DirectlyStore);
37  TIMING_END_STREAM(LTM_Memory_DirectlyStore, ARMARX_DEBUG);
38  }
39 
40  void
42  {
44  memory.setName(serverMemory.name());
45  memory.update(armem::toCommit(serverMemory));
46  this->directlyStore(memory);
47  }
48 
49  void
51  {
52  //use this to count how much work is still left
53  }
54 
55  protected:
56  void
58  {
59  ARMARX_CHECK_NOT_EMPTY(id.memoryName) << " The full id was: " << id.str();
60 
61  buffer->id() = id.getMemoryID();
62  to_store->id() = id.getMemoryID();
63  }
64 
65  void
67  {
68  // create task if not already exists
69  if (!task)
70  {
71  int waitingTimeMs = 1000.f / storeFrequency;
73  this, &BufferedMemoryMixin::storeBuffer, waitingTimeMs);
74  task->start();
75  task->setDelayWarningTolerance(
76  waitingTimeMs); //a warning will be issued if the task takes longer than the waitingTime
77  }
78  }
79 
80  void
81  stop()
82  {
83  if (task)
84  {
85  task->stop();
86  task = nullptr;
87  }
88 
89  }
90 
92  getBuffer() const
93  {
94  std::lock_guard l(bufferMutex);
95  return *buffer;
96  }
97 
98  void
100  {
101  std::lock_guard l(storeMutex);
102  {
103  std::lock_guard l(bufferMutex);
104  to_store = std::move(buffer);
105  buffer = std::make_unique<armem::wm::Memory>(to_store->id());
106  }
107 
108  if (to_store->empty())
109  {
110  ARMARX_INFO << deactivateSpam() << "Cannot store an empty buffer. Ignoring.";
111  return;
112  }
113 
114  while (storeFlag.test_and_set(std::memory_order_acquire))
115  {
116  std::this_thread::yield();
117  }
118  this->directlyStore(*to_store);
119  storeFlag.clear(std::memory_order_release);
120  }
121 
122  /// configuration
123  void
124  configureMixin(const nlohmann::json& json)
125  {
126  if (json.find("BufferedMemory.storeFrequency") != json.end())
127  {
128  storeFrequency = json.at("BufferedMemory.storeFrequency");
129  ARMARX_INFO << "Setting store frequency from configuration json to "
130  << storeFrequency;
131  }
132  }
133 
134  void
135  createPropertyDefinitions(PropertyDefinitionsPtr& defs, const std::string& prefix)
136  {
137  defs->optional(storeFrequency, prefix + "storeFrequency");
138  }
139 
140  virtual void _directlyStore(const armem::wm::Memory& memory) = 0;
141 
142  void
144  {
145  std::lock_guard l(bufferMutex);
146  buffer->append(memory);
147  }
148 
149 
150  protected:
151  /// Internal memory for data consolidated from wm to ltm (double-buffer)
152  /// The to-put-to-ltm buffer (contains data in plain text)
153  /// This buffer may still be filtered (e.g. snapshot filters).
154  /// This means that it is not guaranteed that all data in the buffer will be stored in the ltm
155  std::unique_ptr<armem::wm::Memory> buffer;
156  std::unique_ptr<armem::wm::Memory> to_store;
157  std::atomic_flag storeFlag = ATOMIC_FLAG_INIT;
158 
159  /// The frequency (Hz) to store data to the ltm
160  float storeFrequency = 10;
161 
162  private:
163  /// The periodic'task to store the content of the buffer to the ltm
165 
166  /// a mutex to access the buffer object
167  mutable std::recursive_mutex bufferMutex;
168  mutable std::recursive_mutex storeMutex;
169  };
170 } // namespace armarx::armem::server::ltm::detail::mixin
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::stop
void stop()
Definition: BufferedMemoryMixin.h:81
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::setMixinMemoryID
void setMixinMemoryID(const MemoryID &id)
Definition: BufferedMemoryMixin.h:57
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::storeBuffer
void storeBuffer()
Definition: BufferedMemoryMixin.h:99
TIMING_START
#define TIMING_START(name)
Definition: TimeUtil.h:280
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::directlyStore
void directlyStore(const armem::server::wm::Memory &serverMemory)
Definition: BufferedMemoryMixin.h:41
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::buffer
std::unique_ptr< armem::wm::Memory > buffer
Internal memory for data consolidated from wm to ltm (double-buffer) The to-put-to-ltm buffer (contai...
Definition: BufferedMemoryMixin.h:155
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::storeFlag
std::atomic_flag storeFlag
Definition: BufferedMemoryMixin.h:157
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::getBuffer
armem::wm::Memory getBuffer() const
Definition: BufferedMemoryMixin.h:92
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::to_store
std::unique_ptr< armem::wm::Memory > to_store
Definition: BufferedMemoryMixin.h:156
armarx::armem
Definition: LegacyRobotStateMemoryAdapter.cpp:31
PeriodicTask.h
armarx::armem::base::MemoryBase::name
std::string & name()
Definition: MemoryBase.h:92
ARMARX_CHECK_NOT_EMPTY
#define ARMARX_CHECK_NOT_EMPTY(c)
Definition: ExpressionException.h:224
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::~BufferedMemoryMixin
virtual ~BufferedMemoryMixin()=default
armarx::memory
Brief description of class memory.
Definition: memory.h:39
TIMING_END_STREAM
#define TIMING_END_STREAM(name, os)
Definition: TimeUtil.h:300
armarx::armem::server::ltm::Memory
A memory storing data on the hard drive and in mongodb (needs 'armarx memory start' to start the mong...
Definition: Memory.h:19
memory_definitions.h
deactivateSpam
SpamFilterDataPtr deactivateSpam(SpamFilterDataPtr const &spamFilter, float deactivationDurationSec, const std::string &identifier, bool deactivate)
Definition: Logging.cpp:72
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::addToBuffer
void addToBuffer(const armem::wm::Memory &memory)
Definition: BufferedMemoryMixin.h:143
armarx::armem::server::wm::Memory
Definition: memory_definitions.h:136
armarx::armem::MemoryID
A memory ID.
Definition: MemoryID.h:47
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::start
void start()
Definition: BufferedMemoryMixin.h:66
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::_directlyStore
virtual void _directlyStore(const armem::wm::Memory &memory)=0
ARMARX_DEBUG
#define ARMARX_DEBUG
Definition: Logging.h:177
armarx::armem::wm::Memory
Client-side working memory.
Definition: memory_definitions.h:133
operations.h
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::BufferedMemoryMixin
BufferedMemoryMixin(const MemoryID &id)
Definition: BufferedMemoryMixin.h:22
memory_definitions.h
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::configureMixin
void configureMixin(const nlohmann::json &json)
configuration
Definition: BufferedMemoryMixin.h:124
ARMARX_INFO
#define ARMARX_INFO
Definition: Logging.h:174
armarx::armem::server::ltm::detail::mixin
Definition: BufferedMemoryMixin.cpp:3
std
Definition: Application.h:66
armarx::armem::laser_scans::constants::memoryName
const std::string memoryName
Definition: constants.h:28
IceUtil::Handle< class PropertyDefinitionContainer >
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin
Definition: BufferedMemoryMixin.h:18
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::createPropertyDefinitions
void createPropertyDefinitions(PropertyDefinitionsPtr &defs, const std::string &prefix)
Definition: BufferedMemoryMixin.h:135
armarx::PeriodicTask
Definition: ArmarXManager.h:70
armarx::armem::toCommit
Commit toCommit(const ContainerT &container)
Definition: operations.h:27
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::storeFrequency
float storeFrequency
The frequency (Hz) to store data to the ltm.
Definition: BufferedMemoryMixin.h:160
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::bufferFinished
void bufferFinished()
Definition: BufferedMemoryMixin.h:50
armarx::armem::server::ltm::detail::mixin::BufferedMemoryMixin::directlyStore
void directlyStore(const armem::wm::Memory &memory)
Definition: BufferedMemoryMixin.h:31