FluxioMergerExecutor.cpp
Go to the documentation of this file.
2
3#include <mutex>
4#include <optional>
5#include <string>
6#include <thread>
7#include <vector>
8
10
12
13namespace armarx::skills
14{
16 const std::vector<std::string>& parameterIds) :
17 FluxioExecutor(id, false)
18 {
19 for (const auto& id : parameterIds)
20 {
21 tokenHasArrivedMap[id] = false;
22 }
23 ARMARX_WARNING << "Fluxio Merger is waiting for the following tokens to arrive: "
24 << parameterIds;
25 }
26
27 void
29 armarx::aron::data::DictPtr /*parameters*/,
31 {
32 // set status running
34 this->executorName = executorName;
35
36 // wait for all tokens to arrive
37 while (true)
38 {
39 std::this_thread::sleep_for(std::chrono::milliseconds(250));
40
41 // check if all tokens have arrived
42 bool allTokensArrived = true;
43 for (const auto& [id, hasArrived] : tokenHasArrivedMap)
44 {
45 if (!hasArrived)
46 {
47 allTokensArrived = false;
48 break;
49 }
50 }
51
52 if (status->status == skills::SkillStatus::Aborted ||
55 {
56 return;
57 }
58
59 if (allTokensArrived)
60 {
62 return;
63 }
64 }
65 }
66
67 void
72
73 void
74 FluxioMergerExecutor::checkInToken(const std::string& parameterId)
75 {
76 std::unique_lock l(tokenHasArrivedMapMutex);
77 const auto it = tokenHasArrivedMap.find(parameterId);
78
79 if (it == tokenHasArrivedMap.end())
80 {
81 ARMARX_WARNING << "Unexpected parameterId: " << parameterId;
82 }
83 else
84 {
85 ARMARX_WARNING << "Fluxio Merger received token for parameterId: " << parameterId;
86 tokenHasArrivedMap[parameterId] = true;
87 }
88
89 l.unlock();
90 }
91
92 std::optional<std::vector<skills::FluxioSkillStatusUpdate>>
94 {
95 // unused method
96 return std::nullopt;
97 }
98} // namespace armarx::skills
FluxioExecutor(const FluxioExecutor &)=delete
std::optional< skills::FluxioSkillStatusUpdate > status
std::optional< std::string > executorName
virtual void setStatus(skills::SkillStatus status, const std::string &nodeId="noId")
void run(std::string executorName, armarx::aron::data::DictPtr parameters, std::experimental::observer_ptr< const FluxioProfile > profilePtr) override
std::optional< std::vector< skills::FluxioSkillStatusUpdate > > getStatusUpdate() override
void checkInToken(const std::string &parameterId)
FluxioMergerExecutor(const std::string &id, const std::vector< std::string > &parameterIds)
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
std::shared_ptr< Dict > DictPtr
Definition Dict.h:42
This file is part of ArmarX.