FluxioMergerExecutor.cpp
Go to the documentation of this file.
1 #include "FluxioMergerExecutor.h"
2 
3 #include <mutex>
4 #include <optional>
5 #include <string>
6 #include <thread>
7 #include <vector>
8 
10 
11 #include "../SkillStatusUpdate.h"
12 
13 namespace armarx::skills
14 {
16  const std::vector<std::string>& parameterIds) :
17  FluxioExecutor(id, false)
18  {
19  for (const auto& id : parameterIds)
20  {
21  tokenHasArrivedMap[id] = false;
22  }
23  ARMARX_WARNING << "Fluxio Merger is waiting for the following tokens to arrive: "
24  << parameterIds;
25  }
26 
27  void
28  FluxioMergerExecutor::run(const std::string executorName,
29  armarx::aron::data::DictPtr /*parameters*/,
31  {
32  // set status running
34  this->executorName = executorName;
35 
36  // wait for all tokens to arrive
37  while (true)
38  {
39  std::this_thread::sleep_for(std::chrono::milliseconds(250));
40 
41  // check if all tokens have arrived
42  bool allTokensArrived = true;
43  for (const auto& [id, hasArrived] : tokenHasArrivedMap)
44  {
45  if (!hasArrived)
46  {
47  allTokensArrived = false;
48  break;
49  }
50  }
51 
52  if (status->status == skills::SkillStatus::Aborted ||
55  {
56  return;
57  }
58 
59  if (allTokensArrived)
60  {
62  return;
63  }
64  }
65  }
66 
67  void
69  {
71  }
72 
73  void
74  FluxioMergerExecutor::checkInToken(const std::string& parameterId)
75  {
76  std::unique_lock l(tokenHasArrivedMapMutex);
77  const auto it = tokenHasArrivedMap.find(parameterId);
78 
79  if (it == tokenHasArrivedMap.end())
80  {
81  ARMARX_WARNING << "Unexpected parameterId: " << parameterId;
82  }
83  else
84  {
85  ARMARX_WARNING << "Fluxio Merger received token for parameterId: " << parameterId;
86  tokenHasArrivedMap[parameterId] = true;
87  }
88 
89  l.unlock();
90  }
91 
92  std::optional<std::vector<skills::FluxioSkillStatusUpdate>>
94  {
95  // unused method
96  return std::nullopt;
97  }
98 } // namespace armarx::skills
armarx::skills::FluxioMergerExecutor::getStatusUpdate
std::optional< std::vector< skills::FluxioSkillStatusUpdate > > getStatusUpdate() override
Definition: FluxioMergerExecutor.cpp:93
armarx::skills::SkillStatus::Aborted
@ Aborted
armarx::skills
This file is part of ArmarX.
Definition: PeriodicUpdateWidget.cpp:11
armarx::skills::FluxioExecutor::executorName
std::optional< std::string > executorName
Definition: FluxioExecutor.h:51
armarx::skills::FluxioMergerExecutor::run
void run(std::string executorName, armarx::aron::data::DictPtr parameters, std::experimental::observer_ptr< const FluxioProfile > profilePtr) override
Definition: FluxioMergerExecutor.cpp:28
armarx::skills::FluxioExecutor
Definition: FluxioExecutor.h:21
armarx::skills::FluxioMergerExecutor::FluxioMergerExecutor
FluxioMergerExecutor(const std::string &id, const std::vector< std::string > &parameterIds)
Definition: FluxioMergerExecutor.cpp:15
std::experimental::fundamentals_v2::observer_ptr
Definition: ManagedIceObject.h:53
armarx::status
status
Definition: FiniteStateMachine.h:244
armarx::skills::FluxioMergerExecutor::checkInToken
void checkInToken(const std::string &parameterId)
Definition: FluxioMergerExecutor.cpp:74
armarx::skills::FluxioExecutor::id
const std::string id
Definition: FluxioExecutor.h:47
armarx::skills::FluxioMergerExecutor::abort
void abort() override
Definition: FluxioMergerExecutor.cpp:68
armarx::aron::data::DictPtr
std::shared_ptr< Dict > DictPtr
Definition: Dict.h:41
armarx::skills::SkillStatus::Running
@ Running
Logging.h
ARMARX_WARNING
#define ARMARX_WARNING
Definition: Logging.h:193
armarx::skills::FluxioExecutor::setStatus
virtual void setStatus(skills::SkillStatus status, const std::string &nodeId="noId")
Definition: FluxioExecutor.cpp:17
FluxioMergerExecutor.h
armarx::skills::SkillStatus::Succeeded
@ Succeeded
armarx::skills::SkillStatus::Failed
@ Failed