5#include <experimental/memory>
10#include <shared_mutex>
31 const std::string&
id,
36 const std::string& skillId,
37 const std::string& profileId,
41 executeFluxioSkill(executeFluxioSkillFunc),
47 for (
const auto& node : skill.nodes)
53 result = std::make_shared<armarx::aron::data::Dict>();
61 ARMARX_INFO <<
"Running loop with slotted skill " << skill.name;
68 bool infiniteLoop =
false;
69 if (!parameters->hasElement(
"Iterations"))
73 ARMARX_INFO <<
"max Iterations not set, using default (infinite)";
78 ARMARX_INFO <<
"Missing 'Iterations' parameter for loop node.";
86 const auto& valuePtr =
88 if (valuePtr ==
nullptr)
90 ARMARX_INFO <<
"Invalid 'Iterations' parameter for loop node.";
94 iterations = valuePtr->getValue();
99 std::make_shared<armarx::aron::data::Dict>();
100 for (
const auto& paramIt : skill.parameters)
102 const auto& paramId = paramIt.first;
103 if (!parameters->hasElement(paramId))
105 ARMARX_WARNING <<
"Failed to get possible input for parameter " << paramId;
110 const auto& value = parameters->getElement(paramId);
111 slottedSkillParams->addElement(paramId, value->cloneAsVariant());
119 const std::string newExecutorName =
120 executorName +
"/Loop (" + (isRetry ?
"Retry" :
"Repeat") +
")";
121 bool loopRunning =
true;
122 bool allIterationsSucceeded =
true;
130 executeSlottedSkill(profilePtr->id, newExecutorName, slottedSkillParams);
132 if (!ret.has_value())
134 ARMARX_INFO <<
"Failed to execute slotted skill " << skill.name;
139 const auto& skillStatus = ret->status;
156 allIterationsSucceeded =
false;
159 if (!infiniteLoop && loopIndex >= iterations - 1)
177 const auto& res = subExecutionsMap.find(std::to_string(loopIndex));
178 if (res == subExecutionsMap.end() || res->second ==
nullptr)
183 const auto& resultsCopy = res->second->getResultsCopy();
184 if (resultsCopy !=
nullptr)
186 for (
const auto& [k, v] : resultsCopy->getElements())
188 result->setElementCopy(k, v);
193 std::optional<FluxioSkillStatusUpdate>
194 FluxioLoopExecutor::executeSlottedSkill(
const std::string& profileId,
195 const std::string& executorName,
198 const std::string loopIndexStr = std::to_string(loopIndex);
201 const auto& executorRes = executeFluxioSkill(skill.
id, profileId,
executorName, parameters);
202 if (!executorRes.isSuccess())
207 auto executorPtr = executorRes.getResult();
208 if (executorPtr ==
nullptr)
215 std::unique_lock executionsLock(subExecutionsMapMutex);
216 subExecutionsMap[loopIndexStr] = executorPtr;
217 executionsLock.unlock();
218 bool skillRunning =
true;
219 skills::FluxioSkillStatusUpdate statusUpdate;
222 std::this_thread::sleep_for(std::chrono::milliseconds(250));
223 executorPtr->getStatusUpdate();
224 const auto& statusUpdateIt = executorPtr->getStatus();
226 if (!statusUpdateIt.has_value())
229 <<
" yet. Waiting...";
233 statusUpdate = statusUpdateIt.value();
236 const auto& lastUpdate =
239 [&loopIndexStr](
const skills::FluxioSkillStatusUpdate& statusUpdate)
240 { return statusUpdate.subSkillNodeId == loopIndexStr; });
242 if (lastUpdate ==
statusUpdates.end() || lastUpdate->status != statusUpdate.status)
248 statusMapLock.unlock();
255 skillRunning =
false;
270 std::optional<std::vector<skills::FluxioSkillStatusUpdate>>
275 auto ret = std::vector<skills::FluxioSkillStatusUpdate>(
statusUpdates.begin(),
277 statusMapLock.unlock();
284 std::shared_lock executionsLock(subExecutionsMapMutex);
285 for (
const auto& [nodeId, executorPtr] : subExecutionsMap)
287 auto s = executorPtr->getStatus();
295 executorPtr->abort();
299 statusMapLock.unlock();
301 executionsLock.unlock();
304 std::optional<skills::FluxioSkillStatusUpdate>
307 std::scoped_lock l(subExecutionsMapMutex);
309 const auto& loopIndexStr = std::to_string(loopIndex);
310 const auto& ret = subExecutionsMap.find(loopIndexStr);
311 if (ret == subExecutionsMap.end() || ret->second ==
nullptr)
316 return ret->second->getStatus();
static PointerType DynamicCastAndCheck(const VariantPtr &n)
FluxioExecutor(const FluxioExecutor &)=delete
armarx::aron::data::DictPtr result
std::shared_mutex resultMutex
std::list< skills::FluxioSkillStatusUpdate > statusUpdates
std::shared_mutex statusUpdatesMutex
std::map< std::string, armarx::aron::data::DictPtr > possibleInputs
std::optional< std::string > executorName
virtual void setStatus(skills::SkillStatus status, const std::string &nodeId="noId")
std::shared_mutex possibleInputsMutex
void run(std::string executorName, armarx::aron::data::DictPtr parameters, std::experimental::observer_ptr< const FluxioProfile > profilePtr) override
std::optional< skills::FluxioSkillStatusUpdate > getSlottedStatus()
FluxioLoopExecutor(const std::string &id, const skills::FluxioSkill &skill, bool isRetry, const std::function< skills::Result< std::experimental::observer_ptr< FluxioExecutor >, skills::error::FluxioException >(const std::string &skillId, const std::string &profileId, const std::string &executorName, armarx::aron::data::DictPtr parameters)> &&executeFluxioSkillFunc)
std::optional< std::string > slottedExecutionId
void abortSubExecutions()
std::optional< std::vector< skills::FluxioSkillStatusUpdate > > getStatusUpdate() override
A base class for skill exceptions.
#define ARMARX_INFO
The normal logging level.
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
std::shared_ptr< Dict > DictPtr
This file is part of ArmarX.