FluxioLoopExecutor.cpp
Go to the documentation of this file.
2
3#include <algorithm>
4#include <chrono>
5#include <experimental/memory>
6#include <functional>
7#include <memory>
8#include <mutex>
9#include <optional>
10#include <shared_mutex>
11#include <string>
12#include <thread>
13#include <vector>
14
17
27
28namespace armarx::skills
29{
31 const std::string& id,
32 const skills::FluxioSkill& skill,
33 bool isRetry,
36 const std::string& skillId,
37 const std::string& profileId,
38 const std::string& executorName,
39 armarx::aron::data::DictPtr parameters)>&& executeFluxioSkillFunc) :
40 FluxioExecutor(id, false),
41 executeFluxioSkill(executeFluxioSkillFunc),
42 skill(skill),
43 isRetry(isRetry)
44 {
45 std::scoped_lock l(possibleInputsMutex);
46 possibleInputs[skill.id] = nullptr;
47 for (const auto& node : skill.nodes)
48 {
49 possibleInputs[node.first] = nullptr;
50 }
51
52 std::scoped_lock resultLock(resultMutex);
53 result = std::make_shared<armarx::aron::data::Dict>();
54 }
55
56 void
60 {
61 ARMARX_INFO << "Running loop with slotted skill " << skill.name;
63 this->executorName = executorName;
64 this->slottedExecutionId = std::nullopt;
65
66 // get loop parameter
67 int iterations = -1; // default value
68 bool infiniteLoop = false;
69 if (!parameters->hasElement("Iterations"))
70 {
71 if (isRetry)
72 {
73 ARMARX_INFO << "max Iterations not set, using default (infinite)";
74 infiniteLoop = true;
75 }
76 else
77 {
78 ARMARX_INFO << "Missing 'Iterations' parameter for loop node.";
80 return;
81 }
82 }
83 else
84 {
85 // retrieve iterations value
86 const auto& valuePtr =
87 armarx::aron::data::Int::DynamicCastAndCheck(parameters->getElement("Iterations"));
88 if (valuePtr == nullptr)
89 {
90 ARMARX_INFO << "Invalid 'Iterations' parameter for loop node.";
92 return;
93 }
94 iterations = valuePtr->getValue();
95 }
96
97 // gather parameters for the slotted skill
98 armarx::aron::data::DictPtr slottedSkillParams =
99 std::make_shared<armarx::aron::data::Dict>();
100 for (const auto& paramIt : skill.parameters)
101 {
102 const auto& paramId = paramIt.first;
103 if (!parameters->hasElement(paramId))
104 {
105 ARMARX_WARNING << "Failed to get possible input for parameter " << paramId;
106 // slottedSkillParams->addElement(paramId, nullptr);
107 }
108 else
109 {
110 const auto& value = parameters->getElement(paramId);
111 slottedSkillParams->addElement(paramId, value->cloneAsVariant());
112 }
113 }
114
116
117
119 const std::string newExecutorName =
120 executorName + "/Loop (" + (isRetry ? "Retry" : "Repeat") + ")";
121 bool loopRunning = true;
122 bool allIterationsSucceeded = true;
123 loopIndex = 0;
124
125 // main loop
127 while (loopRunning)
128 {
129 const auto& ret =
130 executeSlottedSkill(profilePtr->id, newExecutorName, slottedSkillParams);
131
132 if (!ret.has_value())
133 {
134 ARMARX_INFO << "Failed to execute slotted skill " << skill.name;
136 return;
137 }
138
139 const auto& skillStatus = ret->status;
140
141 if (skillStatus == SkillStatus::Succeeded && isRetry)
142 {
144 loopRunning = false;
145 break;
146 }
147
148 if (skillStatus == SkillStatus::Aborted)
149 {
151 return;
152 }
153
154 if (skillStatus == SkillStatus::Failed)
155 {
156 allIterationsSucceeded = false;
157 }
158
159 if (!infiniteLoop && loopIndex >= iterations - 1)
160 {
161 if (isRetry)
162 {
164 }
165 else
166 {
167 setStatus(allIterationsSucceeded ? SkillStatus::Succeeded
169 }
170 loopRunning = false;
171 break;
172 }
173
174 loopIndex += 1;
175 }
176
177 const auto& res = subExecutionsMap.find(std::to_string(loopIndex));
178 if (res == subExecutionsMap.end() || res->second == nullptr)
179 {
181 return;
182 }
183 const auto& resultsCopy = res->second->getResultsCopy();
184 if (resultsCopy != nullptr)
185 {
186 for (const auto& [k, v] : resultsCopy->getElements())
187 {
188 result->setElementCopy(k, v);
189 }
190 }
191 }
192
193 std::optional<FluxioSkillStatusUpdate>
194 FluxioLoopExecutor::executeSlottedSkill(const std::string& profileId,
195 const std::string& executorName,
196 armarx::aron::data::DictPtr& parameters)
197 {
198 const std::string loopIndexStr = std::to_string(loopIndex);
199
200 // start slotted skill execution
201 const auto& executorRes = executeFluxioSkill(skill.id, profileId, executorName, parameters);
202 if (!executorRes.isSuccess())
203 {
204 ARMARX_INFO << "Failed to execute slotted skill " << skill.name;
205 return std::nullopt;
206 }
207 auto executorPtr = executorRes.getResult();
208 if (executorPtr == nullptr)
209 {
210 ARMARX_INFO << "Failed to get executor pointer";
211 return std::nullopt;
212 }
213
214 slottedExecutionId = executorPtr->id;
215 std::unique_lock executionsLock(subExecutionsMapMutex);
216 subExecutionsMap[loopIndexStr] = executorPtr;
217 executionsLock.unlock();
218 bool skillRunning = true;
219 skills::FluxioSkillStatusUpdate statusUpdate;
220 while (skillRunning)
221 {
222 std::this_thread::sleep_for(std::chrono::milliseconds(250));
223 executorPtr->getStatusUpdate(); // FIXME: bad design (use call-backs instead)
224 const auto& statusUpdateIt = executorPtr->getStatus();
225
226 if (!statusUpdateIt.has_value())
227 {
228 ARMARX_INFO << "No status update from slotted skill " << skill.name
229 << " yet. Waiting...";
230 continue;
231 }
232
233 statusUpdate = statusUpdateIt.value();
234 // did the status change? update statusUpdates list
235 std::unique_lock statusMapLock(statusUpdatesMutex);
236 const auto& lastUpdate =
237 std::find_if(statusUpdates.begin(),
238 statusUpdates.end(),
239 [&loopIndexStr](const skills::FluxioSkillStatusUpdate& statusUpdate)
240 { return statusUpdate.subSkillNodeId == loopIndexStr; });
241
242 if (lastUpdate == statusUpdates.end() || lastUpdate->status != statusUpdate.status)
243 {
244 statusUpdates.push_front(
245 {armarx::DateTime::Now(), executorPtr->id, loopIndexStr, statusUpdate.status});
246 }
247
248 statusMapLock.unlock();
249
250 // check subskill is finished
251 if (statusUpdate.status == skills::SkillStatus::Succeeded ||
252 statusUpdate.status == skills::SkillStatus::Failed ||
253 statusUpdate.status == skills::SkillStatus::Aborted)
254 {
255 skillRunning = false;
256 break;
257 }
258 }
259 return statusUpdate;
260 }
261
262 void
269
270 std::optional<std::vector<skills::FluxioSkillStatusUpdate>>
272 {
273 // convert statusupdates list to vector
274 std::shared_lock statusMapLock(statusUpdatesMutex);
275 auto ret = std::vector<skills::FluxioSkillStatusUpdate>(statusUpdates.begin(),
276 statusUpdates.end());
277 statusMapLock.unlock();
278 return ret;
279 }
280
281 void
283 {
284 std::shared_lock executionsLock(subExecutionsMapMutex);
285 for (const auto& [nodeId, executorPtr] : subExecutionsMap)
286 {
287 auto s = executorPtr->getStatus();
288 if (!s.has_value() || s->status == skills::SkillStatus::Succeeded ||
289 s->status == skills::SkillStatus::Failed ||
290 s->status == skills::SkillStatus::Aborted)
291 {
292 continue;
293 }
294
295 executorPtr->abort();
296 std::unique_lock statusMapLock(statusUpdatesMutex);
297 statusUpdates.push_front(
298 {armarx::DateTime::Now(), executorPtr->id, nodeId, skills::SkillStatus::Aborted});
299 statusMapLock.unlock();
300 }
301 executionsLock.unlock();
302 }
303
304 std::optional<skills::FluxioSkillStatusUpdate>
306 {
307 std::scoped_lock l(subExecutionsMapMutex);
308
309 const auto& loopIndexStr = std::to_string(loopIndex);
310 const auto& ret = subExecutionsMap.find(loopIndexStr);
311 if (ret == subExecutionsMap.end() || ret->second == nullptr)
312 {
313 return std::nullopt;
314 }
315
316 return ret->second->getStatus();
317 }
318} // namespace armarx::skills
static DateTime Now()
Definition DateTime.cpp:51
FluxioExecutor(const FluxioExecutor &)=delete
armarx::aron::data::DictPtr result
std::list< skills::FluxioSkillStatusUpdate > statusUpdates
std::shared_mutex statusUpdatesMutex
std::map< std::string, armarx::aron::data::DictPtr > possibleInputs
std::optional< std::string > executorName
virtual void setStatus(skills::SkillStatus status, const std::string &nodeId="noId")
std::shared_mutex possibleInputsMutex
void run(std::string executorName, armarx::aron::data::DictPtr parameters, std::experimental::observer_ptr< const FluxioProfile > profilePtr) override
std::optional< skills::FluxioSkillStatusUpdate > getSlottedStatus()
FluxioLoopExecutor(const std::string &id, const skills::FluxioSkill &skill, bool isRetry, const std::function< skills::Result< std::experimental::observer_ptr< FluxioExecutor >, skills::error::FluxioException >(const std::string &skillId, const std::string &profileId, const std::string &executorName, armarx::aron::data::DictPtr parameters)> &&executeFluxioSkillFunc)
std::optional< std::string > slottedExecutionId
std::optional< std::vector< skills::FluxioSkillStatusUpdate > > getStatusUpdate() override
A base class for skill exceptions.
#define ARMARX_INFO
The normal logging level.
Definition Logging.h:181
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
std::shared_ptr< Dict > DictPtr
Definition Dict.h:42
This file is part of ArmarX.