FluxioNativeExecutor.cpp
Go to the documentation of this file.
2
3#include <experimental/memory>
4#include <memory>
5#include <mutex>
6#include <utility>
7#include <vector>
8
10
17
18#include "../ProviderID.h"
20#include "../SkillID.h"
21
22namespace armarx::skills
23{
25 const std::string& id,
26 const skills::SkillID& skillId,
27 const FluxioSkill& fluxioSkill,
28 const std::function<bool(const skills::SkillExecutionID&)>&& abortSkillFunc,
29 const std::function<skills::SkillExecutionID(const skills::SkillExecutionRequest&)>&&
30 executeSkillAsyncFunc,
31 const std::function<std::optional<skills::SkillStatusUpdate>(
32 const skills::SkillExecutionID&)>&& getSkillExecutionStatusFunc) :
33 FluxioExecutor(id, true),
34 skillId(skillId),
35 fluxioSkill(fluxioSkill),
36 abortSkill(abortSkillFunc),
37 executeSkillAsync(executeSkillAsyncFunc),
38 getSkillExecutionStatus(getSkillExecutionStatusFunc)
39 {
40 std::scoped_lock resultLock(resultMutex);
41 result = std::make_shared<armarx::aron::data::Dict>();
42 }
43
44 void
48 {
49 // reset results
50 std::unique_lock resultLock(resultMutex);
51 result = std::make_shared<armarx::aron::data::Dict>();
52 resultLock.unlock();
53
54 std::unique_lock l(possibleInputsMutex);
55 this->possibleInputs[this->fluxioSkill.id] = parameters;
56
57 const auto& type = std::make_shared<armarx::aron::type::Object>();
58
59 // fill in missing with default values
60 for (const auto& [key, param] : this->fluxioSkill.parameters)
61 {
62 if (param.type->getShortName() == "Object<Event>" || !param.isInput ||
63 this->possibleInputs[this->fluxioSkill.id]->hasElement(key))
64 {
65 continue;
66 }
67 const auto& fallBack = findParameterValue(profilePtr, param);
68
69 if (fallBack == nullptr)
70 {
71 ARMARX_INFO << "No fallback value found for parameter " << param.name;
72 if (!param.required)
73 {
74 ARMARX_INFO << "Parameter is not required (MAYBE type). Using nullptr "
75 "(std::nullopt) instead.";
76 }
77 else
78 {
79 ARMARX_INFO << "Can't execute skill due to incomplete params.";
80 ARMARX_INFO << "Aborting skill execution.";
81 this->setStatus(skills::SkillStatus::Aborted);
82 return;
83 }
84 }
85
86 this->possibleInputs[this->fluxioSkill.id]->addElement(key, fallBack);
87
88 type->addMemberType(param.name, param.type);
89 }
90
91 const auto& params = std::make_shared<armarx::aron::data::Dict>();
92
93 for (const auto& [key, value] : this->possibleInputs[this->fluxioSkill.id]->getElements())
94 {
95 const auto& paramIt = this->fluxioSkill.parameters.find(key);
96 if (paramIt == this->fluxioSkill.parameters.end())
97 {
98 ARMARX_WARNING << "Parameter with ID " << key << " not found in skill "
99 << this->fluxioSkill.id;
100 ARMARX_WARNING << "Can't execute skill due to incomplete params.";
101 this->setStatus(skills::SkillStatus::Failed);
102 return;
103 }
104 if (value == nullptr && paramIt->second.required)
105 {
106 ARMARX_INFO << "Value for parameter " << paramIt->second.name << " is null.";
107 continue;
108 }
109 params->addElement(paramIt->second.name, value);
110 }
111 l.unlock();
112
113 const auto& isValid = params->fullfillsType(type);
114
115 if (!isValid)
116 {
117 ARMARX_WARNING << "Parameters do not fullfill the type of the skill";
118 ARMARX_WARNING << "Can't execute skill due to incompatible params.";
119 this->setStatus(skills::SkillStatus::Failed);
120 return;
121 }
122
124 req.skillId = skillId;
125 req.parameters = params;
127 this->executorName = executorName;
128
129 ARMARX_WARNING << "Executing skill with the following parameters: "
131 req.parameters);
132 auto eid = executeSkillAsync(req);
133
134 this->executionId = eid;
135 }
136
137 void
139 {
140 std::scoped_lock l(statusMutex);
141 if (status->status == skills::SkillStatus::Aborted ||
144 {
145 // already terminated
146 return;
147 }
148
149 if (!this->executionId.has_value())
150 {
151 // error
152 ARMARX_WARNING << "Execution ID not set. Cannot abort skill (it should not be running "
153 "at this point).";
154 return;
155 }
156
157 abortSkill(this->executionId.value());
158 }
159
160 std::optional<std::vector<skills::FluxioSkillStatusUpdate>>
162 {
163 if (!this->executionId.has_value())
164 {
165 // error
166 ARMARX_WARNING << "Execution ID is not set";
167 return std::vector<skills::FluxioSkillStatusUpdate>(this->statusUpdates.begin(),
168 this->statusUpdates.end());
169 }
170
171 auto executionId = this->executionId.value();
172
173 auto status = getSkillExecutionStatus(executionId);
174
175 if (!status.has_value())
176 {
177 // error
178 ARMARX_WARNING << "No status with execution ID " << this->executionId->toString()
179 << " found";
180 return std::nullopt;
181 }
182
184 armarx::DateTime::Now(), this->id, this->fluxioSkill.id, status.value().status};
185
186 // newest statusupdate is aways at the end of the vector
187 if (this->statusUpdates.empty() || update.status != this->statusUpdates.front().status)
188 {
189 this->setStatus(update.status);
190 }
191
192 return std::vector<skills::FluxioSkillStatusUpdate>(this->statusUpdates.begin(),
193 this->statusUpdates.end());
194 }
195
196 void
197 FluxioNativeExecutor::setStatus(skills::SkillStatus status, const std::string& /*nodeId*/)
198 {
199 FluxioExecutor::setStatus(status, this->fluxioSkill.id);
200 }
201
204 {
205 // grab latest status update and check if it contains a result
206 if (!executionId.has_value())
207 {
208 ARMARX_WARNING << "No execution ID available for skill " << this->skillId;
209 return nullptr;
210 }
211 std::scoped_lock l(resultMutex);
212 const auto& s = getSkillExecutionStatus(executionId.value());
213 if (!s.has_value() || s->result == nullptr)
214 {
215 ARMARX_WARNING << "No result available for skill " << this->skillId;
216 return nullptr;
217 }
218
219 for (const auto& [key, value] : s->result->getElements())
220 {
221 // get param by name
222 const auto& params = this->fluxioSkill.parameters;
223 const auto& param =
224 std::find_if(params.begin(),
225 params.end(),
226 [&key](const std::pair<std::string, FluxioParameter>& it)
227 { return !it.second.isInput && it.second.name == key; });
228
229 if (param == params.end())
230 {
231 ARMARX_WARNING << "Parameter with name " << key << " not found in skill "
232 << this->skillId;
233 return nullptr;
234 }
235 this->result->addElement(param->second.id, value);
236 }
237
238 return this->result->clone();
239 }
240} // namespace armarx::skills
static DateTime Now()
Definition DateTime.cpp:51
static nlohmann::json ConvertToNlohmannJSON(const data::VariantPtr &)
FluxioExecutor(const FluxioExecutor &)=delete
armarx::aron::data::DictPtr result
armarx::aron::data::VariantPtr findParameterValue(const std::experimental::observer_ptr< const FluxioProfile > profilePtr, const FluxioParameter &parameter) const
std::optional< skills::FluxioSkillStatusUpdate > status
std::list< skills::FluxioSkillStatusUpdate > statusUpdates
std::map< std::string, armarx::aron::data::DictPtr > possibleInputs
std::optional< std::string > executorName
virtual void setStatus(skills::SkillStatus status, const std::string &nodeId="noId")
std::shared_mutex possibleInputsMutex
void run(std::string executorName, armarx::aron::data::DictPtr parameters, std::experimental::observer_ptr< const FluxioProfile > profilePtr) override
FluxioNativeExecutor(const std::string &id, const skills::SkillID &skillId, const FluxioSkill &fluxioSkill, const std::function< bool(const skills::SkillExecutionID &)> &&abortSkillFunc, const std::function< skills::SkillExecutionID(const skills::SkillExecutionRequest &)> &&executeSkillAsyncFunc, const std::function< std::optional< skills::SkillStatusUpdate >(const skills::SkillExecutionID &)> &&getSkillExecutionStatusFunc)
std::optional< std::vector< skills::FluxioSkillStatusUpdate > > getStatusUpdate() override
armarx::aron::data::DictPtr getResultsCopy() override
#define ARMARX_INFO
The normal logging level.
Definition Logging.h:181
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
std::shared_ptr< Dict > DictPtr
Definition Dict.h:42
This file is part of ArmarX.
std::map< std::string, FluxioParameter > parameters
Definition FluxioSkill.h:39