DiskPersistence.h
Go to the documentation of this file.
1#pragma once
2
3#include <atomic>
4#include <chrono>
5#include <condition_variable>
6#include <filesystem>
7#include <memory>
8#include <mutex>
9#include <string>
10#include <thread>
11#include <unordered_map>
12#include <vector>
13
16
18{
19 /**
20 * @brief Statistics for batch write operations.
21 */
23 {
24 std::atomic<uint64_t> totalBatchesWritten{0};
25 std::atomic<uint64_t> totalItemsBatched{0};
26 std::atomic<uint64_t> totalBytesWritten{0};
27 std::atomic<uint64_t> totalFlushTimeNs{0};
28 std::atomic<uint64_t> maxBatchSize{0};
29 std::atomic<uint64_t> flushBySize{0}; // Flushes triggered by size threshold
30 std::atomic<uint64_t> flushByTime{0}; // Flushes triggered by time threshold
31 std::atomic<uint64_t> flushByExplicit{0}; // Explicit flush calls
32
33 void reset()
34 {
39 maxBatchSize = 0;
40 flushBySize = 0;
41 flushByTime = 0;
43 }
44
45 double getAvgFlushTimeMs() const
46 {
47 uint64_t count = totalBatchesWritten.load(std::memory_order_relaxed);
48 if (count == 0) return 0.0;
49 return totalFlushTimeNs.load(std::memory_order_relaxed) / (count * 1e6);
50 }
51
52 double getAvgBatchSize() const
53 {
54 uint64_t count = totalBatchesWritten.load(std::memory_order_relaxed);
55 if (count == 0) return 0.0;
56 return static_cast<double>(totalItemsBatched.load(std::memory_order_relaxed)) / count;
57 }
58 };
59
60 /**
61 * @brief A single item pending batch write.
62 */
64 {
66 std::string key;
67 std::vector<unsigned char> data;
68 std::chrono::steady_clock::time_point enqueuedTime;
69 };
70
72 {
73 public:
74 FileIdentifier(std::string& filename, std::string& fileType) :
75 filename_(filename),
76 fileType_(fileType){
77
78 };
79
81 {
82 }
83
84 std::string
85 getKey() override
86 {
87 return filename_ + fileType_;
88 }
89
90 private:
91 std::string filename_;
92 std::string fileType_;
93 };
94
95 /**
96 * @brief Persistence strategy that writes items (e.g. json files) to a specific container (a directory)
97 * Use it to write the data of a WM to disk.
98 *
99 * Where are the items written (=:location)?
100 * /memoryParentPath/exportName/path(id)/
101 *
102 * How is the file of an item named?
103 * -> Just they key (e.g. filename = key = "data.aron.json")
104 * If you might want a more sophisticated solution fell free to implement your custom ItemIdentifier today!
105 */
107 {
108 public:
109 DiskPersistence() : DiskPersistence(std::filesystem::path("."))
110 {
111 }
112
113 DiskPersistence(const std::filesystem::path& memoryParentPath) :
114 DiskPersistence("Disk", "DefaultExport", memoryParentPath)
115 {
116 }
117
118 /**
119 * @param identifier basically a unique name for the strategy (important if you use different strategies @see RedundantPersistenceStrategy)
120 * @param exportName identifier for the exported memory. A new directory with name is created beneath the memoryParentPath. Everything is stored inside it.
121 * @param memoryParentPath path where the memory should be exported to
122 */
123 DiskPersistence(const std::string& identifier,
124 const std::string& exportName,
125 const std::filesystem::path& memoryParentPath) :
126 MemoryPersistenceStrategy(identifier, exportName), memoryParentPath_(memoryParentPath)
127 {
128 }
129
130 /**
131 * @brief Destructor - flushes pending batch and stops batch thread.
132 */
134 {
135 stopBatchWriter();
136 }
137
138 /**
139 * Returns all containers for the current id.
140 * @return containers <=> directories at current location (=/memoryParentPath/exportName/path(id)/)
141 */
142 std::vector<std::string> getContainerKeys(const armarx::armem::MemoryID& id) override;
143
144 /**
145 * Returns all items for the current id.
146 * @return items <=> files at the current location (=/memoryParentPath/exportName/path(id)/)
147 */
148 std::vector<std::string> getItemKeys(const armarx::armem::MemoryID& id) override;
149
150 /**
151 * Checks if the container is available for the current memory id.
152 * @return true if the current location contains the directory with name 'key'
153 */
154 bool containsContainer(const armarx::armem::MemoryID& id, std::string key) override;
155
156 /**
157 * Checks if current container contains the item defined by its key.
158 * @return true if the current location contains a file with the name 'key'
159 */
160 bool containsItem(const armarx::armem::MemoryID& id, std::string key) override;
161
162 /**
163 * Create a new file with name 'key' and stores the data inside it.
164 */
165 void storeItem(const armarx::armem::MemoryID& id,
166 std::string key,
167 std::vector<unsigned char>& data) override;
168
169 /**
170 * Reads the data of the file with name 'key' at the current location.
171 * @return data if a file was found, an empty vector if the file is empty or was not found
172 */
173 std::vector<unsigned char> retrieveItem(const armarx::armem::MemoryID& id,
174 std::string key) override;
175
176 void
177 createPropertyDefinitions(PropertyDefinitionsPtr& defs, const std::string& prefix) override
178 {
179 // Nothing to do
180 }
181
182 void
184 {
185 this->minDiskSpace = minDiskSpace;
186 if (!enoughDiskSpaceLeft())
187 {
188 ARMARX_WARNING << "Not enough available disk space for DiskPersistance Strategy. "
189 "You need at least "
190 << this->minDiskSpace
191 << " GB available disk space to record into LTM using this strategy";
192 }
193 }
194
195 int minDiskSpace = 50; // in GB
196
197 // ==================== Batch Write Configuration ====================
198
199 /**
200 * @brief Enable or disable batch writing.
201 * When enabled, writes are accumulated and flushed in batches for better I/O performance.
202 * @param enable True to enable batching, false for immediate writes (default behavior)
203 */
204 void setBatchWriteEnabled(bool enable);
205
206 /**
207 * @brief Check if batch writing is enabled.
208 */
209 bool isBatchWriteEnabled() const { return batchWriteEnabled_.load(std::memory_order_acquire); }
210
211 /**
212 * @brief Set the maximum number of items to accumulate before auto-flushing.
213 * @param size Maximum batch size (default: 100)
214 */
215 void setBatchSizeThreshold(size_t size) { batchSizeThreshold_ = size; }
216
217 /**
218 * @brief Set the maximum time to hold items before auto-flushing.
219 * @param ms Maximum time in milliseconds (default: 100ms)
220 */
221 void setBatchTimeThresholdMs(size_t ms) { batchTimeThresholdMs_ = ms; }
222
223 /**
224 * @brief Explicitly flush all pending batch writes to disk.
225 * This is automatically called when batch thresholds are reached.
226 */
227 void flushBatch();
228
229 /**
230 * @brief Get the current number of items pending in the batch buffer.
231 */
232 size_t getBatchPendingCount() const;
233
234 /**
235 * @brief Get batch write statistics.
236 */
237 const BatchWriteStatistics& getBatchStatistics() const { return batchStats_; }
238
239 /**
240 * @brief Reset batch write statistics.
241 */
242 void resetBatchStatistics() { batchStats_.reset(); }
243
244
245 public:
246 size_t getStorageErrorCount() const
247 {
248 return storageErrorCount_.load(std::memory_order_acquire);
249 }
250
252 {
253 storageErrorCount_.store(0, std::memory_order_release);
254 }
255
257
258 private:
259 std::filesystem::path memoryParentPath_;
260
261 // THREAD-SAFETY: Per-directory mutexes for parallel writes
262 // Allows concurrent writes to different directories (different entities)
263 // while preventing corruption within the same directory
264 mutable std::mutex directoryMutexMapLock_;
265 mutable std::unordered_map<std::string, std::unique_ptr<std::mutex>> directoryMutexes_;
266
267 // Error tracking
268 std::atomic<size_t> storageErrorCount_{0};
269
270 /* Internal disk logic */
271
272 bool fullPathExists(const armarx::armem::MemoryID& id);
273
274 std::vector<std::filesystem::path> getAllFiles(const armarx::armem::MemoryID& id);
275
276 std::vector<std::filesystem::path> getAllDirectories(const armarx::armem::MemoryID& id);
277
278 bool fileExists(const armarx::armem::MemoryID& id, const std::string& filename);
279
280 std::filesystem::path getFullPath(const armarx::armem::MemoryID& id);
281
282 void ensureFullPathExists(const armarx::armem::MemoryID& id,
283 bool createIfNotExistent = false);
284
285 void ensureFileExists(const armarx::armem::MemoryID& id,
286 const std::string& filename,
287 bool createIfNotExistent = false);
288
289 void writeDataToFile(const armarx::armem::MemoryID& id,
290 const std::string& filename,
291 const std::vector<unsigned char>& data);
292
293 std::vector<unsigned char> readDataFromFile(const armarx::armem::MemoryID& id,
294 const std::string& filename);
295
296 std::filesystem::path getMemoryParentPath();
297
298 bool enoughDiskSpaceLeft();
299
300 // ==================== Batch Write Implementation ====================
301
302 /**
303 * @brief Add an item to the batch buffer (called by storeItem when batching is enabled).
304 */
305 void enqueueBatchItem(const armarx::armem::MemoryID& id,
306 const std::string& key,
307 std::vector<unsigned char> data);
308
309 /**
310 * @brief Internal method to flush the batch buffer.
311 * @param reason Statistics tracking reason (size/time/explicit)
312 */
313 void flushBatchInternal(int reason);
314
315 /**
316 * @brief Write a batch of items to disk efficiently.
317 * Groups items by directory, creates directories once, writes files, syncs once.
318 */
319 void writeBatch(std::vector<BatchWriteItem>& items);
320
321 /**
322 * @brief Start the background batch flush thread.
323 */
324 void startBatchWriter();
325
326 /**
327 * @brief Stop the background batch flush thread.
328 */
329 void stopBatchWriter();
330
331 /**
332 * @brief Background thread function for time-based batch flushing.
333 */
334 void batchWriterThread();
335
336 // Batch write state
337 std::atomic<bool> batchWriteEnabled_{false};
338 size_t batchSizeThreshold_ = 100; // Flush when batch reaches this size
339 size_t batchTimeThresholdMs_ = 100; // Flush after this many ms
340
341 // Batch buffer
342 mutable std::mutex batchMutex_;
343 std::vector<BatchWriteItem> batchBuffer_;
344 std::chrono::steady_clock::time_point batchStartTime_;
345
346 // Background flush thread
347 std::thread batchWriterThread_;
348 std::atomic<bool> stopBatchWriter_{false};
349 std::condition_variable batchCondition_;
350
351 // Statistics
352 mutable BatchWriteStatistics batchStats_;
353 };
354} // namespace armarx::armem::server::ltm::persistence
bool containsContainer(const armarx::armem::MemoryID &id, std::string key) override
Checks if the container is available for the current memory id.
virtual ~DiskPersistence()
Destructor - flushes pending batch and stops batch thread.
void setBatchSizeThreshold(size_t size)
Set the maximum number of items to accumulate before auto-flushing.
void setBatchWriteEnabled(bool enable)
Enable or disable batch writing.
std::vector< std::string > getItemKeys(const armarx::armem::MemoryID &id) override
Returns all items for the current id.
void storeItem(const armarx::armem::MemoryID &id, std::string key, std::vector< unsigned char > &data) override
Create a new file with name 'key' and stores the data inside it.
bool containsItem(const armarx::armem::MemoryID &id, std::string key) override
Checks if current container contains the item defined by its key.
std::vector< std::string > getContainerKeys(const armarx::armem::MemoryID &id) override
Returns all containers for the current id.
void resetBatchStatistics()
Reset batch write statistics.
void flushBatch()
Explicitly flush all pending batch writes to disk.
const BatchWriteStatistics & getBatchStatistics() const
Get batch write statistics.
void setBatchTimeThresholdMs(size_t ms)
Set the maximum time to hold items before auto-flushing.
DiskPersistence(const std::filesystem::path &memoryParentPath)
bool isBatchWriteEnabled() const
Check if batch writing is enabled.
size_t getBatchPendingCount() const
Get the current number of items pending in the batch buffer.
std::vector< unsigned char > retrieveItem(const armarx::armem::MemoryID &id, std::string key) override
Reads the data of the file with name 'key' at the current location.
void createPropertyDefinitions(PropertyDefinitionsPtr &defs, const std::string &prefix) override
DiskPersistence(const std::string &identifier, const std::string &exportName, const std::filesystem::path &memoryParentPath)
FileIdentifier(std::string &filename, std::string &fileType)
For usage if you might want to create the key using some logic defined with your strategy rather than...
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
IceUtil::Handle< class PropertyDefinitionContainer > PropertyDefinitionsPtr
PropertyDefinitions smart pointer type.
std::chrono::steady_clock::time_point enqueuedTime