28 #include <SimoxUtility/json/json.hpp>
40 static std::filesystem::path
41 getAbsolutePath(
const std::filesystem::path& path)
43 if (path.is_absolute())
66 return "ArVizStorage";
76 properties_.topicName,
"TopicName",
"Layer updates are sent over this topic.");
78 defs->optional(properties_.maxHistorySize,
80 "How many layer updates are saved in the history until they are compressed")
83 defs->optional(properties_.historyPath,
85 "Destination path where the history is serialized to");
88 defs->optional(properties_.componentWarnFrequency,
"ComponentWarnFrequency",
89 "Define a frequency in Hz above which the compnent raises a warning. As you should not send data at a too high rate.");
97 properties_.historyPath = getAbsolutePath(properties_.historyPath);
98 if (!std::filesystem::exists(properties_.historyPath))
100 ARMARX_INFO <<
"Creating history path: " << properties_.historyPath;
101 std::error_code error;
102 std::filesystem::create_directory(properties_.historyPath, error);
105 ARMARX_WARNING <<
"Could not create directory for history: \n" << error.message();
116 currentState.clear();
118 recordingInitialState.clear();
120 recordingBuffer.clear();
121 recordingMetaData.id =
"";
129 recordingTask->stop();
130 recordingTask =
nullptr;
142 std::unique_lock<std::mutex> lock(historyMutex);
147 long nowInMicroSeconds = now.toMicroSeconds();
149 if (not updates.empty())
151 const std::string& componentName = updates.front().component;
153 auto& history = updateHistoryForComponents[componentName];
159 const auto isOutdated = [&referenceNow,
160 &maxHistoryDur](
const DateTime& timestamp) ->
bool
161 {
return (referenceNow - timestamp) > maxHistoryDur; };
164 history.erase(std::remove_if(history.begin(), history.end(), isOutdated),
168 << history.size() / maxHistoryDur.toSecondsDouble() <<
" Hz";
170 if (history.size() > properties_.componentWarnFrequency)
173 <<
"sends data at a too high rate ("
174 << history.size() / maxHistoryDur.toSecondsDouble() <<
")"
180 for (
auto&
update : updates)
182 if (
update.component.empty())
185 <<
"Discarding ArViz update with empty component name. Check whether "
186 <<
"you correctly create your ArViz client (`armarx::viz::Client`) "
191 auto& historyEntry = history.emplace_back();
192 historyEntry.revision = revision;
193 historyEntry.timestampInMicroseconds = nowInMicroSeconds;
194 historyEntry.update =
update;
198 for (
auto& layer : currentState)
200 if (layer.update.component ==
update.component && layer.update.name ==
update.name)
202 layer = historyEntry;
209 currentState.push_back(historyEntry);
213 long currentHistorySize = history.size();
214 if (currentHistorySize >= properties_.maxHistorySize)
217 std::unique_lock<std::mutex> lock(recordingMutex);
218 if (recordingMetaData.id.size() > 0)
220 auto& newBatch = recordingBuffer.emplace_back();
221 newBatch.initialState = recordingInitialState;
222 newBatch.updates = std::move(history);
223 recordingInitialState = currentState;
225 recordingCondition.notify_one();
232 viz::data::CommitResult
236 viz::data::CommitResult result;
239 std::unique_lock<std::mutex> lock(historyMutex);
242 result.revision = revision;
245 long nowInMicroSeconds = now.toMicroSeconds();
248 for (viz::data::LayerUpdate
const&
update :
input.updates)
250 viz::data::TimestampedLayerUpdate& historyEntry = history.emplace_back();
251 historyEntry.revision = revision;
252 historyEntry.timestampInMicroseconds = nowInMicroSeconds;
253 historyEntry.update =
update;
257 for (viz::data::TimestampedLayerUpdate& layer : currentState)
259 if (layer.update.component ==
update.component &&
260 layer.update.name ==
update.name)
262 layer = historyEntry;
269 currentState.push_back(historyEntry);
274 long currentHistorySize = history.size();
275 if (currentHistorySize >= properties_.maxHistorySize)
278 std::unique_lock<std::mutex> lock(recordingMutex);
279 if (recordingMetaData.id.size() > 0)
281 auto& newBatch = recordingBuffer.emplace_back();
282 newBatch.initialState = recordingInitialState;
283 newBatch.updates = std::move(history);
284 recordingInitialState = currentState;
286 recordingCondition.notify_one();
293 if (
input.interactionComponent.size() > 0)
295 auto interactionsEnd = interactionBuffer.end();
296 auto foundInteractionsBegin =
297 std::partition(interactionBuffer.begin(),
301 if (interaction.component == input.interactionComponent)
303 for (std::string const& layer : input.interactionLayers)
305 if (interaction.layer == layer)
314 result.interactions.assign(foundInteractionsBegin, interactionsEnd);
315 interactionBuffer.erase(foundInteractionsBegin, interactionsEnd);
322 viz::data::LayerUpdates
323 ArVizStorage::pullUpdatesSince(
Ice::Long revision,
const Ice::Current&)
325 viz::data::LayerUpdates result;
327 std::unique_lock<std::mutex> lock(historyMutex);
329 result.updates.reserve(currentState.size());
330 for (
auto& layer : currentState)
332 if (layer.revision > revision)
334 result.updates.push_back(layer.update);
337 result.revision = this->revision;
342 static const int ALL_TRANSFORM_FLAGS =
343 viz::data::InteractionFeedbackType::TRANSFORM_BEGIN_FLAG |
344 viz::data::InteractionFeedbackType::TRANSFORM_DURING_FLAG |
345 viz::data::InteractionFeedbackType::TRANSFORM_END_FLAG;
347 viz::data::LayerUpdates
348 ArVizStorage::pullUpdatesSinceAndSendInteractions(
350 viz::data::InteractionFeedbackSeq
const& interactions,
351 const Ice::Current&
c)
353 viz::data::LayerUpdates result;
355 std::unique_lock<std::mutex> lock(historyMutex);
357 for (viz::data::InteractionFeedback
const&
interaction : interactions)
359 for (viz::data::InteractionFeedback& entry : interactionBuffer)
364 int previousTransformFlags = entry.type & ALL_TRANSFORM_FLAGS;
365 int interactionTransformFlags =
interaction.type & ALL_TRANSFORM_FLAGS;
370 if (previousTransformFlags != 0 && interactionTransformFlags != 0)
372 entry.type |= previousTransformFlags;
384 result.updates.reserve(currentState.size());
385 for (viz::data::TimestampedLayerUpdate& layer : currentState)
387 if (layer.revision > revision)
389 result.updates.push_back(layer.update);
392 result.revision = this->revision;
398 ArVizStorage::record()
400 while (!recordingTask->isStopped())
402 std::unique_lock<std::mutex> lock(recordingMutex);
403 while (!recordingTask->isStopped() && recordingBuffer.empty())
405 recordingCondition.wait_for(lock, std::chrono::milliseconds(10));
407 for (
auto& batch : recordingBuffer)
411 recordingBuffer.clear();
420 to_json(nlohmann::json& j, RecordingBatchHeader
const& batch)
422 j[
"index"] = batch.index;
423 j[
"firstRevision"] = batch.firstRevision;
424 j[
"lastRevision"] = batch.lastRevision;
425 j[
"firstTimestampInMicroSeconds"] = batch.firstTimestampInMicroSeconds;
426 j[
"lastTimestampInMicroSeconds"] = batch.lastTimestampInMicroSeconds;
430 from_json(nlohmann::json
const& j, RecordingBatchHeader& batch)
432 batch.index = j[
"index"];
433 batch.firstRevision = j[
"firstRevision"];
434 batch.lastRevision = j[
"lastRevision"];
435 batch.firstTimestampInMicroSeconds = j[
"firstTimestampInMicroSeconds"];
436 batch.lastTimestampInMicroSeconds = j[
"lastTimestampInMicroSeconds"];
440 to_json(nlohmann::json& j, Recording
const& recording)
442 j[
"id"] = recording.id;
443 j[
"firstRevision"] = recording.firstRevision;
444 j[
"lastRevision"] = recording.lastRevision;
445 j[
"firstTimestampInMicroSeconds"] = recording.firstTimestampInMicroSeconds;
446 j[
"lastTimestampInMicroSeconds"] = recording.lastTimestampInMicroSeconds;
447 j[
"batchHeaders"] = recording.batchHeaders;
451 from_json(nlohmann::json
const& j, Recording& recording)
453 recording.id = j[
"id"];
454 recording.firstRevision = j[
"firstRevision"];
455 recording.lastRevision = j[
"lastRevision"];
456 recording.firstTimestampInMicroSeconds = j[
"firstTimestampInMicroSeconds"];
457 recording.lastTimestampInMicroSeconds = j[
"lastTimestampInMicroSeconds"];
458 j[
"batchHeaders"].get_to(recording.batchHeaders);
464 writeCompleteFile(std::string
const&
filename,
const void*
data, std::size_t size)
466 FILE* file = fopen(
filename.c_str(),
"wb");
471 std::size_t written = std::fwrite(
data, 1, size, file);
481 readCompleteFile(std::filesystem::path
const& path)
483 FILE* f = fopen(path.string().c_str(),
"rb");
484 fseek(f, 0, SEEK_END);
485 long fsize = ftell(f);
486 fseek(f, 0, SEEK_SET);
488 std::string result(fsize,
'\0');
489 std::size_t
read = fread(result.data(), 1, fsize, f);
496 static std::optional<armarx::viz::data::Recording>
497 readRecordingInfo(std::filesystem::path
const& recordingDirectory)
499 std::optional<::armarx::viz::data::Recording> result;
501 std::filesystem::path recordingFilePath = recordingDirectory /
"recording.json";
502 if (!std::filesystem::exists(recordingFilePath))
504 ARMARX_INFO <<
"No recording.json found in directory: " << recordingDirectory;
510 std::string recordingString = readCompleteFile(recordingFilePath);
511 nlohmann::json recordingJson = nlohmann::json::parse(recordingString);
514 recordingJson.get_to(recording);
516 result = std::move(recording);
519 catch (std::exception
const& ex)
521 ARMARX_WARNING <<
"Could not parse JSON file: " << recordingFilePath
522 <<
"\nReason: " << ex.what();
528 batchFileName(armarx::viz::data::RecordingBatchHeader
const& batchHeader)
534 armarx::ArVizStorage::recordBatch(armarx::viz::data::RecordingBatch& batch)
536 if (batch.updates.empty())
541 auto& first = batch.updates.front();
542 auto& last = batch.updates.back();
544 batch.header.index = -1;
545 batch.header.firstRevision = first.revision;
546 batch.header.lastRevision = last.revision;
547 batch.header.firstTimestampInMicroSeconds = first.timestampInMicroseconds;
548 batch.header.lastTimestampInMicroSeconds = last.timestampInMicroseconds;
551 batch.initialState = currentState;
556 std::string
filename = batchFileName(batch.header);
557 std::filesystem::path filePath = recordingPath /
filename;
559 ObjectToIceBlobSerializer ser{batch};
561 if (!writeCompleteFile(filePath.string(), ser.begin(), ser.size()))
568 if (recordingMetaData.firstRevision < 0)
570 recordingMetaData.firstRevision = first.revision;
572 recordingMetaData.lastRevision = last.revision;
574 if (recordingMetaData.firstTimestampInMicroSeconds < 0)
576 recordingMetaData.firstTimestampInMicroSeconds = first.timestampInMicroseconds;
578 recordingMetaData.lastTimestampInMicroSeconds = last.timestampInMicroseconds;
580 armarx::viz::data::RecordingBatchHeader& newBatch =
581 recordingMetaData.batchHeaders.emplace_back();
582 newBatch.index = recordingMetaData.batchHeaders.size() - 1;
583 newBatch.firstRevision = first.revision;
584 newBatch.lastRevision = last.revision;
585 newBatch.firstTimestampInMicroSeconds = first.timestampInMicroseconds;
586 newBatch.lastTimestampInMicroSeconds = last.timestampInMicroseconds;
589 nlohmann::json j = recordingMetaData;
590 std::string jString = j.dump(2);
591 std::filesystem::path recordingFile = recordingPath /
"recording.json";
592 if (!writeCompleteFile(recordingFile.string(), jString.data(), jString.size()))
594 ARMARX_WARNING <<
"Could not write recording file: " << recordingFile;
598 ARMARX_INFO <<
"Recorded ArViz batch to: " << filePath;
605 std::unique_lock<std::mutex> lock(recordingMutex);
606 if (recordingMetaData.id.size() > 0)
609 <<
"Could not start recording with prefix " << newRecordingPrefix
610 <<
"\nbecause there is already a recording running for the recording ID: "
611 << recordingMetaData.id;
612 return recordingMetaData.id;
616 std::ostringstream id;
617 id << newRecordingPrefix <<
'_' << now.toString(
"%Y-%m-%d_%H-%M-%S");
618 std::string newRecordingID =
id.str();
620 recordingPath = properties_.historyPath / newRecordingID;
621 if (!std::filesystem::exists(recordingPath))
623 ARMARX_INFO <<
"Creating directory for recording with ID '" << newRecordingID
624 <<
"'\nPath: " << recordingPath;
625 std::filesystem::create_directory(recordingPath);
628 recordingBuffer.clear();
630 recordingMetaData.id = newRecordingID;
632 std::unique_lock<std::mutex> lock(historyMutex);
633 if (history.size() > 0)
635 auto& mostRecent = history.back();
636 recordingMetaData.firstRevision = mostRecent.revision;
637 recordingMetaData.firstTimestampInMicroSeconds = mostRecent.timestampInMicroseconds;
640 recordingMetaData.lastRevision = 0;
641 recordingMetaData.lastTimestampInMicroSeconds = 0;
642 recordingMetaData.batchHeaders.clear();
647 recordingTask->start();
660 recordingTask->stop();
661 recordingTask =
nullptr;
663 std::unique_lock<std::mutex> lock(recordingMutex);
665 viz::data::RecordingBatch lastBatch;
666 lastBatch.initialState = recordingInitialState;
667 lastBatch.updates = std::move(history);
668 recordBatch(lastBatch);
670 recordingMetaData.id =
"";
671 recordingMetaData.firstRevision = -1;
672 recordingMetaData.firstTimestampInMicroSeconds = -1;
675 armarx::viz::data::RecordingsInfo
678 viz::data::RecordingsInfo recordingsInfo;
679 viz::data::RecordingSeq result;
681 for (std::filesystem::directory_entry
const& entry :
682 std::filesystem::directory_iterator(properties_.historyPath))
686 if (!entry.is_directory())
691 std::optional<viz::data::Recording> recording = readRecordingInfo(entry.path());
694 result.push_back(std::move(*recording));
698 recordingsInfo.recordings = result;
699 recordingsInfo.recordingsPath = properties_.historyPath;
701 return recordingsInfo;
704 armarx::viz::data::RecordingBatch
709 viz::data::RecordingBatch result;
710 result.header.index = -1;
712 std::filesystem::path recordingPath = properties_.historyPath / recordingID;
713 std::optional<viz::data::Recording> recording = readRecordingInfo(recordingPath);
716 ARMARX_WARNING <<
"Could not read recording information for '" << recordingID <<
"'"
717 <<
"\nPath: " << recordingPath;
721 if (batchIndex < 0 || batchIndex >= (
long)recording->batchHeaders.size())
723 ARMARX_WARNING <<
"Batch index is not valid. Index = " << batchIndex
724 <<
"Batch count: " << recording->batchHeaders.size();
728 viz::data::RecordingBatchHeader
const& batchHeader = recording->batchHeaders[batchIndex];
729 std::filesystem::path batchFile = recordingPath / batchFileName(batchHeader);
730 if (!std::filesystem::exists(batchFile))
732 ARMARX_WARNING <<
"Could not find batch file for recording '" << recordingID
733 <<
"' with index " << batchIndex <<
"\nPath: " << batchFile;
739 result.header.index = batchIndex;