FluxioCompositeExecutor.cpp
Go to the documentation of this file.
2 
3 #include <algorithm>
4 #include <chrono>
5 #include <memory>
6 #include <mutex>
7 #include <optional>
8 #include <shared_mutex>
9 #include <string>
10 #include <thread>
11 #include <vector>
12 
13 #include <IceUtil/UUID.h>
14 
17 
24 
25 #include "../FluxioControlNode.h"
26 #include "../FluxioEdge.h"
27 #include "../FluxioParameter.h"
28 #include "FluxioMergerExecutor.h"
29 
30 namespace armarx::skills
31 {
33  const std::string& id,
34  const skills::FluxioSkill& skill,
35  const std::function<void(const std::string& executionId)>&& abortFluxioSkillFunc,
38  const std::string& skillId,
39  const std::string& profileId,
40  const std::string& executorName,
41  armarx::aron::data::DictPtr parameters)>&& executeFluxioSkillFunc,
43  const std::vector<std::string>& parameterIds)>&& addMergerExecutorToDCFunc) :
44  FluxioExecutor(id, false),
45  abortFluxioSkill(abortFluxioSkillFunc),
46  executeFluxioSkill(executeFluxioSkillFunc),
47  addMergerExecutorToDC(addMergerExecutorToDCFunc),
48  skill(skill)
49  {
50  std::scoped_lock l(possibleInputsMutex);
51  possibleInputs[skill.id] = nullptr;
52  for (const auto& node : skill.nodes)
53  {
54  possibleInputs[node.first] = nullptr;
55  }
56 
57  std::scoped_lock resultLock(resultMutex);
58  result = std::make_shared<armarx::aron::data::Dict>();
59  }
60 
61  void
62  FluxioCompositeExecutor::run(const std::string executorName,
63  armarx::aron::data::DictPtr parameters,
65  {
66  ARMARX_INFO << "Running skill " << skill.name;
68  this->executorName = executorName;
69 
70  std::unique_lock possibleInputsLock(possibleInputsMutex);
71  this->possibleInputs[skill.id] = parameters;
72 
73  // fill in missing with default values
74  for (const auto& [key, param] : this->skill.parameters)
75  {
76  if (param.type->getShortName() == "Object<Event>" || !param.isInput ||
77  this->possibleInputs[this->skill.id]->hasElement(key))
78  {
79  continue;
80  }
81  const auto& fallBack = this->findParameterValue(profilePtr, param);
82 
83  if (fallBack == nullptr)
84  {
85  ARMARX_WARNING << "No fallback value found for parameter " << param.name;
86  ARMARX_WARNING << "Aborting skill execution.";
87  this->abort();
88  return;
89  }
90 
91  this->possibleInputs[this->skill.id]->addElement(key, fallBack);
92  }
93  possibleInputsLock.unlock();
94 
95  skills::FluxioEdge startEdge;
96  if (!validateSkill(startEdge))
97  {
98 
99  ARMARX_WARNING << "Skill execution cancelled.";
100  setStatus(skills::SkillStatus::Failed);
101  return;
102  }
103 
105 
106  std::unique_lock executionsLock(subExecutionsMapMutex);
107  subExecutionsMap.clear();
108  executionsLock.unlock();
109  std::unique_lock resultLock(resultMutex);
110  result = std::make_shared<armarx::aron::data::Dict>();
111  resultLock.unlock();
112 
114 
115  std::atomic_bool skillRunning = true;
116 
117  // thread for polling status updates
118  std::thread(
119  [this, &skillRunning]
120  {
121  while (skillRunning)
122  {
123  this->pollSubStatuses();
124  std::this_thread::sleep_for(
125  std::chrono::milliseconds(pollingFrequency.toMilliSeconds()));
126  }
127  })
128  .detach();
129 
130  // thread for the sub routines
131  const std::string newExecutorName = executorName + "/" + skill.name;
132  std::thread(
133  [this, &startEdge, &skillRunning, newExecutorName, profilePtr]
134  {
135  this->startSubRoutine(startEdge.toNodePtr,
136  startEdge.toParameterPtr,
137  skillRunning,
138  newExecutorName,
139  profilePtr);
140  })
141  .detach();
142 
143  setStatus(skills::SkillStatus::Running);
144  const bool useTimeout = skill.timeout.isPositive();
145  const auto timeStarted = armarx::DateTime::Now();
146 
147  // if the skill has a timout smaller than the polling frequency of the executor, use the skill´s timeout
148  // as pollingfrequency instead to avoid missing the timeout ('overshoot')
149  const auto sleepTime =
150  useTimeout ? std::min(pollingFrequency, skill.timeout) : pollingFrequency;
151 
152  // main loop
153  while (skillRunning)
154  {
155  std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime.toMilliSeconds()));
156 
157  if (useTimeout && armarx::DateTime::Now() >= timeStarted + skill.timeout)
158  {
159  ARMARX_WARNING << "Skill " << skill.name << " timed out.";
160  abortSubExecutions();
161  setStatus(skills::SkillStatus::Aborted);
162 
163  // wait for the thread to update the status map one last time
164  std::this_thread::sleep_for(
165  std::chrono::milliseconds(pollingFrequency.toMilliSeconds()));
166  skillRunning = false;
167  return;
168  }
169 
170  std::shared_lock l(statusMutex);
171  const auto s = this->status;
172  l.unlock();
173 
174  if (s->status == skills::SkillStatus::Aborted ||
175  s->status == skills::SkillStatus::Failed ||
177  {
178  // the skill was aborted with the abort method, no need to abort the
179  // SubExecutions twice
180  if (s->status != skills::SkillStatus::Aborted)
181  {
182  abortSubExecutions();
183  }
184 
185  // wait for the thread to update the status map one last time
186  std::this_thread::sleep_for(
187  std::chrono::milliseconds(pollingFrequency.toMilliSeconds()));
188  skillRunning = false;
189  return;
190  }
191  }
192  }
193 
194  void
195  FluxioCompositeExecutor::startSubRoutine(
198  std::atomic_bool& running,
199  const std::string& executorName,
201  {
202  if (!running)
203  {
204  return;
205  }
206 
207  if (startNode == nullptr)
208  {
209  ARMARX_WARNING << "Unexpected nullptr";
210  setStatus(skills::SkillStatus::Failed);
211  return;
212  }
213 
214  if (startNode->nodeType == skills::FluxioNodeType::PARAMETER)
215  {
216  // cast to parameter node
217  const auto& paramNodePtr = std::experimental::make_observer(
218  dynamic_cast<const skills::FluxioParameterNode*>(startNode.get()));
219  if (paramNodePtr == nullptr)
220  {
221  ARMARX_WARNING << "Unexpected nullptr. Dynamic cast from FluxioNodePtr to "
222  "FluxioParameterNodePtr failed.";
223  setStatus(skills::SkillStatus::Failed);
224  return;
225  }
226 
227  this->handleParameterRoutine(paramNodePtr, running, executorName);
228  }
229  else if (startNode->nodeType == skills::FluxioNodeType::CONTROL)
230  {
231  // cast to control node
232  const auto& controlNodePtr = std::experimental::make_observer(
233  dynamic_cast<const skills::FluxioControlNode*>(startNode.get()));
234 
235  if (controlNodePtr == nullptr)
236  {
237  ARMARX_WARNING << "Unexpected nullptr. Dynamic cast from FluxioNodePtr to "
238  "FluxioControlNodePtr failed.";
239  setStatus(skills::SkillStatus::Failed);
240  return;
241  }
242 
243  this->handleControlRoutine(
244  controlNodePtr, startParameter, running, executorName, profilePtr);
245  }
246  else if (startNode->nodeType == skills::FluxioNodeType::SUBSKILL)
247  {
248  // cast to subskill node
249  const auto& subSkillNodePtr = std::experimental::make_observer(
250  dynamic_cast<const skills::FluxioSubSkillNode*>(startNode.get()));
251  if (subSkillNodePtr == nullptr || subSkillNodePtr->skillPtr == nullptr)
252  {
253  ARMARX_WARNING << "Unexpected nullptr. Dynamic cast from FluxioNodePtr to "
254  "FluxioSubSkillNodePtr failed.";
255  setStatus(skills::SkillStatus::Failed);
256  return;
257  }
258 
259  this->handleSubSkillRoutine(subSkillNodePtr, running, executorName, profilePtr);
260  }
261  else
262  {
264  << "Unexpected node type '"
265  << skills::FluxioNodeTypeToString(startNode->nodeType).value_or("UNKNOWN")
266  << "' for Node with id " << startNode->nodeId;
267  setStatus(skills::SkillStatus::Failed);
268  return;
269  }
270  }
271 
272  void
273  FluxioCompositeExecutor::handleParameterRoutine(
275  std::atomic_bool& /*running*/,
276  const std::string& /*executorName*/)
277  {
278  if (parameterNode == nullptr || parameterNode->parameterPtr == nullptr)
279  {
280  ARMARX_WARNING << "Unexpected nullptr";
281  setStatus(skills::SkillStatus::Failed);
282  return;
283  }
284 
285  // make sure it is an event parameter
286  if (parameterNode->parameterPtr->type->getShortName() != "Object<Event>")
287  {
288  ARMARX_WARNING << "Unexpected parameter type "
289  << parameterNode->parameterPtr->type->getShortName();
290  setStatus(skills::SkillStatus::Failed);
291  return;
292  }
293 
294  const auto& skill = this->skill;
295  std::list<FluxioEdge> resEdges;
296  std::copy_if(skill.edges.begin(),
297  skill.edges.end(),
298  std::back_inserter(resEdges),
299  [](const skills::FluxioEdge& edge)
300  {
301  return (edge.isValid() &&
302  edge.toNodePtr->nodeType == FluxioNodeType::PARAMETER &&
303  edge.fromParameterPtr->type->getShortName() != "Object<Event>");
304  });
305 
306  std::unique_lock l(possibleInputsMutex);
307  for (const auto& e : resEdges)
308  {
309  const auto& fromParam = e.fromParameterPtr;
310  const auto& toParam = e.toParameterPtr;
311  auto fromNodeId = e.fromNodePtr->nodeId;
312  if (e.fromNodePtr->nodeType == FluxioNodeType::PARAMETER)
313  {
314  fromNodeId = skill.id;
315  }
316 
317  l.unlock();
319  this->getPossibleInputCopy(fromNodeId, fromParam->id);
320  l.lock();
321 
322  this->result->addElement(toParam->id, value);
323  }
324  l.unlock();
325 
326  // get the event type and set the status accordingly
327  const std::string& eventType = parameterNode->parameterPtr->name;
328  if (eventType == "Succeeded")
329  {
331  }
332  else if (eventType == "Failed")
333  {
334  setStatus(skills::SkillStatus::Failed);
335  }
336  else if (eventType == "Aborted")
337  {
338  setStatus(skills::SkillStatus::Aborted);
339  }
340  else
341  {
342  ARMARX_WARNING << "Unexpected event type " << eventType << " for parameter "
343  << parameterNode->parameterPtr->name;
344  setStatus(skills::SkillStatus::Failed);
345  }
346  }
347 
348  void
349  FluxioCompositeExecutor::handleSubSkillRoutine(
351  std::atomic_bool& running,
352  const std::string& executorName,
354  {
355  if (subSkillNode == nullptr || subSkillNode->skillPtr == nullptr)
356  {
357  ARMARX_WARNING << "Unexpected nullptr";
358  setStatus(skills::SkillStatus::Failed);
359  return;
360  }
361 
362  // gather parameters for the subskill
363  armarx::aron::data::DictPtr params = std::make_shared<armarx::aron::data::Dict>();
364 
365  std::list<FluxioEdge> paramEdges;
366  std::copy_if(skill.edges.begin(),
367  skill.edges.end(),
368  std::back_inserter(paramEdges),
369  [&subSkillNode](const skills::FluxioEdge& edge)
370  {
371  return (edge.isValid() && edge.toNodePtr->nodeId == subSkillNode->nodeId &&
372  edge.fromParameterPtr->type->getShortName() != "Object<Event>");
373  });
374 
375  for (const auto& e : paramEdges)
376  {
377  const auto& fromParam = e.fromParameterPtr;
378  const auto& toParam = e.toParameterPtr;
379  auto fromNodeId = e.fromNodePtr->nodeId;
380  if (e.fromNodePtr->nodeType == FluxioNodeType::PARAMETER)
381  {
382  fromNodeId = skill.id;
383  }
384  armarx::aron::data::VariantPtr value = getPossibleInputCopy(fromNodeId, fromParam->id);
385  if (value == nullptr)
386  {
387  ARMARX_WARNING << "Failed to get possible input for parameter " << fromParam->id
388  << " " << fromNodeId;
389  }
390  params->addElement(toParam->id, value);
391  }
392 
393  // start skill execution
394  const auto& executorRes =
395  executeFluxioSkill(subSkillNode->skillPtr->id, profilePtr->id, executorName, params);
396  if (!executorRes.isSuccess())
397  {
398  ARMARX_WARNING << "Failed to execute subskill " << subSkillNode->skillPtr->id;
399  setStatus(skills::SkillStatus::Failed);
400  return;
401  }
402  auto executorPtr = executorRes.getResult();
403  if (executorPtr == nullptr)
404  {
405  ARMARX_WARNING << "Failed to execute subskill " << subSkillNode->skillPtr->id;
406  setStatus(skills::SkillStatus::Failed);
407  return;
408  }
409 
410  std::unique_lock executionsLock(subExecutionsMapMutex);
411  subExecutionsMap[subSkillNode->nodeId] = executorPtr;
412  executionsLock.unlock();
413 
414  // wait until the skill has finished (or the super skill is finished)
415  skills::FluxioSkillStatusUpdate statusUpdate;
416  while (running)
417  {
418  // sleep for a while
419  std::this_thread::sleep_for(std::chrono::milliseconds(250));
420  executorPtr->getStatusUpdate(); // FIXME: bad design
421  const auto& statusUpdateIt = executorPtr->getStatus();
422 
423  if (!statusUpdateIt.has_value())
424  {
425  ARMARX_INFO << "No status update from skill " << subSkillNode->skillPtr->name
426  << " yet. Waiting...";
427  continue;
428  }
429 
430  statusUpdate = statusUpdateIt.value();
431  // did the status change? update statusUpdates list
432  std::unique_lock statusMapLock(statusUpdatesMutex);
433  const auto& lastUpdate =
434  std::find_if(statusUpdates.begin(),
435  statusUpdates.end(),
436  [&subSkillNode](const skills::FluxioSkillStatusUpdate& statusUpdate)
437  { return statusUpdate.subSkillNodeId == subSkillNode->nodeId; });
438 
439  if (lastUpdate == statusUpdates.end() || lastUpdate->status != statusUpdate.status)
440  {
441  statusUpdates.push_front({armarx::DateTime::Now(),
442  executorPtr->id,
443  subSkillNode->nodeId,
444  statusUpdate.status});
445  }
446 
447  statusMapLock.unlock();
448 
449  // check subskill is finished
450  if (statusUpdate.status == skills::SkillStatus::Succeeded ||
451  statusUpdate.status == skills::SkillStatus::Failed ||
452  statusUpdate.status == skills::SkillStatus::Aborted)
453  {
454  break;
455  }
456  }
457 
458  // check if the parent skill is still running
459  if (!running)
460  {
461  return;
462  }
463 
464  std::unique_lock possibleInputsLock(this->possibleInputsMutex);
465  this->possibleInputs[subSkillNode->nodeId] = executorPtr->getResultsCopy();
466  possibleInputsLock.unlock();
467 
468  // check the final skill status get the output event parameter
469  const std::string& outputEventName =
470  statusUpdate.status == skills::SkillStatus::Succeeded ? "Succeeded"
471  : statusUpdate.status == skills::SkillStatus::Failed ? "Failed"
472  : statusUpdate.status == skills::SkillStatus::Aborted ? "Aborted"
473  : "Undefined";
474  const auto& outputParam = std::find_if(
475  subSkillNode->skillPtr->parameters.begin(),
476  subSkillNode->skillPtr->parameters.end(),
477  [&outputEventName](const std::pair<std::string, skills::FluxioParameter>& param)
478  {
479  return (param.second.type->getShortName() == "Object<Event>" && !param.second.isInput &&
480  param.second.name == outputEventName);
481  });
482 
483  if (outputParam == subSkillNode->skillPtr->parameters.end())
484  {
485  ARMARX_WARNING << "Skill " << subSkillNode->skillPtr->name
486  << " is missing the output event parameter " << outputEventName;
487  setStatus(skills::SkillStatus::Failed);
488  return;
489  }
490 
491  // find the connected edge
492  const auto& edge =
493  std::find_if(skill.edges.begin(),
494  skill.edges.end(),
495  [&subSkillNode, &outputParam](const skills::FluxioEdge& edge)
496  {
497  return (edge.fromNodePtr->nodeId == subSkillNode->nodeId &&
498  edge.fromParameterPtr->id == outputParam->second.id);
499  });
500 
501  if (edge == skill.edges.end())
502  {
503  if (outputEventName == "Failed") {
504  setStatus(skills::SkillStatus::Failed);
505  return;
506  }
507 
508  if (outputEventName == "Aborted") {
509  setStatus(skills::SkillStatus::Aborted);
510  return;
511  }
512 
513  ARMARX_WARNING << "Skill " << skill.name
514  << " has no edge connected to the output event parameter "
515  << outputEventName;
516  setStatus(skills::SkillStatus::Failed);
517  return;
518  }
519 
520  // start new subroutine
521  const std::string& nextExecutorName = executorName + "/" + subSkillNode->skillPtr->name;
522  startSubRoutine(
523  edge->toNodePtr, edge->toParameterPtr, running, nextExecutorName, profilePtr);
524  }
525 
526  void
527  FluxioCompositeExecutor::handleControlRoutine(
530  std::atomic_bool& running,
531  const std::string& executorName,
533  {
534  if (controlNode == nullptr)
535  {
536  ARMARX_WARNING << "Unexpected nullptr";
537  setStatus(skills::SkillStatus::Failed);
538  return;
539  }
540 
541  // check the controlType
542  if (controlNode->controlType == skills::FluxioControlNodeType::SPLITTER)
543  {
544  // find connected nodes and store the relevant edges
545  std::vector<std::experimental::observer_ptr<const skills::FluxioEdge>> edgePtrs = {};
546  for (const auto& [id, param] : controlNode->parametersMap)
547  {
548  // ignore the input parameters
549  if (param.isInput)
550  {
551  continue;
552  }
553 
554  for (const auto& edge : skill.edges)
555  {
556  if (edge.fromParameterPtr != nullptr && edge.fromParameterPtr->id == id)
557  {
558  edgePtrs.push_back(std::experimental::make_observer(&edge));
559  }
560  }
561  }
562 
563  const size_t allParams = edgePtrs.size();
564  int param = 1;
565 
566  // start subroutines in separate threads
568  {
569  const std::string newExecutorName = executorName + "/Splitter" + "(" +
570  std::to_string(param) + "/" +
571  std::to_string(allParams) + ")";
572  std::thread(
573  [this, edgePtr, &running, newExecutorName, profilePtr]
574  {
575  startSubRoutine(edgePtr->toNodePtr,
576  edgePtr->toParameterPtr,
577  running,
578  newExecutorName,
579  profilePtr);
580  })
581  .detach();
582  param++;
583  }
584  }
585  else if (controlNode->controlType == skills::FluxioControlNodeType::AND_MERGER)
586  {
587  // check the list of subexecutions for the node id
588  std::experimental::observer_ptr<FluxioMergerExecutor> mergerExecutorPtr = nullptr;
589  std::unique_lock executionsLock(subExecutionsMapMutex);
590  const auto& executorPtr = subExecutionsMap.find(controlNode->nodeId);
591  if (executorPtr == subExecutionsMap.end())
592  {
593  // assemble paramId vector
594  std::vector<std::string> paramIds = {};
595  for (const auto& [id, param] : controlNode->parametersMap)
596  {
597  if (param.isInput)
598  {
599  paramIds.push_back(id);
600  }
601  }
602 
603  // there is no execution for the merger yet, let´s start one
604  mergerExecutorPtr = std::experimental::make_observer(
605  dynamic_cast<FluxioMergerExecutor*>(addMergerExecutorToDC(paramIds).get()));
606  subExecutionsMap.emplace(controlNode->nodeId, mergerExecutorPtr);
607  executionsLock.unlock();
608  }
609  else
610  {
611  executionsLock.unlock();
612 
613  mergerExecutorPtr = std::experimental::make_observer(
614  dynamic_cast<FluxioMergerExecutor*>(executorPtr->second.get()));
615  }
616 
617  if (mergerExecutorPtr == nullptr)
618  {
619  ARMARX_WARNING << "Unexpected nullptr. Dynamic cast from FluxioExecutorPtr to "
620  "FluxioMergerExecutorPtr failed.";
621  setStatus(skills::SkillStatus::Failed);
622  return;
623  }
624 
625  // check in the token
626  mergerExecutorPtr->checkInToken(startParameter->id);
627 
628  if (mergerExecutorPtr->getStatus() != std::nullopt)
629  {
630  return;
631  }
632 
633  // reuse this thread
634  mergerExecutorPtr->run(executorName, nullptr, profilePtr);
635 
636  const auto& outputParam =
637  std::find_if(controlNode->parametersMap.begin(),
638  controlNode->parametersMap.end(),
639  [](const std::pair<std::string, skills::FluxioParameter>& param)
640  { return !param.second.isInput; });
641 
642  if (outputParam == controlNode->parametersMap.end())
643  {
644  ARMARX_WARNING << "Control node " << controlNode->nodeId
645  << " has no output parameter";
646  setStatus(skills::SkillStatus::Failed);
647  return;
648  }
649 
650  // find the connected edge
651  const auto& edge =
652  std::find_if(skill.edges.begin(),
653  skill.edges.end(),
654  [&controlNode, &outputParam](const skills::FluxioEdge& edge)
655  {
656  return (edge.fromNodePtr->nodeId == controlNode->nodeId &&
657  edge.fromParameterPtr->id == outputParam->second.id);
658  });
659 
660  if (edge == skill.edges.end())
661  {
663  << "Skill " << skill.name
664  << " has no edge connected to the output event parameter of the AND merger";
665  setStatus(skills::SkillStatus::Failed);
666  return;
667  }
668 
669  // start new subroutine
670  startSubRoutine(
671  edge->toNodePtr, edge->toParameterPtr, running, executorName, profilePtr);
672  }
673  else
674  {
675  ARMARX_WARNING << "Unexpected control type ";
676  setStatus(skills::SkillStatus::Failed);
677  return;
678  }
679  }
680 
681  void
683  {
684  ARMARX_INFO << "Aborting skill " << skill.name;
685  setStatus(skills::SkillStatus::Aborted);
686  abortSubExecutions();
687  }
688 
689  void
690  FluxioCompositeExecutor::abortSubExecutions()
691  {
692  std::shared_lock executionsLock(subExecutionsMapMutex);
693  for (const auto& [nodeId, executorPtr] : subExecutionsMap)
694  {
695  auto s = executorPtr->getStatus();
696  if (!s.has_value() || s->status == skills::SkillStatus::Succeeded ||
697  s->status == skills::SkillStatus::Failed ||
698  s->status == skills::SkillStatus::Aborted)
699  {
700  continue;
701  }
702 
703  executorPtr->abort();
704  std::unique_lock statusMapLock(statusUpdatesMutex);
705  statusUpdates.push_front(
706  {armarx::DateTime::Now(), executorPtr->id, nodeId, skills::SkillStatus::Aborted});
707  statusMapLock.unlock();
708  }
709  executionsLock.unlock();
710  }
711 
712  std::optional<std::vector<skills::FluxioSkillStatusUpdate>>
714  {
715  ARMARX_INFO << "Getting status updates for skill " << skill.name;
716  // convert statusupdates list to vector
717  std::shared_lock statusMapLock(statusUpdatesMutex);
718  auto ret = std::vector<skills::FluxioSkillStatusUpdate>(statusUpdates.begin(),
719  statusUpdates.end());
720  statusMapLock.unlock();
721  return ret;
722  }
723 
724  void
725  FluxioCompositeExecutor::pollSubStatuses()
726  {
727  std::scoped_lock l(subExecutionsMapMutex, statusUpdatesMutex);
728  for (const auto& [nodeId, executorPtr] : subExecutionsMap)
729  {
730  executorPtr->getStatusUpdate();
731  auto s = executorPtr->getStatus();
732  if (!s.has_value())
733  {
734  continue;
735  }
736 
737  const auto& lastStatus =
738  find_if(statusUpdates.begin(),
739  statusUpdates.end(),
740  [&](const skills::FluxioSkillStatusUpdate& statusUpdate)
741  { return statusUpdate.subSkillNodeId == nodeId; })
742  ->status;
743 
744  if (lastStatus != s->status)
745  {
746  statusUpdates.push_front(
747  {armarx::DateTime::Now(), executorPtr->id, nodeId, s->status});
748  }
749  }
750  }
751 
752  bool
754  {
755 
756  // get start parameter
757  const auto& startParam =
758  std::find_if(skill.parameters.begin(),
759  skill.parameters.end(),
760  [](const std::pair<std::string, skills::FluxioParameter>& param)
761  {
762  return (param.second.type->getShortName() == "Object<Event>" &&
763  param.second.isInput && param.second.name == "Start");
764  });
765 
766  if (startParam == skill.parameters.end())
767  {
768  ARMARX_WARNING << "Skill has no start parameter";
769  return false;
770  }
771 
772  // get all parameter nodes for the start parameter
773  const auto& startNode = std::find_if(
774  skill.nodes.begin(),
775  skill.nodes.end(),
776  [startParam](const std::pair<const std::string,
777  const std::unique_ptr<skills::FluxioNode>>& nodeEntry)
778  {
779  if (nodeEntry.second->nodeType != skills::FluxioNodeType::PARAMETER)
780  {
781  return false;
782  }
783 
784  const auto& paramNode =
785  dynamic_cast<const skills::FluxioParameterNode*>(nodeEntry.second.get());
786  return (paramNode->parameterPtr->id == startParam->second.id);
787  });
788 
789  // there can only be one
790  if (startNode == skill.nodes.end())
791  {
792  ARMARX_WARNING << "Skill has no start node";
793  return false;
794  }
795 
796  // check if the start node is connected
797  const auto& startEdge =
798  std::find_if(skill.edges.begin(),
799  skill.edges.end(),
800  [startNode](const skills::FluxioEdge& edge)
801  { return (edge.fromNodePtr->nodeId == startNode->second->nodeId); });
802 
803  // there can only be one
804  if (startEdge == skill.edges.end())
805  {
806  ARMARX_WARNING << "Skill has no edge connected to the start node";
807  return false;
808  }
809 
810  // get the output event parameters
811  const auto& outputParamsSuccess =
812  std::find_if(skill.parameters.begin(),
813  skill.parameters.end(),
814  [](const std::pair<std::string, skills::FluxioParameter>& param)
815  {
816  return (param.second.type->getShortName() == "Object<Event>" &&
817  !param.second.isInput && param.second.name == "Succeeded");
818  });
819  const auto& outputParamsFailed =
820  std::find_if(skill.parameters.begin(),
821  skill.parameters.end(),
822  [](const std::pair<std::string, skills::FluxioParameter>& param)
823  {
824  return (param.second.type->getShortName() == "Object<Event>" &&
825  !param.second.isInput && param.second.name == "Failed");
826  });
827  const auto& outputParamsAborted =
828  std::find_if(skill.parameters.begin(),
829  skill.parameters.end(),
830  [](const std::pair<std::string, skills::FluxioParameter>& param)
831  {
832  return (param.second.type->getShortName() == "Object<Event>" &&
833  !param.second.isInput && param.second.name == "Aborted");
834  });
835 
836  if (outputParamsSuccess == skill.parameters.end() ||
837  outputParamsFailed == skill.parameters.end() ||
838  outputParamsAborted == skill.parameters.end())
839  {
840  ARMARX_WARNING << "Skill is missing one or more output event parameters";
841  return false;
842  }
843 
844  // TODO: the rest
845 
846  ARMARX_INFO << "Skill validation is not fully implemented yet.";
847  ret = *startEdge;
848  return true;
849  }
850 
851  void
852  FluxioCompositeExecutor::setStatus(skills::SkillStatus status, const std::string& /*nodeId*/)
853  {
854  FluxioExecutor::setStatus(status, skill.id);
855  }
856 } // namespace armarx::skills
armarx::skills::FluxioSkill
Definition: FluxioSkill.h:25
FluxioCompositeExecutor.h
armarx::skills::FluxioEdge
Definition: FluxioEdge.h:15
armarx::skills::FluxioSkill::nodes
std::map< const std::string, const std::unique_ptr< FluxioNode > > nodes
Definition: FluxioSkill.h:40
armarx::skills::FluxioExecutor::status
std::optional< skills::FluxioSkillStatusUpdate > status
Definition: FluxioExecutor.h:62
armarx::aron::ret
ReaderT::InputType T & ret
Definition: rw.h:13
armarx::skills::FluxioExecutor::resultMutex
std::shared_mutex resultMutex
Definition: FluxioExecutor.h:57
armarx::skills::SkillStatus::Aborted
@ Aborted
armarx::skills::FluxioParameterNode
Definition: FluxioParameterNode.h:14
armarx::skills::FluxioSkill::id
std::string id
Definition: FluxioSkill.h:27
armarx::skills::error::FluxioException
A base class for skill exceptions.
Definition: FluxioException.h:24
armarx::skills::FluxioExecutor::result
armarx::aron::data::DictPtr result
Definition: FluxioExecutor.h:56
armarx::skills
This file is part of ArmarX.
Definition: PeriodicUpdateWidget.cpp:11
armarx::skills::FluxioNodeType::PARAMETER
@ PARAMETER
armarx::skills::FluxioExecutor::executorName
std::optional< std::string > executorName
Definition: FluxioExecutor.h:51
armarx::core::time::DateTime::Now
static DateTime Now()
Definition: DateTime.cpp:51
armarx::skills::FluxioExecutor::statusMutex
std::shared_mutex statusMutex
Definition: FluxioExecutor.h:63
DateTime.h
armarx::skills::FluxioExecutor::getPossibleInputCopy
virtual armarx::aron::data::VariantPtr getPossibleInputCopy(const std::string nodeId, const std::string parameterId)
Definition: FluxioExecutor.cpp:28
armarx::skills::FluxioExecutor::possibleInputsMutex
std::shared_mutex possibleInputsMutex
Definition: FluxioExecutor.h:60
std::experimental::fundamentals_v2::make_observer
observer_ptr< _Tp > make_observer(_Tp *__p) noexcept
armarx::skills::FluxioExecutor::possibleInputs
std::map< std::string, armarx::aron::data::DictPtr > possibleInputs
Definition: FluxioExecutor.h:59
FluxioProfile.h
armarx::skills::FluxioExecutor
Definition: FluxioExecutor.h:21
return
return() armarx_set_target("PointCloudViewerGuiPlugin") find_package(MPI REQUIRED) find_package(VTK REQUIRED) find_package(Coin QUIET) armarx_build_if(Coin_FOUND "Coin 3D not found") set(COMPONENT_LIBS VisionXInterfaces VisionXCore VisionXPointCloud VTK
Definition: CMakeLists.txt:1
std::experimental::fundamentals_v2::observer_ptr
Definition: ManagedIceObject.h:53
SkillManagerComponentPlugin.h
armarx::core::time::Duration::isPositive
bool isPositive() const
Tests whether the duration is positive (value in µs > 0).
Definition: Duration.cpp:168
armarx::skills::FluxioSkillStatusUpdate
Definition: FluxioSkillStatusUpdate.h:10
armarx::aron::data::VariantPtr
std::shared_ptr< Variant > VariantPtr
Definition: forward_declarations.h:11
armarx::status
status
Definition: FiniteStateMachine.h:244
armarx::skills::FluxioExecutor::findParameterValue
armarx::aron::data::VariantPtr findParameterValue(const std::experimental::observer_ptr< const FluxioProfile > profilePtr, const FluxioParameter &parameter) const
Definition: FluxioExecutor.cpp:75
cxxopts::value
std::shared_ptr< Value > value()
Definition: cxxopts.hpp:855
armarx::skills::FluxioNodeTypeToString
std::optional< std::string > FluxioNodeTypeToString(const FluxioNodeType &type)
Definition: FluxioNode.cpp:41
armarx::skills::FluxioNodeType::CONTROL
@ CONTROL
FluxioException.h
armarx::skills::SkillStatus::Preparing
@ Preparing
armarx::skills::FluxioControlNodeType::AND_MERGER
@ AND_MERGER
FluxioExecutor.h
armarx::to_string
const std::string & to_string(const std::string &s)
Definition: StringHelpers.h:41
armarx::skills::FluxioSkill::timeout
armarx::Duration timeout
How long (in ms) to wait for the skill to finish execution before timing out.
Definition: FluxioSkill.h:33
armarx::skills::FluxioEdge::toParameterPtr
std::experimental::observer_ptr< const FluxioParameter > toParameterPtr
Definition: FluxioEdge.h:20
armarx::aron::data::DictPtr
std::shared_ptr< Dict > DictPtr
Definition: Dict.h:41
armarx::skills::FluxioCompositeExecutor::abort
void abort() override
Definition: FluxioCompositeExecutor.cpp:682
armarx::skills::SkillStatus::Initializing
@ Initializing
Exception.h
armarx::skills::FluxioCompositeExecutor::FluxioCompositeExecutor
FluxioCompositeExecutor(const std::string &id, const skills::FluxioSkill &skill, const std::function< void(const std::string &executionId)> &&abortFluxioSkillFunc, const std::function< skills::Result< std::experimental::observer_ptr< FluxioExecutor >, skills::error::FluxioException >(const std::string &skillId, const std::string &profileId, const std::string &executorName, armarx::aron::data::DictPtr parameters)> &&executeFluxioSkillFunc, const std::function< std::experimental::observer_ptr< FluxioExecutor >(const std::vector< std::string > &parameterIds)> &&addMergerExecutorToDCFunc)
Definition: FluxioCompositeExecutor.cpp:32
ARMARX_INFO
#define ARMARX_INFO
Definition: Logging.h:181
armarx::skills::FluxioExecutor::statusUpdates
std::list< skills::FluxioSkillStatusUpdate > statusUpdates
Definition: FluxioExecutor.h:53
armarx::skills::FluxioExecutor::statusUpdatesMutex
std::shared_mutex statusUpdatesMutex
Definition: FluxioExecutor.h:54
armarx::skills::SkillStatus::Running
@ Running
armarx::skills::FluxioCompositeExecutor::run
void run(std::string executorName, armarx::aron::data::DictPtr parameters, std::experimental::observer_ptr< const FluxioProfile > profilePtr) override
Definition: FluxioCompositeExecutor.cpp:62
armarx::core::time::Duration::toMilliSeconds
std::int64_t toMilliSeconds() const
Returns the amount of milliseconds.
Definition: Duration.cpp:60
Logging.h
min
T min(T t1, T t2)
Definition: gdiam.h:44
ARMARX_WARNING
#define ARMARX_WARNING
Definition: Logging.h:193
armarx::skills::SkillStatus::Constructing
@ Constructing
armarx::skills::FluxioSkill::edges
std::list< FluxioEdge > edges
Definition: FluxioSkill.h:41
armarx::skills::FluxioNodeType::SUBSKILL
@ SUBSKILL
armarx::skills::FluxioCompositeExecutor::validateSkill
bool validateSkill(skills::FluxioEdge &ret) const
Definition: FluxioCompositeExecutor.cpp:753
armarx::skills::FluxioSkill::name
std::string name
Definition: FluxioSkill.h:28
FluxioMergerExecutor.h
armarx::ctrlutil::s
double s(double t, double s0, double v0, double a0, double j)
Definition: CtrlUtil.h:33
armarx::skills::FluxioEdge::toNodePtr
std::experimental::observer_ptr< const FluxioNode > toNodePtr
Definition: FluxioEdge.h:19
armarx::skills::Result
Definition: FluxioResult.h:12
FluxioNode.h
armarx::skills::FluxioCompositeExecutor::getStatusUpdate
std::optional< std::vector< skills::FluxioSkillStatusUpdate > > getStatusUpdate() override
Definition: FluxioCompositeExecutor.cpp:713
armarx::skills::SkillStatus
SkillStatus
Definition: SkillStatusUpdate.h:15
armarx::skills::SkillStatus::Succeeded
@ Succeeded
armarx::skills::FluxioSkill::parameters
std::map< std::string, FluxioParameter > parameters
Definition: FluxioSkill.h:39
armarx::skills::FluxioControlNodeType::SPLITTER
@ SPLITTER
armarx::skills::SkillStatus::Failed
@ Failed