31#include <SimoxUtility/json/json.hpp>
44 static std::filesystem::path
45 getAbsolutePath(
const std::filesystem::path& path)
47 if (path.is_absolute())
70 return "ArVizStorage";
80 properties_.topicName,
"TopicName",
"Layer updates are sent over this topic.");
82 defs->optional(properties_.maxHistorySize,
84 "How many layer updates are saved in the history until they are compressed")
87 defs->optional(properties_.historyPath,
89 "Destination path where the history is serialized to");
92 defs->optional(properties_.componentWarnFrequency,
93 "ComponentWarnFrequency",
94 "Define a frequency in Hz above which the compnent raises a warning. As you "
95 "should not send data at a too high rate.");
103 properties_.historyPath = getAbsolutePath(properties_.historyPath);
104 if (!std::filesystem::exists(properties_.historyPath))
106 ARMARX_INFO <<
"Creating history path: " << properties_.historyPath;
107 std::error_code error;
108 std::filesystem::create_directory(properties_.historyPath, error);
111 ARMARX_WARNING <<
"Could not create directory for history: \n" << error.message();
122 currentState.clear();
124 recordingInitialState.clear();
126 recordingBuffer.clear();
127 recordingMetaData.id =
"";
135 recordingTask->stop();
136 recordingTask =
nullptr;
148 std::unique_lock<std::mutex> lock(historyMutex);
152 IceUtil::Time now = IceUtil::Time::now();
153 long nowInMicroSeconds = now.toMicroSeconds();
155 if (not updates.empty())
157 const std::string& componentName = updates.front().component;
159 auto& history = updateHistoryForComponents[componentName];
165 const auto isOutdated = [&referenceNow,
167 {
return (referenceNow -
timestamp) > maxHistoryDur; };
170 history.erase(std::remove_if(history.begin(), history.end(), isOutdated),
174 << history.size() / maxHistoryDur.toSecondsDouble() <<
" Hz";
176 if (history.size() > properties_.componentWarnFrequency)
179 <<
"sends data at a too high rate ("
180 << history.size() / maxHistoryDur.toSecondsDouble() <<
")"
186 for (
auto& update : updates)
188 if (update.component.empty())
191 <<
"Discarding ArViz update with empty component name. Check whether "
192 <<
"you correctly create your ArViz client (`armarx::viz::Client`) "
197 auto& historyEntry = history.emplace_back();
198 historyEntry.revision = revision;
199 historyEntry.timestampInMicroseconds = nowInMicroSeconds;
200 historyEntry.update = update;
204 for (
auto& layer : currentState)
206 if (layer.update.component == update.component && layer.update.name == update.name)
208 layer = historyEntry;
215 currentState.push_back(historyEntry);
219 long currentHistorySize = history.size();
220 if (currentHistorySize >= properties_.maxHistorySize)
223 std::unique_lock<std::mutex> lock(recordingMutex);
224 if (recordingMetaData.id.size() > 0)
226 auto& newBatch = recordingBuffer.emplace_back();
227 newBatch.initialState = recordingInitialState;
228 newBatch.updates = std::move(history);
229 recordingInitialState = currentState;
231 recordingCondition.notify_one();
238 viz::data::CommitResult
242 viz::data::CommitResult result;
245 std::unique_lock<std::mutex> lock(historyMutex);
248 result.revision = revision;
250 IceUtil::Time now = IceUtil::Time::now();
251 long nowInMicroSeconds = now.toMicroSeconds();
254 for (viz::data::LayerUpdate
const& update : input.updates)
256 viz::data::TimestampedLayerUpdate& historyEntry = history.emplace_back();
257 historyEntry.revision = revision;
258 historyEntry.timestampInMicroseconds = nowInMicroSeconds;
259 historyEntry.update = update;
263 for (viz::data::TimestampedLayerUpdate& layer : currentState)
265 if (layer.update.component == update.component &&
266 layer.update.name == update.name)
268 layer = historyEntry;
275 currentState.push_back(historyEntry);
280 long currentHistorySize = history.size();
281 if (currentHistorySize >= properties_.maxHistorySize)
284 std::unique_lock<std::mutex> lock(recordingMutex);
285 if (recordingMetaData.id.size() > 0)
287 auto& newBatch = recordingBuffer.emplace_back();
288 newBatch.initialState = recordingInitialState;
289 newBatch.updates = std::move(history);
290 recordingInitialState = currentState;
292 recordingCondition.notify_one();
299 if (input.interactionComponent.size() > 0)
301 auto interactionsEnd = interactionBuffer.end();
302 auto foundInteractionsBegin =
303 std::partition(interactionBuffer.begin(),
305 [&input](viz::data::InteractionFeedback
const& interaction)
307 if (interaction.component == input.interactionComponent)
309 for (std::string const& layer : input.interactionLayers)
311 if (interaction.layer == layer)
320 result.interactions.assign(foundInteractionsBegin, interactionsEnd);
321 interactionBuffer.erase(foundInteractionsBegin, interactionsEnd);
328 viz::data::LayerUpdates
331 viz::data::LayerUpdates result;
333 std::unique_lock<std::mutex> lock(historyMutex);
335 result.updates.reserve(currentState.size());
336 for (
auto& layer : currentState)
338 if (layer.revision > revision)
340 result.updates.push_back(layer.update);
343 result.revision = this->revision;
348 static const int ALL_TRANSFORM_FLAGS =
349 viz::data::InteractionFeedbackType::TRANSFORM_BEGIN_FLAG |
350 viz::data::InteractionFeedbackType::TRANSFORM_DURING_FLAG |
351 viz::data::InteractionFeedbackType::TRANSFORM_END_FLAG;
353 viz::data::LayerUpdates
356 viz::data::InteractionFeedbackSeq
const& interactions,
357 const Ice::Current&
c)
359 viz::data::LayerUpdates result;
361 std::unique_lock<std::mutex> lock(historyMutex);
363 for (viz::data::InteractionFeedback
const& interaction : interactions)
365 for (viz::data::InteractionFeedback& entry : interactionBuffer)
367 if (entry.component == interaction.component && entry.layer == interaction.layer &&
368 entry.element == interaction.element)
370 int previousTransformFlags = entry.type & ALL_TRANSFORM_FLAGS;
371 int interactionTransformFlags = interaction.type & ALL_TRANSFORM_FLAGS;
376 if (previousTransformFlags != 0 && interactionTransformFlags != 0)
378 entry.type |= previousTransformFlags;
385 interactionBuffer.push_back(interaction);
390 result.updates.reserve(currentState.size());
391 for (viz::data::TimestampedLayerUpdate& layer : currentState)
393 if (layer.revision > revision)
395 result.updates.push_back(layer.update);
398 result.revision = this->revision;
404 ArVizStorage::record()
406 while (!recordingTask->isStopped())
408 std::unique_lock<std::mutex> lock(recordingMutex);
409 while (!recordingTask->isStopped() && recordingBuffer.empty())
411 recordingCondition.wait_for(lock, std::chrono::milliseconds(10));
413 for (
auto& batch : recordingBuffer)
417 recordingBuffer.clear();
426 to_json(nlohmann::json& j, RecordingBatchHeader
const& batch)
428 j[
"index"] = batch.index;
429 j[
"firstRevision"] = batch.firstRevision;
430 j[
"lastRevision"] = batch.lastRevision;
431 j[
"firstTimestampInMicroSeconds"] = batch.firstTimestampInMicroSeconds;
432 j[
"lastTimestampInMicroSeconds"] = batch.lastTimestampInMicroSeconds;
436 from_json(nlohmann::json
const& j, RecordingBatchHeader& batch)
438 batch.index = j[
"index"];
439 batch.firstRevision = j[
"firstRevision"];
440 batch.lastRevision = j[
"lastRevision"];
441 batch.firstTimestampInMicroSeconds = j[
"firstTimestampInMicroSeconds"];
442 batch.lastTimestampInMicroSeconds = j[
"lastTimestampInMicroSeconds"];
448 j[
"id"] = recording.id;
449 j[
"firstRevision"] = recording.firstRevision;
450 j[
"lastRevision"] = recording.lastRevision;
451 j[
"firstTimestampInMicroSeconds"] = recording.firstTimestampInMicroSeconds;
452 j[
"lastTimestampInMicroSeconds"] = recording.lastTimestampInMicroSeconds;
453 j[
"batchHeaders"] = recording.batchHeaders;
459 recording.id = j[
"id"];
460 recording.firstRevision = j[
"firstRevision"];
461 recording.lastRevision = j[
"lastRevision"];
462 recording.firstTimestampInMicroSeconds = j[
"firstTimestampInMicroSeconds"];
463 recording.lastTimestampInMicroSeconds = j[
"lastTimestampInMicroSeconds"];
464 j[
"batchHeaders"].get_to(recording.batchHeaders);
470writeCompleteFile(std::string
const& filename,
const void*
data, std::size_t size)
472 FILE* file = fopen(filename.c_str(),
"wb");
477 std::size_t written = std::fwrite(
data, 1, size, file);
487readCompleteFile(std::filesystem::path
const& path)
489 FILE* f = fopen(path.string().c_str(),
"rb");
490 fseek(f, 0, SEEK_END);
491 long fsize = ftell(f);
492 fseek(f, 0, SEEK_SET);
494 std::string result(fsize,
'\0');
495 std::size_t
read = fread(result.data(), 1, fsize, f);
502static std::optional<armarx::viz::data::Recording>
503readRecordingInfo(std::filesystem::path
const& recordingDirectory)
505 std::optional<::armarx::viz::data::Recording> result;
507 std::filesystem::path recordingFilePath = recordingDirectory /
"recording.json";
508 if (!std::filesystem::exists(recordingFilePath))
510 ARMARX_INFO <<
"No recording.json found in directory: " << recordingDirectory;
516 std::string recordingString = readCompleteFile(recordingFilePath);
517 nlohmann::json recordingJson = nlohmann::json::parse(recordingString);
519 ::armarx::viz::data::Recording recording;
520 recordingJson.get_to(recording);
522 result = std::move(recording);
525 catch (std::exception
const& ex)
527 ARMARX_WARNING <<
"Could not parse JSON file: " << recordingFilePath
528 <<
"\nReason: " << ex.what();
534batchFileName(armarx::viz::data::RecordingBatchHeader
const& batchHeader)
536 return std::to_string(batchHeader.firstRevision) +
".bin";
540armarx::ArVizStorage::recordBatch(armarx::viz::data::RecordingBatch& batch)
542 if (batch.updates.empty())
547 auto& first = batch.updates.front();
548 auto& last = batch.updates.back();
550 batch.header.index = -1;
551 batch.header.firstRevision = first.revision;
552 batch.header.lastRevision = last.revision;
553 batch.header.firstTimestampInMicroSeconds = first.timestampInMicroseconds;
554 batch.header.lastTimestampInMicroSeconds = last.timestampInMicroseconds;
557 batch.initialState = currentState;
562 std::string filename = batchFileName(batch.header);
563 std::filesystem::path filePath = recordingPath / filename;
565 ObjectToIceBlobSerializer ser{batch};
567 if (!writeCompleteFile(filePath.string(), ser.begin(), ser.size()))
574 if (recordingMetaData.firstRevision < 0)
576 recordingMetaData.firstRevision = first.revision;
578 recordingMetaData.lastRevision = last.revision;
580 if (recordingMetaData.firstTimestampInMicroSeconds < 0)
582 recordingMetaData.firstTimestampInMicroSeconds = first.timestampInMicroseconds;
584 recordingMetaData.lastTimestampInMicroSeconds = last.timestampInMicroseconds;
586 armarx::viz::data::RecordingBatchHeader& newBatch =
587 recordingMetaData.batchHeaders.emplace_back();
588 newBatch.index = recordingMetaData.batchHeaders.size() - 1;
589 newBatch.firstRevision = first.revision;
590 newBatch.lastRevision = last.revision;
591 newBatch.firstTimestampInMicroSeconds = first.timestampInMicroseconds;
592 newBatch.lastTimestampInMicroSeconds = last.timestampInMicroseconds;
595 nlohmann::json j = recordingMetaData;
596 std::string jString = j.dump(2);
597 std::filesystem::path recordingFile = recordingPath /
"recording.json";
598 if (!writeCompleteFile(recordingFile.string(), jString.data(), jString.size()))
600 ARMARX_WARNING <<
"Could not write recording file: " << recordingFile;
604 ARMARX_INFO <<
"Recorded ArViz batch to: " << filePath;
611 std::unique_lock<std::mutex> lock(recordingMutex);
612 if (recordingMetaData.id.size() > 0)
615 <<
"Could not start recording with prefix " << newRecordingPrefix
616 <<
"\nbecause there is already a recording running for the recording ID: "
617 << recordingMetaData.id;
618 return recordingMetaData.id;
621 IceUtil::Time now = IceUtil::Time::now();
622 std::ostringstream id;
623 id << newRecordingPrefix <<
'_' << now.toString(
"%Y-%m-%d_%H-%M-%S");
624 std::string newRecordingID =
id.str();
626 recordingPath = properties_.historyPath / newRecordingID;
627 if (!std::filesystem::exists(recordingPath))
629 ARMARX_INFO <<
"Creating directory for recording with ID '" << newRecordingID
630 <<
"'\nPath: " << recordingPath;
631 std::filesystem::create_directory(recordingPath);
634 recordingBuffer.clear();
636 recordingMetaData.id = newRecordingID;
638 std::unique_lock<std::mutex> lock(historyMutex);
639 if (history.size() > 0)
641 auto& mostRecent = history.back();
642 recordingMetaData.firstRevision = mostRecent.revision;
643 recordingMetaData.firstTimestampInMicroSeconds = mostRecent.timestampInMicroseconds;
646 recordingMetaData.lastRevision = 0;
647 recordingMetaData.lastTimestampInMicroSeconds = 0;
648 recordingMetaData.batchHeaders.clear();
653 recordingTask->start();
663 std::unique_lock<std::mutex> lock(recordingMutex);
664 if (recordingMetaData.id.size() > 0)
667 <<
"Could not start recording with prefix " << recordingPath
668 <<
"\nbecause there is already a recording running for the recording ID: "
669 << recordingMetaData.id;
670 return recordingMetaData.id;
673 const std::string& newRecordingID = recordingPath;
675 this->recordingPath = recordingPath;
676 if (!std::filesystem::exists(recordingPath))
678 ARMARX_INFO <<
"Creating directory for recording with ID and path: "
679 <<
QUOTED(newRecordingID);
680 std::filesystem::create_directory(recordingPath);
683 recordingBuffer.clear();
685 recordingMetaData.id = newRecordingID;
687 std::unique_lock<std::mutex> lock(historyMutex);
688 if (history.size() > 0)
690 auto& mostRecent = history.back();
691 recordingMetaData.firstRevision = mostRecent.revision;
692 recordingMetaData.firstTimestampInMicroSeconds = mostRecent.timestampInMicroseconds;
695 recordingMetaData.lastRevision = 0;
696 recordingMetaData.lastTimestampInMicroSeconds = 0;
697 recordingMetaData.batchHeaders.clear();
702 recordingTask->start();
715 recordingTask->stop();
716 recordingTask =
nullptr;
718 std::unique_lock<std::mutex> lock(recordingMutex);
720 viz::data::RecordingBatch lastBatch;
721 lastBatch.initialState = recordingInitialState;
722 lastBatch.updates = std::move(history);
723 recordBatch(lastBatch);
725 recordingMetaData.id =
"";
726 recordingMetaData.firstRevision = -1;
727 recordingMetaData.firstTimestampInMicroSeconds = -1;
730armarx::viz::data::RecordingsInfo
733 viz::data::RecordingsInfo recordingsInfo;
734 viz::data::RecordingSeq result;
736 for (std::filesystem::directory_entry
const& entry :
737 std::filesystem::directory_iterator(properties_.historyPath))
741 if (!entry.is_directory())
746 std::optional<viz::data::Recording> recording = readRecordingInfo(entry.path());
749 result.push_back(std::move(*recording));
753 recordingsInfo.recordings = result;
754 recordingsInfo.recordingsPath = properties_.historyPath;
756 return recordingsInfo;
759armarx::viz::data::RecordingBatch
761 Ice::Long batchIndex,
764 viz::data::RecordingBatch result;
765 result.header.index = -1;
767 std::filesystem::path recordingPath = properties_.historyPath / recordingID;
768 std::optional<viz::data::Recording> recording = readRecordingInfo(recordingPath);
771 ARMARX_WARNING <<
"Could not read recording information for '" << recordingID <<
"'"
772 <<
"\nPath: " << recordingPath;
776 if (batchIndex < 0 || batchIndex >= (
long)recording->batchHeaders.size())
778 ARMARX_WARNING <<
"Batch index is not valid. Index = " << batchIndex
779 <<
"Batch count: " << recording->batchHeaders.size();
783 viz::data::RecordingBatchHeader
const& batchHeader = recording->batchHeaders[batchIndex];
784 std::filesystem::path batchFile = recordingPath / batchFileName(batchHeader);
785 if (!std::filesystem::exists(batchFile))
787 ARMARX_WARNING <<
"Could not find batch file for recording '" << recordingID
788 <<
"' with index " << batchIndex <<
"\nPath: " << batchFile;
794 result.header.index = batchIndex;
viz::data::CommitResult commitAndReceiveInteractions(viz::data::CommitInput const &input, const Ice::Current &) override
void onInitComponent() override
armarx::ManagedIceObject::onInitComponent()
std::string startRecording(std::string const &prefix, const Ice::Current &) override
void updateLayers(viz::data::LayerUpdateSeq const &updates, const Ice::Current &) override
void onDisconnectComponent() override
armarx::ManagedIceObject::onDisconnectComponent()
viz::data::LayerUpdates pullUpdatesSinceAndSendInteractions(Ice::Long revision, viz::data::InteractionFeedbackSeq const &interactions, const Ice::Current &) override
viz::data::LayerUpdates pullUpdatesSince(Ice::Long revision, const Ice::Current &) override
armarx::PropertyDefinitionsPtr createPropertyDefinitions() override
PropertyUser::createPropertyDefinitions()
std::string startRecordingToDirectory(std::string const &recordingPath, const Ice::Current &) override
viz::data::RecordingsInfo getAllRecordings(const Ice::Current &) override
void onConnectComponent() override
armarx::ManagedIceObject::onConnectComponent()
viz::data::RecordingBatch getRecordingBatch(const std::string &, Ice::Long, const Ice::Current &) override
void stopRecording(const Ice::Current &) override
void onExitComponent() override
armarx::ManagedIceObject::onExitComponent()
std::string getDefaultName() const override
armarx::ManagedIceObject::getDefaultName()
static bool getAbsolutePath(const std::string &relativeFilename, std::string &storeAbsoluteFilename, const std::vector< std::string > &additionalSearchPaths={}, bool verbose=true)
static DateTime Now()
Current time on the virtual clock.
Default component property definition container.
std::string getConfigIdentifier()
Retrieve config identifier for this component as set in constructor.
static Duration SecondsDouble(double seconds)
Constructs a duration in seconds.
SpamFilterDataPtr deactivateSpam(float deactivationDurationSec=10.0f, const std::string &identifier="", bool deactivate=true) const
disables the logging for the current line for the given amount of seconds.
void usingTopic(const std::string &name, bool orderedPublishing=false)
Registers a proxy for subscription after initialization.
Represents a point in time.
#define ARMARX_INFO
The normal logging level.
#define ARMARX_DEBUG
The logging level for output that is only interesting while debugging.
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
#define ARMARX_VERBOSE
The logging level for verbose information.
#define ARMARX_WARNING_S
The logging level for unexpected behaviour, but not a serious problem.
void to_json(nlohmann::json &j, RecordingBatchHeader const &batch)
void from_json(nlohmann::json const &j, RecordingBatchHeader &batch)
This file offers overloads of toIce() and fromIce() functions for STL container types.
void read(auto &eigen, auto *table)
IceUtil::Handle< class PropertyDefinitionContainer > PropertyDefinitionsPtr
PropertyDefinitions smart pointer type.
void iceBlobToObject(T &result, const std::string_view &sv)