InterventionObserver.cpp
Go to the documentation of this file.
1/*
2 * This file is part of ArmarX.
3 *
4 * ArmarX is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License version 2 as
6 * published by the Free Software Foundation.
7 *
8 * ArmarX is distributed in the hope that it will be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
15 *
16 * @package RobotAPI::ArmarXObjects::InterventionObserver
17 * @author Niklas Arlt ( niklas dot arlt at kit dot edu )
18 * @date 2026
19 * @copyright http://www.gnu.org/licenses/gpl-2.0.txt
20 * GNU General Public License
21 */
22
24
25#include <cmath>
26#include <sstream>
27
32
36#include <RobotAPI/libraries/armem_skills/aron/Skill.aron.generated.h>
38
40
41namespace armarx
42{
43 void
45 {
46 ARMARX_INFO << "oninit InterventionObserver";
47
48 usingTopic(getProperty<std::string>("GamepadTopicName").getValue());
49 usingTopic(getProperty<std::string>("EmergencyStopTopic").getValue());
50
51 agentName = getProperty<std::string>("AgentName").getValue();
52 assistanceSessionTimeoutSeconds =
53 getProperty<float>("AssistanceSessionTimeoutSeconds").getValue();
54
55 // usingProxy(getProperty<std::string>("PlatformUnitName").getValue());
56
57 ARMARX_INFO << "oninit InterventionObserver end";
58 }
59
60 void
62 {
63 ARMARX_INFO << "onConnect InterventionObserver";
64
65 // Try to connect immediately. If the memory is already up this succeeds.
66 // If not, ensureMemoryConnection() will retry on every incoming event.
67 ensureMemoryConnection();
68 }
69
70 bool
71 InterventionObserver::ensureMemoryConnection()
72 {
73 if (memoryConnected.load())
74 {
75 return true;
76 }
77
78 // Double-checked locking: try_to_lock avoids blocking event callbacks
79 // while another thread is already attempting connection.
80 std::unique_lock lock(memoryConnectionMutex, std::try_to_lock);
81 if (!lock.owns_lock())
82 {
83 return false;
84 }
85
86 if (memoryConnected.load())
87 {
88 return true;
89 }
90
91 try
92 {
93 // Use getWriter/getReader which resolve immediately (non-blocking).
94 // They throw if the memory server hasn't registered yet.
95 const auto memoryId = armem::MemoryID().withMemoryName("TaskOutcome");
96 memoryNameSystem().getWriter(memoryId);
97 memoryNameSystem().getReader(memoryId);
98
99 // Memory is available — now connect writer/reader properly.
100 // Since the server is already registered, waitForServer() returns immediately.
101 writer.connect(memoryNameSystem());
102 reader.connect(memoryNameSystem());
103
104 // Connect reader for the Skill memory to query latest skill events.
105 const auto skillMemoryId = armem::MemoryID().withMemoryName("Skill");
106 skillMemoryReader = memoryNameSystem().useReader(skillMemoryId);
107
108 memoryConnected.store(true);
109 ARMARX_INFO << "InterventionObserver: Memory connection established.";
110 }
111 catch (...)
112 {
114 << "InterventionObserver: TaskOutcome memory not available yet. "
115 "Will retry on next event.";
116 }
117
118 return memoryConnected.load();
119 }
120
121 void
123 {
124 memoryConnected = false;
125 }
126
127 void
129 {
130 ARMARX_INFO << "exit InterventionObserver";
131 }
132
141
142 // Called on every gamepad event. Handles three concerns:
143 // 1) Back button press -> commit a SUSPENDED TaskOutcome (skill was interrupted).
144 // 2) Start button press -> find the last SUSPENDED entry and mark it recovered.
145 // 3) Any analog/button input -> track as an "assistance session" and commit
146 // a summary after the operator goes idle for the configured timeout.
147 void
149 const std::string& name,
150 const GamepadData& data,
151 const TimestampBasePtr& timestamp,
152 const Ice::Current& c)
153 {
154 // Always update rising-edge detection state, even when the memory is
155 // not yet available, so we don't miss transitions or get stuck.
156 const bool backRisingEdge = data.backButton && !prevBackButton;
157 prevBackButton = data.backButton;
158
159 const bool startRisingEdge = data.startButton && !prevStartButton;
160 prevStartButton = data.startButton;
161
162 if (!ensureMemoryConnection())
163 {
164 return;
165 }
166
167 // --- 1) Back button: suspend the currently running skill ---
168 if (backRisingEdge)
169 {
170 ARMARX_INFO << "Back button pressed — committing SUSPENDED TaskOutcome";
171
172 const std::string suspendedSkillName = queryLatestSkillName();
173 ARMARX_INFO << "Skill suspended by gamepad back button: " << suspendedSkillName;
174
176 outcome.taskName = suspendedSkillName;
179 outcome.agent = agentName;
181 outcome.endTime = armarx::DateTime::Now();
182
185 outcome.failureInfo = fi;
186
187 outcome.context.skillName = suspendedSkillName;
188 outcome.context.additional = {
189 {"source", "gamepad"},
190 {"device", device},
191 {"name", name},
192 {"leftStickX", std::to_string(data.leftStickX)},
193 {"leftStickY", std::to_string(data.leftStickY)},
194 {"rightStickX", std::to_string(data.rightStickX)},
195 {"rightStickY", std::to_string(data.rightStickY)},
196 {"dPadX", std::to_string(data.dPadX)},
197 {"dPadY", std::to_string(data.dPadY)},
198 {"leftTrigger", std::to_string(data.leftTrigger)},
199 {"rightTrigger", std::to_string(data.rightTrigger)},
200 {"leftButton", std::to_string(data.leftButton)},
201 {"rightButton", std::to_string(data.rightButton)},
202 {"backButton", std::to_string(data.backButton)},
203 {"startButton", std::to_string(data.startButton)},
204 {"xButton", std::to_string(data.xButton)},
205 {"yButton", std::to_string(data.yButton)},
206 {"aButton", std::to_string(data.aButton)},
207 {"bButton", std::to_string(data.bButton)},
208 {"theMiddleButton", std::to_string(data.theMiddleButton)},
209 {"leftStickButton", std::to_string(data.leftStickButton)},
210 {"rightStickButton", std::to_string(data.rightStickButton)},
211 };
212
213 const bool success =
214 writer.commitTaskOutcome(outcome, providerName, armem::Time::Now());
215
216 if (success)
217 {
218 ARMARX_INFO << "Successfully committed SUSPENDED TaskOutcome for gamepad intervention.";
219 }
220 else
221 {
222 ARMARX_WARNING << "Failed to commit SUSPENDED TaskOutcome for gamepad intervention.";
223 }
224 }
225
226 // --- 2) Start button: mark the last gamepad-caused suspension as recovered ---
227 if (startRisingEdge)
228 {
229 ARMARX_INFO << "Start button pressed — checking for SUSPENDED entry to mark as recovered";
230
232 .providerName = providerName,
233 .taskTypeFilter = std::nullopt,
235 .timestamp = armem::Time::Now(),
236 .maxAge = armarx::Duration::Hours(24)};
237
238 const auto result = reader.query(q);
239
240 if (!result || result.outcomes.empty())
241 {
242 ARMARX_WARNING << "Start button pressed but no SUSPENDED TaskOutcome found in memory. Skipping recovery.";
243 }
244 else
245 {
246 // Find the most recent SUSPENDED outcome from the gamepad source.
247 const armem::task_outcome::TaskOutcome* lastSuspended = nullptr;
248 for (const auto& o : result.outcomes)
249 {
250 auto it = o.context.additional.find("source");
251 if (it == o.context.additional.end() || it->second != "gamepad")
252 {
253 continue;
254 }
255 if (!lastSuspended || o.endTime > lastSuspended->endTime)
256 {
257 lastSuspended = &o;
258 }
259 }
260
261 if (!lastSuspended)
262 {
263 ARMARX_WARNING << "Start button pressed but no gamepad-sourced SUSPENDED entry found. Skipping recovery.";
264 }
265 else
266 {
267 // Re-commit the suspended entry with recovery info attached. This
268 // creates a new snapshot rather than modifying the original, so
269 // the full history (suspension + recovery) is preserved.
270 armem::task_outcome::TaskOutcome recovered = *lastSuspended;
271 recovered.couldRecover = true;
273 .recoveryTime = armarx::DateTime::Now(),
274 .recoveryMeasure = "resume by gamepad triggered"};
275
276 const bool success =
277 writer.commitTaskOutcome(recovered, providerName, armem::Time::Now());
278
279 if (success)
280 {
281 ARMARX_INFO << "Successfully committed recovery for task '"
282 << recovered.taskName << "'.";
283 }
284 else
285 {
286 ARMARX_WARNING << "Failed to commit recovery TaskOutcome.";
287 }
288 }
289 }
290 }
291
292 // --- 3) Assistance session tracking ---
293 // Classify which gamepad inputs are active this frame, then accumulate
294 // them into a session. A session starts on the first non-idle input and
295 // ends (gets committed) after no input for `assistanceSessionTimeoutSeconds`.
296 lastGamepadDevice = device;
297 lastGamepadName = name;
298
299 constexpr float deadzone = 0.1f;
300
301 std::set<std::string> activeActions;
302
303 if (std::abs(data.leftTrigger) > deadzone || std::abs(data.rightTrigger) > deadzone)
304 {
305 activeActions.insert("manual gripper control");
306 }
307 if (std::abs(data.leftStickX) > deadzone || std::abs(data.leftStickY) > deadzone)
308 {
309 activeActions.insert("manual platform navigation");
310 }
311 if (std::abs(data.rightStickX) > deadzone || std::abs(data.rightStickY) > deadzone)
312 {
313 activeActions.insert("rightStick");
314 }
315 if (std::abs(data.dPadX) > deadzone || std::abs(data.dPadY) > deadzone)
316 {
317 activeActions.insert("dPad");
318 }
319 if (data.aButton)
320 {
321 activeActions.insert("manual action button");
322 }
323 if (data.bButton)
324 {
325 activeActions.insert("manual action button");
326 }
327 if (data.xButton)
328 {
329 activeActions.insert("manual action button");
330 }
331 if (data.yButton)
332 {
333 activeActions.insert("manual action button");
334 }
335 if (data.leftButton)
336 {
337 activeActions.insert("manual bumper action");
338 }
339 if (data.rightButton)
340 {
341 activeActions.insert("manual bumper action");
342 }
343 if (data.theMiddleButton)
344 {
345 activeActions.insert("theMiddleButton");
346 }
347 if (data.leftStickButton)
348 {
349 activeActions.insert("leftStickButton");
350 }
351 if (data.rightStickButton)
352 {
353 activeActions.insert("rightStickButton");
354 }
355
356 const auto now = armarx::DateTime::Now();
357 const bool hasInput = !activeActions.empty();
358
359 if (hasInput)
360 {
361 if (!assistanceSessionActive)
362 {
363 // Start a new session
364 assistanceSessionActive = true;
365 assistanceSessionStart = now;
366 assistanceActionsInSession = activeActions;
367 ARMARX_INFO << "Assistance session started";
368 }
369 else
370 {
371 // Merge actions into existing session
372 assistanceActionsInSession.insert(activeActions.begin(), activeActions.end());
373 }
374 assistanceLastActivityTime = now;
375 }
376 else if (assistanceSessionActive)
377 {
378 const auto elapsed = now - assistanceLastActivityTime;
379 if (elapsed > armarx::Duration::SecondsDouble(assistanceSessionTimeoutSeconds))
380 {
381 // Commit the session
382 std::ostringstream actionStr;
383 bool first = true;
384 for (const auto& a : assistanceActionsInSession)
385 {
386 if (!first)
387 {
388 actionStr << ", ";
389 }
390 actionStr << a;
391 first = false;
392 }
393
394 const auto durationMs =
395 (assistanceLastActivityTime - assistanceSessionStart).toMilliSeconds();
396
398 outcome.taskName = "GamepadIntervention";
401 outcome.agent = agentName;
402 outcome.startTime = assistanceSessionStart;
403 outcome.endTime = assistanceLastActivityTime;
404 outcome.context.skillName = "unknown";
405 outcome.context.additional = {
406 {"source", "gamepad_assistance"},
407 {"actions", actionStr.str()},
408 {"duration_ms", std::to_string(durationMs)},
409 {"device", lastGamepadDevice},
410 {"name", lastGamepadName},
411 };
412
413 const bool success =
414 writer.commitTaskOutcome(outcome, providerName, armem::Time::Now());
415
416 if (success)
417 {
418 ARMARX_INFO << "Committed assistance session: actions=[" << actionStr.str()
419 << "], duration=" << durationMs << "ms";
420 }
421 else
422 {
423 ARMARX_WARNING << "Failed to commit assistance session TaskOutcome.";
424 }
425
426 // Reset session state
427 assistanceSessionActive = false;
428 assistanceActionsInSession.clear();
429 }
430 }
431 }
432
433 std::string
434 InterventionObserver::queryLatestSkillName()
435 {
436 if (!skillMemoryReader)
437 {
438 return "unknown";
439 }
440
441 try
442 {
443 // Query the latest snapshot of every entity in the SkillEvent segment.
444 // Each entity represents one skill execution; its latest snapshot holds
445 // the most recent status update for that execution.
447 qb.coreSegments()
448 .withName("SkillEvent")
450 .all()
451 .entities()
452 .all()
453 .snapshots()
454 .latest();
455
456 const auto result = skillMemoryReader.query(qb);
457 if (!result.success)
458 {
459 ARMARX_WARNING << "Failed to query Skill memory for latest skill event.";
460 return "unknown";
461 }
462
463 const auto* cs = result.memory.findCoreSegment("SkillEvent");
464 if (!cs)
465 {
466 return "unknown";
467 }
468
469 // Among all skill executions, pick the one that was started most
470 // recently — that is the skill the operator most likely intended
471 // to interrupt.
472 std::string latestSkillName = "unknown";
473 armarx::core::time::DateTime latestTime = armarx::core::time::DateTime::Invalid();
474
475 cs->forEachEntity(
476 [&](const armem::wm::Entity& entity)
477 {
478 if (const auto* instance = entity.findLatestInstance(); instance)
479 {
480 auto aronUpdate =
481 skills::arondto::SkillStatusUpdate::FromAron(instance->data());
482 if (!latestTime.isValid() ||
483 aronUpdate.executionStartedTimestamp > latestTime)
484 {
485 latestTime = aronUpdate.executionStartedTimestamp;
486 latestSkillName = aronUpdate.skillId.skillName;
487 }
488 }
489 });
490
491 return latestSkillName;
492 }
493 catch (const std::exception& e)
494 {
495 ARMARX_WARNING << "Exception querying Skill memory: " << e.what();
496 return "unknown";
497 }
498 }
499
500 // Called on every emergency stop state change. Only reacts to transitions:
501 // INACTIVE -> ACTIVE: commit a SUSPENDED TaskOutcome for the currently running skill.
502 // ACTIVE -> INACTIVE: find the last emergency-stop-caused suspension and mark it recovered.
503 void
504 InterventionObserver::reportEmergencyStopState(EmergencyStopState state, const Ice::Current&)
505 {
506 if (!ensureMemoryConnection())
507 {
508 // Still track the state so we detect the correct transition once
509 // the memory becomes available.
510 prevEmergencyStopState = state;
511 return;
512 }
513
514 // Detect state transitions; ignore repeated reports of the same state.
515 const bool becameActive =
516 (state == EmergencyStopState::eEmergencyStopActive &&
517 prevEmergencyStopState == EmergencyStopState::eEmergencyStopInactive);
518 const bool becameInactive =
519 (state == EmergencyStopState::eEmergencyStopInactive &&
520 prevEmergencyStopState == EmergencyStopState::eEmergencyStopActive);
521 prevEmergencyStopState = state;
522
523 if (becameActive)
524 {
525 ARMARX_INFO << "Emergency stop activated — committing SUSPENDED TaskOutcome";
526
527 const std::string suspendedSkillName = queryLatestSkillName();
528 ARMARX_INFO << "Skill suspended by emergency stop: " << suspendedSkillName;
529
531 outcome.taskName = suspendedSkillName;
534 outcome.agent = agentName;
536 outcome.endTime = armarx::DateTime::Now();
537
540 outcome.failureInfo = fi;
541
542 outcome.context.skillName = suspendedSkillName;
543 outcome.context.additional = {
544 {"source", "emergency_stop"},
545 {"emergencyStopState", "active"}};
546
547 const bool success =
548 writer.commitTaskOutcome(outcome, providerName, armem::Time::Now());
549
550 if (success)
551 {
552 ARMARX_INFO << "Successfully committed SUSPENDED TaskOutcome for emergency stop.";
553 }
554 else
555 {
556 ARMARX_WARNING << "Failed to commit SUSPENDED TaskOutcome for emergency stop.";
557 }
558 }
559
560 if (becameInactive)
561 {
562 ARMARX_INFO << "Emergency stop released — checking for SUSPENDED entry to mark as recovered";
563
565 .providerName = providerName,
566 .taskTypeFilter = std::nullopt,
568 .timestamp = armem::Time::Now(),
569 .maxAge = armarx::Duration::Hours(24)};
570
571 const auto result = reader.query(q);
572
573 if (!result || result.outcomes.empty())
574 {
575 ARMARX_WARNING << "Emergency stop released but no SUSPENDED TaskOutcome found in memory. Skipping recovery.";
576 return;
577 }
578
579 // Find the most recent SUSPENDED outcome from the emergency_stop source.
580 const armem::task_outcome::TaskOutcome* lastSuspended = nullptr;
581 for (const auto& o : result.outcomes)
582 {
583 auto it = o.context.additional.find("source");
584 if (it == o.context.additional.end() || it->second != "emergency_stop")
585 {
586 continue;
587 }
588 if (!lastSuspended || o.endTime > lastSuspended->endTime)
589 {
590 lastSuspended = &o;
591 }
592 }
593
594 if (!lastSuspended)
595 {
596 ARMARX_WARNING << "Emergency stop released but no emergency-stop-sourced SUSPENDED entry found. Skipping recovery.";
597 return;
598 }
599
600 armem::task_outcome::TaskOutcome recovered = *lastSuspended;
601 recovered.couldRecover = true;
603 .recoveryTime = armarx::DateTime::Now(),
604 .recoveryMeasure = "resume by emergency stop release"};
605
606 const bool success =
607 writer.commitTaskOutcome(recovered, providerName, armem::Time::Now());
608
609 if (success)
610 {
611 ARMARX_INFO << "Successfully committed recovery for emergency stop intervention.";
612 }
613 else
614 {
615 ARMARX_WARNING << "Failed to commit recovery TaskOutcome for emergency stop.";
616 }
617 }
618 }
619
623
625} // namespace armarx
std::string timestamp()
#define ARMARX_REGISTER_COMPONENT_EXECUTABLE(ComponentT, applicationName)
Definition Decoupled.h:29
constexpr T c
std::string getConfigIdentifier()
Retrieve config identifier for this component as set in constructor.
Definition Component.cpp:90
Property< PropertyType > getProperty(const std::string &name)
static DateTime Now()
Definition DateTime.cpp:51
static Duration Hours(std::int64_t hours)
Constructs a duration in hours.
Definition Duration.cpp:120
static Duration SecondsDouble(double seconds)
Constructs a duration in seconds.
Definition Duration.cpp:78
Observes human interventions during autonomous robot execution and records them as TaskOutcome entrie...
armarx::PropertyDefinitionsPtr createPropertyDefinitions() override
void reportEmergencyStopState(EmergencyStopState state, const Ice::Current &) override
void reportGamepadState(const std::string &device, const std::string &name, const GamepadData &data, const TimestampBasePtr &timestamp, const Ice::Current &c) override
SpamFilterDataPtr deactivateSpam(float deactivationDurationSec=10.0f, const std::string &identifier="", bool deactivate=true) const
disables the logging for the current line for the given amount of seconds.
Definition Logging.cpp:99
void usingTopic(const std::string &name, bool orderedPublishing=false)
Registers a proxy for subscription after initialization.
auto * findLatestInstance(int instanceIndex=0)
Definition EntityBase.h:356
CoreSegmentT * findCoreSegment(const std::string &name)
Definition MemoryBase.h:121
Reader getReader(const MemoryID &memoryID)
Get a reader to the given memory name.
Reader useReader(const MemoryID &memoryID)
Use a memory server and get a reader for it.
Writer getWriter(const MemoryID &memoryID)
Get a writer to the given memory name.
QueryResult query(const QueryInput &input) const
Perform a query on the WM.
Definition Reader.cpp:119
The query::Builder class provides a fluent-style specification of hierarchical queries.
Definition Builder.h:22
CoreSegmentSelector & coreSegments()
Start specifying core segments.
Definition Builder.cpp:42
CoreSegmentSelector & withName(const std::string &name) override
ProviderSegmentSelector & providerSegments()
Start specifying provider segments.
SnapshotSelector & snapshots()
Start specifying entity snapshots.
Definition selectors.cpp:92
EntitySelector & entities()
Start specifying entities.
ProviderSegmentSelector & all() override
static DateTime Now()
Definition DateTime.cpp:51
static DateTime Invalid()
Definition DateTime.cpp:57
#define ARMARX_INFO
The normal logging level.
Definition Logging.h:181
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
#define q
This file offers overloads of toIce() and fromIce() functions for STL container types.
IceUtil::Handle< class PropertyDefinitionContainer > PropertyDefinitionsPtr
PropertyDefinitions smart pointer type.
wm::Memory memory
The slice of the memory that matched the query.
Definition Query.h:58
InterruptionFailures interruption
Definition types.h:92
std::map< std::string, std::string > additional
Definition types.h:112
armarx::core::time::DateTime startTime
Definition types.h:128
std::optional< RecoveryInfo > recoveryInfo
Definition types.h:131
armarx::core::time::DateTime endTime
Definition types.h:129
std::optional< bool > couldRecover
Definition types.h:130
std::optional< FailureInfo > failureInfo
Definition types.h:127