FluxioNativeExecutor.cpp
Go to the documentation of this file.
1 #include "FluxioNativeExecutor.h"
2 
3 #include <experimental/memory>
4 #include <memory>
5 #include <mutex>
6 #include <utility>
7 #include <vector>
8 
10 
17 
18 #include "../ProviderID.h"
19 #include "../SkillExecutionRequest.h"
20 #include "../SkillID.h"
21 
22 namespace armarx::skills
23 {
25  const std::string& id,
26  const skills::SkillID& skillId,
27  const FluxioSkill& fluxioSkill,
28  const std::function<bool(const skills::SkillExecutionID&)>&& abortSkillFunc,
29  const std::function<skills::SkillExecutionID(const skills::SkillExecutionRequest&)>&&
30  executeSkillAsyncFunc,
31  const std::function<std::optional<skills::SkillStatusUpdate>(
32  const skills::SkillExecutionID&)>&& getSkillExecutionStatusFunc) :
33  FluxioExecutor(id, true),
34  skillId(skillId),
35  fluxioSkill(fluxioSkill),
36  abortSkill(abortSkillFunc),
37  executeSkillAsync(executeSkillAsyncFunc),
38  getSkillExecutionStatus(getSkillExecutionStatusFunc)
39  {
40  std::scoped_lock resultLock(resultMutex);
41  result = std::make_shared<armarx::aron::data::Dict>();
42  }
43 
44  void
45  FluxioNativeExecutor::run(const std::string executorName,
46  armarx::aron::data::DictPtr parameters,
48  {
49  // reset results
50  std::unique_lock resultLock(resultMutex);
51  result = std::make_shared<armarx::aron::data::Dict>();
52  resultLock.unlock();
53 
54  std::unique_lock l(possibleInputsMutex);
55  this->possibleInputs[this->fluxioSkill.id] = parameters;
56 
57  const auto& type = std::make_shared<armarx::aron::type::Object>();
58 
59  // fill in missing with default values
60  for (const auto& [key, param] : this->fluxioSkill.parameters)
61  {
62  if (param.type->getShortName() == "Object<Event>" || !param.isInput ||
63  this->possibleInputs[this->fluxioSkill.id]->hasElement(key))
64  {
65  continue;
66  }
67  const auto& fallBack = findParameterValue(profilePtr, param);
68 
69  if (fallBack == nullptr)
70  {
71  ARMARX_WARNING << "No fallback value found for parameter " << param.name;
72  ARMARX_WARNING << "Can't execute skill due to incomplete params.";
73  this->setStatus(skills::SkillStatus::Failed);
74  return;
75  }
76 
77  this->possibleInputs[this->fluxioSkill.id]->addElement(key, fallBack);
78 
79  type->addMemberType(param.name, param.type);
80  }
81 
82  const auto& params = std::make_shared<armarx::aron::data::Dict>();
83 
84  for (const auto& [key, value] : this->possibleInputs[this->fluxioSkill.id]->getElements())
85  {
86  const auto& paramIt = this->fluxioSkill.parameters.find(key);
87  if (paramIt == this->fluxioSkill.parameters.end())
88  {
89  ARMARX_WARNING << "Parameter with ID " << key << " not found in skill "
90  << this->fluxioSkill.id;
91  ARMARX_WARNING << "Can't execute skill due to incomplete params.";
92  this->setStatus(skills::SkillStatus::Failed);
93  return;
94  }
95  if (value == nullptr)
96  {
97  ARMARX_WARNING << "Value for parameter " << paramIt->second.name << " is null";
98  continue;
99  }
100  params->addElement(paramIt->second.name, value);
101  }
102  l.unlock();
103 
104  const auto& isValide = params->fullfillsType(type);
105 
106  if (!isValide)
107  {
108  ARMARX_WARNING << "Parameters do not fullfill the type of the skill";
109  ARMARX_WARNING << "Can't execute skill due to incompatible params.";
110  this->setStatus(skills::SkillStatus::Failed);
111  return;
112  }
113 
115  req.skillId = skillId;
116  req.parameters = params;
118  this->executorName = executorName;
119 
120  ARMARX_WARNING << "Executing skill with the following parameters: "
122  req.parameters);
123  auto eid = executeSkillAsync(req);
124 
125  this->executionId = eid;
126  }
127 
128  void
130  {
131  std::scoped_lock l(statusMutex);
132  if (status->status == skills::SkillStatus::Aborted ||
135  {
136  // already terminated
137  return;
138  }
139 
140  if (!this->executionId.has_value())
141  {
142  // error
143  ARMARX_WARNING << "Execution ID not set. Cannot abort skill (it should not be running "
144  "at this point).";
145  return;
146  }
147 
148  abortSkill(this->executionId.value());
149  }
150 
151  std::optional<std::vector<skills::FluxioSkillStatusUpdate>>
153  {
154  if (!this->executionId.has_value())
155  {
156  // error
157  ARMARX_WARNING << "Execution ID or plugin is not set";
158  return std::vector<skills::FluxioSkillStatusUpdate>(this->statusUpdates.begin(),
159  this->statusUpdates.end());
160  }
161 
162  auto executionId = this->executionId.value();
163 
164  auto status = getSkillExecutionStatus(executionId);
165 
166  if (!status.has_value())
167  {
168  // error
169  ARMARX_WARNING << "No status with execution ID " << this->executionId->toString()
170  << " found";
171  return std::nullopt;
172  }
173 
175  armarx::DateTime::Now(), this->id, this->fluxioSkill.id, status.value().status};
176 
177  // newest statusupdate is aways at the end of the vector
178  if (this->statusUpdates.empty() || update.status != this->statusUpdates.front().status)
179  {
180  this->setStatus(update.status);
181  }
182 
183  return std::vector<skills::FluxioSkillStatusUpdate>(this->statusUpdates.begin(),
184  this->statusUpdates.end());
185  }
186 
187  void
188  FluxioNativeExecutor::setStatus(skills::SkillStatus status, const std::string& /*nodeId*/)
189  {
190  FluxioExecutor::setStatus(status, this->fluxioSkill.id);
191  }
192 
195  {
196  // grab latest status update and check if it contains a result
197  if (!executionId.has_value())
198  {
199  ARMARX_WARNING << "No execution ID available for skill " << this->skillId;
200  return nullptr;
201  }
202  std::scoped_lock l(resultMutex);
203  const auto& s = getSkillExecutionStatus(executionId.value());
204  if (!s.has_value() || s->result == nullptr)
205  {
206  ARMARX_WARNING << "No result available for skill " << this->skillId;
207  return nullptr;
208  }
209 
210  for (const auto& [key, value] : s->result->getElements())
211  {
212  // get param by name
213  const auto& params = this->fluxioSkill.parameters;
214  const auto& param =
215  std::find_if(params.begin(),
216  params.end(),
217  [&key](const std::pair<std::string, FluxioParameter>& it)
218  { return !it.second.isInput && it.second.name == key; });
219 
220  if (param == params.end())
221  {
222  ARMARX_WARNING << "Parameter with name " << key << " not found in skill "
223  << this->skillId;
224  return nullptr;
225  }
226  this->result->addElement(param->second.id, value);
227  }
228 
229  return this->result->clone();
230  }
231 } // namespace armarx::skills
armarx::skills::FluxioSkill
Definition: FluxioSkill.h:25
armarx::skills::FluxioNativeExecutor::run
void run(std::string executorName, armarx::aron::data::DictPtr parameters, std::experimental::observer_ptr< const FluxioProfile > profilePtr) override
Definition: FluxioNativeExecutor.cpp:45
armarx::skills::SkillExecutionID
Definition: SkillExecutionID.h:15
armarx::skills::FluxioExecutor::resultMutex
std::shared_mutex resultMutex
Definition: FluxioExecutor.h:57
SkillStatusUpdate.h
armarx::skills::FluxioNativeExecutor::FluxioNativeExecutor
FluxioNativeExecutor(const std::string &id, const skills::SkillID &skillId, const FluxioSkill &fluxioSkill, const std::function< bool(const skills::SkillExecutionID &)> &&abortSkillFunc, const std::function< skills::SkillExecutionID(const skills::SkillExecutionRequest &)> &&executeSkillAsyncFunc, const std::function< std::optional< skills::SkillStatusUpdate >(const skills::SkillExecutionID &)> &&getSkillExecutionStatusFunc)
Definition: FluxioNativeExecutor.cpp:24
armarx::skills::SkillStatus::Aborted
@ Aborted
armarx::skills::SkillExecutionRequest::skillId
skills::SkillID skillId
Definition: SkillExecutionRequest.h:25
armarx::skills::FluxioSkill::id
std::string id
Definition: FluxioSkill.h:27
armarx::skills::FluxioExecutor::result
armarx::aron::data::DictPtr result
Definition: FluxioExecutor.h:56
armarx::skills
This file is part of ArmarX.
Definition: PeriodicUpdateWidget.cpp:11
armarx::skills::FluxioExecutor::executorName
std::optional< std::string > executorName
Definition: FluxioExecutor.h:51
armarx::core::time::DateTime::Now
static DateTime Now()
Definition: DateTime.cpp:51
armarx::skills::FluxioExecutor::statusMutex
std::shared_mutex statusMutex
Definition: FluxioExecutor.h:63
armarx::aron::data::converter::AronNlohmannJSONConverter::ConvertToNlohmannJSON
static nlohmann::json ConvertToNlohmannJSON(const data::VariantPtr &)
Definition: NLohmannJSONConverter.cpp:10
armarx::skills::FluxioExecutor::possibleInputsMutex
std::shared_mutex possibleInputsMutex
Definition: FluxioExecutor.h:60
armarx::skills::FluxioExecutor::possibleInputs
std::map< std::string, armarx::aron::data::DictPtr > possibleInputs
Definition: FluxioExecutor.h:59
armarx::skills::FluxioSkillStatusUpdate::status
SkillStatus status
Definition: FluxioSkillStatusUpdate.h:15
armarx::skills::FluxioExecutor
Definition: FluxioExecutor.h:21
std::experimental::fundamentals_v2::observer_ptr
Definition: ManagedIceObject.h:53
Dict.h
armarx::skills::FluxioNativeExecutor::getStatusUpdate
std::optional< std::vector< skills::FluxioSkillStatusUpdate > > getStatusUpdate() override
Definition: FluxioNativeExecutor.cpp:152
NLohmannJSONConverter.h
armarx::skills::FluxioSkillStatusUpdate
Definition: FluxioSkillStatusUpdate.h:10
armarx::skills::SkillExecutionRequest
Definition: SkillExecutionRequest.h:13
Object.h
armarx::status
status
Definition: FiniteStateMachine.h:244
armarx::skills::FluxioExecutor::findParameterValue
armarx::aron::data::VariantPtr findParameterValue(const std::experimental::observer_ptr< const FluxioProfile > profilePtr, const FluxioParameter &parameter) const
Definition: FluxioExecutor.cpp:75
cxxopts::value
std::shared_ptr< Value > value()
Definition: cxxopts.hpp:855
FluxioParameter.h
armarx::skills::FluxioNativeExecutor::abort
void abort() override
Definition: FluxioNativeExecutor.cpp:129
armarx::skills::FluxioExecutor::id
const std::string id
Definition: FluxioExecutor.h:47
FluxioExecutor.h
armarx::skills::FluxioNativeExecutor::getResultsCopy
armarx::aron::data::DictPtr getResultsCopy() override
Definition: FluxioNativeExecutor.cpp:194
armarx::armem::server::ltm::util::mongodb::detail::update
bool update(mongocxx::collection &coll, const nlohmann::json &query, const nlohmann::json &update)
Definition: mongodb.cpp:68
armarx::aron::data::DictPtr
std::shared_ptr< Dict > DictPtr
Definition: Dict.h:41
armarx::skills::FluxioExecutor::statusUpdates
std::list< skills::FluxioSkillStatusUpdate > statusUpdates
Definition: FluxioExecutor.h:53
armarx::skills::SkillExecutionRequest::parameters
armarx::aron::data::DictPtr parameters
Definition: SkillExecutionRequest.h:27
armarx::skills::SkillExecutionRequest::executorName
std::string executorName
Definition: SkillExecutionRequest.h:26
Logging.h
ARMARX_WARNING
#define ARMARX_WARNING
Definition: Logging.h:193
armarx::skills::FluxioExecutor::setStatus
virtual void setStatus(skills::SkillStatus status, const std::string &nodeId="noId")
Definition: FluxioExecutor.cpp:17
armarx::skills::SkillID
Definition: SkillID.h:14
armarx::ctrlutil::s
double s(double t, double s0, double v0, double a0, double j)
Definition: CtrlUtil.h:33
FluxioNativeExecutor.h
armarx::skills::SkillStatus
SkillStatus
Definition: SkillStatusUpdate.h:15
armarx::skills::SkillStatus::Succeeded
@ Succeeded
armarx::skills::FluxioSkill::parameters
std::map< std::string, FluxioParameter > parameters
Definition: FluxioSkill.h:39
armarx::skills::SkillStatus::Failed
@ Failed