8 #include <shared_mutex>
13 #include <IceUtil/UUID.h>
25 #include "../FluxioControlNode.h"
26 #include "../FluxioEdge.h"
27 #include "../FluxioParameter.h"
33 const std::string&
id,
35 const std::function<
void(
const std::string& executionId)>&& abortFluxioSkillFunc,
38 const std::string& skillId,
39 const std::string& profileId,
40 const std::string& executorName,
43 const std::vector<std::string>& parameterIds)>&& addMergerExecutorToDCFunc) :
45 abortFluxioSkill(abortFluxioSkillFunc),
46 executeFluxioSkill(executeFluxioSkillFunc),
47 addMergerExecutorToDC(addMergerExecutorToDCFunc),
52 for (
const auto& node : skill.
nodes)
58 result = std::make_shared<armarx::aron::data::Dict>();
74 for (
const auto& [key, param] : this->skill.
parameters)
76 if (param.type->getShortName() ==
"Object<Event>" || !param.isInput ||
77 this->possibleInputs[this->skill.id]->hasElement(key))
83 if (fallBack ==
nullptr)
85 ARMARX_WARNING <<
"No fallback value found for parameter " << param.name;
93 possibleInputsLock.unlock();
106 std::unique_lock executionsLock(subExecutionsMapMutex);
107 subExecutionsMap.clear();
108 executionsLock.unlock();
110 result = std::make_shared<armarx::aron::data::Dict>();
115 std::atomic_bool skillRunning =
true;
119 [
this, &skillRunning]
123 this->pollSubStatuses();
124 std::this_thread::sleep_for(
133 [
this, &startEdge, &skillRunning, newExecutorName, profilePtr]
135 this->startSubRoutine(startEdge.
toNodePtr,
149 const auto sleepTime =
150 useTimeout ?
std::min(pollingFrequency, skill.
timeout) : pollingFrequency;
155 std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime.toMilliSeconds()));
160 abortSubExecutions();
164 std::this_thread::sleep_for(
166 skillRunning =
false;
182 abortSubExecutions();
186 std::this_thread::sleep_for(
188 skillRunning =
false;
195 FluxioCompositeExecutor::startSubRoutine(
198 std::atomic_bool& running,
199 const std::string& executorName,
207 if (startNode ==
nullptr)
218 dynamic_cast<const skills::FluxioParameterNode*
>(startNode.get()));
219 if (paramNodePtr ==
nullptr)
221 ARMARX_WARNING <<
"Unexpected nullptr. Dynamic cast from FluxioNodePtr to "
222 "FluxioParameterNodePtr failed.";
227 this->handleParameterRoutine(paramNodePtr, running,
executorName);
233 dynamic_cast<const skills::FluxioControlNode*
>(startNode.get()));
235 if (controlNodePtr ==
nullptr)
237 ARMARX_WARNING <<
"Unexpected nullptr. Dynamic cast from FluxioNodePtr to "
238 "FluxioControlNodePtr failed.";
243 this->handleControlRoutine(
244 controlNodePtr, startParameter, running,
executorName, profilePtr);
250 dynamic_cast<const skills::FluxioSubSkillNode*
>(startNode.get()));
251 if (subSkillNodePtr ==
nullptr || subSkillNodePtr->skillPtr ==
nullptr)
253 ARMARX_WARNING <<
"Unexpected nullptr. Dynamic cast from FluxioNodePtr to "
254 "FluxioSubSkillNodePtr failed.";
259 this->handleSubSkillRoutine(subSkillNodePtr, running,
executorName, profilePtr);
264 <<
"Unexpected node type '"
266 <<
"' for Node with id " << startNode->nodeId;
273 FluxioCompositeExecutor::handleParameterRoutine(
278 if (parameterNode ==
nullptr || parameterNode->parameterPtr ==
nullptr)
286 if (parameterNode->parameterPtr->type->getShortName() !=
"Object<Event>")
289 << parameterNode->parameterPtr->type->getShortName();
294 const auto& skill = this->skill;
295 std::list<FluxioEdge> resEdges;
296 std::copy_if(skill.
edges.begin(),
298 std::back_inserter(resEdges),
299 [](
const skills::FluxioEdge& edge)
301 return (edge.isValid() &&
302 edge.toNodePtr->nodeType == FluxioNodeType::PARAMETER &&
303 edge.fromParameterPtr->type->getShortName() !=
"Object<Event>");
307 for (
const auto& e : resEdges)
309 const auto& fromParam = e.fromParameterPtr;
310 const auto& toParam = e.toParameterPtr;
311 auto fromNodeId = e.fromNodePtr->nodeId;
314 fromNodeId = skill.
id;
327 const std::string& eventType = parameterNode->parameterPtr->name;
328 if (eventType ==
"Succeeded")
332 else if (eventType ==
"Failed")
336 else if (eventType ==
"Aborted")
342 ARMARX_WARNING <<
"Unexpected event type " << eventType <<
" for parameter "
343 << parameterNode->parameterPtr->name;
349 FluxioCompositeExecutor::handleSubSkillRoutine(
351 std::atomic_bool& running,
352 const std::string& executorName,
355 if (subSkillNode ==
nullptr || subSkillNode->skillPtr ==
nullptr)
365 std::list<FluxioEdge> paramEdges;
366 std::copy_if(skill.
edges.begin(),
368 std::back_inserter(paramEdges),
369 [&subSkillNode](
const skills::FluxioEdge& edge)
371 return (edge.isValid() && edge.toNodePtr->nodeId == subSkillNode->nodeId &&
372 edge.fromParameterPtr->type->getShortName() !=
"Object<Event>");
375 for (
const auto& e : paramEdges)
377 const auto& fromParam = e.fromParameterPtr;
378 const auto& toParam = e.toParameterPtr;
379 auto fromNodeId = e.fromNodePtr->nodeId;
382 fromNodeId = skill.
id;
385 if (
value ==
nullptr)
387 ARMARX_WARNING <<
"Failed to get possible input for parameter " << fromParam->id
388 <<
" " << fromNodeId;
390 params->addElement(toParam->id,
value);
394 const auto& executorRes =
395 executeFluxioSkill(subSkillNode->skillPtr->id, profilePtr->id,
executorName, params);
396 if (!executorRes.isSuccess())
398 ARMARX_WARNING <<
"Failed to execute subskill " << subSkillNode->skillPtr->id;
402 auto executorPtr = executorRes.getResult();
403 if (executorPtr ==
nullptr)
405 ARMARX_WARNING <<
"Failed to execute subskill " << subSkillNode->skillPtr->id;
410 std::unique_lock executionsLock(subExecutionsMapMutex);
411 subExecutionsMap[subSkillNode->nodeId] = executorPtr;
412 executionsLock.unlock();
415 skills::FluxioSkillStatusUpdate statusUpdate;
419 std::this_thread::sleep_for(std::chrono::milliseconds(250));
420 executorPtr->getStatusUpdate();
421 const auto& statusUpdateIt = executorPtr->getStatus();
423 if (!statusUpdateIt.has_value())
425 ARMARX_INFO <<
"No status update from skill " << subSkillNode->skillPtr->name
426 <<
" yet. Waiting...";
430 statusUpdate = statusUpdateIt.value();
433 const auto& lastUpdate =
436 [&subSkillNode](
const skills::FluxioSkillStatusUpdate& statusUpdate)
437 { return statusUpdate.subSkillNodeId == subSkillNode->nodeId; });
439 if (lastUpdate ==
statusUpdates.end() || lastUpdate->status != statusUpdate.status)
443 subSkillNode->nodeId,
444 statusUpdate.status});
447 statusMapLock.unlock();
465 this->
possibleInputs[subSkillNode->nodeId] = executorPtr->getResultsCopy();
466 possibleInputsLock.unlock();
469 const std::string& outputEventName =
474 const auto& outputParam = std::find_if(
475 subSkillNode->skillPtr->parameters.begin(),
476 subSkillNode->skillPtr->parameters.end(),
477 [&outputEventName](
const std::pair<std::string, skills::FluxioParameter>& param)
479 return (param.second.type->getShortName() ==
"Object<Event>" && !param.second.isInput &&
480 param.second.name == outputEventName);
483 if (outputParam == subSkillNode->skillPtr->parameters.end())
486 <<
" is missing the output event parameter " << outputEventName;
493 std::find_if(skill.
edges.begin(),
495 [&subSkillNode, &outputParam](
const skills::FluxioEdge& edge)
497 return (edge.fromNodePtr->nodeId == subSkillNode->nodeId &&
498 edge.fromParameterPtr->id == outputParam->second.id);
501 if (edge == skill.
edges.end())
503 if (outputEventName ==
"Failed") {
508 if (outputEventName ==
"Aborted") {
514 <<
" has no edge connected to the output event parameter "
521 const std::string& nextExecutorName =
executorName +
"/" + subSkillNode->skillPtr->name;
523 edge->toNodePtr, edge->toParameterPtr, running, nextExecutorName, profilePtr);
527 FluxioCompositeExecutor::handleControlRoutine(
530 std::atomic_bool& running,
531 const std::string& executorName,
534 if (controlNode ==
nullptr)
545 std::vector<std::experimental::observer_ptr<const skills::FluxioEdge>> edgePtrs = {};
546 for (
const auto& [
id, param] : controlNode->parametersMap)
554 for (
const auto& edge : skill.
edges)
556 if (edge.fromParameterPtr !=
nullptr && edge.fromParameterPtr->id ==
id)
563 const size_t allParams = edgePtrs.size();
569 const std::string newExecutorName =
executorName +
"/Splitter" +
"(" +
573 [
this, edgePtr, &running, newExecutorName, profilePtr]
575 startSubRoutine(edgePtr->toNodePtr,
576 edgePtr->toParameterPtr,
589 std::unique_lock executionsLock(subExecutionsMapMutex);
590 const auto& executorPtr = subExecutionsMap.find(controlNode->nodeId);
591 if (executorPtr == subExecutionsMap.end())
594 std::vector<std::string> paramIds = {};
595 for (
const auto& [
id, param] : controlNode->parametersMap)
599 paramIds.push_back(
id);
605 dynamic_cast<FluxioMergerExecutor*
>(addMergerExecutorToDC(paramIds).get()));
606 subExecutionsMap.emplace(controlNode->nodeId, mergerExecutorPtr);
607 executionsLock.unlock();
611 executionsLock.unlock();
614 dynamic_cast<FluxioMergerExecutor*
>(executorPtr->second.get()));
617 if (mergerExecutorPtr ==
nullptr)
619 ARMARX_WARNING <<
"Unexpected nullptr. Dynamic cast from FluxioExecutorPtr to "
620 "FluxioMergerExecutorPtr failed.";
626 mergerExecutorPtr->checkInToken(startParameter->id);
628 if (mergerExecutorPtr->getStatus() != std::nullopt)
634 mergerExecutorPtr->run(
executorName,
nullptr, profilePtr);
636 const auto& outputParam =
637 std::find_if(controlNode->parametersMap.begin(),
638 controlNode->parametersMap.end(),
639 [](
const std::pair<std::string, skills::FluxioParameter>& param)
640 { return !param.second.isInput; });
642 if (outputParam == controlNode->parametersMap.end())
645 <<
" has no output parameter";
652 std::find_if(skill.
edges.begin(),
654 [&controlNode, &outputParam](
const skills::FluxioEdge& edge)
656 return (edge.fromNodePtr->nodeId == controlNode->nodeId &&
657 edge.fromParameterPtr->id == outputParam->second.id);
660 if (edge == skill.
edges.end())
663 <<
"Skill " << skill.
name
664 <<
" has no edge connected to the output event parameter of the AND merger";
671 edge->toNodePtr, edge->toParameterPtr, running,
executorName, profilePtr);
686 abortSubExecutions();
690 FluxioCompositeExecutor::abortSubExecutions()
692 std::shared_lock executionsLock(subExecutionsMapMutex);
693 for (
const auto& [nodeId, executorPtr] : subExecutionsMap)
695 auto s = executorPtr->getStatus();
703 executorPtr->abort();
707 statusMapLock.unlock();
709 executionsLock.unlock();
712 std::optional<std::vector<skills::FluxioSkillStatusUpdate>>
718 auto ret = std::vector<skills::FluxioSkillStatusUpdate>(
statusUpdates.begin(),
720 statusMapLock.unlock();
725 FluxioCompositeExecutor::pollSubStatuses()
728 for (
const auto& [nodeId, executorPtr] : subExecutionsMap)
730 executorPtr->getStatusUpdate();
731 auto s = executorPtr->getStatus();
737 const auto& lastStatus =
741 { return statusUpdate.subSkillNodeId == nodeId; })
744 if (lastStatus !=
s->status)
757 const auto& startParam =
760 [](
const std::pair<std::string, skills::FluxioParameter>& param)
762 return (param.second.type->getShortName() ==
"Object<Event>" &&
763 param.second.isInput && param.second.name ==
"Start");
773 const auto& startNode = std::find_if(
776 [startParam](
const std::pair<
const std::string,
777 const std::unique_ptr<skills::FluxioNode>>& nodeEntry)
779 if (nodeEntry.second->nodeType != skills::FluxioNodeType::PARAMETER)
784 const auto& paramNode =
786 return (paramNode->parameterPtr->id == startParam->second.id);
790 if (startNode == skill.
nodes.end())
797 const auto& startEdge =
798 std::find_if(skill.
edges.begin(),
801 { return (edge.fromNodePtr->nodeId == startNode->second->nodeId); });
804 if (startEdge == skill.
edges.end())
806 ARMARX_WARNING <<
"Skill has no edge connected to the start node";
811 const auto& outputParamsSuccess =
814 [](
const std::pair<std::string, skills::FluxioParameter>& param)
816 return (param.second.type->getShortName() ==
"Object<Event>" &&
817 !param.second.isInput && param.second.name ==
"Succeeded");
819 const auto& outputParamsFailed =
822 [](
const std::pair<std::string, skills::FluxioParameter>& param)
824 return (param.second.type->getShortName() ==
"Object<Event>" &&
825 !param.second.isInput && param.second.name ==
"Failed");
827 const auto& outputParamsAborted =
830 [](
const std::pair<std::string, skills::FluxioParameter>& param)
832 return (param.second.type->getShortName() ==
"Object<Event>" &&
833 !param.second.isInput && param.second.name ==
"Aborted");
836 if (outputParamsSuccess == skill.
parameters.end() ||
837 outputParamsFailed == skill.
parameters.end() ||
838 outputParamsAborted == skill.
parameters.end())
840 ARMARX_WARNING <<
"Skill is missing one or more output event parameters";
846 ARMARX_INFO <<
"Skill validation is not fully implemented yet.";
854 FluxioExecutor::setStatus(
status, skill.
id);