ReadWritePluginUser.cpp
Go to the documentation of this file.
2
3#include <SimoxUtility/algorithm/string/string_tools.h> //for loadExportToLTM
4
7
14
15#include "Plugin.h"
16
18{
19
24
28
29 void
30 ReadWritePluginUser::setMemoryName(const std::string& memoryName)
31 {
32 plugin->setMemoryName(memoryName);
33 }
34
35 // WRITING
36 data::AddSegmentsResult
37 ReadWritePluginUser::addSegments(const data::AddSegmentsInput& input, const Ice::Current&)
38 {
40 bool addCoreSegmentOnUsage = false;
41 return addSegments(input, addCoreSegmentOnUsage);
42 }
43
44 data::AddSegmentsResult
45 ReadWritePluginUser::addSegments(const data::AddSegmentsInput& input, bool addCoreSegments)
46 {
48 data::AddSegmentsResult result = iceAdapter().addSegments(input, addCoreSegments);
49 return result;
50 }
51
52 data::CommitResult
53 ReadWritePluginUser::commit(const data::Commit& commitIce, const Ice::Current&)
54 {
56 return iceAdapter().commitLocking(commitIce);
57 }
58
59 void
61 {
63
64 ARMARX_DEBUG << "Clear working memory id="
66
67 iceAdapter().workingMemory->forEachCoreSegment([](auto& coreSegment)
68 { coreSegment.clear(); });
69 }
70
71 // READING
72 armem::query::data::Result
73 ReadWritePluginUser::query(const armem::query::data::Input& input, const Ice::Current&)
74 {
76 return iceAdapter().query(input);
77 }
78
79 armem::query::data::Result
80 ReadWritePluginUser::queryLTM(const armem::query::data::Input& input,
81 bool storeIntoWM,
82 const Ice::Current&)
83 {
85 return iceAdapter().queryLTM(input, storeIntoWM);
86 }
88 structure::data::GetServerStructureResult
90 {
93 }
94
95 // LTM STORING AND RECORDING
96 dto::DirectlyStoreResult
97 ReadWritePluginUser::directlyStore(const dto::DirectlyStoreInput& input, const Ice::Current&)
98 {
100 return iceAdapter().directlyStore(input);
101 }
102
103 dto::StartRecordResult
104 ReadWritePluginUser::startRecord(const dto::StartRecordInput& startRecordInput,
105 const Ice::Current&)
106 {
108 return iceAdapter().startRecord(startRecordInput);
109 }
110
111 dto::StopRecordResult
113 {
115 return iceAdapter().stopRecord();
116 }
117
118 dto::RecordStatusResult
120 {
122 return iceAdapter().getRecordStatus();
123 }
124
125 Plugin&
127 {
128 return *plugin;
129 }
130
133 {
134 return plugin->workingMemory;
135 }
136
139 {
140 return plugin->iceAdapter;
141 }
142
145 {
146 return plugin->longtermMemory;
147 }
148
149 // ACTIONS
150 armem::actions::GetActionsOutputSeq
151 ReadWritePluginUser::getActions(const armem::actions::GetActionsInputSeq& inputs,
152 const ::Ice::Current& /*unused*/)
153 {
154 return getActions(inputs);
155 }
156
157 armem::actions::GetActionsOutputSeq
158 ReadWritePluginUser::getActions(const armem::actions::GetActionsInputSeq& inputs)
159 {
160 (void)inputs;
161 return {};
162 }
163
164 armem::actions::ExecuteActionOutputSeq
165 ReadWritePluginUser::executeActions(const armem::actions::ExecuteActionInputSeq& inputs,
166 const ::Ice::Current& /*unused*/)
167 {
168 return executeActions(inputs);
169 }
170
171 armem::actions::ExecuteActionOutputSeq
172 ReadWritePluginUser::executeActions(const armem::actions::ExecuteActionInputSeq& inputs)
173 {
174 return {};
175 }
176
177 // PREDICTIONS
178 armem::prediction::data::PredictionResultSeq
179 ReadWritePluginUser::predict(const armem::prediction::data::PredictionRequestSeq& requests)
180 {
181 return iceAdapter().predict(requests);
182 }
183
184 armem::prediction::data::EngineSupportMap
189
190 armem::prediction::data::PredictionResultSeq
191 ReadWritePluginUser::predict(const armem::prediction::data::PredictionRequestSeq& requests,
192 const ::Ice::Current& /*unused*/)
193 {
194 return predict(requests);
195 }
196
197 armem::prediction::data::EngineSupportMap
198 ReadWritePluginUser::getAvailableEngines(const ::Ice::Current& /*unused*/)
199 {
200 return getAvailableEngines();
201 }
202
205 {
206 // Only take the core segments defined in the property 'loadedCoreSegments'
207 // Why not full? Legacy code, previously also only retrieved not all core segments.
209 return armem::CommitResult();
210 }
211
212 void
214 const std::string& exportName,
215 const std::string& host,
216 int port,
217 bool overwritePrevStrategy,
218 const ::Ice::Current&)
219 {
220 ARMARX_IMPORTANT << "Setting persistence strategy=rest via PROXY";
221
222 std::shared_ptr<armem::server::ltm::persistence::RestPersistence> restPersistence =
223 std::make_shared<armem::server::ltm::persistence::RestPersistence>(
224 identifier, exportName, host, port);
225
226 ARMARX_INFO << "MemoryID=" << iceAdapter().longtermMemory->getMemoryID().str();
227 ARMARX_INFO << "Identifier=" << identifier;
228 ARMARX_INFO << "Export name=" << exportName;
229 ARMARX_INFO << "Host=" << host;
230 ARMARX_INFO << "Port=" << port;
231 ARMARX_INFO << "OverwritePrevStrategy=" << overwritePrevStrategy;
232
233 if (overwritePrevStrategy)
234 {
235 iceAdapter().longtermMemory->getPersistenceStrategy()->clearStrategies();
236 iceAdapter().longtermMemory->getPersistenceStrategy()->addStrategy(restPersistence);
237 }
238 else
239 {
240 iceAdapter().longtermMemory->getPersistenceStrategy()->addStrategy(restPersistence);
241 }
242 }
243
244 void
246 const std::string& exportName,
247 const std::string& exportPath,
248 bool overwritePrevStrategy,
249 const ::Ice::Current&)
250 {
251 ARMARX_IMPORTANT << "Setting persistence strategy=disk via PROXY";
252
253 std::shared_ptr<armem::server::ltm::persistence::DiskPersistence> diskPersistence =
254 std::make_shared<armem::server::ltm::persistence::DiskPersistence>(
255 identifier, exportName, std::filesystem::path(exportPath));
256
257 ARMARX_INFO << "MemoryID=" << iceAdapter().longtermMemory->getMemoryID().str();
258 ARMARX_INFO << "Identifier=" << identifier;
259 ARMARX_INFO << "Export name=" << exportName;
260 ARMARX_INFO << "Export path=" << exportPath;
261 ARMARX_INFO << "OverwritePrevStrategy=" << overwritePrevStrategy;
262
263 if (overwritePrevStrategy)
264 {
265 iceAdapter().longtermMemory->getPersistenceStrategy()->clearStrategies();
266 iceAdapter().longtermMemory->getPersistenceStrategy()->addStrategy(diskPersistence);
267 }
268 else
269 {
270 iceAdapter().longtermMemory->getPersistenceStrategy()->addStrategy(diskPersistence);
271 }
272 }
273
274 void
276 const std::string& exportName,
277 bool overwritePrevStrategy,
278 const ::Ice::Current&)
279 {
280 // TODO
281 }
282
283 void
284 ReadWritePluginUser::loadLTMintoWM(bool complete, const ::Ice::Current&)
285 {
286 if (complete)
287 {
289 }
290 else
291 {
292 // Only take the core segments defined in the property 'loadedCoreSegments'
294 }
295 }
296
297 bool
298 ReadWritePluginUser::loadExportIntoWM(const std::string& export_path,
299 const std::string& memoryName,
300 const std::vector<std::string>& coreSegmentNames,
301 bool addNonExistingCoreSegments,
302 int amountOfSnapshotsPerSegmentToLoad,
303 const ::Ice::Current&)
304 {
305 ARMARX_INFO << "at ReadWritePluginUser";
306 //we only build the directy and exportName from the export_path and then use the original function:
307 auto splitAtLastSlash =
308 [this, &export_path](const std::string& input) -> std::pair<std::string, std::string>
309 {
310 size_t lastSlash = input.rfind('/');
311
312 if (lastSlash == std::string::npos)
313 {
315 << "Could not load LTM into WM, as the given export_path does not contain '/', "
316 "which it needs to in order to split into directory_path and exportName. \n"
317 << "The path you have provided is: \n"
318 << export_path;
319 return {"", ""};
320 }
321
322 std::string first = input.substr(0, lastSlash);
323 std::string second = input.substr(lastSlash + 1);
324
325 if (!second.empty())
326 {
327 return {first, second};
328 }
329
330 // If second part is empty, try finding an earlier slash
331 size_t secondLastSlash = input.rfind('/', lastSlash - 1);
332 if (secondLastSlash == std::string::npos)
333 {
334 ARMARX_ERROR << "The path you provided to load the LTM into the WM ends with a '/' "
335 "but does not contain any further '/'. "
336 << "Check if you provided the correct path. The path needs to consist "
337 "of a path to a directory, a '/' as a divider, and the name of the "
338 "export directory.\n"
339 << "You have given the following path: \n"
340 << export_path;
341 return {"", ""};
342 }
343
344 return {input.substr(0, secondLastSlash),
345 input.substr(secondLastSlash + 1, lastSlash - secondLastSlash - 1)};
346 };
347
348 auto [directory, exportName] = splitAtLastSlash(export_path);
349 if (directory == "" && exportName == "")
350 {
351 return false;
352 }
353 return this->loadExportIntoWM(directory,
354 exportName,
355 memoryName,
356 coreSegmentNames,
357 addNonExistingCoreSegments,
358 amountOfSnapshotsPerSegmentToLoad);
359 }
360
361 bool
362 ReadWritePluginUser::loadExportIntoWM(const std::string& directory,
363 const std::string& exportName,
364 const std::string& memoryName,
365 const std::vector<std::string>& coreSegmentNames,
366 bool addNonExistingCoreSegments,
367 int amountOfSnapshotsPerSegmentToLoad)
368 {
369 ARMARX_IMPORTANT << "Starting to load data from disk into WM...";
370 ARMARX_DEBUG << VAROUT(directory);
371 ARMARX_DEBUG << VAROUT(exportName);
372 ARMARX_DEBUG << VAROUT(memoryName);
373 ARMARX_DEBUG << VAROUT(coreSegmentNames);
374
375 ARMARX_DEBUG << VAROUT(addNonExistingCoreSegments);
376 if (addNonExistingCoreSegments)
377 {
379 << "You tried to set addNonExistingCoreSegments to true, this is not supported yet";
380 }
381 ARMARX_DEBUG << VAROUT(amountOfSnapshotsPerSegmentToLoad);
382
383 std::filesystem::path path(directory);
384 path.append(exportName);
385 path.append(memoryName);
386
387 //check existance and depth and contains memory data
388 if (not std::filesystem::is_directory(path))
389 {
390 ARMARX_WARNING << "You are trying to load data from a path which does not exist";
391 ARMARX_WARNING << VAROUT(path);
392 return false;
393 }
394
395 // Find out whether this is a single memory or a collection of memories by searching
396 // for a data.aron.* or metadata.aron.* file at depth 5 (if 6 then it is collection of memories)
397 bool isSingleMemory = false;
398 for (auto i = std::filesystem::recursive_directory_iterator(path);
399 i != std::filesystem::recursive_directory_iterator();
400 ++i)
401 {
402 if (i.depth() >
404 {
405 // In case we do not find any memory data in here
407 << "You are trying to import a directory which does not contain memory data";
408 ARMARX_WARNING << VAROUT(path);
409 return false;
410 }
411
412 auto& dir = *i;
413
414 // if one matches it is enough to check
415 if (std::filesystem::is_regular_file(dir.path()) &&
416 simox::alg::starts_with(dir.path().filename(), "data.aron"))
417 {
418 isSingleMemory = (i.depth() == armem::server::ltm::persistence::
419 MemoryPersistenceStrategy::DEPTH_TO_DATA_FILES);
420 if (!isSingleMemory)
421 {
422 ARMARX_INFO << "Depth = " << i.depth();
423 ARMARX_INFO << "Depth to data files: "
424 << armem::server::ltm::persistence::MemoryPersistenceStrategy::
425 DEPTH_TO_DATA_FILES;
426 }
427 break;
428 }
429 }
430
431 if (!isSingleMemory)
432 {
433 ARMARX_WARNING << "Could not load data from LTM at " << directory
434 << " and write it to the current WM"
435 << "\n"
436 << "Reason: the path you have given is not a single memory but contains "
437 "more than one memory";
438 return false;
439 }
440
441 ARMARX_DEBUG << "Now loading data from filesystem LTM...";
442
443 armem::wm::Memory memoryData;
444
445 if (std::filesystem::is_directory(directory))
446 {
447 std::string defaultIdent = "DefaultDisk";
448 ARMARX_INFO << "Path of the folder trying to import: " << path;
449 std::shared_ptr<armem::server::ltm::persistence::DiskPersistence> diskPersistence =
450 std::make_shared<armem::server::ltm::persistence::DiskPersistence>(
451 defaultIdent, exportName, path.parent_path().parent_path());
452
453 armem::server::ltm::Memory ltm(exportName, memoryName);
454 if (ltm.getPersistenceStrategy())
455 {
456 ltm.getPersistenceStrategy()->addStrategy(diskPersistence);
457 }
458 else
459 {
460 std::shared_ptr<armem::server::ltm::persistence::RedundantPersistenceStrategy>
461 redundantPersistence = std::make_shared<
463
464 ltm.setPersistenceStrategy(redundantPersistence);
465 ltm.getPersistenceStrategy()->addStrategy(diskPersistence);
466 }
467
469
470 if (amountOfSnapshotsPerSegmentToLoad != -1)
471 {
472 ARMARX_INFO << "You specified amountOfSnapshotsPerSegmentToLoad = "
473 << std::to_string(amountOfSnapshotsPerSegmentToLoad);
474 if (coreSegmentNames.size() != 0)
475 {
476 std::list<std::string> lst(coreSegmentNames.begin(), coreSegmentNames.end());
477 ltm.loadLatestNReferences(amountOfSnapshotsPerSegmentToLoad, memory, lst);
478 }
479 else
480 {
481 memory = ltm.loadLatestNReferences(amountOfSnapshotsPerSegmentToLoad);
482 }
483 ltm.resolve(memory);
484 }
485 else
486 {
487 if (coreSegmentNames.size() != 0)
488 {
489 std::list<std::string> lst(coreSegmentNames.begin(), coreSegmentNames.end());
490 ltm.loadAllAndResolve(memory, lst);
491 }
492 else
493 {
494 memory = ltm.loadAllAndResolve(); // load list of references and load data
495 }
496 }
497
498 memoryData = std::move(memory);
499 }
500
501 // we need to merge the current memory with the one from the disk.
502 // Please note that this may ignores coresegments, if not already existent
503 auto commit = armem::toCommit(memoryData);
505 data::Commit iceCommit;
506 toIce(iceCommit, commit);
507 this->commit(iceCommit);
508
509 ARMARX_INFO << "Successfully loaded data from " << path
510 << " into the current working memory";
511
512 return true;
513 }
514
515} // namespace armarx::armem::server::plugins
#define VAROUT(x)
PluginT * addPlugin(const std::string prefix="", ParamsT &&... params)
MemoryID cleanID() const
Definition MemoryID.cpp:133
std::string str(bool escapeDelimiters=true) const
Get a string representation of this memory ID.
Definition MemoryID.cpp:102
bool forEachCoreSegment(CoreSegmentFunctionT &&func)
Definition MemoryBase.h:188
Helps connecting a Memory server to the Ice interface.
armem::structure::data::GetServerStructureResult getServerStructure()
dto::StartRecordResult startRecord(const dto::StartRecordInput &startRecordInput)
armem::CommitResult reloadFromLTMOnStartup()
Triggers a reload (.
armem::CommitResult reloadAllFromLTM()
Loads all core segments and their data from the LTM.
query::data::Result queryLTM(const armem::query::data::Input &input, bool storeIntoWM)
Query the LTMs of the memory server.
query::data::Result query(const armem::query::data::Input &input)
prediction::data::PredictionResultSeq predict(prediction::data::PredictionRequestSeq requests)
dto::DirectlyStoreResult directlyStore(const dto::DirectlyStoreInput &directlStoreInput)
armem::CommitResult reloadPropertyDefinedCoreSegmentsFromLTM()
data::CommitResult commitLocking(const data::Commit &commitIce, Time timeArrived)
prediction::data::EngineSupportMap getAvailableEngines()
data::AddSegmentsResult addSegments(const data::AddSegmentsInput &input, bool addCoreSegments=false)
A memory storing data on the hard drive and in mongodb (needs 'armarx memory start' to start the mong...
Definition Memory.h:24
std::shared_ptr< persistence::RedundantPersistenceStrategy > getPersistenceStrategy() const
Definition Memory.h:96
Basically the option to use multiple ltm sinks as source or target.
virtual void useDiskPersistenceStrategy(const std::string &identifier, const std::string &exportName, const std::string &exportPath, bool overwritePrevStrategy, const ::Ice::Current &) override
Use a disk persistence strategy for storing the wm as ltm Uses the disk as ltm sink.
virtual dto::StopRecordResult stopRecord(const Ice::Current &=Ice::emptyCurrent) override
virtual armem::query::data::Result query(const armem::query::data::Input &input, const Ice::Current &=Ice::emptyCurrent) override
virtual data::CommitResult commit(const data::Commit &commit, const Ice::Current &=Ice::emptyCurrent) override
virtual armem::query::data::Result queryLTM(const armem::query::data::Input &input, bool storeIntoWM, const Ice::Current &=Ice::emptyCurrent) override
Queries only the LTMs of the memory server.
virtual void loadLTMintoWM(bool complete, const ::Ice::Current &) override
Loads the data of the LTMs into the WM.
virtual dto::RecordStatusResult getRecordStatus(const Ice::Current &=Ice::emptyCurrent) override
virtual armem::actions::ExecuteActionOutputSeq executeActions(const armem::actions::ExecuteActionInputSeq &inputs)
virtual data::AddSegmentsResult addSegments(const data::AddSegmentsInput &input, const Ice::Current &=Ice::emptyCurrent) override
bool loadExportIntoWM(const std::string &export_path, const std::string &memoryName, const std::vector< std::string > &coreSegmentName={}, bool addNonExistingCoreSegments=false, int amountOfSnapshotsPerSegmentToLoad=-1, const ::Ice::Current &=Ice::emptyCurrent) override
loads data from a specified LTM export into the currently running WM (needs to already have the CoreS...
virtual dto::StartRecordResult startRecord(const dto::StartRecordInput &startRecordInput, const Ice::Current &=Ice::emptyCurrent) override
virtual dto::DirectlyStoreResult directlyStore(const dto::DirectlyStoreInput &, const Ice::Current &=Ice::emptyCurrent) override
virtual void useMongoDbPersistenceStrategy(const std::string &identifier, const std::string &exportName, bool overwritePrevStrategy, const ::Ice::Current &) override
Use a mongodb persistence strategy for storing the wm as ltm Uses a mongodb as ltm sink.
virtual armem::actions::GetActionsOutputSeq getActions(const armem::actions::GetActionsInputSeq &inputs)
virtual void clearWorkingMemory(const Ice::Current &=Ice::emptyCurrent) override
Clears the working memory usings its clear feature.
virtual armem::structure::data::GetServerStructureResult getServerStructure(const Ice::Current &=Ice::emptyCurrent) override
virtual void useRestPersistenceStrategy(const std::string &identifier, const std::string &exportName, const std::string &host, int port, bool overwritePrevStrategy, const ::Ice::Current &) override
Use a rest persistence strategy for storing the wm as ltm Uses a rest server as ltm sink.
virtual armem::prediction::data::EngineSupportMap getAvailableEngines()
virtual armem::prediction::data::PredictionResultSeq predict(const armem::prediction::data::PredictionRequestSeq &requests)
Client-side working memory.
Brief description of class memory.
Definition memory.h:39
#define ARMARX_INFO
The normal logging level.
Definition Logging.h:181
#define ARMARX_IMPORTANT
The logging level for always important information, but expected behaviour (in contrast to ARMARX_WAR...
Definition Logging.h:190
#define ARMARX_ERROR
The logging level for unexpected behaviour, that must be fixed.
Definition Logging.h:196
#define ARMARX_DEBUG
The logging level for output that is only interesting while debugging.
Definition Logging.h:184
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
Commit toCommit(const ContainerT &container)
Definition operations.h:23
void toIce(data::MemoryID &ice, const MemoryID &id)
Result of a Commit.
Definition Commit.h:111
#define ARMARX_TRACE
Definition trace.h:77