FluxioCompositeExecutor.cpp
Go to the documentation of this file.
2 
3 #include <algorithm>
4 #include <chrono>
5 #include <memory>
6 #include <mutex>
7 #include <optional>
8 #include <shared_mutex>
9 #include <string>
10 #include <thread>
11 #include <vector>
12 
13 #include <IceUtil/UUID.h>
14 
17 
24 
25 #include "../FluxioControlNode.h"
26 #include "../FluxioEdge.h"
27 #include "../FluxioParameter.h"
28 #include "FluxioMergerExecutor.h"
29 
30 namespace armarx::skills
31 {
33  const std::string& id,
34  const skills::FluxioSkill& skill,
35  const std::function<void(const std::string& executionId)>&& abortFluxioSkillFunc,
38  const std::string& skillId,
39  const std::string& profileId,
40  const std::string& executorName,
41  armarx::aron::data::DictPtr parameters)>&& executeFluxioSkillFunc,
43  const std::vector<std::string>& parameterIds)>&& addMergerExecutorToDCFunc) :
44  FluxioExecutor(id, false),
45  abortFluxioSkill(abortFluxioSkillFunc),
46  executeFluxioSkill(executeFluxioSkillFunc),
47  addMergerExecutorToDC(addMergerExecutorToDCFunc),
48  skill(skill)
49  {
50  std::scoped_lock l(possibleInputsMutex);
51  possibleInputs[skill.id] = nullptr;
52  for (const auto& node : skill.nodes)
53  {
54  possibleInputs[node.first] = nullptr;
55  }
56 
57  std::scoped_lock resultLock(resultMutex);
58  result = std::make_shared<armarx::aron::data::Dict>();
59  }
60 
61  void
62  FluxioCompositeExecutor::run(const std::string executorName,
63  armarx::aron::data::DictPtr parameters,
65  {
66  ARMARX_INFO << "Running skill " << skill.name;
68  this->executorName = executorName;
69 
70  std::unique_lock possibleInputsLock(possibleInputsMutex);
71  this->possibleInputs[skill.id] = parameters;
72 
73  // fill in missing with default values
74  for (const auto& [key, param] : this->skill.parameters)
75  {
76  if (param.type->getShortName() == "Object<Event>" || !param.isInput ||
77  this->possibleInputs[this->skill.id]->hasElement(key))
78  {
79  continue;
80  }
81  const auto& fallBack = this->findParameterValue(profilePtr, param);
82 
83  if (fallBack == nullptr)
84  {
85  ARMARX_INFO << "No fallback value found for parameter " << param.name;
86  if (!param.required)
87  {
89  << "Parameter is not required. Using nullptr (std::nullopt) instead.";
90  }
91  else
92  {
93  ARMARX_INFO << "Can't execute skill due to incomplete params.";
94  ARMARX_INFO << "Aborting skill execution.";
95  this->setStatus(skills::SkillStatus::Aborted);
96  return;
97  }
98  }
99 
100  this->possibleInputs[this->skill.id]->addElement(key, fallBack);
101  }
102  possibleInputsLock.unlock();
103 
104  skills::FluxioEdge startEdge;
105  if (!validateSkill(startEdge))
106  {
107 
108  ARMARX_WARNING << "Skill execution cancelled.";
109  setStatus(skills::SkillStatus::Failed);
110  return;
111  }
112 
114 
115  std::unique_lock executionsLock(subExecutionsMapMutex);
116  subExecutionsMap.clear();
117  executionsLock.unlock();
118  std::unique_lock resultLock(resultMutex);
119  result = std::make_shared<armarx::aron::data::Dict>();
120  resultLock.unlock();
121 
123 
124  std::atomic_bool skillRunning = true;
125 
126  // thread for polling status updates
127  std::thread(
128  [this, &skillRunning]
129  {
130  while (skillRunning)
131  {
132  this->pollSubStatuses();
133  std::this_thread::sleep_for(
134  std::chrono::milliseconds(pollingFrequency.toMilliSeconds()));
135  }
136  })
137  .detach();
138 
139  // thread for the sub routines
140  const std::string newExecutorName = executorName + "/" + skill.name;
141  std::thread(
142  [this, &startEdge, &skillRunning, newExecutorName, profilePtr]
143  {
144  this->startSubRoutine(startEdge.toNodePtr,
145  startEdge.toParameterPtr,
146  skillRunning,
147  newExecutorName,
148  profilePtr);
149  })
150  .detach();
151 
152  setStatus(skills::SkillStatus::Running);
153  const bool useTimeout = skill.timeout.isPositive();
154  const auto timeStarted = armarx::DateTime::Now();
155 
156  // if the skill has a timout smaller than the polling frequency of the executor, use the skill´s timeout
157  // as pollingfrequency instead to avoid missing the timeout ('overshoot')
158  const auto sleepTime =
159  useTimeout ? std::min(pollingFrequency, skill.timeout) : pollingFrequency;
160 
161  // main loop
162  while (skillRunning)
163  {
164  std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime.toMilliSeconds()));
165 
166  if (useTimeout && armarx::DateTime::Now() >= timeStarted + skill.timeout)
167  {
168  ARMARX_WARNING << "Skill " << skill.name << " timed out.";
169  abortSubExecutions();
170  setStatus(skills::SkillStatus::Aborted);
171 
172  // wait for the thread to update the status map one last time
173  std::this_thread::sleep_for(
174  std::chrono::milliseconds(pollingFrequency.toMilliSeconds()));
175  skillRunning = false;
176  return;
177  }
178 
179  std::shared_lock l(statusMutex);
180  const auto s = this->status;
181  l.unlock();
182 
183  if (s->status == skills::SkillStatus::Aborted ||
184  s->status == skills::SkillStatus::Failed ||
186  {
187  // the skill was aborted with the abort method, no need to abort the
188  // SubExecutions twice
189  if (s->status != skills::SkillStatus::Aborted)
190  {
191  abortSubExecutions();
192  }
193 
194  // wait for the thread to update the status map one last time
195  std::this_thread::sleep_for(
196  std::chrono::milliseconds(pollingFrequency.toMilliSeconds()));
197  skillRunning = false;
198  return;
199  }
200  }
201  }
202 
203  void
204  FluxioCompositeExecutor::startSubRoutine(
207  std::atomic_bool& running,
208  const std::string& executorName,
210  {
211  if (!running)
212  {
213  return;
214  }
215 
216  if (startNode == nullptr)
217  {
218  ARMARX_WARNING << "Unexpected nullptr";
219  setStatus(skills::SkillStatus::Failed);
220  return;
221  }
222 
223  if (startNode->nodeType == skills::FluxioNodeType::PARAMETER)
224  {
225  // cast to parameter node
226  const auto& paramNodePtr = std::experimental::make_observer(
227  dynamic_cast<const skills::FluxioParameterNode*>(startNode.get()));
228  if (paramNodePtr == nullptr)
229  {
230  ARMARX_WARNING << "Unexpected nullptr. Dynamic cast from FluxioNodePtr to "
231  "FluxioParameterNodePtr failed.";
232  setStatus(skills::SkillStatus::Failed);
233  return;
234  }
235 
236  this->handleParameterRoutine(paramNodePtr, running, executorName);
237  }
238  else if (startNode->nodeType == skills::FluxioNodeType::CONTROL)
239  {
240  // cast to control node
241  const auto& controlNodePtr = std::experimental::make_observer(
242  dynamic_cast<const skills::FluxioControlNode*>(startNode.get()));
243 
244  if (controlNodePtr == nullptr)
245  {
246  ARMARX_WARNING << "Unexpected nullptr. Dynamic cast from FluxioNodePtr to "
247  "FluxioControlNodePtr failed.";
248  setStatus(skills::SkillStatus::Failed);
249  return;
250  }
251 
252  this->handleControlRoutine(
253  controlNodePtr, startParameter, running, executorName, profilePtr);
254  }
255  else if (startNode->nodeType == skills::FluxioNodeType::SUBSKILL)
256  {
257  // cast to subskill node
258  const auto& subSkillNodePtr = std::experimental::make_observer(
259  dynamic_cast<const skills::FluxioSubSkillNode*>(startNode.get()));
260  if (subSkillNodePtr == nullptr || subSkillNodePtr->skillPtr == nullptr)
261  {
262  ARMARX_WARNING << "Unexpected nullptr. Dynamic cast from FluxioNodePtr to "
263  "FluxioSubSkillNodePtr failed.";
264  setStatus(skills::SkillStatus::Failed);
265  return;
266  }
267 
268  this->handleSubSkillRoutine(subSkillNodePtr, running, executorName, profilePtr);
269  }
270  else
271  {
273  << "Unexpected node type '"
274  << skills::FluxioNodeTypeToString(startNode->nodeType).value_or("UNKNOWN")
275  << "' for Node with id " << startNode->nodeId;
276  setStatus(skills::SkillStatus::Failed);
277  return;
278  }
279  }
280 
281  void
282  FluxioCompositeExecutor::handleParameterRoutine(
284  std::atomic_bool& /*running*/,
285  const std::string& /*executorName*/)
286  {
287  if (parameterNode == nullptr || parameterNode->parameterPtr == nullptr)
288  {
289  ARMARX_WARNING << "Unexpected nullptr";
290  setStatus(skills::SkillStatus::Failed);
291  return;
292  }
293 
294  // make sure it is an event parameter
295  if (parameterNode->parameterPtr->type->getShortName() != "Object<Event>")
296  {
297  ARMARX_WARNING << "Unexpected parameter type "
298  << parameterNode->parameterPtr->type->getShortName();
299  setStatus(skills::SkillStatus::Failed);
300  return;
301  }
302 
303  const auto& skill = this->skill;
304  std::list<FluxioEdge> resEdges;
305  std::copy_if(skill.edges.begin(),
306  skill.edges.end(),
307  std::back_inserter(resEdges),
308  [](const skills::FluxioEdge& edge)
309  {
310  return (edge.isValid() &&
311  edge.toNodePtr->nodeType == FluxioNodeType::PARAMETER &&
312  edge.fromParameterPtr->type->getShortName() != "Object<Event>");
313  });
314 
315  std::unique_lock l(possibleInputsMutex);
316  for (const auto& e : resEdges)
317  {
318  const auto& fromParam = e.fromParameterPtr;
319  const auto& toParam = e.toParameterPtr;
320  auto fromNodeId = e.fromNodePtr->nodeId;
321  if (e.fromNodePtr->nodeType == FluxioNodeType::PARAMETER)
322  {
323  fromNodeId = skill.id;
324  }
325 
326  l.unlock();
328  this->getPossibleInputCopy(fromNodeId, fromParam->id);
329  l.lock();
330 
331  this->result->addElement(toParam->id, value);
332  }
333  l.unlock();
334 
335  // get the event type and set the status accordingly
336  const std::string& eventType = parameterNode->parameterPtr->name;
337  if (eventType == "Succeeded")
338  {
340  }
341  else if (eventType == "Failed")
342  {
343  setStatus(skills::SkillStatus::Failed);
344  }
345  else if (eventType == "Aborted")
346  {
347  setStatus(skills::SkillStatus::Aborted);
348  }
349  else
350  {
351  ARMARX_WARNING << "Unexpected event type " << eventType << " for parameter "
352  << parameterNode->parameterPtr->name;
353  setStatus(skills::SkillStatus::Failed);
354  }
355  }
356 
357  void
358  FluxioCompositeExecutor::handleSubSkillRoutine(
360  std::atomic_bool& running,
361  const std::string& executorName,
363  {
364  if (subSkillNode == nullptr || subSkillNode->skillPtr == nullptr)
365  {
366  ARMARX_WARNING << "Unexpected nullptr";
367  setStatus(skills::SkillStatus::Failed);
368  return;
369  }
370 
371  // gather parameters for the subskill
372  armarx::aron::data::DictPtr params = std::make_shared<armarx::aron::data::Dict>();
373 
374  std::list<FluxioEdge> paramEdges;
375  std::copy_if(skill.edges.begin(),
376  skill.edges.end(),
377  std::back_inserter(paramEdges),
378  [&subSkillNode](const skills::FluxioEdge& edge)
379  {
380  return (edge.isValid() && edge.toNodePtr->nodeId == subSkillNode->nodeId &&
381  edge.fromParameterPtr->type->getShortName() != "Object<Event>");
382  });
383 
384  for (const auto& e : paramEdges)
385  {
386  const auto& fromParam = e.fromParameterPtr;
387  const auto& toParam = e.toParameterPtr;
388  auto fromNodeId = e.fromNodePtr->nodeId;
389  if (e.fromNodePtr->nodeType == FluxioNodeType::PARAMETER)
390  {
391  fromNodeId = skill.id;
392  }
393  armarx::aron::data::VariantPtr value = getPossibleInputCopy(fromNodeId, fromParam->id);
394  if (value == nullptr)
395  {
396  ARMARX_WARNING << "Failed to get possible input for parameter " << fromParam->id
397  << " " << fromNodeId;
398  }
399  params->addElement(toParam->id, value);
400  }
401 
402  // start skill execution
403  const auto& executorRes =
404  executeFluxioSkill(subSkillNode->skillPtr->id, profilePtr->id, executorName, params);
405  if (!executorRes.isSuccess())
406  {
407  ARMARX_WARNING << "Failed to execute subskill " << subSkillNode->skillPtr->id;
408  setStatus(skills::SkillStatus::Failed);
409  return;
410  }
411  auto executorPtr = executorRes.getResult();
412  if (executorPtr == nullptr)
413  {
414  ARMARX_WARNING << "Failed to execute subskill " << subSkillNode->skillPtr->id;
415  setStatus(skills::SkillStatus::Failed);
416  return;
417  }
418 
419  std::unique_lock executionsLock(subExecutionsMapMutex);
420  subExecutionsMap[subSkillNode->nodeId] = executorPtr;
421  executionsLock.unlock();
422 
423  // wait until the skill has finished (or the super skill is finished)
424  skills::FluxioSkillStatusUpdate statusUpdate;
425  while (running)
426  {
427  // sleep for a while
428  std::this_thread::sleep_for(std::chrono::milliseconds(250));
429  executorPtr->getStatusUpdate(); // FIXME: bad design
430  const auto& statusUpdateIt = executorPtr->getStatus();
431 
432  if (!statusUpdateIt.has_value())
433  {
434  ARMARX_INFO << "No status update from skill " << subSkillNode->skillPtr->name
435  << " yet. Waiting...";
436  continue;
437  }
438 
439  statusUpdate = statusUpdateIt.value();
440  // did the status change? update statusUpdates list
441  std::unique_lock statusMapLock(statusUpdatesMutex);
442  const auto& lastUpdate =
443  std::find_if(statusUpdates.begin(),
444  statusUpdates.end(),
445  [&subSkillNode](const skills::FluxioSkillStatusUpdate& statusUpdate)
446  { return statusUpdate.subSkillNodeId == subSkillNode->nodeId; });
447 
448  if (lastUpdate == statusUpdates.end() || lastUpdate->status != statusUpdate.status)
449  {
450  statusUpdates.push_front({armarx::DateTime::Now(),
451  executorPtr->id,
452  subSkillNode->nodeId,
453  statusUpdate.status});
454  }
455 
456  statusMapLock.unlock();
457 
458  // check subskill is finished
459  if (statusUpdate.status == skills::SkillStatus::Succeeded ||
460  statusUpdate.status == skills::SkillStatus::Failed ||
461  statusUpdate.status == skills::SkillStatus::Aborted)
462  {
463  break;
464  }
465  }
466 
467  // check if the parent skill is still running
468  if (!running)
469  {
470  return;
471  }
472 
473  std::unique_lock possibleInputsLock(this->possibleInputsMutex);
474  this->possibleInputs[subSkillNode->nodeId] = executorPtr->getResultsCopy();
475  possibleInputsLock.unlock();
476 
477  // check the final skill status get the output event parameter
478  const std::string& outputEventName =
479  statusUpdate.status == skills::SkillStatus::Succeeded ? "Succeeded"
480  : statusUpdate.status == skills::SkillStatus::Failed ? "Failed"
481  : statusUpdate.status == skills::SkillStatus::Aborted ? "Aborted"
482  : "Undefined";
483  const auto& outputParam = std::find_if(
484  subSkillNode->skillPtr->parameters.begin(),
485  subSkillNode->skillPtr->parameters.end(),
486  [&outputEventName](const std::pair<std::string, skills::FluxioParameter>& param)
487  {
488  return (param.second.type->getShortName() == "Object<Event>" &&
489  !param.second.isInput && param.second.name == outputEventName);
490  });
491 
492  if (outputParam == subSkillNode->skillPtr->parameters.end())
493  {
494  ARMARX_WARNING << "Skill " << subSkillNode->skillPtr->name
495  << " is missing the output event parameter " << outputEventName;
496  setStatus(skills::SkillStatus::Failed);
497  return;
498  }
499 
500  // find the connected edge
501  const auto& edge =
502  std::find_if(skill.edges.begin(),
503  skill.edges.end(),
504  [&subSkillNode, &outputParam](const skills::FluxioEdge& edge)
505  {
506  return (edge.fromNodePtr->nodeId == subSkillNode->nodeId &&
507  edge.fromParameterPtr->id == outputParam->second.id);
508  });
509 
510  if (edge == skill.edges.end())
511  {
512  if (outputEventName == "Failed")
513  {
514  setStatus(skills::SkillStatus::Failed);
515  return;
516  }
517 
518  if (outputEventName == "Aborted")
519  {
520  setStatus(skills::SkillStatus::Aborted);
521  return;
522  }
523 
524  ARMARX_WARNING << "Skill " << skill.name
525  << " has no edge connected to the output event parameter "
526  << outputEventName;
527  setStatus(skills::SkillStatus::Failed);
528  return;
529  }
530 
531  // start new subroutine
532  const std::string& nextExecutorName = executorName + "/" + subSkillNode->skillPtr->name;
533  startSubRoutine(
534  edge->toNodePtr, edge->toParameterPtr, running, nextExecutorName, profilePtr);
535  }
536 
537  void
538  FluxioCompositeExecutor::handleControlRoutine(
541  std::atomic_bool& running,
542  const std::string& executorName,
544  {
545  if (controlNode == nullptr)
546  {
547  ARMARX_WARNING << "Unexpected nullptr";
548  setStatus(skills::SkillStatus::Failed);
549  return;
550  }
551 
552  // check the controlType
553  if (controlNode->controlType == skills::FluxioControlNodeType::SPLITTER)
554  {
555  // find connected nodes and store the relevant edges
556  std::vector<std::experimental::observer_ptr<const skills::FluxioEdge>> edgePtrs = {};
557  for (const auto& [id, param] : controlNode->parametersMap)
558  {
559  // ignore the input parameters
560  if (param.isInput)
561  {
562  continue;
563  }
564 
565  for (const auto& edge : skill.edges)
566  {
567  if (edge.isValid() && edge.fromNodePtr->nodeId == controlNode->nodeId &&
568  edge.fromParameterPtr->id == id)
569  {
570  edgePtrs.push_back(std::experimental::make_observer(&edge));
571  }
572  }
573  }
574 
575  const size_t allParams = edgePtrs.size();
576  int param = 1;
577 
578  // start subroutines in separate threads
580  {
581  const std::string newExecutorName = executorName + "/Splitter" + "(" +
582  std::to_string(param) + "/" +
583  std::to_string(allParams) + ")";
584  std::thread(
585  [this, edgePtr, &running, newExecutorName, profilePtr]
586  {
587  startSubRoutine(edgePtr->toNodePtr,
588  edgePtr->toParameterPtr,
589  running,
590  newExecutorName,
591  profilePtr);
592  })
593  .detach();
594  param++;
595  }
596  }
597  else if (controlNode->controlType == skills::FluxioControlNodeType::AND_MERGER)
598  {
599  // check the list of subexecutions for the node id
600  std::experimental::observer_ptr<FluxioMergerExecutor> mergerExecutorPtr = nullptr;
601  std::unique_lock executionsLock(subExecutionsMapMutex);
602  const auto& executorPtr = subExecutionsMap.find(controlNode->nodeId);
603  if (executorPtr == subExecutionsMap.end())
604  {
605  // assemble paramId vector
606  std::vector<std::string> paramIds = {};
607  for (const auto& [id, param] : controlNode->parametersMap)
608  {
609  if (param.isInput)
610  {
611  paramIds.push_back(id);
612  }
613  }
614 
615  // there is no execution for the merger yet, let´s start one
616  mergerExecutorPtr = std::experimental::make_observer(
617  dynamic_cast<FluxioMergerExecutor*>(addMergerExecutorToDC(paramIds).get()));
618  subExecutionsMap.emplace(controlNode->nodeId, mergerExecutorPtr);
619  executionsLock.unlock();
620  }
621  else
622  {
623  executionsLock.unlock();
624 
625  mergerExecutorPtr = std::experimental::make_observer(
626  dynamic_cast<FluxioMergerExecutor*>(executorPtr->second.get()));
627  }
628 
629  if (mergerExecutorPtr == nullptr)
630  {
631  ARMARX_WARNING << "Unexpected nullptr. Dynamic cast from FluxioExecutorPtr to "
632  "FluxioMergerExecutorPtr failed.";
633  setStatus(skills::SkillStatus::Failed);
634  return;
635  }
636 
637  // check in the token
638  mergerExecutorPtr->checkInToken(startParameter->id);
639 
640  if (mergerExecutorPtr->getStatus() != std::nullopt)
641  {
642  return;
643  }
644 
645  // reuse this thread
646  mergerExecutorPtr->run(executorName, nullptr, profilePtr);
647 
648  const auto& outputParam =
649  std::find_if(controlNode->parametersMap.begin(),
650  controlNode->parametersMap.end(),
651  [](const std::pair<std::string, skills::FluxioParameter>& param)
652  { return !param.second.isInput; });
653 
654  if (outputParam == controlNode->parametersMap.end())
655  {
656  ARMARX_WARNING << "Control node " << controlNode->nodeId
657  << " has no output parameter";
658  setStatus(skills::SkillStatus::Failed);
659  return;
660  }
661 
662  // find the connected edge
663  const auto& edge =
664  std::find_if(skill.edges.begin(),
665  skill.edges.end(),
666  [&controlNode, &outputParam](const skills::FluxioEdge& edge)
667  {
668  return (edge.fromNodePtr->nodeId == controlNode->nodeId &&
669  edge.fromParameterPtr->id == outputParam->second.id);
670  });
671 
672  if (edge == skill.edges.end())
673  {
675  << "Skill " << skill.name
676  << " has no edge connected to the output event parameter of the AND merger";
677  setStatus(skills::SkillStatus::Failed);
678  return;
679  }
680 
681  // start new subroutine
682  startSubRoutine(
683  edge->toNodePtr, edge->toParameterPtr, running, executorName, profilePtr);
684  }
685  else
686  {
687  ARMARX_WARNING << "Unexpected control type ";
688  setStatus(skills::SkillStatus::Failed);
689  return;
690  }
691  }
692 
693  void
695  {
696  ARMARX_INFO << "Aborting skill " << skill.name;
697  setStatus(skills::SkillStatus::Aborted);
698  abortSubExecutions();
699  }
700 
701  void
702  FluxioCompositeExecutor::abortSubExecutions()
703  {
704  std::shared_lock executionsLock(subExecutionsMapMutex);
705  for (const auto& [nodeId, executorPtr] : subExecutionsMap)
706  {
707  auto s = executorPtr->getStatus();
708  if (!s.has_value() || s->status == skills::SkillStatus::Succeeded ||
709  s->status == skills::SkillStatus::Failed ||
710  s->status == skills::SkillStatus::Aborted)
711  {
712  continue;
713  }
714 
715  executorPtr->abort();
716  std::unique_lock statusMapLock(statusUpdatesMutex);
717  statusUpdates.push_front(
718  {armarx::DateTime::Now(), executorPtr->id, nodeId, skills::SkillStatus::Aborted});
719  statusMapLock.unlock();
720  }
721  executionsLock.unlock();
722  }
723 
724  std::optional<std::vector<skills::FluxioSkillStatusUpdate>>
726  {
727  ARMARX_INFO << "Getting status updates for skill " << skill.name;
728  // convert statusupdates list to vector
729  std::shared_lock statusMapLock(statusUpdatesMutex);
730  auto ret = std::vector<skills::FluxioSkillStatusUpdate>(statusUpdates.begin(),
731  statusUpdates.end());
732  statusMapLock.unlock();
733  return ret;
734  }
735 
736  void
737  FluxioCompositeExecutor::pollSubStatuses()
738  {
739  std::scoped_lock l(subExecutionsMapMutex, statusUpdatesMutex);
740  for (const auto& [nodeId, executorPtr] : subExecutionsMap)
741  {
742  executorPtr->getStatusUpdate();
743  auto s = executorPtr->getStatus();
744  if (!s.has_value())
745  {
746  continue;
747  }
748 
749  const auto& lastStatus =
750  find_if(statusUpdates.begin(),
751  statusUpdates.end(),
752  [&](const skills::FluxioSkillStatusUpdate& statusUpdate)
753  { return statusUpdate.subSkillNodeId == nodeId; })
754  ->status;
755 
756  if (lastStatus != s->status)
757  {
758  statusUpdates.push_front(
759  {armarx::DateTime::Now(), executorPtr->id, nodeId, s->status});
760  }
761  }
762  }
763 
764  bool
766  {
767 
768  // get start parameter
769  const auto& startParam =
770  std::find_if(skill.parameters.begin(),
771  skill.parameters.end(),
772  [](const std::pair<std::string, skills::FluxioParameter>& param)
773  {
774  return (param.second.type->getShortName() == "Object<Event>" &&
775  param.second.isInput && param.second.name == "Start");
776  });
777 
778  if (startParam == skill.parameters.end())
779  {
780  ARMARX_WARNING << "Skill has no start parameter";
781  return false;
782  }
783 
784  // get all parameter nodes for the start parameter
785  const auto& startNode = std::find_if(
786  skill.nodes.begin(),
787  skill.nodes.end(),
788  [startParam](const std::pair<const std::string,
789  const std::unique_ptr<skills::FluxioNode>>& nodeEntry)
790  {
791  if (nodeEntry.second->nodeType != skills::FluxioNodeType::PARAMETER)
792  {
793  return false;
794  }
795 
796  const auto& paramNode =
797  dynamic_cast<const skills::FluxioParameterNode*>(nodeEntry.second.get());
798  return (paramNode->parameterPtr->id == startParam->second.id);
799  });
800 
801  // there can only be one
802  if (startNode == skill.nodes.end())
803  {
804  ARMARX_WARNING << "Skill has no start node";
805  return false;
806  }
807 
808  // check if the start node is connected
809  const auto& startEdge =
810  std::find_if(skill.edges.begin(),
811  skill.edges.end(),
812  [startNode](const skills::FluxioEdge& edge)
813  { return (edge.fromNodePtr->nodeId == startNode->second->nodeId); });
814 
815  // there can only be one
816  if (startEdge == skill.edges.end())
817  {
818  ARMARX_WARNING << "Skill has no edge connected to the start node";
819  return false;
820  }
821 
822  // get the output event parameters
823  const auto& outputParamsSuccess =
824  std::find_if(skill.parameters.begin(),
825  skill.parameters.end(),
826  [](const std::pair<std::string, skills::FluxioParameter>& param)
827  {
828  return (param.second.type->getShortName() == "Object<Event>" &&
829  !param.second.isInput && param.second.name == "Succeeded");
830  });
831  const auto& outputParamsFailed =
832  std::find_if(skill.parameters.begin(),
833  skill.parameters.end(),
834  [](const std::pair<std::string, skills::FluxioParameter>& param)
835  {
836  return (param.second.type->getShortName() == "Object<Event>" &&
837  !param.second.isInput && param.second.name == "Failed");
838  });
839  const auto& outputParamsAborted =
840  std::find_if(skill.parameters.begin(),
841  skill.parameters.end(),
842  [](const std::pair<std::string, skills::FluxioParameter>& param)
843  {
844  return (param.second.type->getShortName() == "Object<Event>" &&
845  !param.second.isInput && param.second.name == "Aborted");
846  });
847 
848  if (outputParamsSuccess == skill.parameters.end() ||
849  outputParamsFailed == skill.parameters.end() ||
850  outputParamsAborted == skill.parameters.end())
851  {
852  ARMARX_WARNING << "Skill is missing one or more output event parameters";
853  return false;
854  }
855 
856  // TODO: the rest
857 
858  ARMARX_INFO << "Skill validation is not fully implemented yet.";
859  ret = *startEdge;
860  return true;
861  }
862 
863  void
864  FluxioCompositeExecutor::setStatus(skills::SkillStatus status, const std::string& /*nodeId*/)
865  {
866  FluxioExecutor::setStatus(status, skill.id);
867  }
868 } // namespace armarx::skills
armarx::skills::FluxioSkill
Definition: FluxioSkill.h:25
FluxioCompositeExecutor.h
armarx::skills::FluxioEdge
Definition: FluxioEdge.h:15
armarx::skills::FluxioSkill::nodes
std::map< const std::string, const std::unique_ptr< FluxioNode > > nodes
Definition: FluxioSkill.h:40
armarx::skills::FluxioExecutor::status
std::optional< skills::FluxioSkillStatusUpdate > status
Definition: FluxioExecutor.h:62
armarx::aron::ret
ReaderT::InputType T & ret
Definition: rw.h:13
armarx::skills::FluxioExecutor::resultMutex
std::shared_mutex resultMutex
Definition: FluxioExecutor.h:57
armarx::skills::SkillStatus::Aborted
@ Aborted
armarx::skills::FluxioParameterNode
Definition: FluxioParameterNode.h:14
armarx::skills::FluxioSkill::id
std::string id
Definition: FluxioSkill.h:27
armarx::skills::error::FluxioException
A base class for skill exceptions.
Definition: FluxioException.h:24
armarx::skills::FluxioExecutor::result
armarx::aron::data::DictPtr result
Definition: FluxioExecutor.h:56
armarx::skills
This file is part of ArmarX.
Definition: PeriodicUpdateWidget.cpp:11
armarx::skills::FluxioNodeType::PARAMETER
@ PARAMETER
armarx::skills::FluxioExecutor::executorName
std::optional< std::string > executorName
Definition: FluxioExecutor.h:51
armarx::core::time::DateTime::Now
static DateTime Now()
Definition: DateTime.cpp:51
armarx::skills::FluxioExecutor::statusMutex
std::shared_mutex statusMutex
Definition: FluxioExecutor.h:63
DateTime.h
armarx::skills::FluxioExecutor::getPossibleInputCopy
virtual armarx::aron::data::VariantPtr getPossibleInputCopy(const std::string nodeId, const std::string parameterId)
Definition: FluxioExecutor.cpp:28
armarx::skills::FluxioExecutor::possibleInputsMutex
std::shared_mutex possibleInputsMutex
Definition: FluxioExecutor.h:60
std::experimental::fundamentals_v2::make_observer
observer_ptr< _Tp > make_observer(_Tp *__p) noexcept
armarx::skills::FluxioExecutor::possibleInputs
std::map< std::string, armarx::aron::data::DictPtr > possibleInputs
Definition: FluxioExecutor.h:59
FluxioProfile.h
armarx::skills::FluxioExecutor
Definition: FluxioExecutor.h:21
return
return() armarx_set_target("PointCloudViewerGuiPlugin") find_package(MPI REQUIRED) find_package(VTK REQUIRED) find_package(Coin QUIET) armarx_build_if(Coin_FOUND "Coin 3D not found") set(COMPONENT_LIBS VisionXInterfaces VisionXCore VisionXPointCloud VTK
Definition: CMakeLists.txt:1
std::experimental::fundamentals_v2::observer_ptr
Definition: ManagedIceObject.h:53
SkillManagerComponentPlugin.h
armarx::core::time::Duration::isPositive
bool isPositive() const
Tests whether the duration is positive (value in µs > 0).
Definition: Duration.cpp:168
armarx::skills::FluxioSkillStatusUpdate
Definition: FluxioSkillStatusUpdate.h:10
armarx::aron::data::VariantPtr
std::shared_ptr< Variant > VariantPtr
Definition: forward_declarations.h:11
armarx::status
status
Definition: FiniteStateMachine.h:244
armarx::skills::FluxioExecutor::findParameterValue
armarx::aron::data::VariantPtr findParameterValue(const std::experimental::observer_ptr< const FluxioProfile > profilePtr, const FluxioParameter &parameter) const
Definition: FluxioExecutor.cpp:75
cxxopts::value
std::shared_ptr< Value > value()
Definition: cxxopts.hpp:855
armarx::skills::FluxioNodeTypeToString
std::optional< std::string > FluxioNodeTypeToString(const FluxioNodeType &type)
Definition: FluxioNode.cpp:41
armarx::skills::FluxioNodeType::CONTROL
@ CONTROL
FluxioException.h
armarx::skills::SkillStatus::Preparing
@ Preparing
armarx::skills::FluxioControlNodeType::AND_MERGER
@ AND_MERGER
FluxioExecutor.h
armarx::to_string
const std::string & to_string(const std::string &s)
Definition: StringHelpers.h:41
armarx::skills::FluxioSkill::timeout
armarx::Duration timeout
How long (in ms) to wait for the skill to finish execution before timing out.
Definition: FluxioSkill.h:33
armarx::skills::FluxioEdge::toParameterPtr
std::experimental::observer_ptr< const FluxioParameter > toParameterPtr
Definition: FluxioEdge.h:20
armarx::aron::data::DictPtr
std::shared_ptr< Dict > DictPtr
Definition: Dict.h:41
armarx::skills::FluxioCompositeExecutor::abort
void abort() override
Definition: FluxioCompositeExecutor.cpp:694
armarx::skills::SkillStatus::Initializing
@ Initializing
Exception.h
armarx::skills::FluxioCompositeExecutor::FluxioCompositeExecutor
FluxioCompositeExecutor(const std::string &id, const skills::FluxioSkill &skill, const std::function< void(const std::string &executionId)> &&abortFluxioSkillFunc, const std::function< skills::Result< std::experimental::observer_ptr< FluxioExecutor >, skills::error::FluxioException >(const std::string &skillId, const std::string &profileId, const std::string &executorName, armarx::aron::data::DictPtr parameters)> &&executeFluxioSkillFunc, const std::function< std::experimental::observer_ptr< FluxioExecutor >(const std::vector< std::string > &parameterIds)> &&addMergerExecutorToDCFunc)
Definition: FluxioCompositeExecutor.cpp:32
ARMARX_INFO
#define ARMARX_INFO
Definition: Logging.h:181
armarx::skills::FluxioExecutor::statusUpdates
std::list< skills::FluxioSkillStatusUpdate > statusUpdates
Definition: FluxioExecutor.h:53
armarx::skills::FluxioExecutor::statusUpdatesMutex
std::shared_mutex statusUpdatesMutex
Definition: FluxioExecutor.h:54
armarx::skills::SkillStatus::Running
@ Running
armarx::skills::FluxioCompositeExecutor::run
void run(std::string executorName, armarx::aron::data::DictPtr parameters, std::experimental::observer_ptr< const FluxioProfile > profilePtr) override
Definition: FluxioCompositeExecutor.cpp:62
armarx::core::time::Duration::toMilliSeconds
std::int64_t toMilliSeconds() const
Returns the amount of milliseconds.
Definition: Duration.cpp:60
Logging.h
min
T min(T t1, T t2)
Definition: gdiam.h:44
ARMARX_WARNING
#define ARMARX_WARNING
Definition: Logging.h:193
armarx::skills::SkillStatus::Constructing
@ Constructing
armarx::skills::FluxioSkill::edges
std::list< FluxioEdge > edges
Definition: FluxioSkill.h:41
armarx::skills::FluxioNodeType::SUBSKILL
@ SUBSKILL
armarx::skills::FluxioCompositeExecutor::validateSkill
bool validateSkill(skills::FluxioEdge &ret) const
Definition: FluxioCompositeExecutor.cpp:765
armarx::skills::FluxioSkill::name
std::string name
Definition: FluxioSkill.h:28
FluxioMergerExecutor.h
armarx::ctrlutil::s
double s(double t, double s0, double v0, double a0, double j)
Definition: CtrlUtil.h:33
armarx::skills::FluxioEdge::toNodePtr
std::experimental::observer_ptr< const FluxioNode > toNodePtr
Definition: FluxioEdge.h:19
armarx::skills::Result
Definition: FluxioResult.h:12
FluxioNode.h
armarx::skills::FluxioCompositeExecutor::getStatusUpdate
std::optional< std::vector< skills::FluxioSkillStatusUpdate > > getStatusUpdate() override
Definition: FluxioCompositeExecutor.cpp:725
armarx::skills::SkillStatus
SkillStatus
Definition: SkillStatusUpdate.h:15
armarx::skills::SkillStatus::Succeeded
@ Succeeded
armarx::skills::FluxioSkill::parameters
std::map< std::string, FluxioParameter > parameters
Definition: FluxioSkill.h:39
armarx::skills::FluxioControlNodeType::SPLITTER
@ SPLITTER
armarx::skills::SkillStatus::Failed
@ Failed