13 std::vector<std::string>
16 std::vector<std::string> containers;
24 if (!
id.hasEntityName() ||
id.hasTimestamp())
26 std::vector<std::filesystem::path> dirs = getAllDirectories(
id);
29 for (
auto& path : dirs)
31 std::string container = path.filename().string();
33 if (!container.empty())
35 containers.emplace_back(container);
42 std::vector<std::filesystem::path> dayDirs = getAllDirectories(
id);
44 for (std::filesystem::path& dayDir : dayDirs)
48 ARMARX_WARNING <<
"Found a non-date folder inside an entity '" <<
id.str()
49 <<
"' with name '" << dayDir.filename() <<
"'. "
50 <<
"Ignoring this folder, however this is a bad situation.";
56 for (std::filesystem::path& secondDir : secondDirs)
60 ARMARX_WARNING <<
"Found a non-timestamp folder inside an entity '"
61 <<
id.str() <<
"' hours folder with name '"
62 << secondDir.filename() <<
"'. "
63 <<
"Ignoring this folder, however this is a bad situation.";
67 std::vector<std::filesystem::path> timestampDirs =
70 for (std::filesystem::path& timestampDir : timestampDirs)
75 <<
"Found a non-timestamp folder inside an entity '" <<
id.str()
76 <<
"' seconds folder with name '" << timestampDir.filename()
78 <<
"Ignoring this folder, however this is a bad situation.";
82 std::string container = timestampDir.filename().string();
84 if (!container.empty())
86 containers.emplace_back(container);
96 std::vector<std::string>
101 return std::vector<std::string>();
105 std::string directoryPath = getFullPath(
id).string();
107 std::mutex* dirMutex =
nullptr;
109 std::lock_guard mapLock(directoryMutexMapLock_);
110 auto& mutexPtr = directoryMutexes_[directoryPath];
113 mutexPtr = std::make_unique<std::mutex>();
115 dirMutex = mutexPtr.get();
118 std::lock_guard dirLock(*dirMutex);
120 std::vector<std::filesystem::path> files = getAllFiles(
id);
121 std::vector<std::string> filesStr;
123 for (
auto& path : files)
125 std::string item = path.filename().string();
129 filesStr.emplace_back(item);
145 auto path_to_id = getFullPath(
id);
146 auto correct_container_path = path_to_id / key;
149 return contains_container;
160 return fileExists(
id, key);
166 std::vector<unsigned char>&
data)
174 if (batchWriteEnabled_.load(std::memory_order_acquire))
176 enqueueBatchItem(
id, key, std::move(
data));
184 std::string directoryPath = getFullPath(
id).string();
187 std::mutex* dirMutex =
nullptr;
189 std::lock_guard mapLock(directoryMutexMapLock_);
190 auto& mutexPtr = directoryMutexes_[directoryPath];
193 mutexPtr = std::make_unique<std::mutex>();
195 dirMutex = mutexPtr.get();
200 std::lock_guard dirLock(*dirMutex);
204 auto dir = getFullPath(
id);
205 auto parentDir = dir.parent_path();
210 ARMARX_ERROR <<
"No write permission for directory: " << parentDir
211 <<
". Cannot store " <<
id.str() <<
"/" << key;
212 storageErrorCount_.fetch_add(1, std::memory_order_relaxed);
216 ensureFullPathExists(
id,
true);
218 if (enoughDiskSpaceLeft())
220 writeDataToFile(
id, key,
data);
224 ARMARX_ERROR <<
"Not enough disk space available for DiskPersistence. "
225 <<
"Skipping storage of " <<
id.str() <<
"/" << key;
226 storageErrorCount_.fetch_add(1, std::memory_order_relaxed);
229 catch (
const armarx::LocalException& e)
231 ARMARX_ERROR <<
"ArmarX exception while storing " <<
id.str() <<
"/" << key
233 storageErrorCount_.fetch_add(1, std::memory_order_relaxed);
235 catch (
const std::filesystem::filesystem_error& e)
237 ARMARX_ERROR <<
"Filesystem error while storing " <<
id.str() <<
"/" << key
238 <<
": " << e.what() <<
" (error code: " << e.code() <<
")";
239 storageErrorCount_.fetch_add(1, std::memory_order_relaxed);
241 catch (
const std::system_error& e)
243 ARMARX_ERROR <<
"System error while storing " <<
id.str() <<
"/" << key
244 <<
": " << e.what() <<
" (error code: " << e.code() <<
")";
245 storageErrorCount_.fetch_add(1, std::memory_order_relaxed);
247 catch (
const std::exception& e)
249 ARMARX_ERROR <<
"Unexpected exception while storing " <<
id.str() <<
"/" << key
251 storageErrorCount_.fetch_add(1, std::memory_order_relaxed);
255 ARMARX_ERROR <<
"Unknown exception while storing " <<
id.str() <<
"/" << key;
256 storageErrorCount_.fetch_add(1, std::memory_order_relaxed);
260 std::vector<unsigned char>
265 return std::vector<unsigned char>();
270 std::string directoryPath = getFullPath(
id).string();
272 std::mutex* dirMutex =
nullptr;
274 std::lock_guard mapLock(directoryMutexMapLock_);
275 auto& mutexPtr = directoryMutexes_[directoryPath];
278 mutexPtr = std::make_unique<std::mutex>();
280 dirMutex = mutexPtr.get();
283 std::lock_guard dirLock(*dirMutex);
285 if (fileExists(
id, key))
287 return readDataFromFile(
id, key);
290 return std::vector<unsigned char>();
293 std::filesystem::path
294 DiskPersistence::getMemoryParentPath()
296 std::string p = memoryParentPath_.string();
303 std::filesystem::path
319 auto p = getFullPath(
id);
326 auto p = getFullPath(
id) / filename;
332 bool createIfNotExistent)
334 auto p = getFullPath(
id);
340 const std::string& filename,
341 bool createIfNotExistent)
343 auto p = getFullPath(
id) / filename;
348 DiskPersistence::writeDataToFile(
const armarx::armem::MemoryID&
id,
349 const std::string& filename,
350 const std::vector<unsigned char>& data)
352 auto p = getFullPath(
id) / filename;
356 std::vector<unsigned char>
357 DiskPersistence::readDataFromFile(
const armarx::armem::MemoryID&
id,
358 const std::string& filename)
360 auto p = getFullPath(
id) / filename;
364 std::vector<std::filesystem::path>
365 DiskPersistence::getAllFiles(
const armarx::armem::MemoryID&
id)
367 if (fullPathExists(
id))
369 auto p = getFullPath(
id);
373 return std::vector<std::filesystem::path>();
376 std::vector<std::filesystem::path>
377 DiskPersistence::getAllDirectories(
const armarx::armem::MemoryID&
id)
379 if (fullPathExists(
id))
381 auto p = getFullPath(
id);
385 return std::vector<std::filesystem::path>();
389 DiskPersistence::enoughDiskSpaceLeft()
391 std::string path_to_disk = this->getMemoryParentPath();
392 bool debug_info_output_enabled =
false;
394 if (std::filesystem::exists(path_to_disk))
398 auto space_info = std::filesystem::space(path_to_disk);
399 int const conversion_factor = 1024;
401 auto available_space = space_info.available /
402 (conversion_factor * conversion_factor * conversion_factor);
404 if (debug_info_output_enabled)
407 << space_info.capacity /
408 (conversion_factor * conversion_factor * conversion_factor)
412 (conversion_factor * conversion_factor * conversion_factor)
415 << space_info.available /
416 (conversion_factor * conversion_factor * conversion_factor)
421 return static_cast<bool>(available_space >= this->
minDiskSpace);
423 catch (
const std::filesystem::filesystem_error& e)
430 ARMARX_DEBUG <<
"Error while trying to get info on available disk space";
436 ARMARX_DEBUG <<
"Cannot find path to disk and thus cannot check if enough space is "
445 auto basePath = getMemoryParentPath();
450 <<
"' is not writable! Data will not be persisted.";
452 <<
" 1. Directory exists\n"
453 <<
" 2. Current user has write permissions\n"
454 <<
" 3. Filesystem is not read-only\n"
455 <<
" 4. No SELinux/AppArmor restrictions";
459 ARMARX_INFO <<
"LTM storage path validated: " << basePath;
468 bool wasEnabled = batchWriteEnabled_.exchange(
enable, std::memory_order_acq_rel);
470 if (
enable && !wasEnabled)
474 ARMARX_INFO <<
"Batch write mode ENABLED (threshold: " << batchSizeThreshold_
475 <<
" items or " << batchTimeThresholdMs_ <<
"ms)";
477 else if (!
enable && wasEnabled)
488 const std::string& key,
489 std::vector<unsigned char>
data)
491 bool shouldFlush =
false;
493 std::lock_guard<std::mutex> lock(batchMutex_);
496 if (batchBuffer_.empty())
498 batchStartTime_ = std::chrono::steady_clock::now();
502 batchBuffer_.push_back({id, key, std::move(
data), std::chrono::steady_clock::now()});
505 if (batchBuffer_.size() >= batchSizeThreshold_)
514 flushBatchInternal(0);
519 batchCondition_.notify_one();
526 flushBatchInternal(2);
532 std::lock_guard<std::mutex> lock(batchMutex_);
533 return batchBuffer_.size();
537 DiskPersistence::flushBatchInternal(
int reason)
539 std::vector<BatchWriteItem> itemsToWrite;
542 std::lock_guard<std::mutex> lock(batchMutex_);
543 if (batchBuffer_.empty())
547 itemsToWrite = std::move(batchBuffer_);
548 batchBuffer_.clear();
554 case 0: batchStats_.
flushBySize.fetch_add(1, std::memory_order_relaxed);
break;
555 case 1: batchStats_.
flushByTime.fetch_add(1, std::memory_order_relaxed);
break;
556 case 2: batchStats_.
flushByExplicit.fetch_add(1, std::memory_order_relaxed);
break;
560 auto startTime = std::chrono::steady_clock::now();
561 writeBatch(itemsToWrite);
562 auto endTime = std::chrono::steady_clock::now();
565 uint64_t durationNs = std::chrono::duration_cast<std::chrono::nanoseconds>(endTime - startTime).count();
567 batchStats_.
totalItemsBatched.fetch_add(itemsToWrite.size(), std::memory_order_relaxed);
568 batchStats_.
totalFlushTimeNs.fetch_add(durationNs, std::memory_order_relaxed);
571 uint64_t currentMax = batchStats_.
maxBatchSize.load(std::memory_order_relaxed);
572 while (itemsToWrite.size() > currentMax)
574 if (batchStats_.
maxBatchSize.compare_exchange_weak(currentMax, itemsToWrite.size(), std::memory_order_relaxed))
580 ARMARX_DEBUG <<
"Batch flush: " << itemsToWrite.size() <<
" items in "
581 << (durationNs / 1e6) <<
"ms (reason=" << reason <<
")";
585 DiskPersistence::writeBatch(std::vector<BatchWriteItem>& items)
593 if (!enoughDiskSpaceLeft())
595 ARMARX_ERROR <<
"Not enough disk space for batch write of " << items.size() <<
" items. Dropping batch!";
596 storageErrorCount_.fetch_add(items.size(), std::memory_order_relaxed);
601 std::unordered_map<std::string, std::vector<BatchWriteItem*>> itemsByDirectory;
602 for (
auto& item : items)
604 std::string dirPath = getFullPath(item.id).string();
605 itemsByDirectory[dirPath].push_back(&item);
609 std::set<std::string> createdDirectories;
610 std::vector<std::string> directoriesToSync;
612 for (
auto& [dirPath, dirItems] : itemsByDirectory)
615 std::mutex* dirMutex =
nullptr;
617 std::lock_guard mapLock(directoryMutexMapLock_);
618 auto& mutexPtr = directoryMutexes_[dirPath];
621 mutexPtr = std::make_unique<std::mutex>();
623 dirMutex = mutexPtr.get();
627 std::lock_guard dirLock(*dirMutex);
632 if (createdDirectories.find(dirPath) == createdDirectories.end())
635 ensureFullPathExists(dirItems[0]->
id,
true);
636 createdDirectories.insert(dirPath);
640 for (
auto* item : dirItems)
642 auto filePath = getFullPath(item->id) / item->key;
647 batchStats_.totalBytesWritten.fetch_add(item->data.size(), std::memory_order_relaxed);
651 directoriesToSync.push_back(dirPath);
653 catch (
const std::exception& e)
655 ARMARX_ERROR <<
"Error writing batch to directory " << dirPath <<
": " << e.what();
656 storageErrorCount_.fetch_add(dirItems.size(), std::memory_order_relaxed);
661 for (
const auto& dirPath : directoriesToSync)
663 int dfd = ::open(dirPath.c_str(), O_DIRECTORY | O_RDONLY);
673 DiskPersistence::startBatchWriter()
675 if (batchWriterThread_.joinable())
680 stopBatchWriter_.store(
false, std::memory_order_release);
681 batchWriterThread_ = std::thread(&DiskPersistence::batchWriterThread,
this);
685 DiskPersistence::stopBatchWriter()
687 stopBatchWriter_.store(
true, std::memory_order_release);
688 batchCondition_.notify_all();
690 if (batchWriterThread_.joinable())
692 batchWriterThread_.join();
696 flushBatchInternal(2);
700 DiskPersistence::batchWriterThread()
704 while (!stopBatchWriter_.load(std::memory_order_acquire))
706 bool shouldFlush =
false;
709 std::unique_lock<std::mutex> lock(batchMutex_);
712 auto timeout = std::chrono::milliseconds(batchTimeThresholdMs_);
713 batchCondition_.wait_for(lock, timeout, [
this]() {
714 return stopBatchWriter_.load(std::memory_order_acquire) ||
715 batchBuffer_.size() >= batchSizeThreshold_;
719 if (!batchBuffer_.empty())
721 auto now = std::chrono::steady_clock::now();
722 auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
723 now - batchStartTime_).count();
725 if (elapsed >=
static_cast<long long>(batchTimeThresholdMs_) ||
726 batchBuffer_.size() >= batchSizeThreshold_)
735 flushBatchInternal(1);