7#include <experimental/memory>
14#include <shared_mutex>
20#include <IceUtil/UUID.h>
47 const std::string&
id,
51 const std::string& skillId,
52 const std::string& profileId,
56 const std::vector<std::string>& parameterIds)>&& addMergerExecutorToDCFunc,
58 const std::string&
id,
60 bool isRetry)>&& addLoopExecutorToDCFunc) :
62 executeFluxioSkill(executeFluxioSkillFunc),
63 addMergerExecutorToDC(addMergerExecutorToDCFunc),
64 addLoopExecutorToDC(addLoopExecutorToDCFunc),
69 for (
const auto& node : skill.nodes)
75 result = std::make_shared<armarx::aron::data::Dict>();
91 for (
const auto& [key, param] : this->skill.
parameters)
93 if (param.type->getShortName() ==
"Object<Event>" || !param.isInput ||
94 this->possibleInputs[this->skill.id]->hasElement(key))
100 if (fallBack ==
nullptr)
102 ARMARX_INFO <<
"No fallback value found for parameter " << param.name;
106 <<
"Parameter is not required. Using nullptr (std::nullopt) instead.";
110 ARMARX_INFO <<
"Can't execute skill due to incomplete params.";
119 possibleInputsLock.unlock();
132 std::unique_lock executionsLock(subExecutionsMapMutex);
133 subExecutionsMap.clear();
134 executionsLock.unlock();
136 result = std::make_shared<armarx::aron::data::Dict>();
141 std::atomic_bool skillRunning =
true;
145 [
this, &skillRunning]
149 this->pollSubStatuses();
150 std::this_thread::sleep_for(
151 std::chrono::milliseconds(pollingFrequency.toMilliSeconds()));
157 const std::string newExecutorName =
executorName +
"/" + skill.name;
159 [
this, &startEdge, &skillRunning, newExecutorName, profilePtr]
161 this->startSubRoutine(startEdge.
toNodePtr,
170 const bool useTimeout = skill.timeout.isPositive();
175 const auto sleepTime =
176 useTimeout ? std::min(pollingFrequency, skill.timeout) : pollingFrequency;
181 std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime.toMilliSeconds()));
186 abortSubExecutions();
190 std::this_thread::sleep_for(
191 std::chrono::milliseconds(pollingFrequency.toMilliSeconds()));
192 skillRunning =
false;
197 const auto s = this->
status;
208 abortSubExecutions();
212 std::this_thread::sleep_for(
213 std::chrono::milliseconds(pollingFrequency.toMilliSeconds()));
214 skillRunning =
false;
221 FluxioCompositeExecutor::startSubRoutine(
224 std::atomic_bool& running,
225 const std::string& executorName,
233 if (startNode ==
nullptr)
244 dynamic_cast<const skills::FluxioParameterNode*
>(startNode.get()));
245 if (paramNodePtr ==
nullptr)
247 ARMARX_WARNING <<
"Unexpected nullptr. Dynamic cast from FluxioNodePtr to "
248 "FluxioParameterNodePtr failed.";
253 this->handleParameterRoutine(paramNodePtr, running,
executorName);
259 dynamic_cast<const skills::FluxioControlNode*
>(startNode.get()));
261 if (controlNodePtr ==
nullptr)
263 ARMARX_WARNING <<
"Unexpected nullptr. Dynamic cast from FluxioNodePtr to "
264 "FluxioControlNodePtr failed.";
269 this->handleControlRoutine(
270 controlNodePtr, startParameter, running,
executorName, profilePtr);
276 dynamic_cast<const skills::FluxioSubSkillNode*
>(startNode.get()));
277 if (subSkillNodePtr ==
nullptr || subSkillNodePtr->skillPtr ==
nullptr)
279 ARMARX_WARNING <<
"Unexpected nullptr. Dynamic cast from FluxioNodePtr to "
280 "FluxioSubSkillNodePtr failed.";
285 this->handleSubSkillRoutine(subSkillNodePtr, running,
executorName, profilePtr);
290 <<
"Unexpected node type '"
292 <<
"' for Node with id " << startNode->nodeId;
299 FluxioCompositeExecutor::handleParameterRoutine(
300 std::experimental::observer_ptr<const skills::FluxioParameterNode> parameterNode,
304 if (parameterNode ==
nullptr || parameterNode->parameterPtr ==
nullptr)
312 if (parameterNode->parameterPtr->type->getShortName() !=
"Object<Event>")
315 << parameterNode->parameterPtr->type->getShortName();
320 const auto& skill = this->skill;
321 std::list<FluxioEdge> resEdges;
322 std::copy_if(skill.edges.begin(),
324 std::back_inserter(resEdges),
325 [](
const skills::FluxioEdge& edge)
327 return (edge.isValid() &&
328 edge.toNodePtr->nodeType == FluxioNodeType::PARAMETER &&
329 edge.fromParameterPtr->type->getShortName() !=
"Object<Event>");
333 for (
const auto& e : resEdges)
335 const auto& fromParam = e.fromParameterPtr;
336 const auto& toParam = e.toParameterPtr;
337 auto fromNodeId = e.fromNodePtr->nodeId;
340 fromNodeId = skill.id;
348 this->
result->addElement(toParam->id, value);
353 const std::string& eventType = parameterNode->parameterPtr->name;
354 if (eventType ==
"Succeeded")
358 else if (eventType ==
"Failed")
362 else if (eventType ==
"Aborted")
368 ARMARX_WARNING <<
"Unexpected event type " << eventType <<
" for parameter "
369 << parameterNode->parameterPtr->name;
375 FluxioCompositeExecutor::handleSubSkillRoutine(
376 std::experimental::observer_ptr<const skills::FluxioSubSkillNode> subSkillNode,
377 std::atomic_bool& running,
378 const std::string& executorName,
379 const std::experimental::observer_ptr<const FluxioProfile> profilePtr)
381 if (subSkillNode ==
nullptr || subSkillNode->skillPtr ==
nullptr)
391 std::list<FluxioEdge> paramEdges;
392 std::copy_if(skill.edges.begin(),
394 std::back_inserter(paramEdges),
395 [&subSkillNode](
const skills::FluxioEdge& edge)
397 return (edge.isValid() && edge.toNodePtr->nodeId == subSkillNode->nodeId &&
398 edge.fromParameterPtr->type->getShortName() !=
"Object<Event>");
401 for (
const auto& e : paramEdges)
403 const auto& fromParam = e.fromParameterPtr;
404 const auto& toParam = e.toParameterPtr;
405 auto fromNodeId = e.fromNodePtr->nodeId;
408 fromNodeId = skill.id;
411 if (value ==
nullptr)
413 ARMARX_WARNING <<
"Failed to get possible input for parameter " << fromParam->id
414 <<
" " << fromNodeId;
416 params->addElement(toParam->id, value);
420 const auto& executorRes =
421 executeFluxioSkill(subSkillNode->skillPtr->id, profilePtr->id,
executorName, params);
422 if (!executorRes.isSuccess())
424 ARMARX_WARNING <<
"Failed to execute subskill " << subSkillNode->skillPtr->id;
428 auto executorPtr = executorRes.getResult();
429 if (executorPtr ==
nullptr)
431 ARMARX_WARNING <<
"Failed to execute subskill " << subSkillNode->skillPtr->id;
436 std::unique_lock executionsLock(subExecutionsMapMutex);
437 subExecutionsMap[subSkillNode->nodeId] = executorPtr;
438 executionsLock.unlock();
441 skills::FluxioSkillStatusUpdate statusUpdate;
445 std::this_thread::sleep_for(std::chrono::milliseconds(250));
446 executorPtr->getStatusUpdate();
447 const auto& statusUpdateIt = executorPtr->getStatus();
449 if (!statusUpdateIt.has_value())
451 ARMARX_INFO <<
"No status update from skill " << subSkillNode->skillPtr->name
452 <<
" yet. Waiting...";
456 statusUpdate = statusUpdateIt.value();
459 const auto& lastUpdate =
462 [&subSkillNode](
const skills::FluxioSkillStatusUpdate& statusUpdate)
463 { return statusUpdate.subSkillNodeId == subSkillNode->nodeId; });
465 if (lastUpdate ==
statusUpdates.end() || lastUpdate->status != statusUpdate.status)
469 subSkillNode->nodeId,
470 statusUpdate.status});
473 statusMapLock.unlock();
491 this->
possibleInputs[subSkillNode->nodeId] = executorPtr->getResultsCopy();
492 possibleInputsLock.unlock();
495 const std::string& outputEventName =
500 const auto& outputParam = std::find_if(
501 subSkillNode->skillPtr->parameters.begin(),
502 subSkillNode->skillPtr->parameters.end(),
503 [&outputEventName](
const std::pair<std::string, skills::FluxioParameter>& param)
505 return (param.second.type->getShortName() ==
"Object<Event>" &&
506 !param.second.isInput && param.second.name == outputEventName);
509 if (outputParam == subSkillNode->skillPtr->parameters.end())
512 <<
" is missing the output event parameter " << outputEventName;
519 std::find_if(skill.edges.begin(),
521 [&subSkillNode, &outputParam](
const skills::FluxioEdge& edge)
523 return (edge.fromNodePtr->nodeId == subSkillNode->nodeId &&
524 edge.fromParameterPtr->id == outputParam->second.id);
527 if (edge == skill.edges.end())
529 if (outputEventName ==
"Failed")
535 if (outputEventName ==
"Aborted")
542 <<
" has no edge connected to the output event parameter "
549 const std::string& nextExecutorName =
executorName +
"/" + subSkillNode->skillPtr->name;
551 edge->toNodePtr, edge->toParameterPtr, running, nextExecutorName, profilePtr);
555 FluxioCompositeExecutor::handleControlRoutine(
556 std::experimental::observer_ptr<const skills::FluxioControlNode> controlNode,
557 const std::experimental::observer_ptr<const skills::FluxioParameter>& startParameter,
558 std::atomic_bool& running,
559 const std::string& executorName,
560 const std::experimental::observer_ptr<const FluxioProfile> profilePtr)
562 if (controlNode ==
nullptr)
573 std::vector<std::experimental::observer_ptr<const skills::FluxioEdge>> edgePtrs = {};
574 for (
const auto& [
id, param] : controlNode->parametersMap)
582 for (
const auto& edge : skill.edges)
584 if (edge.isValid() && edge.fromNodePtr->nodeId == controlNode->nodeId &&
585 edge.fromParameterPtr->id ==
id)
592 const size_t allParams = edgePtrs.size();
596 for (std::experimental::observer_ptr<const skills::FluxioEdge> edgePtr : edgePtrs)
598 const std::string newExecutorName =
executorName +
"/Splitter" +
"(" +
599 std::to_string(param) +
"/" +
600 std::to_string(allParams) +
")";
602 [
this, edgePtr, &running, newExecutorName, profilePtr]
604 startSubRoutine(edgePtr->toNodePtr,
605 edgePtr->toParameterPtr,
617 std::experimental::observer_ptr<FluxioMergerExecutor> mergerExecutorPtr =
nullptr;
618 std::unique_lock executionsLock(subExecutionsMapMutex);
619 const auto& executorPtr = subExecutionsMap.find(controlNode->nodeId);
620 if (executorPtr == subExecutionsMap.end())
623 std::vector<std::string> paramIds = {};
624 for (
const auto& [
id, param] : controlNode->parametersMap)
628 paramIds.push_back(
id);
634 dynamic_cast<FluxioMergerExecutor*
>(addMergerExecutorToDC(paramIds).get()));
635 subExecutionsMap.emplace(controlNode->nodeId, mergerExecutorPtr);
636 executionsLock.unlock();
640 executionsLock.unlock();
643 dynamic_cast<FluxioMergerExecutor*
>(executorPtr->second.get()));
646 if (mergerExecutorPtr ==
nullptr)
648 ARMARX_WARNING <<
"Unexpected nullptr. Dynamic cast from FluxioExecutorPtr to "
649 "FluxioMergerExecutorPtr failed.";
655 mergerExecutorPtr->checkInToken(startParameter->id);
657 if (mergerExecutorPtr->getStatus() != std::nullopt)
663 mergerExecutorPtr->run(
executorName,
nullptr, profilePtr);
665 const auto& outputParam =
666 std::find_if(controlNode->parametersMap.begin(),
667 controlNode->parametersMap.end(),
668 [](
const std::pair<std::string, skills::FluxioParameter>& param)
669 { return !param.second.isInput; });
671 if (outputParam == controlNode->parametersMap.end())
674 <<
" has no output parameter";
681 std::find_if(skill.edges.begin(),
683 [&controlNode, &outputParam](
const skills::FluxioEdge& edge)
685 return (edge.fromNodePtr->nodeId == controlNode->nodeId &&
686 edge.fromParameterPtr->id == outputParam->second.id);
689 if (edge == skill.edges.end())
692 <<
"Skill " << skill.name
693 <<
" has no edge connected to the output event parameter of the AND merger";
700 edge->toNodePtr, edge->toParameterPtr, running,
executorName, profilePtr);
705 if (controlNode->slottedNode ==
nullptr)
707 ARMARX_INFO <<
"Slot is empty, Can not execute loop node.";
714 ARMARX_INFO <<
"Slotted node is not a subskill node, can not execute loop node.";
720 std::list<FluxioEdge> paramEdges;
721 std::copy_if(skill.edges.begin(),
723 std::back_inserter(paramEdges),
724 [&controlNode](
const skills::FluxioEdge& edge)
726 return (edge.isValid() &&
727 edge.fromParameterPtr->type->getShortName() !=
729 (edge.toNodePtr->nodeId == controlNode->slottedNode->nodeId ||
730 edge.toNodePtr->nodeId == controlNode->nodeId));
733 for (
const auto& e : paramEdges)
735 const auto& fromParam = e.fromParameterPtr;
736 const auto& toParam = e.toParameterPtr;
737 const auto& toNode = e.toNodePtr;
744 auto fromNodeId = e.fromNodePtr->nodeId;
747 fromNodeId = skill.id;
751 if (value ==
nullptr)
753 ARMARX_WARNING <<
"Failed to get possible input for parameter " << fromParam->id
754 <<
" " << fromNodeId;
757 if (toNode->nodeId == controlNode->nodeId &&
758 toParam->name.find(
"Iterations") != std::string::npos)
760 params->addElement(
"Iterations", value);
764 params->addElement(toParam->id, value);
770 auto loopExecutionId = IceUtil::generateUUID();
772 dynamic_cast<const skills::FluxioSubSkillNode*
>(controlNode->slottedNode.get()));
774 if (slottedSubskillNodePtr ==
nullptr)
776 ARMARX_INFO <<
"Unexpected nullptr. Dynamic cast from FluxioNodePtr to "
777 "FluxioSubSkillNodePtr failed.";
782 if (slottedSubskillNodePtr->skillPtr ==
nullptr)
784 ARMARX_INFO <<
"Unexpected nullptr. Subskill node has no skill pointer.";
789 std::shared_lock executionsLock(subExecutionsMapMutex);
790 const auto& loopExecutorPtr =
792 addLoopExecutorToDC(loopExecutionId, *slottedSubskillNodePtr->skillPtr, isRetry)
794 subExecutionsMap.emplace(controlNode->nodeId, loopExecutorPtr);
795 executionsLock.unlock();
797 std::thread([&] { loopExecutorPtr->run(
executorName, params, profilePtr); }).detach();
799 skills::FluxioSkillStatusUpdate statusUpdate;
800 skills::FluxioSkillStatusUpdate slottedStatusUpdate;
804 std::this_thread::sleep_for(std::chrono::milliseconds(250));
805 loopExecutorPtr->getStatusUpdate();
807 const auto& statusUpdateOpt = loopExecutorPtr->getStatus();
808 if (!statusUpdateOpt.has_value())
810 ARMARX_INFO <<
"No status update from loop execution yet. Waiting...";
813 statusUpdate = statusUpdateOpt.value();
817 const auto& lastUpdate =
820 [&controlNode](
const skills::FluxioSkillStatusUpdate& statusUpdate)
821 { return statusUpdate.subSkillNodeId == controlNode->nodeId; });
823 if (lastUpdate ==
statusUpdates.end() || lastUpdate->status != statusUpdate.status)
828 statusUpdate.status});
831 const auto& slottedStatusUpdateOpt = loopExecutorPtr->getSlottedStatus();
832 const auto& currentSlottedExecutionId = loopExecutorPtr->slottedExecutionId;
833 if (!slottedStatusUpdateOpt.has_value() || !currentSlottedExecutionId.has_value())
844 slottedStatusUpdate = slottedStatusUpdateOpt.value();
846 const auto& lastSlottedUpdate = std::find_if(
849 [&controlNode](
const skills::FluxioSkillStatusUpdate& statusUpdate)
850 { return statusUpdate.subSkillNodeId == controlNode->slottedNode->nodeId; });
853 lastSlottedUpdate->status != statusUpdate.status ||
854 slottedStatusUpdate.executionId != currentSlottedExecutionId.value())
857 currentSlottedExecutionId.value(),
858 controlNode->slottedNode->nodeId,
859 slottedStatusUpdate.status});
862 statusMapLock.unlock();
880 loopExecutorPtr->getResultsCopy();
881 possibleInputsLock.unlock();
883 const std::string& outputEventName =
889 const auto& outputParam = std::find_if(
890 controlNode->parametersMap.begin(),
891 controlNode->parametersMap.end(),
892 [&outputEventName](
const std::pair<std::string, skills::FluxioParameter>& param)
894 return (param.second.type->getShortName() ==
"Object<Event>" &&
895 !param.second.isInput && param.second.name == outputEventName);
898 if (outputParam == controlNode->parametersMap.end())
901 <<
" is missing the output event parameter " << outputEventName;
908 std::find_if(skill.edges.begin(),
910 [&controlNode, &outputParam](
const skills::FluxioEdge& edge)
912 return (edge.fromNodePtr->nodeId == controlNode->nodeId &&
913 edge.fromParameterPtr->id == outputParam->second.id);
916 if (edge == skill.edges.end())
918 if (outputEventName ==
"Failed")
924 if (outputEventName ==
"Aborted")
931 <<
" has no edge connected to the output event parameter "
938 const std::string& nextExecutorName =
executorName +
"/Loop";
940 edge->toNodePtr, edge->toParameterPtr, running, nextExecutorName, profilePtr);
956 abortSubExecutions();
960 FluxioCompositeExecutor::abortSubExecutions()
962 std::shared_lock executionsLock(subExecutionsMapMutex);
963 for (
const auto& [nodeId, executorPtr] : subExecutionsMap)
965 auto s = executorPtr->getStatus();
973 executorPtr->abort();
977 statusMapLock.unlock();
979 executionsLock.unlock();
982 std::optional<std::vector<skills::FluxioSkillStatusUpdate>>
985 ARMARX_INFO <<
"Getting status updates for skill " << skill.name;
988 auto ret = std::vector<skills::FluxioSkillStatusUpdate>(
statusUpdates.begin(),
990 statusMapLock.unlock();
995 FluxioCompositeExecutor::pollSubStatuses()
998 for (
const auto& [nodeId, executorPtr] : subExecutionsMap)
1000 executorPtr->getStatusUpdate();
1001 auto s = executorPtr->getStatus();
1007 const auto& lastStatus =
1011 { return statusUpdate.subSkillNodeId == nodeId; })
1014 if (lastStatus != s->status)
1027 const auto& startParam =
1028 std::find_if(skill.parameters.begin(),
1029 skill.parameters.end(),
1030 [](
const std::pair<std::string, skills::FluxioParameter>& param)
1032 return (param.second.type->getShortName() ==
"Object<Event>" &&
1033 param.second.isInput && param.second.name ==
"Start");
1036 if (startParam == skill.parameters.end())
1043 const auto& startNode = std::find_if(
1044 skill.nodes.begin(),
1046 [startParam](
const std::pair<
const std::string,
1047 const std::unique_ptr<skills::FluxioNode>>& nodeEntry)
1049 if (nodeEntry.second->nodeType != skills::FluxioNodeType::PARAMETER)
1054 const auto& paramNode =
1056 return (paramNode->parameterPtr->id == startParam->second.id);
1060 if (startNode == skill.
nodes.end())
1067 const auto& startEdge =
1068 std::find_if(skill.
edges.begin(),
1071 { return (edge.fromNodePtr->nodeId == startNode->second->nodeId); });
1074 if (startEdge == skill.
edges.end())
1076 ARMARX_WARNING <<
"Skill has no edge connected to the start node";
1081 const auto& outputParamsSuccess =
1084 [](
const std::pair<std::string, skills::FluxioParameter>& param)
1086 return (param.second.type->getShortName() ==
"Object<Event>" &&
1087 !param.second.isInput && param.second.name ==
"Succeeded");
1089 const auto& outputParamsFailed =
1092 [](
const std::pair<std::string, skills::FluxioParameter>& param)
1094 return (param.second.type->getShortName() ==
"Object<Event>" &&
1095 !param.second.isInput && param.second.name ==
"Failed");
1097 const auto& outputParamsAborted =
1100 [](
const std::pair<std::string, skills::FluxioParameter>& param)
1102 return (param.second.type->getShortName() ==
"Object<Event>" &&
1103 !param.second.isInput && param.second.name ==
"Aborted");
1106 if (outputParamsSuccess == skill.
parameters.end() ||
1107 outputParamsFailed == skill.
parameters.end() ||
1108 outputParamsAborted == skill.
parameters.end())
1110 ARMARX_WARNING <<
"Skill is missing one or more output event parameters";
1116 ARMARX_INFO <<
"Skill validation is not fully implemented yet.";
1124 FluxioExecutor::setStatus(
status, skill.
id);
void run(std::string executorName, armarx::aron::data::DictPtr parameters, std::experimental::observer_ptr< const FluxioProfile > profilePtr) override
FluxioCompositeExecutor(const std::string &id, const skills::FluxioSkill &skill, const std::function< skills::Result< std::experimental::observer_ptr< FluxioExecutor >, skills::error::FluxioException >(const std::string &skillId, const std::string &profileId, const std::string &executorName, armarx::aron::data::DictPtr parameters)> &&executeFluxioSkillFunc, const std::function< std::experimental::observer_ptr< FluxioExecutor >(const std::vector< std::string > ¶meterIds)> &&addMergerExecutorToDCFunc, const std::function< std::experimental::observer_ptr< FluxioExecutor >(const std::string &id, const skills::FluxioSkill &skill, bool isRetry)> &&addLoopExecutorToDCFunc)
bool validateSkill(skills::FluxioEdge &ret) const
std::optional< std::vector< skills::FluxioSkillStatusUpdate > > getStatusUpdate() override
FluxioExecutor(const FluxioExecutor &)=delete
armarx::aron::data::DictPtr result
armarx::aron::data::VariantPtr findParameterValue(const std::experimental::observer_ptr< const FluxioProfile > profilePtr, const FluxioParameter ¶meter) const
virtual armarx::aron::data::VariantPtr getPossibleInputCopy(const std::string nodeId, const std::string parameterId)
std::optional< skills::FluxioSkillStatusUpdate > status
std::shared_mutex resultMutex
std::list< skills::FluxioSkillStatusUpdate > statusUpdates
std::shared_mutex statusMutex
std::shared_mutex statusUpdatesMutex
std::map< std::string, armarx::aron::data::DictPtr > possibleInputs
std::optional< std::string > executorName
std::shared_mutex possibleInputsMutex
A base class for skill exceptions.
#define ARMARX_INFO
The normal logging level.
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
std::shared_ptr< Dict > DictPtr
std::shared_ptr< Variant > VariantPtr
This file is part of ArmarX.
std::optional< std::string > FluxioNodeTypeToString(const FluxioNodeType &type)
std::shared_ptr< Value > value()
observer_ptr< _Tp > make_observer(_Tp *__p) noexcept
std::experimental::observer_ptr< const FluxioParameter > toParameterPtr
std::experimental::observer_ptr< const FluxioNode > toNodePtr
std::map< std::string, FluxioParameter > parameters
std::list< FluxioEdge > edges
std::map< const std::string, const std::unique_ptr< FluxioNode > > nodes