SkillEventSegment.cpp
Go to the documentation of this file.
1#include "SkillEventSegment.h"
2
3#include <atomic>
4
5#include <SimoxUtility/algorithm/string.h>
6
9#include <RobotAPI/libraries/armem_skills/aron/Skill.aron.generated.h>
12
14{
15
22
23 void
25 {
26 this->setDefaultMaxHistorySize(300);
27 Base::defineProperties(defs, prefix);
28
29 defs->topic(p.skillEventTopic, "SkillEventListener", prefix + "tpc.pub.SkillEventListener");
30 }
31
32 void
37
38 void
40 {
41 // create commit about new update
42 armarx::skills::arondto::SkillStatusUpdate event;
43 armem::toAron(event, update);
44
45 armem::MemoryID commitId = id();
46 commitId.providerSegmentName = event.skillId.providerId.providerName;
47 commitId.entityName = event.skillId.skillName;
48
49 auto aron = event.toAron();
50
51 armem::Commit comm;
52 auto& entityUpdate = comm.add();
53 entityUpdate.confidence = 1.0;
54 entityUpdate.referencedTime = armem::Time::Now();
55 entityUpdate.instancesData = {aron};
56 entityUpdate.entityID = commitId;
57
58 // Publish to the skill event topic. Tolerate two failure modes that
59 // would otherwise crash the SkillMemory component:
60 // 1. The topic proxy is null (e.g. topic was never wired up).
61 // 2. The Ice RPC throws (network error, marshalling, etc.).
62 // In both cases we skip the publish, log a warning ONCE per failure
63 // mode, and still commit the update to local memory below.
64 if (p.skillEventTopic)
65 {
66 try
67 {
68 p.skillEventTopic->reportSkillEvent(
69 update.toProviderIce(), event.skillId.providerId.providerName);
70 }
71 catch (const std::exception& e)
72 {
73 static std::atomic_flag warned = ATOMIC_FLAG_INIT;
74 if (!warned.test_and_set())
75 {
76 ARMARX_WARNING << "Failed to publish skill event on topic '"
77 << "SkillEventListener"
78 << "': " << e.what()
79 << ". Subsequent failures of this kind will be "
80 "silently ignored. The local memory commit "
81 "still proceeds.";
82 }
83 }
84 }
85 else
86 {
87 static std::atomic_flag warned = ATOMIC_FLAG_INIT;
88 if (!warned.test_and_set())
89 {
90 ARMARX_WARNING << "SkillEventListener topic proxy is null; "
91 "skill events will not be published. "
92 "This warning is only logged once. "
93 "The local memory commit still proceeds.";
94 }
95 }
96
97 iceMemory.commitLocking(comm);
98 }
99
100 std::map<skills::SkillExecutionID, skills::SkillStatusUpdate>
102 {
103 std::map<skills::SkillExecutionID, skills::SkillStatusUpdate> ret;
104 auto coreSegment = this->segmentPtr;
105 ARMARX_CHECK(coreSegment);
106
107 coreSegment->doLocked(
108 [&]()
109 {
110 coreSegment->forEachInstance(
111 [&](const armem::wm::EntityInstance& i)
112 {
113 // dataAs<>() throws on type mismatch / corrupted aron
114 // data. Skip the bad entry instead of crashing the
115 // whole getSkillStatusUpdates() call.
117 try
118 {
119 auto event =
120 i.dataAs<armarx::skills::arondto::SkillStatusUpdate>();
121 armem::fromAron(event, up);
122 }
123 catch (const std::exception& e)
124 {
125 ARMARX_WARNING << "Skipping malformed SkillStatusUpdate "
126 "instance in SkillEvent segment: "
127 << e.what();
128 return;
129 }
130
131 if (auto it = ret.find(up.executionId); it != ret.end() && up < it->second)
132 {
133 return;
134 }
135
136 // set or replace
137 ret[up.executionId] = up;
138 });
139 });
140
141 // for (const auto& [k, v] : ret)
142 // {
143 // ARMARX_IMPORTANT << "Skill " << k.skillId << " has stati: " << int(v.status);
144 // }
145
146 return ret;
147 }
148
149 std::optional<skills::SkillStatusUpdate>
151 {
152 std::optional<skills::SkillStatusUpdate> ret = std::nullopt;
153 auto coreSegment = this->segmentPtr;
154 ARMARX_CHECK(coreSegment);
155
156 coreSegment->doLocked(
157 [&]()
158 {
159 coreSegment->forEachInstance(
160 [&](const armem::wm::EntityInstance& i)
161 {
162 // dataAs<>() throws on type mismatch / corrupted aron
163 // data. Skip the bad entry instead of crashing the
164 // whole getSkillStatusUpdate() call.
166 try
167 {
168 auto event =
169 i.dataAs<armarx::skills::arondto::SkillStatusUpdate>();
170 armem::fromAron(event, up);
171 }
172 catch (const std::exception& e)
173 {
174 ARMARX_WARNING << "Skipping malformed SkillStatusUpdate "
175 "instance in SkillEvent segment: "
176 << e.what();
177 return;
178 }
179
180 if (up.executionId == id)
181 {
182 if (!ret || (ret.has_value() && *ret < up))
183 {
184 ret = up;
185 }
186 }
187 });
188 });
189 return ret;
190 }
191
192 std::size_t
194 const std::string& memoryName,
195 bool& consolidated)
196 {
197 consolidated = false;
198 std::size_t snapshotCount = 0;
199 auto coreSegment = this->segmentPtr;
200 ARMARX_CHECK(coreSegment);
201
202 coreSegment->doLockedExclusive(
203 [&]()
204 {
205 coreSegment->forEachSnapshot(
206 [&](const armem::wm::EntitySnapshot&) { ++snapshotCount; });
207
208 if (ltm.isRecording() && snapshotCount > 0)
209 {
210 // Build a WM slice holding only this core segment so that
211 // only the data we are about to clear is written to LTM.
212 armem::wm::Memory slice;
213 slice.setName(memoryName);
214 slice.update(armem::toCommit(*coreSegment), true);
215 ltm.directlyStore(slice);
216 consolidated = true;
217 }
218
219 coreSegment->clear();
220 });
221
222 return snapshotCount;
223 }
224} // namespace armarx::skills::segment
std::string entityName
Definition MemoryID.h:53
std::string providerSegmentName
Definition MemoryID.h:52
AronDtoT dataAs() const
Get the data converted to a generated Aron DTO class.
Helps connecting a Memory server to the Ice interface.
A memory storing data on the hard drive and in mongodb (needs 'armarx memory start' to start the mong...
Definition Memory.h:24
void directlyStore(const armem::wm::Memory &memory, bool simulatedVersion=false)
virtual void defineProperties(armarx::PropertyDefinitionsPtr defs, const std::string &prefix="") override
Client-side working entity instance.
Client-side working memory entity snapshot.
Client-side working memory.
static DateTime Now()
Definition DateTime.cpp:51
void addSkillUpdateEvent(const skills::SkillStatusUpdate &update)
std::size_t clearAndConsolidate(armem::server::ltm::Memory &ltm, const std::string &memoryName, bool &consolidated)
Clear all snapshots from the working memory of this core segment, consolidating them into ltm first i...
SkillEventCoreSegment(armem::server::MemoryToIceAdapter &iceMemory)
std::map< skills::SkillExecutionID, skills::SkillStatusUpdate > getSkillStatusUpdates()
std::optional< skills::SkillStatusUpdate > getSkillStatusUpdate(const skills::SkillExecutionID &id)
void defineProperties(PropertyDefinitionsPtr defs, const std::string &prefix)
#define ARMARX_CHECK(expression)
Shortcut for ARMARX_CHECK_EXPRESSION.
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
void fromAron(const arondto::MemoryID &dto, MemoryID &bo)
Commit toCommit(const ContainerT &container)
Definition operations.h:23
void toAron(arondto::MemoryID &dto, const MemoryID &bo)
This file offers overloads of toIce() and fromIce() functions for STL container types.
IceUtil::Handle< class PropertyDefinitionContainer > PropertyDefinitionsPtr
PropertyDefinitions smart pointer type.
This file is part of ArmarX.
A bundle of updates to be sent to the memory.
Definition Commit.h:90
EntityUpdate & add()
Definition Commit.cpp:80
float confidence
An optional confidence, may be used for things like decay.
Definition Commit.h:43