8 #include <shared_mutex>
13 #include <IceUtil/UUID.h>
25 #include "../FluxioControlNode.h"
26 #include "../FluxioEdge.h"
27 #include "../FluxioParameter.h"
33 const std::string&
id,
35 const std::function<
void(
const std::string& executionId)>&& abortFluxioSkillFunc,
38 const std::string& skillId,
39 const std::string& profileId,
40 const std::string& executorName,
43 const std::vector<std::string>& parameterIds)>&& addMergerExecutorToDCFunc) :
45 abortFluxioSkill(abortFluxioSkillFunc),
46 executeFluxioSkill(executeFluxioSkillFunc),
47 addMergerExecutorToDC(addMergerExecutorToDCFunc),
52 for (
const auto& node : skill.
nodes)
58 result = std::make_shared<armarx::aron::data::Dict>();
74 for (
const auto& [key, param] : this->skill.
parameters)
76 if (param.type->getShortName() ==
"Object<Event>" || !param.isInput ||
77 this->possibleInputs[this->skill.id]->hasElement(key))
83 if (fallBack ==
nullptr)
85 ARMARX_INFO <<
"No fallback value found for parameter " << param.name;
89 <<
"Parameter is not required. Using nullptr (std::nullopt) instead.";
93 ARMARX_INFO <<
"Can't execute skill due to incomplete params.";
102 possibleInputsLock.unlock();
115 std::unique_lock executionsLock(subExecutionsMapMutex);
116 subExecutionsMap.clear();
117 executionsLock.unlock();
119 result = std::make_shared<armarx::aron::data::Dict>();
124 std::atomic_bool skillRunning =
true;
128 [
this, &skillRunning]
132 this->pollSubStatuses();
133 std::this_thread::sleep_for(
142 [
this, &startEdge, &skillRunning, newExecutorName, profilePtr]
144 this->startSubRoutine(startEdge.
toNodePtr,
158 const auto sleepTime =
159 useTimeout ?
std::min(pollingFrequency, skill.
timeout) : pollingFrequency;
164 std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime.toMilliSeconds()));
169 abortSubExecutions();
173 std::this_thread::sleep_for(
175 skillRunning =
false;
191 abortSubExecutions();
195 std::this_thread::sleep_for(
197 skillRunning =
false;
204 FluxioCompositeExecutor::startSubRoutine(
207 std::atomic_bool& running,
208 const std::string& executorName,
216 if (startNode ==
nullptr)
227 dynamic_cast<const skills::FluxioParameterNode*
>(startNode.get()));
228 if (paramNodePtr ==
nullptr)
230 ARMARX_WARNING <<
"Unexpected nullptr. Dynamic cast from FluxioNodePtr to "
231 "FluxioParameterNodePtr failed.";
236 this->handleParameterRoutine(paramNodePtr, running,
executorName);
242 dynamic_cast<const skills::FluxioControlNode*
>(startNode.get()));
244 if (controlNodePtr ==
nullptr)
246 ARMARX_WARNING <<
"Unexpected nullptr. Dynamic cast from FluxioNodePtr to "
247 "FluxioControlNodePtr failed.";
252 this->handleControlRoutine(
253 controlNodePtr, startParameter, running,
executorName, profilePtr);
259 dynamic_cast<const skills::FluxioSubSkillNode*
>(startNode.get()));
260 if (subSkillNodePtr ==
nullptr || subSkillNodePtr->skillPtr ==
nullptr)
262 ARMARX_WARNING <<
"Unexpected nullptr. Dynamic cast from FluxioNodePtr to "
263 "FluxioSubSkillNodePtr failed.";
268 this->handleSubSkillRoutine(subSkillNodePtr, running,
executorName, profilePtr);
273 <<
"Unexpected node type '"
275 <<
"' for Node with id " << startNode->nodeId;
282 FluxioCompositeExecutor::handleParameterRoutine(
287 if (parameterNode ==
nullptr || parameterNode->parameterPtr ==
nullptr)
295 if (parameterNode->parameterPtr->type->getShortName() !=
"Object<Event>")
298 << parameterNode->parameterPtr->type->getShortName();
303 const auto& skill = this->skill;
304 std::list<FluxioEdge> resEdges;
305 std::copy_if(skill.
edges.begin(),
307 std::back_inserter(resEdges),
308 [](
const skills::FluxioEdge& edge)
310 return (edge.isValid() &&
311 edge.toNodePtr->nodeType == FluxioNodeType::PARAMETER &&
312 edge.fromParameterPtr->type->getShortName() !=
"Object<Event>");
316 for (
const auto& e : resEdges)
318 const auto& fromParam = e.fromParameterPtr;
319 const auto& toParam = e.toParameterPtr;
320 auto fromNodeId = e.fromNodePtr->nodeId;
323 fromNodeId = skill.
id;
336 const std::string& eventType = parameterNode->parameterPtr->name;
337 if (eventType ==
"Succeeded")
341 else if (eventType ==
"Failed")
345 else if (eventType ==
"Aborted")
351 ARMARX_WARNING <<
"Unexpected event type " << eventType <<
" for parameter "
352 << parameterNode->parameterPtr->name;
358 FluxioCompositeExecutor::handleSubSkillRoutine(
360 std::atomic_bool& running,
361 const std::string& executorName,
364 if (subSkillNode ==
nullptr || subSkillNode->skillPtr ==
nullptr)
374 std::list<FluxioEdge> paramEdges;
375 std::copy_if(skill.
edges.begin(),
377 std::back_inserter(paramEdges),
378 [&subSkillNode](
const skills::FluxioEdge& edge)
380 return (edge.isValid() && edge.toNodePtr->nodeId == subSkillNode->nodeId &&
381 edge.fromParameterPtr->type->getShortName() !=
"Object<Event>");
384 for (
const auto& e : paramEdges)
386 const auto& fromParam = e.fromParameterPtr;
387 const auto& toParam = e.toParameterPtr;
388 auto fromNodeId = e.fromNodePtr->nodeId;
391 fromNodeId = skill.
id;
394 if (
value ==
nullptr)
396 ARMARX_WARNING <<
"Failed to get possible input for parameter " << fromParam->id
397 <<
" " << fromNodeId;
399 params->addElement(toParam->id,
value);
403 const auto& executorRes =
404 executeFluxioSkill(subSkillNode->skillPtr->id, profilePtr->id,
executorName, params);
405 if (!executorRes.isSuccess())
407 ARMARX_WARNING <<
"Failed to execute subskill " << subSkillNode->skillPtr->id;
411 auto executorPtr = executorRes.getResult();
412 if (executorPtr ==
nullptr)
414 ARMARX_WARNING <<
"Failed to execute subskill " << subSkillNode->skillPtr->id;
419 std::unique_lock executionsLock(subExecutionsMapMutex);
420 subExecutionsMap[subSkillNode->nodeId] = executorPtr;
421 executionsLock.unlock();
424 skills::FluxioSkillStatusUpdate statusUpdate;
428 std::this_thread::sleep_for(std::chrono::milliseconds(250));
429 executorPtr->getStatusUpdate();
430 const auto& statusUpdateIt = executorPtr->getStatus();
432 if (!statusUpdateIt.has_value())
434 ARMARX_INFO <<
"No status update from skill " << subSkillNode->skillPtr->name
435 <<
" yet. Waiting...";
439 statusUpdate = statusUpdateIt.value();
442 const auto& lastUpdate =
445 [&subSkillNode](
const skills::FluxioSkillStatusUpdate& statusUpdate)
446 { return statusUpdate.subSkillNodeId == subSkillNode->nodeId; });
448 if (lastUpdate ==
statusUpdates.end() || lastUpdate->status != statusUpdate.status)
452 subSkillNode->nodeId,
453 statusUpdate.status});
456 statusMapLock.unlock();
474 this->
possibleInputs[subSkillNode->nodeId] = executorPtr->getResultsCopy();
475 possibleInputsLock.unlock();
478 const std::string& outputEventName =
483 const auto& outputParam = std::find_if(
484 subSkillNode->skillPtr->parameters.begin(),
485 subSkillNode->skillPtr->parameters.end(),
486 [&outputEventName](
const std::pair<std::string, skills::FluxioParameter>& param)
488 return (param.second.type->getShortName() ==
"Object<Event>" &&
489 !param.second.isInput && param.second.name == outputEventName);
492 if (outputParam == subSkillNode->skillPtr->parameters.end())
495 <<
" is missing the output event parameter " << outputEventName;
502 std::find_if(skill.
edges.begin(),
504 [&subSkillNode, &outputParam](
const skills::FluxioEdge& edge)
506 return (edge.fromNodePtr->nodeId == subSkillNode->nodeId &&
507 edge.fromParameterPtr->id == outputParam->second.id);
510 if (edge == skill.
edges.end())
512 if (outputEventName ==
"Failed")
518 if (outputEventName ==
"Aborted")
525 <<
" has no edge connected to the output event parameter "
532 const std::string& nextExecutorName =
executorName +
"/" + subSkillNode->skillPtr->name;
534 edge->toNodePtr, edge->toParameterPtr, running, nextExecutorName, profilePtr);
538 FluxioCompositeExecutor::handleControlRoutine(
541 std::atomic_bool& running,
542 const std::string& executorName,
545 if (controlNode ==
nullptr)
556 std::vector<std::experimental::observer_ptr<const skills::FluxioEdge>> edgePtrs = {};
557 for (
const auto& [
id, param] : controlNode->parametersMap)
565 for (
const auto& edge : skill.
edges)
567 if (edge.isValid() && edge.fromNodePtr->nodeId == controlNode->nodeId &&
568 edge.fromParameterPtr->id ==
id)
575 const size_t allParams = edgePtrs.size();
581 const std::string newExecutorName =
executorName +
"/Splitter" +
"(" +
585 [
this, edgePtr, &running, newExecutorName, profilePtr]
587 startSubRoutine(edgePtr->toNodePtr,
588 edgePtr->toParameterPtr,
601 std::unique_lock executionsLock(subExecutionsMapMutex);
602 const auto& executorPtr = subExecutionsMap.find(controlNode->nodeId);
603 if (executorPtr == subExecutionsMap.end())
606 std::vector<std::string> paramIds = {};
607 for (
const auto& [
id, param] : controlNode->parametersMap)
611 paramIds.push_back(
id);
617 dynamic_cast<FluxioMergerExecutor*
>(addMergerExecutorToDC(paramIds).get()));
618 subExecutionsMap.emplace(controlNode->nodeId, mergerExecutorPtr);
619 executionsLock.unlock();
623 executionsLock.unlock();
626 dynamic_cast<FluxioMergerExecutor*
>(executorPtr->second.get()));
629 if (mergerExecutorPtr ==
nullptr)
631 ARMARX_WARNING <<
"Unexpected nullptr. Dynamic cast from FluxioExecutorPtr to "
632 "FluxioMergerExecutorPtr failed.";
638 mergerExecutorPtr->checkInToken(startParameter->id);
640 if (mergerExecutorPtr->getStatus() != std::nullopt)
646 mergerExecutorPtr->run(
executorName,
nullptr, profilePtr);
648 const auto& outputParam =
649 std::find_if(controlNode->parametersMap.begin(),
650 controlNode->parametersMap.end(),
651 [](
const std::pair<std::string, skills::FluxioParameter>& param)
652 { return !param.second.isInput; });
654 if (outputParam == controlNode->parametersMap.end())
657 <<
" has no output parameter";
664 std::find_if(skill.
edges.begin(),
666 [&controlNode, &outputParam](
const skills::FluxioEdge& edge)
668 return (edge.fromNodePtr->nodeId == controlNode->nodeId &&
669 edge.fromParameterPtr->id == outputParam->second.id);
672 if (edge == skill.
edges.end())
675 <<
"Skill " << skill.
name
676 <<
" has no edge connected to the output event parameter of the AND merger";
683 edge->toNodePtr, edge->toParameterPtr, running,
executorName, profilePtr);
698 abortSubExecutions();
702 FluxioCompositeExecutor::abortSubExecutions()
704 std::shared_lock executionsLock(subExecutionsMapMutex);
705 for (
const auto& [nodeId, executorPtr] : subExecutionsMap)
707 auto s = executorPtr->getStatus();
715 executorPtr->abort();
719 statusMapLock.unlock();
721 executionsLock.unlock();
724 std::optional<std::vector<skills::FluxioSkillStatusUpdate>>
730 auto ret = std::vector<skills::FluxioSkillStatusUpdate>(
statusUpdates.begin(),
732 statusMapLock.unlock();
737 FluxioCompositeExecutor::pollSubStatuses()
740 for (
const auto& [nodeId, executorPtr] : subExecutionsMap)
742 executorPtr->getStatusUpdate();
743 auto s = executorPtr->getStatus();
749 const auto& lastStatus =
753 { return statusUpdate.subSkillNodeId == nodeId; })
756 if (lastStatus !=
s->status)
769 const auto& startParam =
772 [](
const std::pair<std::string, skills::FluxioParameter>& param)
774 return (param.second.type->getShortName() ==
"Object<Event>" &&
775 param.second.isInput && param.second.name ==
"Start");
785 const auto& startNode = std::find_if(
788 [startParam](
const std::pair<
const std::string,
789 const std::unique_ptr<skills::FluxioNode>>& nodeEntry)
791 if (nodeEntry.second->nodeType != skills::FluxioNodeType::PARAMETER)
796 const auto& paramNode =
798 return (paramNode->parameterPtr->id == startParam->second.id);
802 if (startNode == skill.
nodes.end())
809 const auto& startEdge =
810 std::find_if(skill.
edges.begin(),
813 { return (edge.fromNodePtr->nodeId == startNode->second->nodeId); });
816 if (startEdge == skill.
edges.end())
818 ARMARX_WARNING <<
"Skill has no edge connected to the start node";
823 const auto& outputParamsSuccess =
826 [](
const std::pair<std::string, skills::FluxioParameter>& param)
828 return (param.second.type->getShortName() ==
"Object<Event>" &&
829 !param.second.isInput && param.second.name ==
"Succeeded");
831 const auto& outputParamsFailed =
834 [](
const std::pair<std::string, skills::FluxioParameter>& param)
836 return (param.second.type->getShortName() ==
"Object<Event>" &&
837 !param.second.isInput && param.second.name ==
"Failed");
839 const auto& outputParamsAborted =
842 [](
const std::pair<std::string, skills::FluxioParameter>& param)
844 return (param.second.type->getShortName() ==
"Object<Event>" &&
845 !param.second.isInput && param.second.name ==
"Aborted");
848 if (outputParamsSuccess == skill.
parameters.end() ||
849 outputParamsFailed == skill.
parameters.end() ||
850 outputParamsAborted == skill.
parameters.end())
852 ARMARX_WARNING <<
"Skill is missing one or more output event parameters";
858 ARMARX_INFO <<
"Skill validation is not fully implemented yet.";
866 FluxioExecutor::setStatus(
status, skill.
id);