5#include <condition_variable>
11#include <unordered_map>
48 if (count == 0)
return 0.0;
55 if (count == 0)
return 0.0;
56 return static_cast<double>(
totalItemsBatched.load(std::memory_order_relaxed)) / count;
67 std::vector<unsigned char>
data;
87 return filename_ + fileType_;
91 std::string filename_;
92 std::string fileType_;
124 const std::string& exportName,
125 const std::filesystem::path& memoryParentPath) :
167 std::vector<unsigned char>&
data)
override;
174 std::string key)
override;
186 if (!enoughDiskSpaceLeft())
188 ARMARX_WARNING <<
"Not enough available disk space for DiskPersistance Strategy. "
190 << this->minDiskSpace
191 <<
" GB available disk space to record into LTM using this strategy";
248 return storageErrorCount_.load(std::memory_order_acquire);
253 storageErrorCount_.store(0, std::memory_order_release);
259 std::filesystem::path memoryParentPath_;
264 mutable std::mutex directoryMutexMapLock_;
265 mutable std::unordered_map<std::string, std::unique_ptr<std::mutex>> directoryMutexes_;
268 std::atomic<size_t> storageErrorCount_{0};
283 bool createIfNotExistent =
false);
286 const std::string& filename,
287 bool createIfNotExistent =
false);
290 const std::string& filename,
291 const std::vector<unsigned char>&
data);
294 const std::string& filename);
296 std::filesystem::path getMemoryParentPath();
298 bool enoughDiskSpaceLeft();
306 const std::string& key,
307 std::vector<unsigned char>
data);
313 void flushBatchInternal(
int reason);
319 void writeBatch(std::vector<BatchWriteItem>& items);
324 void startBatchWriter();
329 void stopBatchWriter();
334 void batchWriterThread();
337 std::atomic<bool> batchWriteEnabled_{
false};
338 size_t batchSizeThreshold_ = 100;
339 size_t batchTimeThresholdMs_ = 100;
342 mutable std::mutex batchMutex_;
343 std::vector<BatchWriteItem> batchBuffer_;
344 std::chrono::steady_clock::time_point batchStartTime_;
347 std::thread batchWriterThread_;
348 std::atomic<bool> stopBatchWriter_{
false};
349 std::condition_variable batchCondition_;
352 mutable BatchWriteStatistics batchStats_;
bool containsContainer(const armarx::armem::MemoryID &id, std::string key) override
Checks if the container is available for the current memory id.
virtual ~DiskPersistence()
Destructor - flushes pending batch and stops batch thread.
void setBatchSizeThreshold(size_t size)
Set the maximum number of items to accumulate before auto-flushing.
void setBatchWriteEnabled(bool enable)
Enable or disable batch writing.
std::vector< std::string > getItemKeys(const armarx::armem::MemoryID &id) override
Returns all items for the current id.
void storeItem(const armarx::armem::MemoryID &id, std::string key, std::vector< unsigned char > &data) override
Create a new file with name 'key' and stores the data inside it.
void validateStoragePermissions()
size_t getStorageErrorCount() const
bool containsItem(const armarx::armem::MemoryID &id, std::string key) override
Checks if current container contains the item defined by its key.
std::vector< std::string > getContainerKeys(const armarx::armem::MemoryID &id) override
Returns all containers for the current id.
void resetBatchStatistics()
Reset batch write statistics.
void flushBatch()
Explicitly flush all pending batch writes to disk.
const BatchWriteStatistics & getBatchStatistics() const
Get batch write statistics.
void setBatchTimeThresholdMs(size_t ms)
Set the maximum time to hold items before auto-flushing.
DiskPersistence(const std::filesystem::path &memoryParentPath)
bool isBatchWriteEnabled() const
Check if batch writing is enabled.
size_t getBatchPendingCount() const
Get the current number of items pending in the batch buffer.
void resetStorageErrorCount()
std::vector< unsigned char > retrieveItem(const armarx::armem::MemoryID &id, std::string key) override
Reads the data of the file with name 'key' at the current location.
void createPropertyDefinitions(PropertyDefinitionsPtr &defs, const std::string &prefix) override
void setMinAvailableDiskSpace(const int minDiskSpace)
DiskPersistence(const std::string &identifier, const std::string &exportName, const std::filesystem::path &memoryParentPath)
virtual ~FileIdentifier()
FileIdentifier(std::string &filename, std::string &fileType)
std::string getKey() override
For usage if you might want to create the key using some logic defined with your strategy rather than...
MemoryPersistenceStrategy()
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
IceUtil::Handle< class PropertyDefinitionContainer > PropertyDefinitionsPtr
PropertyDefinitions smart pointer type.
A single item pending batch write.
armarx::armem::MemoryID id
std::chrono::steady_clock::time_point enqueuedTime
std::vector< unsigned char > data
Statistics for batch write operations.
std::atomic< uint64_t > totalFlushTimeNs
std::atomic< uint64_t > flushByExplicit
std::atomic< uint64_t > totalItemsBatched
std::atomic< uint64_t > totalBytesWritten
std::atomic< uint64_t > totalBatchesWritten
double getAvgFlushTimeMs() const
std::atomic< uint64_t > flushBySize
double getAvgBatchSize() const
std::atomic< uint64_t > flushByTime
std::atomic< uint64_t > maxBatchSize