DiskPersistence.cpp
Go to the documentation of this file.
2
3#include <fcntl.h>
4#include <unistd.h>
5
6#include <set>
7
9
11{
12
13 std::vector<std::string>
15 {
16 std::vector<std::string> containers;
17
18 if (!enabled_)
19 {
20 return containers;
21 }
22
23 // If it has a nice form
24 if (!id.hasEntityName() || id.hasTimestamp())
25 {
26 std::vector<std::filesystem::path> dirs = getAllDirectories(id);
27
28
29 for (auto& path : dirs)
30 {
31 std::string container = path.filename().string();
32
33 if (!container.empty())
34 {
35 containers.emplace_back(container);
36 }
37 }
38 }
39 // If it has not a nice form
40 else
41 {
42 std::vector<std::filesystem::path> dayDirs = getAllDirectories(id);
43
44 for (std::filesystem::path& dayDir : dayDirs)
45 {
46 if (!util::fs::detail::isDateString(dayDir.filename()))
47 {
48 ARMARX_WARNING << "Found a non-date folder inside an entity '" << id.str()
49 << "' with name '" << dayDir.filename() << "'. "
50 << "Ignoring this folder, however this is a bad situation.";
51 continue;
52 }
53
54 std::vector<std::filesystem::path> secondDirs = util::fs::getAllDirectories(dayDir);
55
56 for (std::filesystem::path& secondDir : secondDirs)
57 {
58 if (!util::fs::detail::isNumberString(secondDir.filename()))
59 {
60 ARMARX_WARNING << "Found a non-timestamp folder inside an entity '"
61 << id.str() << "' hours folder with name '"
62 << secondDir.filename() << "'. "
63 << "Ignoring this folder, however this is a bad situation.";
64 continue;
65 }
66
67 std::vector<std::filesystem::path> timestampDirs =
69
70 for (std::filesystem::path& timestampDir : timestampDirs)
71 {
72 if (!util::fs::detail::isNumberString(timestampDir.filename()))
73 {
75 << "Found a non-timestamp folder inside an entity '" << id.str()
76 << "' seconds folder with name '" << timestampDir.filename()
77 << "'. "
78 << "Ignoring this folder, however this is a bad situation.";
79 continue;
80 }
81
82 std::string container = timestampDir.filename().string();
83
84 if (!container.empty())
85 {
86 containers.emplace_back(container);
87 }
88 }
89 }
90 }
91 }
92
93 return containers;
94 }
95
96 std::vector<std::string>
98 {
99 if (!enabled_)
100 {
101 return std::vector<std::string>();
102 }
103
104 // THREAD-SAFETY: Acquire per-directory mutex to get consistent file listing
105 std::string directoryPath = getFullPath(id).string();
106
107 std::mutex* dirMutex = nullptr;
108 {
109 std::lock_guard mapLock(directoryMutexMapLock_);
110 auto& mutexPtr = directoryMutexes_[directoryPath];
111 if (!mutexPtr)
112 {
113 mutexPtr = std::make_unique<std::mutex>();
114 }
115 dirMutex = mutexPtr.get();
116 }
117
118 std::lock_guard dirLock(*dirMutex);
119
120 std::vector<std::filesystem::path> files = getAllFiles(id);
121 std::vector<std::string> filesStr;
122
123 for (auto& path : files)
124 {
125 std::string item = path.filename().string();
126
127 if (!item.empty())
128 {
129 filesStr.emplace_back(item);
130 }
131 }
132
133 return filesStr;
134 }
135
136 bool
138 {
139 if (!enabled_)
140 {
141 return false;
142 }
143
144 //from id get the directory for this memory item and append the key to it, then check if the directory exists
145 auto path_to_id = getFullPath(id);
146 auto correct_container_path = path_to_id / key;
147 bool contains_container = util::fs::directoryExists(correct_container_path);
148
149 return contains_container;
150 }
151
152 bool
154 {
155 if (!enabled_)
156 {
157 return false;
158 }
159
160 return fileExists(id, key);
161 }
162
163 void
165 std::string key,
166 std::vector<unsigned char>& data)
167 {
168 if (!enabled_)
169 {
170 return;
171 }
172
173 // If batch writing is enabled, add to batch buffer instead of writing immediately
174 if (batchWriteEnabled_.load(std::memory_order_acquire))
175 {
176 enqueueBatchItem(id, key, std::move(data));
177 return;
178 }
179
180 // Original immediate write path
181 // THREAD-SAFETY: Get per-directory mutex for fine-grained locking
182 // This allows parallel writes to different directories (different entities)
183 // while preventing filesystem corruption within the same directory
184 std::string directoryPath = getFullPath(id).string();
185
186 // Get or create mutex for this directory
187 std::mutex* dirMutex = nullptr;
188 {
189 std::lock_guard mapLock(directoryMutexMapLock_);
190 auto& mutexPtr = directoryMutexes_[directoryPath];
191 if (!mutexPtr)
192 {
193 mutexPtr = std::make_unique<std::mutex>();
194 }
195 dirMutex = mutexPtr.get();
196 }
197
198 // Lock this specific directory for the duration of the write
199 // Other threads can write to different directories in parallel
200 std::lock_guard dirLock(*dirMutex);
201
202 try
203 {
204 auto dir = getFullPath(id);
205 auto parentDir = dir.parent_path();
206
207 // Check permissions before attempting write
208 if (util::fs::directoryExists(parentDir) && !util::fs::hasWritePermission(parentDir))
209 {
210 ARMARX_ERROR << "No write permission for directory: " << parentDir
211 << ". Cannot store " << id.str() << "/" << key;
212 storageErrorCount_.fetch_add(1, std::memory_order_relaxed);
213 return;
214 }
215
216 ensureFullPathExists(id, true);
217
218 if (enoughDiskSpaceLeft())
219 {
220 writeDataToFile(id, key, data);
221 }
222 else
223 {
224 ARMARX_ERROR << "Not enough disk space available for DiskPersistence. "
225 << "Skipping storage of " << id.str() << "/" << key;
226 storageErrorCount_.fetch_add(1, std::memory_order_relaxed);
227 }
228 }
229 catch (const armarx::LocalException& e)
230 {
231 ARMARX_ERROR << "ArmarX exception while storing " << id.str() << "/" << key
232 << ": " << e.what();
233 storageErrorCount_.fetch_add(1, std::memory_order_relaxed);
234 }
235 catch (const std::filesystem::filesystem_error& e)
236 {
237 ARMARX_ERROR << "Filesystem error while storing " << id.str() << "/" << key
238 << ": " << e.what() << " (error code: " << e.code() << ")";
239 storageErrorCount_.fetch_add(1, std::memory_order_relaxed);
240 }
241 catch (const std::system_error& e)
242 {
243 ARMARX_ERROR << "System error while storing " << id.str() << "/" << key
244 << ": " << e.what() << " (error code: " << e.code() << ")";
245 storageErrorCount_.fetch_add(1, std::memory_order_relaxed);
246 }
247 catch (const std::exception& e)
248 {
249 ARMARX_ERROR << "Unexpected exception while storing " << id.str() << "/" << key
250 << ": " << e.what();
251 storageErrorCount_.fetch_add(1, std::memory_order_relaxed);
252 }
253 catch (...)
254 {
255 ARMARX_ERROR << "Unknown exception while storing " << id.str() << "/" << key;
256 storageErrorCount_.fetch_add(1, std::memory_order_relaxed);
257 }
258 }
259
260 std::vector<unsigned char>
262 {
263 if (!enabled_)
264 {
265 return std::vector<unsigned char>();
266 }
267
268 // THREAD-SAFETY: Acquire per-directory mutex to prevent reading during writes
269 // This ensures we don't read partial/empty files while another thread is writing
270 std::string directoryPath = getFullPath(id).string();
271
272 std::mutex* dirMutex = nullptr;
273 {
274 std::lock_guard mapLock(directoryMutexMapLock_);
275 auto& mutexPtr = directoryMutexes_[directoryPath];
276 if (!mutexPtr)
277 {
278 mutexPtr = std::make_unique<std::mutex>();
279 }
280 dirMutex = mutexPtr.get();
281 }
282
283 std::lock_guard dirLock(*dirMutex);
284
285 if (fileExists(id, key))
286 {
287 return readDataFromFile(id, key);
288 }
289
290 return std::vector<unsigned char>();
291 }
292
293 std::filesystem::path
294 DiskPersistence::getMemoryParentPath()
295 {
296 std::string p = memoryParentPath_.string();
297
299
300 return p;
301 }
302
303 std::filesystem::path
304 DiskPersistence::getFullPath(const armarx::armem::MemoryID& id)
305 {
306 auto p = getMemoryParentPath() / getExportName();
307
308 auto cleanID =
309 id.cleanID(); //somehow, the iDs are jumbled when loading the LTM from disk, this solves it for now
310
311 auto fullPath = util::fs::toPath(p, cleanID);
312
313 return fullPath;
314 }
315
316 bool
317 DiskPersistence::fullPathExists(const armarx::armem::MemoryID& id)
318 {
319 auto p = getFullPath(id);
321 }
322
323 bool
324 DiskPersistence::fileExists(const armarx::armem::MemoryID& id, const std::string& filename)
325 {
326 auto p = getFullPath(id) / filename;
327 return util::fs::fileExists(p);
328 }
329
330 void
331 DiskPersistence::ensureFullPathExists(const armarx::armem::MemoryID& id,
332 bool createIfNotExistent)
333 {
334 auto p = getFullPath(id);
335 util::fs::ensureDirectoryExists(p, createIfNotExistent);
336 }
337
338 void
339 DiskPersistence::ensureFileExists(const armarx::armem::MemoryID& id,
340 const std::string& filename,
341 bool createIfNotExistent)
342 {
343 auto p = getFullPath(id) / filename;
344 util::fs::ensureFileExists(p, createIfNotExistent);
345 }
346
347 void
348 DiskPersistence::writeDataToFile(const armarx::armem::MemoryID& id,
349 const std::string& filename,
350 const std::vector<unsigned char>& data)
351 {
352 auto p = getFullPath(id) / filename;
354 }
355
356 std::vector<unsigned char>
357 DiskPersistence::readDataFromFile(const armarx::armem::MemoryID& id,
358 const std::string& filename)
359 {
360 auto p = getFullPath(id) / filename;
362 }
363
364 std::vector<std::filesystem::path>
365 DiskPersistence::getAllFiles(const armarx::armem::MemoryID& id)
366 {
367 if (fullPathExists(id))
368 {
369 auto p = getFullPath(id);
370 return util::fs::getAllFiles(p);
371 }
372
373 return std::vector<std::filesystem::path>();
374 }
375
376 std::vector<std::filesystem::path>
377 DiskPersistence::getAllDirectories(const armarx::armem::MemoryID& id)
378 {
379 if (fullPathExists(id))
380 {
381 auto p = getFullPath(id);
383 }
384
385 return std::vector<std::filesystem::path>();
386 }
387
388 bool
389 DiskPersistence::enoughDiskSpaceLeft()
390 {
391 std::string path_to_disk = this->getMemoryParentPath();
392 bool debug_info_output_enabled = false;
393
394 if (std::filesystem::exists(path_to_disk))
395 {
396 try
397 {
398 auto space_info = std::filesystem::space(path_to_disk);
399 int const conversion_factor = 1024;
400
401 auto available_space = space_info.available /
402 (conversion_factor * conversion_factor * conversion_factor);
403
404 if (debug_info_output_enabled)
405 {
406 ARMARX_DEBUG << "Capacity: "
407 << space_info.capacity /
408 (conversion_factor * conversion_factor * conversion_factor)
409 << " GB\n";
410 ARMARX_DEBUG << "Free space: "
411 << space_info.free /
412 (conversion_factor * conversion_factor * conversion_factor)
413 << " GB\n";
414 ARMARX_DEBUG << "Available space: "
415 << space_info.available /
416 (conversion_factor * conversion_factor * conversion_factor)
417 << " GB\n";
418
419 ARMARX_DEBUG << "Min disk space: " << this->minDiskSpace << " GB\n";
420 }
421 return static_cast<bool>(available_space >= this->minDiskSpace);
422 }
423 catch (const std::filesystem::filesystem_error& e)
424 {
425 ARMARX_WARNING << "Error: " << e.what() << '\n';
426 return false;
427 }
428 catch (...)
429 {
430 ARMARX_DEBUG << "Error while trying to get info on available disk space";
431 return false;
432 }
433 }
434 else
435 {
436 ARMARX_DEBUG << "Cannot find path to disk and thus cannot check if enough space is "
437 "still available";
438 return true;
439 }
440 }
441
442 void
444 {
445 auto basePath = getMemoryParentPath();
446
447 if (!util::fs::canCreateFiles(basePath))
448 {
449 ARMARX_ERROR << "LTM storage path '" << basePath
450 << "' is not writable! Data will not be persisted.";
451 ARMARX_ERROR << "Please check:\n"
452 << " 1. Directory exists\n"
453 << " 2. Current user has write permissions\n"
454 << " 3. Filesystem is not read-only\n"
455 << " 4. No SELinux/AppArmor restrictions";
456 }
457 else
458 {
459 ARMARX_INFO << "LTM storage path validated: " << basePath;
460 }
461 }
462
463 // ==================== Batch Write Implementation ====================
464
465 void
467 {
468 bool wasEnabled = batchWriteEnabled_.exchange(enable, std::memory_order_acq_rel);
469
470 if (enable && !wasEnabled)
471 {
472 // Starting batch mode - start the background flush thread
473 startBatchWriter();
474 ARMARX_INFO << "Batch write mode ENABLED (threshold: " << batchSizeThreshold_
475 << " items or " << batchTimeThresholdMs_ << "ms)";
476 }
477 else if (!enable && wasEnabled)
478 {
479 // Stopping batch mode - flush remaining items and stop thread
480 flushBatch();
481 stopBatchWriter();
482 ARMARX_INFO << "Batch write mode DISABLED";
483 }
484 }
485
486 void
487 DiskPersistence::enqueueBatchItem(const armarx::armem::MemoryID& id,
488 const std::string& key,
489 std::vector<unsigned char> data)
490 {
491 bool shouldFlush = false;
492 {
493 std::lock_guard<std::mutex> lock(batchMutex_);
494
495 // Set batch start time on first item
496 if (batchBuffer_.empty())
497 {
498 batchStartTime_ = std::chrono::steady_clock::now();
499 }
500
501 // Add item to batch
502 batchBuffer_.push_back({id, key, std::move(data), std::chrono::steady_clock::now()});
503
504 // Check if we should flush due to size threshold
505 if (batchBuffer_.size() >= batchSizeThreshold_)
506 {
507 shouldFlush = true;
508 }
509 }
510
511 // Flush outside the lock if needed
512 if (shouldFlush)
513 {
514 flushBatchInternal(0); // 0 = flush by size
515 }
516 else
517 {
518 // Notify the background thread that there's work
519 batchCondition_.notify_one();
520 }
521 }
522
523 void
525 {
526 flushBatchInternal(2); // 2 = explicit flush
527 }
528
529 size_t
531 {
532 std::lock_guard<std::mutex> lock(batchMutex_);
533 return batchBuffer_.size();
534 }
535
536 void
537 DiskPersistence::flushBatchInternal(int reason)
538 {
539 std::vector<BatchWriteItem> itemsToWrite;
540
541 {
542 std::lock_guard<std::mutex> lock(batchMutex_);
543 if (batchBuffer_.empty())
544 {
545 return;
546 }
547 itemsToWrite = std::move(batchBuffer_);
548 batchBuffer_.clear();
549 }
550
551 // Update statistics for flush reason
552 switch (reason)
553 {
554 case 0: batchStats_.flushBySize.fetch_add(1, std::memory_order_relaxed); break;
555 case 1: batchStats_.flushByTime.fetch_add(1, std::memory_order_relaxed); break;
556 case 2: batchStats_.flushByExplicit.fetch_add(1, std::memory_order_relaxed); break;
557 }
558
559 // Write the batch
560 auto startTime = std::chrono::steady_clock::now();
561 writeBatch(itemsToWrite);
562 auto endTime = std::chrono::steady_clock::now();
563
564 // Update statistics
565 uint64_t durationNs = std::chrono::duration_cast<std::chrono::nanoseconds>(endTime - startTime).count();
566 batchStats_.totalBatchesWritten.fetch_add(1, std::memory_order_relaxed);
567 batchStats_.totalItemsBatched.fetch_add(itemsToWrite.size(), std::memory_order_relaxed);
568 batchStats_.totalFlushTimeNs.fetch_add(durationNs, std::memory_order_relaxed);
569
570 // Update max batch size
571 uint64_t currentMax = batchStats_.maxBatchSize.load(std::memory_order_relaxed);
572 while (itemsToWrite.size() > currentMax)
573 {
574 if (batchStats_.maxBatchSize.compare_exchange_weak(currentMax, itemsToWrite.size(), std::memory_order_relaxed))
575 {
576 break;
577 }
578 }
579
580 ARMARX_DEBUG << "Batch flush: " << itemsToWrite.size() << " items in "
581 << (durationNs / 1e6) << "ms (reason=" << reason << ")";
582 }
583
584 void
585 DiskPersistence::writeBatch(std::vector<BatchWriteItem>& items)
586 {
587 if (items.empty())
588 {
589 return;
590 }
591
592 // Check disk space once for the whole batch
593 if (!enoughDiskSpaceLeft())
594 {
595 ARMARX_ERROR << "Not enough disk space for batch write of " << items.size() << " items. Dropping batch!";
596 storageErrorCount_.fetch_add(items.size(), std::memory_order_relaxed);
597 return;
598 }
599
600 // Group items by directory path for efficient directory creation
601 std::unordered_map<std::string, std::vector<BatchWriteItem*>> itemsByDirectory;
602 for (auto& item : items)
603 {
604 std::string dirPath = getFullPath(item.id).string();
605 itemsByDirectory[dirPath].push_back(&item);
606 }
607
608 // Process each directory group
609 std::set<std::string> createdDirectories; // Track directories we've created
610 std::vector<std::string> directoriesToSync; // Directories that need fsync
611
612 for (auto& [dirPath, dirItems] : itemsByDirectory)
613 {
614 // Get or create directory mutex
615 std::mutex* dirMutex = nullptr;
616 {
617 std::lock_guard mapLock(directoryMutexMapLock_);
618 auto& mutexPtr = directoryMutexes_[dirPath];
619 if (!mutexPtr)
620 {
621 mutexPtr = std::make_unique<std::mutex>();
622 }
623 dirMutex = mutexPtr.get();
624 }
625
626 // Lock this directory for the batch write
627 std::lock_guard dirLock(*dirMutex);
628
629 try
630 {
631 // Create directory once for all items in this group
632 if (createdDirectories.find(dirPath) == createdDirectories.end())
633 {
634 // Use the first item's ID to create the directory
635 ensureFullPathExists(dirItems[0]->id, true);
636 createdDirectories.insert(dirPath);
637 }
638
639 // Write all files in this directory without individual fsync
640 for (auto* item : dirItems)
641 {
642 auto filePath = getFullPath(item->id) / item->key;
643
644 // Use non-atomic write for batch mode (we'll sync at the end)
645 util::fs::writeDataToFile(filePath, item->data, false);
646
647 batchStats_.totalBytesWritten.fetch_add(item->data.size(), std::memory_order_relaxed);
648 }
649
650 // Track this directory for final sync
651 directoriesToSync.push_back(dirPath);
652 }
653 catch (const std::exception& e)
654 {
655 ARMARX_ERROR << "Error writing batch to directory " << dirPath << ": " << e.what();
656 storageErrorCount_.fetch_add(dirItems.size(), std::memory_order_relaxed);
657 }
658 }
659
660 // Single fsync for all directories at the end (deferred sync)
661 for (const auto& dirPath : directoriesToSync)
662 {
663 int dfd = ::open(dirPath.c_str(), O_DIRECTORY | O_RDONLY);
664 if (dfd >= 0)
665 {
666 ::fsync(dfd);
667 ::close(dfd);
668 }
669 }
670 }
671
672 void
673 DiskPersistence::startBatchWriter()
674 {
675 if (batchWriterThread_.joinable())
676 {
677 return; // Already running
678 }
679
680 stopBatchWriter_.store(false, std::memory_order_release);
681 batchWriterThread_ = std::thread(&DiskPersistence::batchWriterThread, this);
682 }
683
684 void
685 DiskPersistence::stopBatchWriter()
686 {
687 stopBatchWriter_.store(true, std::memory_order_release);
688 batchCondition_.notify_all();
689
690 if (batchWriterThread_.joinable())
691 {
692 batchWriterThread_.join();
693 }
694
695 // Flush any remaining items
696 flushBatchInternal(2);
697 }
698
699 void
700 DiskPersistence::batchWriterThread()
701 {
702 ARMARX_DEBUG << "Batch writer thread started";
703
704 while (!stopBatchWriter_.load(std::memory_order_acquire))
705 {
706 bool shouldFlush = false;
707
708 {
709 std::unique_lock<std::mutex> lock(batchMutex_);
710
711 // Wait for items or timeout
712 auto timeout = std::chrono::milliseconds(batchTimeThresholdMs_);
713 batchCondition_.wait_for(lock, timeout, [this]() {
714 return stopBatchWriter_.load(std::memory_order_acquire) ||
715 batchBuffer_.size() >= batchSizeThreshold_;
716 });
717
718 // Check if we should flush due to time threshold
719 if (!batchBuffer_.empty())
720 {
721 auto now = std::chrono::steady_clock::now();
722 auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
723 now - batchStartTime_).count();
724
725 if (elapsed >= static_cast<long long>(batchTimeThresholdMs_) ||
726 batchBuffer_.size() >= batchSizeThreshold_)
727 {
728 shouldFlush = true;
729 }
730 }
731 }
732
733 if (shouldFlush)
734 {
735 flushBatchInternal(1); // 1 = flush by time
736 }
737 }
738
739 ARMARX_DEBUG << "Batch writer thread stopped";
740 }
741
742} // namespace armarx::armem::server::ltm::persistence
static bool ReplaceEnvVars(std::string &string)
ReplaceEnvVars replaces environment variables in a string with their values, if the env.
bool containsContainer(const armarx::armem::MemoryID &id, std::string key) override
Checks if the container is available for the current memory id.
void setBatchWriteEnabled(bool enable)
Enable or disable batch writing.
std::vector< std::string > getItemKeys(const armarx::armem::MemoryID &id) override
Returns all items for the current id.
void storeItem(const armarx::armem::MemoryID &id, std::string key, std::vector< unsigned char > &data) override
Create a new file with name 'key' and stores the data inside it.
bool containsItem(const armarx::armem::MemoryID &id, std::string key) override
Checks if current container contains the item defined by its key.
std::vector< std::string > getContainerKeys(const armarx::armem::MemoryID &id) override
Returns all containers for the current id.
void flushBatch()
Explicitly flush all pending batch writes to disk.
size_t getBatchPendingCount() const
Get the current number of items pending in the batch buffer.
std::vector< unsigned char > retrieveItem(const armarx::armem::MemoryID &id, std::string key) override
Reads the data of the file with name 'key' at the current location.
bool enabled_
If false, the strategy is not writing or reading anything.
#define ARMARX_INFO
The normal logging level.
Definition Logging.h:181
#define ARMARX_ERROR
The logging level for unexpected behaviour, that must be fixed.
Definition Logging.h:196
#define ARMARX_DEBUG
The logging level for output that is only interesting while debugging.
Definition Logging.h:184
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
bool isDateString(const std::string &s)
bool isNumberString(const std::string &s)
std::vector< unsigned char > readDataFromFile(const std::filesystem::path &p)
void ensureFileExists(const std::filesystem::path &p, bool createIfNotExistent=false)
bool directoryExists(const std::filesystem::path &p)
std::vector< std::filesystem::path > getAllFiles(const std::filesystem::path &p)
bool canCreateFiles(const std::filesystem::path &dir)
Check if we can create files in this directory.
std::filesystem::path toPath(const std::filesystem::path &base, const armem::MemoryID &id)
bool hasWritePermission(const std::filesystem::path &p)
Check if directory has write permission.
void writeDataToFile(const std::filesystem::path &p, const std::vector< unsigned char > &data, bool write_atomic=true)
bool fileExists(const std::filesystem::path &p)
void ensureDirectoryExists(const std::filesystem::path &p, bool createIfNotExistent=false)
std::vector< std::filesystem::path > getAllDirectories(const std::filesystem::path &p)