FluxioCompositeExecutor.cpp
Go to the documentation of this file.
2
3#include <algorithm>
4#include <atomic>
5#include <chrono>
6#include <cstddef>
7#include <experimental/memory>
8#include <functional>
9#include <iterator>
10#include <list>
11#include <memory>
12#include <mutex>
13#include <optional>
14#include <shared_mutex>
15#include <string>
16#include <thread>
17#include <utility>
18#include <vector>
19
20#include <IceUtil/UUID.h>
21
24
37
39#include "../FluxioEdge.h"
40#include "../FluxioParameter.h"
41#include "FluxioLoopExecutor.h"
43
44namespace armarx::skills
45{
47 const std::string& id,
48 const skills::FluxioSkill& skill,
51 const std::string& skillId,
52 const std::string& profileId,
53 const std::string& executorName,
54 armarx::aron::data::DictPtr parameters)>&& executeFluxioSkillFunc,
56 const std::vector<std::string>& parameterIds)>&& addMergerExecutorToDCFunc,
58 const std::string& id,
59 const skills::FluxioSkill& skill,
60 bool isRetry)>&& addLoopExecutorToDCFunc) :
61 FluxioExecutor(id, false),
62 executeFluxioSkill(executeFluxioSkillFunc),
63 addMergerExecutorToDC(addMergerExecutorToDCFunc),
64 addLoopExecutorToDC(addLoopExecutorToDCFunc),
65 skill(skill)
66 {
67 std::scoped_lock l(possibleInputsMutex);
68 possibleInputs[skill.id] = nullptr;
69 for (const auto& node : skill.nodes)
70 {
71 possibleInputs[node.first] = nullptr;
72 }
73
74 std::scoped_lock resultLock(resultMutex);
75 result = std::make_shared<armarx::aron::data::Dict>();
76 }
77
78 void
82 {
83 ARMARX_INFO << "Running skill " << skill.name;
85 this->executorName = executorName;
86
87 std::unique_lock possibleInputsLock(possibleInputsMutex);
88 this->possibleInputs[skill.id] = parameters;
89
90 // fill in missing with default values
91 for (const auto& [key, param] : this->skill.parameters)
92 {
93 if (param.type->getShortName() == "Object<Event>" || !param.isInput ||
94 this->possibleInputs[this->skill.id]->hasElement(key))
95 {
96 continue;
97 }
98 const auto& fallBack = this->findParameterValue(profilePtr, param);
99
100 if (fallBack == nullptr)
101 {
102 ARMARX_INFO << "No fallback value found for parameter " << param.name;
103 if (!param.required)
104 {
106 << "Parameter is not required. Using nullptr (std::nullopt) instead.";
107 }
108 else
109 {
110 ARMARX_INFO << "Can't execute skill due to incomplete params.";
111 ARMARX_INFO << "Aborting skill execution.";
112 this->setStatus(skills::SkillStatus::Aborted);
113 return;
114 }
115 }
116
117 this->possibleInputs[this->skill.id]->addElement(key, fallBack);
118 }
119 possibleInputsLock.unlock();
120
121 skills::FluxioEdge startEdge;
122 if (!validateSkill(startEdge))
123 {
124
125 ARMARX_WARNING << "Skill execution cancelled.";
127 return;
128 }
129
131
132 std::unique_lock executionsLock(subExecutionsMapMutex);
133 subExecutionsMap.clear();
134 executionsLock.unlock();
135 std::unique_lock resultLock(resultMutex);
136 result = std::make_shared<armarx::aron::data::Dict>();
137 resultLock.unlock();
138
140
141 std::atomic_bool skillRunning = true;
142
143 // thread for polling status updates
144 std::thread(
145 [this, &skillRunning]
146 {
147 while (skillRunning)
148 {
149 this->pollSubStatuses();
150 std::this_thread::sleep_for(
151 std::chrono::milliseconds(pollingFrequency.toMilliSeconds()));
152 }
153 })
154 .detach();
155
156 // thread for the sub routines
157 const std::string newExecutorName = executorName + "/" + skill.name;
158 std::thread(
159 [this, &startEdge, &skillRunning, newExecutorName, profilePtr]
160 {
161 this->startSubRoutine(startEdge.toNodePtr,
162 startEdge.toParameterPtr,
163 skillRunning,
164 newExecutorName,
165 profilePtr);
166 })
167 .detach();
168
170 const bool useTimeout = skill.timeout.isPositive();
171 const auto timeStarted = armarx::DateTime::Now();
172
173 // if the skill has a timout smaller than the polling frequency of the executor, use the skill“s timeout
174 // as pollingfrequency instead to avoid missing the timeout ('overshoot')
175 const auto sleepTime =
176 useTimeout ? std::min(pollingFrequency, skill.timeout) : pollingFrequency;
177
178 // main loop
179 while (skillRunning)
180 {
181 std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime.toMilliSeconds()));
182
183 if (useTimeout && armarx::DateTime::Now() >= timeStarted + skill.timeout)
184 {
185 ARMARX_WARNING << "Skill " << skill.name << " timed out.";
186 abortSubExecutions();
188
189 // wait for the thread to update the status map one last time
190 std::this_thread::sleep_for(
191 std::chrono::milliseconds(pollingFrequency.toMilliSeconds()));
192 skillRunning = false;
193 return;
194 }
195
196 std::shared_lock l(statusMutex);
197 const auto s = this->status;
198 l.unlock();
199
200 if (s->status == skills::SkillStatus::Aborted ||
201 s->status == skills::SkillStatus::Failed ||
203 {
204 // the skill was aborted with the abort method, no need to abort the
205 // SubExecutions twice
206 if (s->status != skills::SkillStatus::Aborted)
207 {
208 abortSubExecutions();
209 }
210
211 // wait for the thread to update the status map one last time
212 std::this_thread::sleep_for(
213 std::chrono::milliseconds(pollingFrequency.toMilliSeconds()));
214 skillRunning = false;
215 return;
216 }
217 }
218 }
219
220 void
221 FluxioCompositeExecutor::startSubRoutine(
224 std::atomic_bool& running,
225 const std::string& executorName,
227 {
228 if (!running)
229 {
230 return;
231 }
232
233 if (startNode == nullptr)
234 {
235 ARMARX_WARNING << "Unexpected nullptr";
237 return;
238 }
239
240 if (startNode->nodeType == skills::FluxioNodeType::PARAMETER)
241 {
242 // cast to parameter node
243 const auto& paramNodePtr = std::experimental::make_observer(
244 dynamic_cast<const skills::FluxioParameterNode*>(startNode.get()));
245 if (paramNodePtr == nullptr)
246 {
247 ARMARX_WARNING << "Unexpected nullptr. Dynamic cast from FluxioNodePtr to "
248 "FluxioParameterNodePtr failed.";
250 return;
251 }
252
253 this->handleParameterRoutine(paramNodePtr, running, executorName);
254 }
255 else if (startNode->nodeType == skills::FluxioNodeType::CONTROL)
256 {
257 // cast to control node
258 const auto& controlNodePtr = std::experimental::make_observer(
259 dynamic_cast<const skills::FluxioControlNode*>(startNode.get()));
260
261 if (controlNodePtr == nullptr)
262 {
263 ARMARX_WARNING << "Unexpected nullptr. Dynamic cast from FluxioNodePtr to "
264 "FluxioControlNodePtr failed.";
266 return;
267 }
268
269 this->handleControlRoutine(
270 controlNodePtr, startParameter, running, executorName, profilePtr);
271 }
272 else if (startNode->nodeType == skills::FluxioNodeType::SUBSKILL)
273 {
274 // cast to subskill node
275 const auto& subSkillNodePtr = std::experimental::make_observer(
276 dynamic_cast<const skills::FluxioSubSkillNode*>(startNode.get()));
277 if (subSkillNodePtr == nullptr || subSkillNodePtr->skillPtr == nullptr)
278 {
279 ARMARX_WARNING << "Unexpected nullptr. Dynamic cast from FluxioNodePtr to "
280 "FluxioSubSkillNodePtr failed.";
282 return;
283 }
284
285 this->handleSubSkillRoutine(subSkillNodePtr, running, executorName, profilePtr);
286 }
287 else
288 {
290 << "Unexpected node type '"
291 << skills::FluxioNodeTypeToString(startNode->nodeType).value_or("UNKNOWN")
292 << "' for Node with id " << startNode->nodeId;
294 return;
295 }
296 }
297
298 void
299 FluxioCompositeExecutor::handleParameterRoutine(
300 std::experimental::observer_ptr<const skills::FluxioParameterNode> parameterNode,
301 std::atomic_bool& /*running*/,
302 const std::string& /*executorName*/)
303 {
304 if (parameterNode == nullptr || parameterNode->parameterPtr == nullptr)
305 {
306 ARMARX_WARNING << "Unexpected nullptr";
308 return;
309 }
310
311 // make sure it is an event parameter
312 if (parameterNode->parameterPtr->type->getShortName() != "Object<Event>")
313 {
314 ARMARX_WARNING << "Unexpected parameter type "
315 << parameterNode->parameterPtr->type->getShortName();
317 return;
318 }
319
320 const auto& skill = this->skill;
321 std::list<FluxioEdge> resEdges;
322 std::copy_if(skill.edges.begin(),
323 skill.edges.end(),
324 std::back_inserter(resEdges),
325 [](const skills::FluxioEdge& edge)
326 {
327 return (edge.isValid() &&
328 edge.toNodePtr->nodeType == FluxioNodeType::PARAMETER &&
329 edge.fromParameterPtr->type->getShortName() != "Object<Event>");
330 });
331
332 std::unique_lock l(possibleInputsMutex);
333 for (const auto& e : resEdges)
334 {
335 const auto& fromParam = e.fromParameterPtr;
336 const auto& toParam = e.toParameterPtr;
337 auto fromNodeId = e.fromNodePtr->nodeId;
338 if (e.fromNodePtr->nodeType == FluxioNodeType::PARAMETER)
339 {
340 fromNodeId = skill.id;
341 }
342
343 l.unlock();
345 this->getPossibleInputCopy(fromNodeId, fromParam->id);
346 l.lock();
347
348 this->result->addElement(toParam->id, value);
349 }
350 l.unlock();
351
352 // get the event type and set the status accordingly
353 const std::string& eventType = parameterNode->parameterPtr->name;
354 if (eventType == "Succeeded")
355 {
357 }
358 else if (eventType == "Failed")
359 {
361 }
362 else if (eventType == "Aborted")
363 {
365 }
366 else
367 {
368 ARMARX_WARNING << "Unexpected event type " << eventType << " for parameter "
369 << parameterNode->parameterPtr->name;
371 }
372 }
373
374 void
375 FluxioCompositeExecutor::handleSubSkillRoutine(
376 std::experimental::observer_ptr<const skills::FluxioSubSkillNode> subSkillNode,
377 std::atomic_bool& running,
378 const std::string& executorName,
379 const std::experimental::observer_ptr<const FluxioProfile> profilePtr)
380 {
381 if (subSkillNode == nullptr || subSkillNode->skillPtr == nullptr)
382 {
383 ARMARX_WARNING << "Unexpected nullptr";
385 return;
386 }
387
388 // gather parameters for the subskill
389 armarx::aron::data::DictPtr params = std::make_shared<armarx::aron::data::Dict>();
390
391 std::list<FluxioEdge> paramEdges;
392 std::copy_if(skill.edges.begin(),
393 skill.edges.end(),
394 std::back_inserter(paramEdges),
395 [&subSkillNode](const skills::FluxioEdge& edge)
396 {
397 return (edge.isValid() && edge.toNodePtr->nodeId == subSkillNode->nodeId &&
398 edge.fromParameterPtr->type->getShortName() != "Object<Event>");
399 });
400
401 for (const auto& e : paramEdges)
402 {
403 const auto& fromParam = e.fromParameterPtr;
404 const auto& toParam = e.toParameterPtr;
405 auto fromNodeId = e.fromNodePtr->nodeId;
406 if (e.fromNodePtr->nodeType == FluxioNodeType::PARAMETER)
407 {
408 fromNodeId = skill.id;
409 }
410 armarx::aron::data::VariantPtr value = getPossibleInputCopy(fromNodeId, fromParam->id);
411 if (value == nullptr)
412 {
413 ARMARX_WARNING << "Failed to get possible input for parameter " << fromParam->id
414 << " " << fromNodeId;
415 }
416 params->addElement(toParam->id, value);
417 }
418
419 // start skill execution
420 const auto& executorRes =
421 executeFluxioSkill(subSkillNode->skillPtr->id, profilePtr->id, executorName, params);
422 if (!executorRes.isSuccess())
423 {
424 ARMARX_WARNING << "Failed to execute subskill " << subSkillNode->skillPtr->id;
426 return;
427 }
428 auto executorPtr = executorRes.getResult();
429 if (executorPtr == nullptr)
430 {
431 ARMARX_WARNING << "Failed to execute subskill " << subSkillNode->skillPtr->id;
433 return;
434 }
435
436 std::unique_lock executionsLock(subExecutionsMapMutex);
437 subExecutionsMap[subSkillNode->nodeId] = executorPtr;
438 executionsLock.unlock();
439
440 // wait until the skill has finished (or the super skill is finished)
441 skills::FluxioSkillStatusUpdate statusUpdate;
442 while (running)
443 {
444 // sleep for a while
445 std::this_thread::sleep_for(std::chrono::milliseconds(250));
446 executorPtr->getStatusUpdate(); // FIXME: bad design
447 const auto& statusUpdateIt = executorPtr->getStatus();
448
449 if (!statusUpdateIt.has_value())
450 {
451 ARMARX_INFO << "No status update from skill " << subSkillNode->skillPtr->name
452 << " yet. Waiting...";
453 continue;
454 }
455
456 statusUpdate = statusUpdateIt.value();
457 // did the status change? update statusUpdates list
458 std::unique_lock statusMapLock(statusUpdatesMutex);
459 const auto& lastUpdate =
460 std::find_if(statusUpdates.begin(),
461 statusUpdates.end(),
462 [&subSkillNode](const skills::FluxioSkillStatusUpdate& statusUpdate)
463 { return statusUpdate.subSkillNodeId == subSkillNode->nodeId; });
464
465 if (lastUpdate == statusUpdates.end() || lastUpdate->status != statusUpdate.status)
466 {
468 executorPtr->id,
469 subSkillNode->nodeId,
470 statusUpdate.status});
471 }
472
473 statusMapLock.unlock();
474
475 // check subskill is finished
476 if (statusUpdate.status == skills::SkillStatus::Succeeded ||
477 statusUpdate.status == skills::SkillStatus::Failed ||
478 statusUpdate.status == skills::SkillStatus::Aborted)
479 {
480 break;
481 }
482 }
483
484 // check if the parent skill is still running
485 if (!running)
486 {
487 return;
488 }
489
490 std::unique_lock possibleInputsLock(this->possibleInputsMutex);
491 this->possibleInputs[subSkillNode->nodeId] = executorPtr->getResultsCopy();
492 possibleInputsLock.unlock();
493
494 // check the final skill status get the output event parameter
495 const std::string& outputEventName =
496 statusUpdate.status == skills::SkillStatus::Succeeded ? "Succeeded"
497 : statusUpdate.status == skills::SkillStatus::Failed ? "Failed"
498 : statusUpdate.status == skills::SkillStatus::Aborted ? "Aborted"
499 : "Undefined";
500 const auto& outputParam = std::find_if(
501 subSkillNode->skillPtr->parameters.begin(),
502 subSkillNode->skillPtr->parameters.end(),
503 [&outputEventName](const std::pair<std::string, skills::FluxioParameter>& param)
504 {
505 return (param.second.type->getShortName() == "Object<Event>" &&
506 !param.second.isInput && param.second.name == outputEventName);
507 });
508
509 if (outputParam == subSkillNode->skillPtr->parameters.end())
510 {
511 ARMARX_WARNING << "Skill " << subSkillNode->skillPtr->name
512 << " is missing the output event parameter " << outputEventName;
514 return;
515 }
516
517 // find the connected edge
518 const auto& edge =
519 std::find_if(skill.edges.begin(),
520 skill.edges.end(),
521 [&subSkillNode, &outputParam](const skills::FluxioEdge& edge)
522 {
523 return (edge.fromNodePtr->nodeId == subSkillNode->nodeId &&
524 edge.fromParameterPtr->id == outputParam->second.id);
525 });
526
527 if (edge == skill.edges.end())
528 {
529 if (outputEventName == "Failed")
530 {
532 return;
533 }
534
535 if (outputEventName == "Aborted")
536 {
538 return;
539 }
540
541 ARMARX_WARNING << "Skill " << skill.name
542 << " has no edge connected to the output event parameter "
543 << outputEventName;
545 return;
546 }
547
548 // start new subroutine
549 const std::string& nextExecutorName = executorName + "/" + subSkillNode->skillPtr->name;
550 startSubRoutine(
551 edge->toNodePtr, edge->toParameterPtr, running, nextExecutorName, profilePtr);
552 }
553
554 void
555 FluxioCompositeExecutor::handleControlRoutine(
556 std::experimental::observer_ptr<const skills::FluxioControlNode> controlNode,
557 const std::experimental::observer_ptr<const skills::FluxioParameter>& startParameter,
558 std::atomic_bool& running,
559 const std::string& executorName,
560 const std::experimental::observer_ptr<const FluxioProfile> profilePtr)
561 {
562 if (controlNode == nullptr)
563 {
564 ARMARX_WARNING << "Unexpected nullptr";
566 return;
567 }
568
569 // check the controlType
570 if (controlNode->controlType == skills::FluxioControlNodeType::SPLITTER)
571 {
572 // find connected nodes and store the relevant edges
573 std::vector<std::experimental::observer_ptr<const skills::FluxioEdge>> edgePtrs = {};
574 for (const auto& [id, param] : controlNode->parametersMap)
575 {
576 // ignore the input parameters
577 if (param.isInput)
578 {
579 continue;
580 }
581
582 for (const auto& edge : skill.edges)
583 {
584 if (edge.isValid() && edge.fromNodePtr->nodeId == controlNode->nodeId &&
585 edge.fromParameterPtr->id == id)
586 {
587 edgePtrs.push_back(std::experimental::make_observer(&edge));
588 }
589 }
590 }
591
592 const size_t allParams = edgePtrs.size();
593 int param = 1;
594
595 // start subroutines in separate threads
596 for (std::experimental::observer_ptr<const skills::FluxioEdge> edgePtr : edgePtrs)
597 {
598 const std::string newExecutorName = executorName + "/Splitter" + "(" +
599 std::to_string(param) + "/" +
600 std::to_string(allParams) + ")";
601 std::thread(
602 [this, edgePtr, &running, newExecutorName, profilePtr]
603 {
604 startSubRoutine(edgePtr->toNodePtr,
605 edgePtr->toParameterPtr,
606 running,
607 newExecutorName,
608 profilePtr);
609 })
610 .detach();
611 param++;
612 }
613 }
614 else if (controlNode->controlType == skills::FluxioControlNodeType::AND_MERGER)
615 {
616 // check the list of subexecutions for the node id
617 std::experimental::observer_ptr<FluxioMergerExecutor> mergerExecutorPtr = nullptr;
618 std::unique_lock executionsLock(subExecutionsMapMutex);
619 const auto& executorPtr = subExecutionsMap.find(controlNode->nodeId);
620 if (executorPtr == subExecutionsMap.end())
621 {
622 // assemble paramId vector
623 std::vector<std::string> paramIds = {};
624 for (const auto& [id, param] : controlNode->parametersMap)
625 {
626 if (param.isInput)
627 {
628 paramIds.push_back(id);
629 }
630 }
631
632 // there is no execution for the merger yet, let“s start one
633 mergerExecutorPtr = std::experimental::make_observer(
634 dynamic_cast<FluxioMergerExecutor*>(addMergerExecutorToDC(paramIds).get()));
635 subExecutionsMap.emplace(controlNode->nodeId, mergerExecutorPtr);
636 executionsLock.unlock();
637 }
638 else
639 {
640 executionsLock.unlock();
641
642 mergerExecutorPtr = std::experimental::make_observer(
643 dynamic_cast<FluxioMergerExecutor*>(executorPtr->second.get()));
644 }
645
646 if (mergerExecutorPtr == nullptr)
647 {
648 ARMARX_WARNING << "Unexpected nullptr. Dynamic cast from FluxioExecutorPtr to "
649 "FluxioMergerExecutorPtr failed.";
651 return;
652 }
653
654 // check in the token
655 mergerExecutorPtr->checkInToken(startParameter->id);
656
657 if (mergerExecutorPtr->getStatus() != std::nullopt)
658 {
659 return;
660 }
661
662 // reuse this thread
663 mergerExecutorPtr->run(executorName, nullptr, profilePtr);
664
665 const auto& outputParam =
666 std::find_if(controlNode->parametersMap.begin(),
667 controlNode->parametersMap.end(),
668 [](const std::pair<std::string, skills::FluxioParameter>& param)
669 { return !param.second.isInput; });
670
671 if (outputParam == controlNode->parametersMap.end())
672 {
673 ARMARX_WARNING << "Control node " << controlNode->nodeId
674 << " has no output parameter";
676 return;
677 }
678
679 // find the connected edge
680 const auto& edge =
681 std::find_if(skill.edges.begin(),
682 skill.edges.end(),
683 [&controlNode, &outputParam](const skills::FluxioEdge& edge)
684 {
685 return (edge.fromNodePtr->nodeId == controlNode->nodeId &&
686 edge.fromParameterPtr->id == outputParam->second.id);
687 });
688
689 if (edge == skill.edges.end())
690 {
692 << "Skill " << skill.name
693 << " has no edge connected to the output event parameter of the AND merger";
695 return;
696 }
697
698 // start new subroutine
699 startSubRoutine(
700 edge->toNodePtr, edge->toParameterPtr, running, executorName, profilePtr);
701 }
702 else if (controlNode->controlType == skills::FluxioControlNodeType::LOOP_REPEAT ||
703 controlNode->controlType == skills::FluxioControlNodeType::LOOP_RETRY)
704 {
705 if (controlNode->slottedNode == nullptr)
706 {
707 ARMARX_INFO << "Slot is empty, Can not execute loop node.";
709 return;
710 }
711
712 if (controlNode->slottedNode->nodeType != skills::FluxioNodeType::SUBSKILL)
713 {
714 ARMARX_INFO << "Slotted node is not a subskill node, can not execute loop node.";
716 return;
717 }
718
719 armarx::aron::data::DictPtr params = std::make_shared<armarx::aron::data::Dict>();
720 std::list<FluxioEdge> paramEdges;
721 std::copy_if(skill.edges.begin(),
722 skill.edges.end(),
723 std::back_inserter(paramEdges),
724 [&controlNode](const skills::FluxioEdge& edge)
725 {
726 return (edge.isValid() &&
727 edge.fromParameterPtr->type->getShortName() !=
728 "Object<Event>" &&
729 (edge.toNodePtr->nodeId == controlNode->slottedNode->nodeId ||
730 edge.toNodePtr->nodeId == controlNode->nodeId));
731 });
732
733 for (const auto& e : paramEdges)
734 {
735 const auto& fromParam = e.fromParameterPtr;
736 const auto& toParam = e.toParameterPtr;
737 const auto& toNode = e.toNodePtr;
738
739 if (!e.isValid())
740 {
741 // skip
742 }
743
744 auto fromNodeId = e.fromNodePtr->nodeId;
745 if (e.fromNodePtr->nodeType == FluxioNodeType::PARAMETER)
746 {
747 fromNodeId = skill.id;
748 }
750 getPossibleInputCopy(fromNodeId, fromParam->id);
751 if (value == nullptr)
752 {
753 ARMARX_WARNING << "Failed to get possible input for parameter " << fromParam->id
754 << " " << fromNodeId;
755 }
756
757 if (toNode->nodeId == controlNode->nodeId &&
758 toParam->name.find("Iterations") != std::string::npos)
759 {
760 params->addElement("Iterations", value);
761 }
762 else
763 {
764 params->addElement(toParam->id, value);
765 }
766 }
767
768 const bool isRetry =
769 controlNode->controlType == skills::FluxioControlNodeType::LOOP_RETRY;
770 auto loopExecutionId = IceUtil::generateUUID();
771 const auto& slottedSubskillNodePtr = std::experimental::make_observer(
772 dynamic_cast<const skills::FluxioSubSkillNode*>(controlNode->slottedNode.get()));
773
774 if (slottedSubskillNodePtr == nullptr)
775 {
776 ARMARX_INFO << "Unexpected nullptr. Dynamic cast from FluxioNodePtr to "
777 "FluxioSubSkillNodePtr failed.";
779 return;
780 }
781
782 if (slottedSubskillNodePtr->skillPtr == nullptr)
783 {
784 ARMARX_INFO << "Unexpected nullptr. Subskill node has no skill pointer.";
786 return;
787 }
788
789 std::shared_lock executionsLock(subExecutionsMapMutex);
790 const auto& loopExecutorPtr =
791 std::experimental::make_observer(dynamic_cast<FluxioLoopExecutor*>(
792 addLoopExecutorToDC(loopExecutionId, *slottedSubskillNodePtr->skillPtr, isRetry)
793 .get()));
794 subExecutionsMap.emplace(controlNode->nodeId, loopExecutorPtr);
795 executionsLock.unlock();
796
797 std::thread([&] { loopExecutorPtr->run(executorName, params, profilePtr); }).detach();
798
799 skills::FluxioSkillStatusUpdate statusUpdate;
800 skills::FluxioSkillStatusUpdate slottedStatusUpdate;
801 while (running)
802 {
803 // sleep for a while
804 std::this_thread::sleep_for(std::chrono::milliseconds(250));
805 loopExecutorPtr->getStatusUpdate();
806
807 const auto& statusUpdateOpt = loopExecutorPtr->getStatus();
808 if (!statusUpdateOpt.has_value())
809 {
810 ARMARX_INFO << "No status update from loop execution yet. Waiting...";
811 continue;
812 }
813 statusUpdate = statusUpdateOpt.value();
814
815 // did the status change? update statusUpdates list
816 std::unique_lock statusMapLock(statusUpdatesMutex);
817 const auto& lastUpdate =
818 std::find_if(statusUpdates.begin(),
819 statusUpdates.end(),
820 [&controlNode](const skills::FluxioSkillStatusUpdate& statusUpdate)
821 { return statusUpdate.subSkillNodeId == controlNode->nodeId; });
822
823 if (lastUpdate == statusUpdates.end() || lastUpdate->status != statusUpdate.status)
824 {
826 loopExecutorPtr->id,
827 controlNode->nodeId,
828 statusUpdate.status});
829 }
830
831 const auto& slottedStatusUpdateOpt = loopExecutorPtr->getSlottedStatus();
832 const auto& currentSlottedExecutionId = loopExecutorPtr->slottedExecutionId;
833 if (!slottedStatusUpdateOpt.has_value() || !currentSlottedExecutionId.has_value())
834 {
835 if (statusUpdate.status == skills::SkillStatus::Failed ||
836 statusUpdate.status == skills::SkillStatus::Aborted)
837 {
838 break;
839 }
840
841 // ARMARX_INFO << "No status update from slotted subskill yet. Waiting...";
842 continue;
843 }
844 slottedStatusUpdate = slottedStatusUpdateOpt.value();
845
846 const auto& lastSlottedUpdate = std::find_if(
847 statusUpdates.begin(),
848 statusUpdates.end(),
849 [&controlNode](const skills::FluxioSkillStatusUpdate& statusUpdate)
850 { return statusUpdate.subSkillNodeId == controlNode->slottedNode->nodeId; });
851
852 if (lastSlottedUpdate == statusUpdates.end() ||
853 lastSlottedUpdate->status != statusUpdate.status ||
854 slottedStatusUpdate.executionId != currentSlottedExecutionId.value())
855 {
857 currentSlottedExecutionId.value(),
858 controlNode->slottedNode->nodeId,
859 slottedStatusUpdate.status});
860 }
861
862 statusMapLock.unlock();
863
864 // check loop is finished
865 if (statusUpdate.status == skills::SkillStatus::Succeeded ||
866 statusUpdate.status == skills::SkillStatus::Failed ||
867 statusUpdate.status == skills::SkillStatus::Aborted)
868 {
869 break;
870 }
871 }
872
873 if (!running)
874 {
875 return;
876 }
877
878 std::unique_lock possibleInputsLock(this->possibleInputsMutex);
879 this->possibleInputs[controlNode->slottedNode->nodeId] =
880 loopExecutorPtr->getResultsCopy();
881 possibleInputsLock.unlock();
882
883 const std::string& outputEventName =
884 statusUpdate.status == skills::SkillStatus::Succeeded ? "Succeeded"
885 : statusUpdate.status == skills::SkillStatus::Failed ? "Failed"
886 : statusUpdate.status == skills::SkillStatus::Aborted ? "Aborted"
887 : "Undefined";
888
889 const auto& outputParam = std::find_if(
890 controlNode->parametersMap.begin(),
891 controlNode->parametersMap.end(),
892 [&outputEventName](const std::pair<std::string, skills::FluxioParameter>& param)
893 {
894 return (param.second.type->getShortName() == "Object<Event>" &&
895 !param.second.isInput && param.second.name == outputEventName);
896 });
897
898 if (outputParam == controlNode->parametersMap.end())
899 {
900 ARMARX_WARNING << "Loop with nodeId " << controlNode->nodeId
901 << " is missing the output event parameter " << outputEventName;
903 return;
904 }
905
906 // find the connected edge
907 const auto& edge =
908 std::find_if(skill.edges.begin(),
909 skill.edges.end(),
910 [&controlNode, &outputParam](const skills::FluxioEdge& edge)
911 {
912 return (edge.fromNodePtr->nodeId == controlNode->nodeId &&
913 edge.fromParameterPtr->id == outputParam->second.id);
914 });
915
916 if (edge == skill.edges.end())
917 {
918 if (outputEventName == "Failed")
919 {
921 return;
922 }
923
924 if (outputEventName == "Aborted")
925 {
927 return;
928 }
929
930 ARMARX_WARNING << "Skill " << skill.name
931 << " has no edge connected to the output event parameter "
932 << outputEventName;
934 return;
935 }
936
937 // start new subroutine
938 const std::string& nextExecutorName = executorName + "/Loop";
939 startSubRoutine(
940 edge->toNodePtr, edge->toParameterPtr, running, nextExecutorName, profilePtr);
941 }
942
943 else
944 {
945 ARMARX_WARNING << "Unexpected control type ";
947 return;
948 }
949 }
950
951 void
953 {
954 ARMARX_INFO << "Aborting skill " << skill.name;
956 abortSubExecutions();
957 }
958
959 void
960 FluxioCompositeExecutor::abortSubExecutions()
961 {
962 std::shared_lock executionsLock(subExecutionsMapMutex);
963 for (const auto& [nodeId, executorPtr] : subExecutionsMap)
964 {
965 auto s = executorPtr->getStatus();
966 if (!s.has_value() || s->status == skills::SkillStatus::Succeeded ||
967 s->status == skills::SkillStatus::Failed ||
968 s->status == skills::SkillStatus::Aborted)
969 {
970 continue;
971 }
972
973 executorPtr->abort();
974 std::unique_lock statusMapLock(statusUpdatesMutex);
975 statusUpdates.push_front(
976 {armarx::DateTime::Now(), executorPtr->id, nodeId, skills::SkillStatus::Aborted});
977 statusMapLock.unlock();
978 }
979 executionsLock.unlock();
980 }
981
982 std::optional<std::vector<skills::FluxioSkillStatusUpdate>>
984 {
985 ARMARX_INFO << "Getting status updates for skill " << skill.name;
986 // convert statusupdates list to vector
987 std::shared_lock statusMapLock(statusUpdatesMutex);
988 auto ret = std::vector<skills::FluxioSkillStatusUpdate>(statusUpdates.begin(),
989 statusUpdates.end());
990 statusMapLock.unlock();
991 return ret;
992 }
993
994 void
995 FluxioCompositeExecutor::pollSubStatuses()
996 {
997 std::scoped_lock l(subExecutionsMapMutex, statusUpdatesMutex);
998 for (const auto& [nodeId, executorPtr] : subExecutionsMap)
999 {
1000 executorPtr->getStatusUpdate();
1001 auto s = executorPtr->getStatus();
1002 if (!s.has_value())
1003 {
1004 continue;
1005 }
1006
1007 const auto& lastStatus =
1008 find_if(statusUpdates.begin(),
1009 statusUpdates.end(),
1010 [&](const skills::FluxioSkillStatusUpdate& statusUpdate)
1011 { return statusUpdate.subSkillNodeId == nodeId; })
1012 ->status;
1013
1014 if (lastStatus != s->status)
1015 {
1016 statusUpdates.push_front(
1017 {armarx::DateTime::Now(), executorPtr->id, nodeId, s->status});
1018 }
1019 }
1020 }
1021
1022 bool
1024 {
1025
1026 // get start parameter
1027 const auto& startParam =
1028 std::find_if(skill.parameters.begin(),
1029 skill.parameters.end(),
1030 [](const std::pair<std::string, skills::FluxioParameter>& param)
1031 {
1032 return (param.second.type->getShortName() == "Object<Event>" &&
1033 param.second.isInput && param.second.name == "Start");
1034 });
1035
1036 if (startParam == skill.parameters.end())
1037 {
1038 ARMARX_WARNING << "Skill has no start parameter";
1039 return false;
1040 }
1041
1042 // get all parameter nodes for the start parameter
1043 const auto& startNode = std::find_if(
1044 skill.nodes.begin(),
1045 skill.nodes.end(),
1046 [startParam](const std::pair<const std::string,
1047 const std::unique_ptr<skills::FluxioNode>>& nodeEntry)
1048 {
1049 if (nodeEntry.second->nodeType != skills::FluxioNodeType::PARAMETER)
1050 {
1051 return false;
1052 }
1053
1054 const auto& paramNode =
1055 dynamic_cast<const skills::FluxioParameterNode*>(nodeEntry.second.get());
1056 return (paramNode->parameterPtr->id == startParam->second.id);
1057 });
1058
1059 // there can only be one
1060 if (startNode == skill.nodes.end())
1061 {
1062 ARMARX_WARNING << "Skill has no start node";
1063 return false;
1064 }
1065
1066 // check if the start node is connected
1067 const auto& startEdge =
1068 std::find_if(skill.edges.begin(),
1069 skill.edges.end(),
1070 [startNode](const skills::FluxioEdge& edge)
1071 { return (edge.fromNodePtr->nodeId == startNode->second->nodeId); });
1072
1073 // there can only be one
1074 if (startEdge == skill.edges.end())
1075 {
1076 ARMARX_WARNING << "Skill has no edge connected to the start node";
1077 return false;
1078 }
1079
1080 // get the output event parameters
1081 const auto& outputParamsSuccess =
1082 std::find_if(skill.parameters.begin(),
1083 skill.parameters.end(),
1084 [](const std::pair<std::string, skills::FluxioParameter>& param)
1085 {
1086 return (param.second.type->getShortName() == "Object<Event>" &&
1087 !param.second.isInput && param.second.name == "Succeeded");
1088 });
1089 const auto& outputParamsFailed =
1090 std::find_if(skill.parameters.begin(),
1091 skill.parameters.end(),
1092 [](const std::pair<std::string, skills::FluxioParameter>& param)
1093 {
1094 return (param.second.type->getShortName() == "Object<Event>" &&
1095 !param.second.isInput && param.second.name == "Failed");
1096 });
1097 const auto& outputParamsAborted =
1098 std::find_if(skill.parameters.begin(),
1099 skill.parameters.end(),
1100 [](const std::pair<std::string, skills::FluxioParameter>& param)
1101 {
1102 return (param.second.type->getShortName() == "Object<Event>" &&
1103 !param.second.isInput && param.second.name == "Aborted");
1104 });
1105
1106 if (outputParamsSuccess == skill.parameters.end() ||
1107 outputParamsFailed == skill.parameters.end() ||
1108 outputParamsAborted == skill.parameters.end())
1109 {
1110 ARMARX_WARNING << "Skill is missing one or more output event parameters";
1111 return false;
1112 }
1113
1114 // TODO: the rest
1115
1116 ARMARX_INFO << "Skill validation is not fully implemented yet.";
1117 ret = *startEdge;
1118 return true;
1119 }
1120
1121 void
1122 FluxioCompositeExecutor::setStatus(skills::SkillStatus status, const std::string& /*nodeId*/)
1123 {
1124 FluxioExecutor::setStatus(status, skill.id);
1125 }
1126} // namespace armarx::skills
static DateTime Now()
Definition DateTime.cpp:51
void run(std::string executorName, armarx::aron::data::DictPtr parameters, std::experimental::observer_ptr< const FluxioProfile > profilePtr) override
FluxioCompositeExecutor(const std::string &id, const skills::FluxioSkill &skill, const std::function< skills::Result< std::experimental::observer_ptr< FluxioExecutor >, skills::error::FluxioException >(const std::string &skillId, const std::string &profileId, const std::string &executorName, armarx::aron::data::DictPtr parameters)> &&executeFluxioSkillFunc, const std::function< std::experimental::observer_ptr< FluxioExecutor >(const std::vector< std::string > &parameterIds)> &&addMergerExecutorToDCFunc, const std::function< std::experimental::observer_ptr< FluxioExecutor >(const std::string &id, const skills::FluxioSkill &skill, bool isRetry)> &&addLoopExecutorToDCFunc)
bool validateSkill(skills::FluxioEdge &ret) const
std::optional< std::vector< skills::FluxioSkillStatusUpdate > > getStatusUpdate() override
FluxioExecutor(const FluxioExecutor &)=delete
armarx::aron::data::DictPtr result
armarx::aron::data::VariantPtr findParameterValue(const std::experimental::observer_ptr< const FluxioProfile > profilePtr, const FluxioParameter &parameter) const
virtual armarx::aron::data::VariantPtr getPossibleInputCopy(const std::string nodeId, const std::string parameterId)
std::optional< skills::FluxioSkillStatusUpdate > status
std::list< skills::FluxioSkillStatusUpdate > statusUpdates
std::shared_mutex statusUpdatesMutex
std::map< std::string, armarx::aron::data::DictPtr > possibleInputs
std::optional< std::string > executorName
std::shared_mutex possibleInputsMutex
A base class for skill exceptions.
#define ARMARX_INFO
The normal logging level.
Definition Logging.h:181
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
std::shared_ptr< Dict > DictPtr
Definition Dict.h:42
std::shared_ptr< Variant > VariantPtr
This file is part of ArmarX.
std::optional< std::string > FluxioNodeTypeToString(const FluxioNodeType &type)
std::shared_ptr< Value > value()
Definition cxxopts.hpp:855
observer_ptr< _Tp > make_observer(_Tp *__p) noexcept
std::experimental::observer_ptr< const FluxioParameter > toParameterPtr
Definition FluxioEdge.h:20
std::experimental::observer_ptr< const FluxioNode > toNodePtr
Definition FluxioEdge.h:19
std::map< std::string, FluxioParameter > parameters
Definition FluxioSkill.h:39
std::list< FluxioEdge > edges
Definition FluxioSkill.h:41
std::map< const std::string, const std::unique_ptr< FluxioNode > > nodes
Definition FluxioSkill.h:40