28 #include <SimoxUtility/json/json.hpp>
40 static std::filesystem::path
41 getAbsolutePath(
const std::filesystem::path& path)
43 if (path.is_absolute())
66 return "ArVizStorage";
76 properties_.topicName,
"TopicName",
"Layer updates are sent over this topic.");
78 defs->optional(properties_.maxHistorySize,
80 "How many layer updates are saved in the history until they are compressed")
83 defs->optional(properties_.historyPath,
85 "Destination path where the history is serialized to");
88 defs->optional(properties_.componentWarnFrequency,
89 "ComponentWarnFrequency",
90 "Define a frequency in Hz above which the compnent raises a warning. As you "
91 "should not send data at a too high rate.");
99 properties_.historyPath = getAbsolutePath(properties_.historyPath);
100 if (!std::filesystem::exists(properties_.historyPath))
102 ARMARX_INFO <<
"Creating history path: " << properties_.historyPath;
103 std::error_code error;
104 std::filesystem::create_directory(properties_.historyPath, error);
107 ARMARX_WARNING <<
"Could not create directory for history: \n" << error.message();
118 currentState.clear();
120 recordingInitialState.clear();
122 recordingBuffer.clear();
123 recordingMetaData.id =
"";
131 recordingTask->stop();
132 recordingTask =
nullptr;
144 std::unique_lock<std::mutex> lock(historyMutex);
149 long nowInMicroSeconds = now.toMicroSeconds();
151 if (not updates.empty())
153 const std::string& componentName = updates.front().component;
155 auto& history = updateHistoryForComponents[componentName];
161 const auto isOutdated = [&referenceNow,
162 &maxHistoryDur](
const DateTime& timestamp) ->
bool
163 {
return (referenceNow - timestamp) > maxHistoryDur; };
166 history.erase(std::remove_if(history.begin(), history.end(), isOutdated),
170 << history.size() / maxHistoryDur.toSecondsDouble() <<
" Hz";
172 if (history.size() > properties_.componentWarnFrequency)
175 <<
"sends data at a too high rate ("
176 << history.size() / maxHistoryDur.toSecondsDouble() <<
")"
182 for (
auto&
update : updates)
184 if (
update.component.empty())
187 <<
"Discarding ArViz update with empty component name. Check whether "
188 <<
"you correctly create your ArViz client (`armarx::viz::Client`) "
193 auto& historyEntry = history.emplace_back();
194 historyEntry.revision = revision;
195 historyEntry.timestampInMicroseconds = nowInMicroSeconds;
196 historyEntry.update =
update;
200 for (
auto& layer : currentState)
202 if (layer.update.component ==
update.component && layer.update.name ==
update.name)
204 layer = historyEntry;
211 currentState.push_back(historyEntry);
215 long currentHistorySize = history.size();
216 if (currentHistorySize >= properties_.maxHistorySize)
219 std::unique_lock<std::mutex> lock(recordingMutex);
220 if (recordingMetaData.id.size() > 0)
222 auto& newBatch = recordingBuffer.emplace_back();
223 newBatch.initialState = recordingInitialState;
224 newBatch.updates = std::move(history);
225 recordingInitialState = currentState;
227 recordingCondition.notify_one();
234 viz::data::CommitResult
238 viz::data::CommitResult result;
241 std::unique_lock<std::mutex> lock(historyMutex);
244 result.revision = revision;
247 long nowInMicroSeconds = now.toMicroSeconds();
250 for (viz::data::LayerUpdate
const&
update :
input.updates)
252 viz::data::TimestampedLayerUpdate& historyEntry = history.emplace_back();
253 historyEntry.revision = revision;
254 historyEntry.timestampInMicroseconds = nowInMicroSeconds;
255 historyEntry.update =
update;
259 for (viz::data::TimestampedLayerUpdate& layer : currentState)
261 if (layer.update.component ==
update.component &&
262 layer.update.name ==
update.name)
264 layer = historyEntry;
271 currentState.push_back(historyEntry);
276 long currentHistorySize = history.size();
277 if (currentHistorySize >= properties_.maxHistorySize)
280 std::unique_lock<std::mutex> lock(recordingMutex);
281 if (recordingMetaData.id.size() > 0)
283 auto& newBatch = recordingBuffer.emplace_back();
284 newBatch.initialState = recordingInitialState;
285 newBatch.updates = std::move(history);
286 recordingInitialState = currentState;
288 recordingCondition.notify_one();
295 if (
input.interactionComponent.size() > 0)
297 auto interactionsEnd = interactionBuffer.end();
298 auto foundInteractionsBegin =
299 std::partition(interactionBuffer.begin(),
303 if (interaction.component == input.interactionComponent)
305 for (std::string const& layer : input.interactionLayers)
307 if (interaction.layer == layer)
316 result.interactions.assign(foundInteractionsBegin, interactionsEnd);
317 interactionBuffer.erase(foundInteractionsBegin, interactionsEnd);
324 viz::data::LayerUpdates
325 ArVizStorage::pullUpdatesSince(
Ice::Long revision,
const Ice::Current&)
327 viz::data::LayerUpdates result;
329 std::unique_lock<std::mutex> lock(historyMutex);
331 result.updates.reserve(currentState.size());
332 for (
auto& layer : currentState)
334 if (layer.revision > revision)
336 result.updates.push_back(layer.update);
339 result.revision = this->revision;
344 static const int ALL_TRANSFORM_FLAGS =
345 viz::data::InteractionFeedbackType::TRANSFORM_BEGIN_FLAG |
346 viz::data::InteractionFeedbackType::TRANSFORM_DURING_FLAG |
347 viz::data::InteractionFeedbackType::TRANSFORM_END_FLAG;
349 viz::data::LayerUpdates
350 ArVizStorage::pullUpdatesSinceAndSendInteractions(
352 viz::data::InteractionFeedbackSeq
const& interactions,
353 const Ice::Current&
c)
355 viz::data::LayerUpdates result;
357 std::unique_lock<std::mutex> lock(historyMutex);
359 for (viz::data::InteractionFeedback
const&
interaction : interactions)
361 for (viz::data::InteractionFeedback& entry : interactionBuffer)
366 int previousTransformFlags = entry.type & ALL_TRANSFORM_FLAGS;
367 int interactionTransformFlags =
interaction.type & ALL_TRANSFORM_FLAGS;
372 if (previousTransformFlags != 0 && interactionTransformFlags != 0)
374 entry.type |= previousTransformFlags;
386 result.updates.reserve(currentState.size());
387 for (viz::data::TimestampedLayerUpdate& layer : currentState)
389 if (layer.revision > revision)
391 result.updates.push_back(layer.update);
394 result.revision = this->revision;
400 ArVizStorage::record()
402 while (!recordingTask->isStopped())
404 std::unique_lock<std::mutex> lock(recordingMutex);
405 while (!recordingTask->isStopped() && recordingBuffer.empty())
407 recordingCondition.wait_for(lock, std::chrono::milliseconds(10));
409 for (
auto& batch : recordingBuffer)
413 recordingBuffer.clear();
422 to_json(nlohmann::json& j, RecordingBatchHeader
const& batch)
424 j[
"index"] = batch.index;
425 j[
"firstRevision"] = batch.firstRevision;
426 j[
"lastRevision"] = batch.lastRevision;
427 j[
"firstTimestampInMicroSeconds"] = batch.firstTimestampInMicroSeconds;
428 j[
"lastTimestampInMicroSeconds"] = batch.lastTimestampInMicroSeconds;
432 from_json(nlohmann::json
const& j, RecordingBatchHeader& batch)
434 batch.index = j[
"index"];
435 batch.firstRevision = j[
"firstRevision"];
436 batch.lastRevision = j[
"lastRevision"];
437 batch.firstTimestampInMicroSeconds = j[
"firstTimestampInMicroSeconds"];
438 batch.lastTimestampInMicroSeconds = j[
"lastTimestampInMicroSeconds"];
442 to_json(nlohmann::json& j, Recording
const& recording)
444 j[
"id"] = recording.id;
445 j[
"firstRevision"] = recording.firstRevision;
446 j[
"lastRevision"] = recording.lastRevision;
447 j[
"firstTimestampInMicroSeconds"] = recording.firstTimestampInMicroSeconds;
448 j[
"lastTimestampInMicroSeconds"] = recording.lastTimestampInMicroSeconds;
449 j[
"batchHeaders"] = recording.batchHeaders;
453 from_json(nlohmann::json
const& j, Recording& recording)
455 recording.id = j[
"id"];
456 recording.firstRevision = j[
"firstRevision"];
457 recording.lastRevision = j[
"lastRevision"];
458 recording.firstTimestampInMicroSeconds = j[
"firstTimestampInMicroSeconds"];
459 recording.lastTimestampInMicroSeconds = j[
"lastTimestampInMicroSeconds"];
460 j[
"batchHeaders"].get_to(recording.batchHeaders);
466 writeCompleteFile(std::string
const&
filename,
const void*
data, std::size_t size)
468 FILE* file = fopen(
filename.c_str(),
"wb");
473 std::size_t written = std::fwrite(
data, 1, size, file);
483 readCompleteFile(std::filesystem::path
const& path)
485 FILE* f = fopen(path.string().c_str(),
"rb");
486 fseek(f, 0, SEEK_END);
487 long fsize = ftell(f);
488 fseek(f, 0, SEEK_SET);
490 std::string result(fsize,
'\0');
491 std::size_t
read = fread(result.data(), 1, fsize, f);
498 static std::optional<armarx::viz::data::Recording>
499 readRecordingInfo(std::filesystem::path
const& recordingDirectory)
501 std::optional<::armarx::viz::data::Recording> result;
503 std::filesystem::path recordingFilePath = recordingDirectory /
"recording.json";
504 if (!std::filesystem::exists(recordingFilePath))
506 ARMARX_INFO <<
"No recording.json found in directory: " << recordingDirectory;
512 std::string recordingString = readCompleteFile(recordingFilePath);
513 nlohmann::json recordingJson = nlohmann::json::parse(recordingString);
516 recordingJson.get_to(recording);
518 result = std::move(recording);
521 catch (std::exception
const& ex)
523 ARMARX_WARNING <<
"Could not parse JSON file: " << recordingFilePath
524 <<
"\nReason: " << ex.what();
530 batchFileName(armarx::viz::data::RecordingBatchHeader
const& batchHeader)
536 armarx::ArVizStorage::recordBatch(armarx::viz::data::RecordingBatch& batch)
538 if (batch.updates.empty())
543 auto& first = batch.updates.front();
544 auto& last = batch.updates.back();
546 batch.header.index = -1;
547 batch.header.firstRevision = first.revision;
548 batch.header.lastRevision = last.revision;
549 batch.header.firstTimestampInMicroSeconds = first.timestampInMicroseconds;
550 batch.header.lastTimestampInMicroSeconds = last.timestampInMicroseconds;
553 batch.initialState = currentState;
558 std::string
filename = batchFileName(batch.header);
559 std::filesystem::path filePath = recordingPath /
filename;
561 ObjectToIceBlobSerializer ser{batch};
563 if (!writeCompleteFile(filePath.string(), ser.begin(), ser.size()))
570 if (recordingMetaData.firstRevision < 0)
572 recordingMetaData.firstRevision = first.revision;
574 recordingMetaData.lastRevision = last.revision;
576 if (recordingMetaData.firstTimestampInMicroSeconds < 0)
578 recordingMetaData.firstTimestampInMicroSeconds = first.timestampInMicroseconds;
580 recordingMetaData.lastTimestampInMicroSeconds = last.timestampInMicroseconds;
582 armarx::viz::data::RecordingBatchHeader& newBatch =
583 recordingMetaData.batchHeaders.emplace_back();
584 newBatch.index = recordingMetaData.batchHeaders.size() - 1;
585 newBatch.firstRevision = first.revision;
586 newBatch.lastRevision = last.revision;
587 newBatch.firstTimestampInMicroSeconds = first.timestampInMicroseconds;
588 newBatch.lastTimestampInMicroSeconds = last.timestampInMicroseconds;
591 nlohmann::json j = recordingMetaData;
592 std::string jString = j.dump(2);
593 std::filesystem::path recordingFile = recordingPath /
"recording.json";
594 if (!writeCompleteFile(recordingFile.string(), jString.data(), jString.size()))
596 ARMARX_WARNING <<
"Could not write recording file: " << recordingFile;
600 ARMARX_INFO <<
"Recorded ArViz batch to: " << filePath;
607 std::unique_lock<std::mutex> lock(recordingMutex);
608 if (recordingMetaData.id.size() > 0)
611 <<
"Could not start recording with prefix " << newRecordingPrefix
612 <<
"\nbecause there is already a recording running for the recording ID: "
613 << recordingMetaData.id;
614 return recordingMetaData.id;
618 std::ostringstream id;
619 id << newRecordingPrefix <<
'_' << now.toString(
"%Y-%m-%d_%H-%M-%S");
620 std::string newRecordingID =
id.str();
622 recordingPath = properties_.historyPath / newRecordingID;
623 if (!std::filesystem::exists(recordingPath))
625 ARMARX_INFO <<
"Creating directory for recording with ID '" << newRecordingID
626 <<
"'\nPath: " << recordingPath;
627 std::filesystem::create_directory(recordingPath);
630 recordingBuffer.clear();
632 recordingMetaData.id = newRecordingID;
634 std::unique_lock<std::mutex> lock(historyMutex);
635 if (history.size() > 0)
637 auto& mostRecent = history.back();
638 recordingMetaData.firstRevision = mostRecent.revision;
639 recordingMetaData.firstTimestampInMicroSeconds = mostRecent.timestampInMicroseconds;
642 recordingMetaData.lastRevision = 0;
643 recordingMetaData.lastTimestampInMicroSeconds = 0;
644 recordingMetaData.batchHeaders.clear();
649 recordingTask->start();
662 recordingTask->stop();
663 recordingTask =
nullptr;
665 std::unique_lock<std::mutex> lock(recordingMutex);
667 viz::data::RecordingBatch lastBatch;
668 lastBatch.initialState = recordingInitialState;
669 lastBatch.updates = std::move(history);
670 recordBatch(lastBatch);
672 recordingMetaData.id =
"";
673 recordingMetaData.firstRevision = -1;
674 recordingMetaData.firstTimestampInMicroSeconds = -1;
677 armarx::viz::data::RecordingsInfo
680 viz::data::RecordingsInfo recordingsInfo;
681 viz::data::RecordingSeq result;
683 for (std::filesystem::directory_entry
const& entry :
684 std::filesystem::directory_iterator(properties_.historyPath))
688 if (!entry.is_directory())
693 std::optional<viz::data::Recording> recording = readRecordingInfo(entry.path());
696 result.push_back(std::move(*recording));
700 recordingsInfo.recordings = result;
701 recordingsInfo.recordingsPath = properties_.historyPath;
703 return recordingsInfo;
706 armarx::viz::data::RecordingBatch
711 viz::data::RecordingBatch result;
712 result.header.index = -1;
714 std::filesystem::path recordingPath = properties_.historyPath / recordingID;
715 std::optional<viz::data::Recording> recording = readRecordingInfo(recordingPath);
718 ARMARX_WARNING <<
"Could not read recording information for '" << recordingID <<
"'"
719 <<
"\nPath: " << recordingPath;
723 if (batchIndex < 0 || batchIndex >= (
long)recording->batchHeaders.size())
725 ARMARX_WARNING <<
"Batch index is not valid. Index = " << batchIndex
726 <<
"Batch count: " << recording->batchHeaders.size();
730 viz::data::RecordingBatchHeader
const& batchHeader = recording->batchHeaders[batchIndex];
731 std::filesystem::path batchFile = recordingPath / batchFileName(batchHeader);
732 if (!std::filesystem::exists(batchFile))
734 ARMARX_WARNING <<
"Could not find batch file for recording '" << recordingID
735 <<
"' with index " << batchIndex <<
"\nPath: " << batchFile;
741 result.header.index = batchIndex;