InterventionObserver.cpp
Go to the documentation of this file.
1/*
2 * This file is part of ArmarX.
3 *
4 * ArmarX is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License version 2 as
6 * published by the Free Software Foundation.
7 *
8 * ArmarX is distributed in the hope that it will be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
15 *
16 * @package RobotAPI::ArmarXObjects::InterventionObserver
17 * @author Niklas Arlt ( niklas dot arlt at kit dot edu )
18 * @date 2026
19 * @copyright http://www.gnu.org/licenses/gpl-2.0.txt
20 * GNU General Public License
21 */
22
24
25#include <cmath>
26#include <sstream>
27
32
36#include <RobotAPI/libraries/armem_skills/aron/Skill.aron.generated.h>
38
39namespace armarx
40{
41 void
43 {
44 ARMARX_INFO << "oninit InterventionObserver";
45
46 usingTopic(getProperty<std::string>("GamepadTopicName").getValue());
47 usingTopic(getProperty<std::string>("EmergencyStopTopic").getValue());
48
49 agentName = getProperty<std::string>("AgentName").getValue();
50 assistanceSessionTimeoutSeconds =
51 getProperty<float>("AssistanceSessionTimeoutSeconds").getValue();
52
53 // usingProxy(getProperty<std::string>("PlatformUnitName").getValue());
54
55 ARMARX_INFO << "oninit InterventionObserver end";
56 }
57
58 void
60 {
61 ARMARX_INFO << "onConnect InterventionObserver";
62
63 // Try to connect immediately. If the memory is already up this succeeds.
64 // If not, ensureMemoryConnection() will retry on every incoming event.
65 ensureMemoryConnection();
66 }
67
68 bool
69 InterventionObserver::ensureMemoryConnection()
70 {
71 if (memoryConnected.load())
72 {
73 return true;
74 }
75
76 // Double-checked locking: try_to_lock avoids blocking event callbacks
77 // while another thread is already attempting connection.
78 std::unique_lock lock(memoryConnectionMutex, std::try_to_lock);
79 if (!lock.owns_lock())
80 {
81 return false;
82 }
83
84 if (memoryConnected.load())
85 {
86 return true;
87 }
88
89 try
90 {
91 // Use getWriter/getReader which resolve immediately (non-blocking).
92 // They throw if the memory server hasn't registered yet.
93 const auto memoryId = armem::MemoryID().withMemoryName("TaskOutcome");
94 memoryNameSystem().getWriter(memoryId);
95 memoryNameSystem().getReader(memoryId);
96
97 // Memory is available — now connect writer/reader properly.
98 // Since the server is already registered, waitForServer() returns immediately.
99 writer.connect(memoryNameSystem());
100 reader.connect(memoryNameSystem());
101
102 // Connect reader for the Skill memory to query latest skill events.
103 const auto skillMemoryId = armem::MemoryID().withMemoryName("Skill");
104 skillMemoryReader = memoryNameSystem().useReader(skillMemoryId);
105
106 memoryConnected.store(true);
107 ARMARX_INFO << "InterventionObserver: Memory connection established.";
108 }
109 catch (...)
110 {
112 << "InterventionObserver: TaskOutcome memory not available yet. "
113 "Will retry on next event.";
114 }
115
116 return memoryConnected.load();
117 }
118
119 void
121 {
122 memoryConnected = false;
123 }
124
125 void
127 {
128 ARMARX_INFO << "exit InterventionObserver";
129 }
130
139
140 // Called on every gamepad event. Handles three concerns:
141 // 1) Back button press -> commit a SUSPENDED TaskOutcome (skill was interrupted).
142 // 2) Start button press -> find the last SUSPENDED entry and mark it recovered.
143 // 3) Any analog/button input -> track as an "assistance session" and commit
144 // a summary after the operator goes idle for the configured timeout.
145 void
147 const std::string& name,
148 const GamepadData& data,
149 const TimestampBasePtr& timestamp,
150 const Ice::Current& c)
151 {
152 // Always update rising-edge detection state, even when the memory is
153 // not yet available, so we don't miss transitions or get stuck.
154 const bool backRisingEdge = data.backButton && !prevBackButton;
155 prevBackButton = data.backButton;
156
157 const bool startRisingEdge = data.startButton && !prevStartButton;
158 prevStartButton = data.startButton;
159
160 if (!ensureMemoryConnection())
161 {
162 return;
163 }
164
165 // --- 1) Back button: suspend the currently running skill ---
166 if (backRisingEdge)
167 {
168 ARMARX_INFO << "Back button pressed — committing SUSPENDED TaskOutcome";
169
170 const std::string suspendedSkillName = queryLatestSkillName();
171 ARMARX_INFO << "Skill suspended by gamepad back button: " << suspendedSkillName;
172
174 outcome.taskName = suspendedSkillName;
177 outcome.agent = agentName;
179 outcome.endTime = armarx::DateTime::Now();
180
183 outcome.failureInfo = fi;
184
185 outcome.context.skillName = suspendedSkillName;
186 outcome.context.additional = {
187 {"source", "gamepad"},
188 {"device", device},
189 {"name", name},
190 {"leftStickX", std::to_string(data.leftStickX)},
191 {"leftStickY", std::to_string(data.leftStickY)},
192 {"rightStickX", std::to_string(data.rightStickX)},
193 {"rightStickY", std::to_string(data.rightStickY)},
194 {"dPadX", std::to_string(data.dPadX)},
195 {"dPadY", std::to_string(data.dPadY)},
196 {"leftTrigger", std::to_string(data.leftTrigger)},
197 {"rightTrigger", std::to_string(data.rightTrigger)},
198 {"leftButton", std::to_string(data.leftButton)},
199 {"rightButton", std::to_string(data.rightButton)},
200 {"backButton", std::to_string(data.backButton)},
201 {"startButton", std::to_string(data.startButton)},
202 {"xButton", std::to_string(data.xButton)},
203 {"yButton", std::to_string(data.yButton)},
204 {"aButton", std::to_string(data.aButton)},
205 {"bButton", std::to_string(data.bButton)},
206 {"theMiddleButton", std::to_string(data.theMiddleButton)},
207 {"leftStickButton", std::to_string(data.leftStickButton)},
208 {"rightStickButton", std::to_string(data.rightStickButton)},
209 };
210
211 const bool success =
212 writer.commitTaskOutcome(outcome, providerName, armem::Time::Now());
213
214 if (success)
215 {
216 ARMARX_INFO << "Successfully committed SUSPENDED TaskOutcome for gamepad intervention.";
217 }
218 else
219 {
220 ARMARX_WARNING << "Failed to commit SUSPENDED TaskOutcome for gamepad intervention.";
221 }
222 }
223
224 // --- 2) Start button: mark the last gamepad-caused suspension as recovered ---
225 if (startRisingEdge)
226 {
227 ARMARX_INFO << "Start button pressed — checking for SUSPENDED entry to mark as recovered";
228
230 .providerName = providerName,
231 .taskTypeFilter = std::nullopt,
233 .timestamp = armem::Time::Now(),
234 .maxAge = armarx::Duration::Hours(24)};
235
236 const auto result = reader.query(q);
237
238 if (!result || result.outcomes.empty())
239 {
240 ARMARX_WARNING << "Start button pressed but no SUSPENDED TaskOutcome found in memory. Skipping recovery.";
241 }
242 else
243 {
244 // Find the most recent SUSPENDED outcome from the gamepad source.
245 const armem::task_outcome::TaskOutcome* lastSuspended = nullptr;
246 for (const auto& o : result.outcomes)
247 {
248 auto it = o.context.additional.find("source");
249 if (it == o.context.additional.end() || it->second != "gamepad")
250 {
251 continue;
252 }
253 if (!lastSuspended || o.endTime > lastSuspended->endTime)
254 {
255 lastSuspended = &o;
256 }
257 }
258
259 if (!lastSuspended)
260 {
261 ARMARX_WARNING << "Start button pressed but no gamepad-sourced SUSPENDED entry found. Skipping recovery.";
262 }
263 else
264 {
265 // Re-commit the suspended entry with recovery info attached. This
266 // creates a new snapshot rather than modifying the original, so
267 // the full history (suspension + recovery) is preserved.
268 armem::task_outcome::TaskOutcome recovered = *lastSuspended;
269 recovered.couldRecover = true;
271 .recoveryTime = armarx::DateTime::Now(),
272 .recoveryMeasure = "resume by gamepad triggered"};
273
274 const bool success =
275 writer.commitTaskOutcome(recovered, providerName, armem::Time::Now());
276
277 if (success)
278 {
279 ARMARX_INFO << "Successfully committed recovery for task '"
280 << recovered.taskName << "'.";
281 }
282 else
283 {
284 ARMARX_WARNING << "Failed to commit recovery TaskOutcome.";
285 }
286 }
287 }
288 }
289
290 // --- 3) Assistance session tracking ---
291 // Classify which gamepad inputs are active this frame, then accumulate
292 // them into a session. A session starts on the first non-idle input and
293 // ends (gets committed) after no input for `assistanceSessionTimeoutSeconds`.
294 lastGamepadDevice = device;
295 lastGamepadName = name;
296
297 constexpr float deadzone = 0.1f;
298
299 std::set<std::string> activeActions;
300
301 if (std::abs(data.leftTrigger) > deadzone || std::abs(data.rightTrigger) > deadzone)
302 {
303 activeActions.insert("manual gripper control");
304 }
305 if (std::abs(data.leftStickX) > deadzone || std::abs(data.leftStickY) > deadzone)
306 {
307 activeActions.insert("manual platform navigation");
308 }
309 if (std::abs(data.rightStickX) > deadzone || std::abs(data.rightStickY) > deadzone)
310 {
311 activeActions.insert("rightStick");
312 }
313 if (std::abs(data.dPadX) > deadzone || std::abs(data.dPadY) > deadzone)
314 {
315 activeActions.insert("dPad");
316 }
317 if (data.aButton)
318 {
319 activeActions.insert("manual action button");
320 }
321 if (data.bButton)
322 {
323 activeActions.insert("manual action button");
324 }
325 if (data.xButton)
326 {
327 activeActions.insert("manual action button");
328 }
329 if (data.yButton)
330 {
331 activeActions.insert("manual action button");
332 }
333 if (data.leftButton)
334 {
335 activeActions.insert("manual bumper action");
336 }
337 if (data.rightButton)
338 {
339 activeActions.insert("manual bumper action");
340 }
341 if (data.theMiddleButton)
342 {
343 activeActions.insert("theMiddleButton");
344 }
345 if (data.leftStickButton)
346 {
347 activeActions.insert("leftStickButton");
348 }
349 if (data.rightStickButton)
350 {
351 activeActions.insert("rightStickButton");
352 }
353
354 const auto now = armarx::DateTime::Now();
355 const bool hasInput = !activeActions.empty();
356
357 if (hasInput)
358 {
359 if (!assistanceSessionActive)
360 {
361 // Start a new session
362 assistanceSessionActive = true;
363 assistanceSessionStart = now;
364 assistanceActionsInSession = activeActions;
365 ARMARX_INFO << "Assistance session started";
366 }
367 else
368 {
369 // Merge actions into existing session
370 assistanceActionsInSession.insert(activeActions.begin(), activeActions.end());
371 }
372 assistanceLastActivityTime = now;
373 }
374 else if (assistanceSessionActive)
375 {
376 const auto elapsed = now - assistanceLastActivityTime;
377 if (elapsed > armarx::Duration::SecondsDouble(assistanceSessionTimeoutSeconds))
378 {
379 // Commit the session
380 std::ostringstream actionStr;
381 bool first = true;
382 for (const auto& a : assistanceActionsInSession)
383 {
384 if (!first)
385 {
386 actionStr << ", ";
387 }
388 actionStr << a;
389 first = false;
390 }
391
392 const auto durationMs =
393 (assistanceLastActivityTime - assistanceSessionStart).toMilliSeconds();
394
396 outcome.taskName = "GamepadIntervention";
399 outcome.agent = agentName;
400 outcome.startTime = assistanceSessionStart;
401 outcome.endTime = assistanceLastActivityTime;
402 outcome.context.skillName = "unknown";
403 outcome.context.additional = {
404 {"source", "gamepad_assistance"},
405 {"actions", actionStr.str()},
406 {"duration_ms", std::to_string(durationMs)},
407 {"device", lastGamepadDevice},
408 {"name", lastGamepadName},
409 };
410
411 const bool success =
412 writer.commitTaskOutcome(outcome, providerName, armem::Time::Now());
413
414 if (success)
415 {
416 ARMARX_INFO << "Committed assistance session: actions=[" << actionStr.str()
417 << "], duration=" << durationMs << "ms";
418 }
419 else
420 {
421 ARMARX_WARNING << "Failed to commit assistance session TaskOutcome.";
422 }
423
424 // Reset session state
425 assistanceSessionActive = false;
426 assistanceActionsInSession.clear();
427 }
428 }
429 }
430
431 std::string
432 InterventionObserver::queryLatestSkillName()
433 {
434 if (!skillMemoryReader)
435 {
436 return "unknown";
437 }
438
439 try
440 {
441 // Query the latest snapshot of every entity in the SkillEvent segment.
442 // Each entity represents one skill execution; its latest snapshot holds
443 // the most recent status update for that execution.
445 qb.coreSegments()
446 .withName("SkillEvent")
448 .all()
449 .entities()
450 .all()
451 .snapshots()
452 .latest();
453
454 const auto result = skillMemoryReader.query(qb);
455 if (!result.success)
456 {
457 ARMARX_WARNING << "Failed to query Skill memory for latest skill event.";
458 return "unknown";
459 }
460
461 const auto* cs = result.memory.findCoreSegment("SkillEvent");
462 if (!cs)
463 {
464 return "unknown";
465 }
466
467 // Among all skill executions, pick the one that was started most
468 // recently — that is the skill the operator most likely intended
469 // to interrupt.
470 std::string latestSkillName = "unknown";
471 armarx::core::time::DateTime latestTime = armarx::core::time::DateTime::Invalid();
472
473 cs->forEachEntity(
474 [&](const armem::wm::Entity& entity)
475 {
476 if (const auto* instance = entity.findLatestInstance(); instance)
477 {
478 auto aronUpdate =
479 skills::arondto::SkillStatusUpdate::FromAron(instance->data());
480 if (!latestTime.isValid() ||
481 aronUpdate.executionStartedTimestamp > latestTime)
482 {
483 latestTime = aronUpdate.executionStartedTimestamp;
484 latestSkillName = aronUpdate.skillId.skillName;
485 }
486 }
487 });
488
489 return latestSkillName;
490 }
491 catch (const std::exception& e)
492 {
493 ARMARX_WARNING << "Exception querying Skill memory: " << e.what();
494 return "unknown";
495 }
496 }
497
498 // Called on every emergency stop state change. Only reacts to transitions:
499 // INACTIVE -> ACTIVE: commit a SUSPENDED TaskOutcome for the currently running skill.
500 // ACTIVE -> INACTIVE: find the last emergency-stop-caused suspension and mark it recovered.
501 void
502 InterventionObserver::reportEmergencyStopState(EmergencyStopState state, const Ice::Current&)
503 {
504 if (!ensureMemoryConnection())
505 {
506 // Still track the state so we detect the correct transition once
507 // the memory becomes available.
508 prevEmergencyStopState = state;
509 return;
510 }
511
512 // Detect state transitions; ignore repeated reports of the same state.
513 const bool becameActive =
514 (state == EmergencyStopState::eEmergencyStopActive &&
515 prevEmergencyStopState == EmergencyStopState::eEmergencyStopInactive);
516 const bool becameInactive =
517 (state == EmergencyStopState::eEmergencyStopInactive &&
518 prevEmergencyStopState == EmergencyStopState::eEmergencyStopActive);
519 prevEmergencyStopState = state;
520
521 if (becameActive)
522 {
523 ARMARX_INFO << "Emergency stop activated — committing SUSPENDED TaskOutcome";
524
525 const std::string suspendedSkillName = queryLatestSkillName();
526 ARMARX_INFO << "Skill suspended by emergency stop: " << suspendedSkillName;
527
529 outcome.taskName = suspendedSkillName;
532 outcome.agent = agentName;
534 outcome.endTime = armarx::DateTime::Now();
535
538 outcome.failureInfo = fi;
539
540 outcome.context.skillName = suspendedSkillName;
541 outcome.context.additional = {
542 {"source", "emergency_stop"},
543 {"emergencyStopState", "active"}};
544
545 const bool success =
546 writer.commitTaskOutcome(outcome, providerName, armem::Time::Now());
547
548 if (success)
549 {
550 ARMARX_INFO << "Successfully committed SUSPENDED TaskOutcome for emergency stop.";
551 }
552 else
553 {
554 ARMARX_WARNING << "Failed to commit SUSPENDED TaskOutcome for emergency stop.";
555 }
556 }
557
558 if (becameInactive)
559 {
560 ARMARX_INFO << "Emergency stop released — checking for SUSPENDED entry to mark as recovered";
561
563 .providerName = providerName,
564 .taskTypeFilter = std::nullopt,
566 .timestamp = armem::Time::Now(),
567 .maxAge = armarx::Duration::Hours(24)};
568
569 const auto result = reader.query(q);
570
571 if (!result || result.outcomes.empty())
572 {
573 ARMARX_WARNING << "Emergency stop released but no SUSPENDED TaskOutcome found in memory. Skipping recovery.";
574 return;
575 }
576
577 // Find the most recent SUSPENDED outcome from the emergency_stop source.
578 const armem::task_outcome::TaskOutcome* lastSuspended = nullptr;
579 for (const auto& o : result.outcomes)
580 {
581 auto it = o.context.additional.find("source");
582 if (it == o.context.additional.end() || it->second != "emergency_stop")
583 {
584 continue;
585 }
586 if (!lastSuspended || o.endTime > lastSuspended->endTime)
587 {
588 lastSuspended = &o;
589 }
590 }
591
592 if (!lastSuspended)
593 {
594 ARMARX_WARNING << "Emergency stop released but no emergency-stop-sourced SUSPENDED entry found. Skipping recovery.";
595 return;
596 }
597
598 armem::task_outcome::TaskOutcome recovered = *lastSuspended;
599 recovered.couldRecover = true;
601 .recoveryTime = armarx::DateTime::Now(),
602 .recoveryMeasure = "resume by emergency stop release"};
603
604 const bool success =
605 writer.commitTaskOutcome(recovered, providerName, armem::Time::Now());
606
607 if (success)
608 {
609 ARMARX_INFO << "Successfully committed recovery for emergency stop intervention.";
610 }
611 else
612 {
613 ARMARX_WARNING << "Failed to commit recovery TaskOutcome for emergency stop.";
614 }
615 }
616 }
617
621} // namespace armarx
std::string timestamp()
constexpr T c
std::string getConfigIdentifier()
Retrieve config identifier for this component as set in constructor.
Definition Component.cpp:90
Property< PropertyType > getProperty(const std::string &name)
static DateTime Now()
Definition DateTime.cpp:51
static Duration Hours(std::int64_t hours)
Constructs a duration in hours.
Definition Duration.cpp:120
static Duration SecondsDouble(double seconds)
Constructs a duration in seconds.
Definition Duration.cpp:78
armarx::PropertyDefinitionsPtr createPropertyDefinitions() override
void reportEmergencyStopState(EmergencyStopState state, const Ice::Current &) override
void reportGamepadState(const std::string &device, const std::string &name, const GamepadData &data, const TimestampBasePtr &timestamp, const Ice::Current &c) override
SpamFilterDataPtr deactivateSpam(float deactivationDurationSec=10.0f, const std::string &identifier="", bool deactivate=true) const
disables the logging for the current line for the given amount of seconds.
Definition Logging.cpp:99
void usingTopic(const std::string &name, bool orderedPublishing=false)
Registers a proxy for subscription after initialization.
auto * findLatestInstance(int instanceIndex=0)
Definition EntityBase.h:356
CoreSegmentT * findCoreSegment(const std::string &name)
Definition MemoryBase.h:121
Reader getReader(const MemoryID &memoryID)
Get a reader to the given memory name.
Reader useReader(const MemoryID &memoryID)
Use a memory server and get a reader for it.
Writer getWriter(const MemoryID &memoryID)
Get a writer to the given memory name.
QueryResult query(const QueryInput &input) const
Perform a query on the WM.
Definition Reader.cpp:119
The query::Builder class provides a fluent-style specification of hierarchical queries.
Definition Builder.h:22
CoreSegmentSelector & coreSegments()
Start specifying core segments.
Definition Builder.cpp:42
CoreSegmentSelector & withName(const std::string &name) override
ProviderSegmentSelector & providerSegments()
Start specifying provider segments.
SnapshotSelector & snapshots()
Start specifying entity snapshots.
Definition selectors.cpp:92
EntitySelector & entities()
Start specifying entities.
ProviderSegmentSelector & all() override
static DateTime Now()
Definition DateTime.cpp:51
static DateTime Invalid()
Definition DateTime.cpp:57
#define ARMARX_INFO
The normal logging level.
Definition Logging.h:181
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
#define q
This file offers overloads of toIce() and fromIce() functions for STL container types.
IceUtil::Handle< class PropertyDefinitionContainer > PropertyDefinitionsPtr
PropertyDefinitions smart pointer type.
wm::Memory memory
The slice of the memory that matched the query.
Definition Query.h:58
InterruptionFailures interruption
Definition types.h:92
std::map< std::string, std::string > additional
Definition types.h:112
armarx::core::time::DateTime startTime
Definition types.h:128
std::optional< RecoveryInfo > recoveryInfo
Definition types.h:131
armarx::core::time::DateTime endTime
Definition types.h:129
std::optional< bool > couldRecover
Definition types.h:130
std::optional< FailureInfo > failureInfo
Definition types.h:127