30 #include <SimoxUtility/json/json.hpp>
37 static std::filesystem::path getAbsolutePath(
const std::filesystem::path& path)
39 if (path.is_absolute())
62 return "ArVizStorage";
70 defs->optional(topicName,
"TopicName",
71 "Layer updates are sent over this topic.");
73 defs->optional(maxHistorySize,
"MaxHistorySize",
74 "How many layer updates are saved in the history until they are compressed")
77 defs->defineOptionalProperty<std::string>(
78 "HistoryPath",
"RobotAPI/ArVizStorage",
79 "Destination path where the history is serialized to");
87 std::filesystem::path historyPathProp = getProperty<std::string>(
"HistoryPath").getValue();
88 historyPath = getAbsolutePath(historyPathProp);
89 if (!std::filesystem::exists(historyPath))
91 ARMARX_INFO <<
"Creating history path: " << historyPath;
92 std::error_code error;
93 std::filesystem::create_directory(historyPath, error);
108 currentState.clear();
110 recordingInitialState.clear();
112 recordingBuffer.clear();
113 recordingMetaData.id =
"";
121 recordingTask->stop();
122 recordingTask =
nullptr;
134 std::unique_lock<std::mutex> lock(historyMutex);
138 long nowInMicroSeconds = now.toMicroSeconds();
140 for (
auto&
update : updates)
142 if (
update.component.empty())
145 <<
"Discarding ArViz update with empty component name. Check whether "
146 <<
"you correctly create your ArViz client (`armarx::viz::Client`) "
151 auto& historyEntry = history.emplace_back();
152 historyEntry.revision = revision;
153 historyEntry.timestampInMicroseconds = nowInMicroSeconds;
154 historyEntry.update =
update;
158 for (
auto& layer : currentState)
160 if (layer.update.component ==
update.component
161 && layer.update.name ==
update.name)
163 layer = historyEntry;
170 currentState.push_back(historyEntry);
174 long currentHistorySize = history.size();
175 if (currentHistorySize >= maxHistorySize)
178 std::unique_lock<std::mutex> lock(recordingMutex);
179 if (recordingMetaData.id.size() > 0)
181 auto& newBatch = recordingBuffer.emplace_back();
182 newBatch.initialState = recordingInitialState;
183 newBatch.updates = std::move(history);
184 recordingInitialState = currentState;
186 recordingCondition.notify_one();
195 viz::data::CommitResult result;
198 std::unique_lock<std::mutex> lock(historyMutex);
201 result.revision = revision;
204 long nowInMicroSeconds = now.toMicroSeconds();
207 for (viz::data::LayerUpdate
const&
update :
input.updates)
209 viz::data::TimestampedLayerUpdate& historyEntry = history.emplace_back();
210 historyEntry.revision = revision;
211 historyEntry.timestampInMicroseconds = nowInMicroSeconds;
212 historyEntry.update =
update;
216 for (viz::data::TimestampedLayerUpdate& layer : currentState)
218 if (layer.update.component ==
update.component
219 && layer.update.name ==
update.name)
221 layer = historyEntry;
228 currentState.push_back(historyEntry);
233 long currentHistorySize = history.size();
234 if (currentHistorySize >= maxHistorySize)
237 std::unique_lock<std::mutex> lock(recordingMutex);
238 if (recordingMetaData.id.size() > 0)
240 auto& newBatch = recordingBuffer.emplace_back();
241 newBatch.initialState = recordingInitialState;
242 newBatch.updates = std::move(history);
243 recordingInitialState = currentState;
245 recordingCondition.notify_one();
252 if (
input.interactionComponent.size() > 0)
254 auto interactionsEnd = interactionBuffer.end();
255 auto foundInteractionsBegin = std::partition(interactionBuffer.begin(), interactionsEnd,
258 if (interaction.component == input.interactionComponent)
260 for (std::string const& layer : input.interactionLayers)
262 if (interaction.layer == layer)
271 result.interactions.assign(foundInteractionsBegin, interactionsEnd);
272 interactionBuffer.erase(foundInteractionsBegin, interactionsEnd);
281 viz::data::LayerUpdates ArVizStorage::pullUpdatesSince(
Ice::Long revision,
const Ice::Current&)
283 viz::data::LayerUpdates result;
285 std::unique_lock<std::mutex> lock(historyMutex);
287 result.updates.reserve(currentState.size());
288 for (
auto& layer : currentState)
290 if (layer.revision > revision)
292 result.updates.push_back(layer.update);
295 result.revision = this->revision;
300 static const int ALL_TRANSFORM_FLAGS = viz::data::InteractionFeedbackType::TRANSFORM_BEGIN_FLAG
301 | viz::data::InteractionFeedbackType::TRANSFORM_DURING_FLAG
302 | viz::data::InteractionFeedbackType::TRANSFORM_END_FLAG;
304 viz::data::LayerUpdates ArVizStorage::pullUpdatesSinceAndSendInteractions(
305 Ice::Long revision, viz::data::InteractionFeedbackSeq
const& interactions,
const Ice::Current&
c)
307 viz::data::LayerUpdates result;
309 std::unique_lock<std::mutex> lock(historyMutex);
311 for (viz::data::InteractionFeedback
const&
interaction : interactions)
313 for (viz::data::InteractionFeedback& entry : interactionBuffer)
319 int previousTransformFlags = entry.type & ALL_TRANSFORM_FLAGS;
320 int interactionTransformFlags =
interaction.type & ALL_TRANSFORM_FLAGS;
325 if (previousTransformFlags != 0 && interactionTransformFlags != 0)
327 entry.type |= previousTransformFlags;
339 result.updates.reserve(currentState.size());
340 for (viz::data::TimestampedLayerUpdate& layer : currentState)
342 if (layer.revision > revision)
344 result.updates.push_back(layer.update);
347 result.revision = this->revision;
352 void ArVizStorage::record()
354 while (!recordingTask->isStopped())
356 std::unique_lock<std::mutex> lock(recordingMutex);
357 while (!recordingTask->isStopped() && recordingBuffer.empty())
359 recordingCondition.wait_for(lock, std::chrono::milliseconds(10));
361 for (
auto& batch : recordingBuffer)
365 recordingBuffer.clear();
373 void to_json(nlohmann::json& j, RecordingBatchHeader
const& batch)
375 j[
"index"] = batch.index;
376 j[
"firstRevision"] = batch.firstRevision;
377 j[
"lastRevision"] = batch.lastRevision;
378 j[
"firstTimestampInMicroSeconds"] = batch.firstTimestampInMicroSeconds;
379 j[
"lastTimestampInMicroSeconds"] = batch.lastTimestampInMicroSeconds;
382 void from_json(nlohmann::json
const& j, RecordingBatchHeader& batch)
384 batch.index = j[
"index"] ;
385 batch.firstRevision = j[
"firstRevision"];
386 batch.lastRevision = j[
"lastRevision"];
387 batch.firstTimestampInMicroSeconds = j[
"firstTimestampInMicroSeconds"];
388 batch.lastTimestampInMicroSeconds = j[
"lastTimestampInMicroSeconds"];
391 void to_json(nlohmann::json& j, Recording
const& recording)
393 j[
"id"] = recording.id;
394 j[
"firstRevision"] = recording.firstRevision;
395 j[
"lastRevision"] = recording.lastRevision;
396 j[
"firstTimestampInMicroSeconds"] = recording.firstTimestampInMicroSeconds;
397 j[
"lastTimestampInMicroSeconds"] = recording.lastTimestampInMicroSeconds;
398 j[
"batchHeaders"] = recording.batchHeaders;
401 void from_json(nlohmann::json
const& j, Recording& recording)
403 recording.id = j[
"id"];
404 recording.firstRevision = j[
"firstRevision"];
405 recording.lastRevision = j[
"lastRevision"];
406 recording.firstTimestampInMicroSeconds = j[
"firstTimestampInMicroSeconds"];
407 recording.lastTimestampInMicroSeconds = j[
"lastTimestampInMicroSeconds"];
408 j[
"batchHeaders"].get_to(recording.batchHeaders);
413 static bool writeCompleteFile(std::string
const&
filename,
414 const void* data, std::size_t size)
416 FILE* file = fopen(
filename.c_str(),
"wb");
421 std::size_t written = std::fwrite(data, 1, size, file);
430 static std::string readCompleteFile(std::filesystem::path
const& path)
432 FILE* f = fopen(path.string().c_str(),
"rb");
433 fseek(f, 0, SEEK_END);
434 long fsize = ftell(f);
435 fseek(f, 0, SEEK_SET);
437 std::string result(fsize,
'\0');
438 std::size_t
read = fread(result.data(), 1, fsize, f);
445 static std::optional<armarx::viz::data::Recording> readRecordingInfo(std::filesystem::path
const& recordingDirectory)
447 std::optional<::armarx::viz::data::Recording> result;
449 std::filesystem::path recordingFilePath = recordingDirectory /
"recording.json";
450 if (!std::filesystem::exists(recordingFilePath))
452 ARMARX_INFO <<
"No recording.json found in directory: " << recordingDirectory;
458 std::string recordingString = readCompleteFile(recordingFilePath);
459 nlohmann::json recordingJson = nlohmann::json::parse(recordingString);
462 recordingJson.get_to(recording);
464 result = std::move(recording);
467 catch (std::exception
const& ex)
469 ARMARX_WARNING <<
"Could not parse JSON file: " << recordingFilePath
470 <<
"\nReason: " << ex.what();
475 static std::string batchFileName(armarx::viz::data::RecordingBatchHeader
const& batchHeader)
480 void armarx::ArVizStorage::recordBatch(armarx::viz::data::RecordingBatch& batch)
482 if (batch.updates.empty())
487 auto& first = batch.updates.front();
488 auto& last = batch.updates.back();
490 batch.header.index = -1;
491 batch.header.firstRevision = first.revision;
492 batch.header.lastRevision = last.revision;
493 batch.header.firstTimestampInMicroSeconds = first.timestampInMicroseconds;
494 batch.header.lastTimestampInMicroSeconds = last.timestampInMicroseconds;
497 batch.initialState = currentState;
502 std::string
filename = batchFileName(batch.header);
503 std::filesystem::path filePath = recordingPath /
filename;
505 ObjectToIceBlobSerializer ser{batch};
507 if (!writeCompleteFile(filePath.string(), ser.begin(), ser.size()))
514 if (recordingMetaData.firstRevision < 0)
516 recordingMetaData.firstRevision = first.revision;
518 recordingMetaData.lastRevision = last.revision;
520 if (recordingMetaData.firstTimestampInMicroSeconds < 0)
522 recordingMetaData.firstTimestampInMicroSeconds = first.timestampInMicroseconds;
524 recordingMetaData.lastTimestampInMicroSeconds = last.timestampInMicroseconds;
526 armarx::viz::data::RecordingBatchHeader& newBatch = recordingMetaData.batchHeaders.emplace_back();
527 newBatch.index = recordingMetaData.batchHeaders.size() - 1;
528 newBatch.firstRevision = first.revision;
529 newBatch.lastRevision = last.revision;
530 newBatch.firstTimestampInMicroSeconds = first.timestampInMicroseconds;
531 newBatch.lastTimestampInMicroSeconds = last.timestampInMicroseconds;
534 nlohmann::json j = recordingMetaData;
535 std::string jString = j.dump(2);
536 std::filesystem::path recordingFile = recordingPath /
"recording.json";
537 if (!writeCompleteFile(recordingFile.string(), jString.data(), jString.size()))
539 ARMARX_WARNING <<
"Could not write recording file: " << recordingFile;
543 ARMARX_INFO <<
"Recorded ArViz batch to: " << filePath;
549 std::unique_lock<std::mutex> lock(recordingMutex);
550 if (recordingMetaData.id.size() > 0)
552 ARMARX_WARNING <<
"Could not start recording with prefix " << newRecordingPrefix
553 <<
"\nbecause there is already a recording running for the recording ID: "
554 << recordingMetaData.id;
555 return recordingMetaData.id;
559 std::ostringstream id;
560 id << newRecordingPrefix
562 << now.toString(
"%Y-%m-%d_%H-%M-%S");
563 std::string newRecordingID =
id.str();
565 recordingPath = historyPath / newRecordingID;
566 if (!std::filesystem::exists(recordingPath))
568 ARMARX_INFO <<
"Creating directory for recording with ID '" << newRecordingID
569 <<
"'\nPath: " << recordingPath;
570 std::filesystem::create_directory(recordingPath);
573 recordingBuffer.clear();
575 recordingMetaData.id = newRecordingID;
577 std::unique_lock<std::mutex> lock(historyMutex);
578 if (history.size() > 0)
580 auto& mostRecent = history.back();
581 recordingMetaData.firstRevision = mostRecent.revision;
582 recordingMetaData.firstTimestampInMicroSeconds = mostRecent.timestampInMicroseconds;
585 recordingMetaData.lastRevision = 0;
586 recordingMetaData.lastTimestampInMicroSeconds = 0;
587 recordingMetaData.batchHeaders.clear();
592 recordingTask->start();
604 recordingTask->stop();
605 recordingTask =
nullptr;
607 std::unique_lock<std::mutex> lock(recordingMutex);
609 viz::data::RecordingBatch lastBatch;
610 lastBatch.initialState = recordingInitialState;
611 lastBatch.updates = std::move(history);
612 recordBatch(lastBatch);
614 recordingMetaData.id =
"";
615 recordingMetaData.firstRevision = -1;
616 recordingMetaData.firstTimestampInMicroSeconds = -1;
621 viz::data::RecordingsInfo recordingsInfo;
622 viz::data::RecordingSeq result;
624 for (std::filesystem::directory_entry
const& entry : std::filesystem::directory_iterator(historyPath))
628 if (!entry.is_directory())
633 std::optional<viz::data::Recording> recording = readRecordingInfo(entry.path());
636 result.push_back(std::move(*recording));
640 recordingsInfo.recordings = result;
641 recordingsInfo.recordingsPath = historyPath;
643 return recordingsInfo;
648 viz::data::RecordingBatch result;
649 result.header.index = -1;
651 std::filesystem::path recordingPath = historyPath / recordingID;
652 std::optional<viz::data::Recording> recording = readRecordingInfo(recordingPath);
655 ARMARX_WARNING <<
"Could not read recording information for '" << recordingID <<
"'"
656 <<
"\nPath: " << recordingPath;
660 if (batchIndex < 0 || batchIndex >= (
long)recording->batchHeaders.size())
662 ARMARX_WARNING <<
"Batch index is not valid. Index = " << batchIndex
663 <<
"Batch count: " << recording->batchHeaders.size();
667 viz::data::RecordingBatchHeader
const& batchHeader = recording->batchHeaders[batchIndex];
668 std::filesystem::path batchFile = recordingPath / batchFileName(batchHeader);
669 if (!std::filesystem::exists(batchFile))
671 ARMARX_WARNING <<
"Could not find batch file for recording '" << recordingID
672 <<
"' with index " << batchIndex
673 <<
"\nPath: " << batchFile;
679 result.header.index = batchIndex;