SystemObserver.cpp
Go to the documentation of this file.
1/*
2* This file is part of ArmarX.
3*
4* Copyright (C) 2011-2016, High Performance Humanoid Technologies (H2T), Karlsruhe Institute of Technology (KIT), all rights reserved.
5*
6* ArmarX is free software; you can redistribute it and/or modify
7* it under the terms of the GNU General Public License version 2 as
8* published by the Free Software Foundation.
9*
10* ArmarX is distributed in the hope that it will be useful, but
11* WITHOUT ANY WARRANTY; without even the implied warranty of
12* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13* GNU General Public License for more details.
14*
15* You should have received a copy of the GNU General Public License
16* along with this program. If not, see <http://www.gnu.org/licenses/>.
17*
18* @package ArmarX::Core
19* @author Kai Welke (welke _at_ kit _dot_ edu)
20* @date 2012
21* @copyright http://www.gnu.org/licenses/gpl-2.0.txtg
22* GNU General Public License
23*/
24
25#include "SystemObserver.h"
26
27#include <IceUtil/Time.h>
28
39
40namespace armarx
41{
42 // ********************************************************************
43 // observer framework hooks
44 // ********************************************************************
45 void
47 {
48 maxTimerId = 0;
49
50 // register all checks
51 offerConditionCheck(checks::SystemObserver::updated->getCheckStr(),
53 offerConditionCheck(checks::SystemObserver::valid->getCheckStr(),
55 offerConditionCheck(checks::SystemObserver::equals->getCheckStr(),
57 offerConditionCheck(checks::SystemObserver::inrange->getCheckStr(),
59 offerConditionCheck(checks::SystemObserver::larger->getCheckStr(),
61 offerConditionCheck(checks::SystemObserver::smaller->getCheckStr(),
63 }
64
65 void
67 {
69 std::unique_lock lock(timersMutex);
70 SystemObserverTimerMap::iterator iter = timers.begin();
71
72 while (iter != timers.end())
73 {
74 try
75 {
76 // update the timer
77 updateTimer(iter->second);
78
79 // update data field and channel
80 setDataField(iter->second.timerName, "elapsedMs", int(iter->second.elapsedMs));
81 updateChannel(iter->second.timerName);
82 }
83 catch (armarx::LocalException& e)
84 {
85 ARMARX_FATAL << e.what();
86 }
87 catch (armarx::UserException& e)
88 {
89 ARMARX_FATAL << e.reason;
90 }
91 iter++;
92 }
93 }
94
95 // ********************************************************************
96 // timers interface implementation
97 // ********************************************************************
98 void
99 SystemObserver::startTimer_async(const AMD_SystemObserverInterface_startTimerPtr& amd,
100 const std::string& timerBaseName,
101 const Ice::Current& c)
102 {
103 addWorkerJob("SystemObserver::startTimer",
104 [this, amd, timerBaseName]
105 {
106 std::stringstream temp;
107 temp << ++maxTimerId << "_" << timerBaseName;
108 std::string timerName = temp.str();
109
110 std::unique_lock lock(timersMutex);
111
112 if (timers.find(timerName) != timers.end())
113 {
114 std::string reason = "Timer " + timerName + " already exists";
115 amd->ice_exception(InvalidTimerException(reason.c_str()));
116 }
117
118 // create new timer
120 timer.timerName = timerName;
121 resetTimer(timer);
122
123 std::pair<std::string, SystemObserverTimer> entry;
124 entry.first = timerName;
125 entry.second = timer;
126
127 timers.insert(entry);
128
129 std::unique_lock registryLock(channelsMutex);
130
131 // add channel and datafield
132 offerChannel(timerName, "Timer " + timerName);
134 "elapsedMs",
135 int(0),
136 "Elapsed milliseconds of Timer " + timerName);
137 updateChannel(timerName);
138
139 // return datafield identifier for easy condition installation
140 amd->ice_response(new ChannelRef(this, timerName));
141 });
142 }
143
144 void
145 SystemObserver::resetTimer_async(const AMD_SystemObserverInterface_resetTimerPtr& amd,
146 const ChannelRefBasePtr& timer,
147 const Ice::Current& c)
148 {
149 addWorkerJob("SystemObserver::resetTimer",
150 [this, amd, timer]
151 {
152 std::unique_lock lock(timersMutex);
153 std::string timerName =
154 ChannelRefPtr::dynamicCast(timer)->getChannelName();
155
156 SystemObserverTimerMap::iterator iter = timers.find(timerName);
157
158 if (iter != timers.end())
159 {
160 resetTimer(iter->second);
161 }
162 else
163 {
164 std::string reason = "Timer " + timerName + " unknown";
165 amd->ice_exception(InvalidTimerException(reason.c_str()));
166 }
167 amd->ice_response();
168 });
169 }
170
171 void
172 SystemObserver::pauseTimer_async(const AMD_SystemObserverInterface_pauseTimerPtr& amd,
173 const ChannelRefBasePtr& timer,
174 const Ice::Current& c)
175 {
176 addWorkerJob("SystemObserver::pauseTimer",
177 [this, amd, timer]
178 {
179 std::unique_lock lock(timersMutex);
180 std::string timerName =
181 ChannelRefPtr::dynamicCast(timer)->getChannelName();
182
183 SystemObserverTimerMap::iterator iter = timers.find(timerName);
184
185 if (iter != timers.end())
186 {
187 iter->second.paused = true;
188 }
189 else
190 {
191 std::string reason = "Timer " + timerName + " unknown";
192 amd->ice_exception(InvalidTimerException(reason.c_str()));
193 }
194 amd->ice_response();
195 });
196 }
197
198 void
199 SystemObserver::unpauseTimer_async(const AMD_SystemObserverInterface_unpauseTimerPtr& amd,
200 const ChannelRefBasePtr& timer,
201 const Ice::Current& c)
202 {
203 addWorkerJob("SystemObserver::unpauseTimer",
204 [this, amd, timer]
205 {
206 std::unique_lock lock(timersMutex);
207 std::string timerName =
208 ChannelRefPtr::dynamicCast(timer)->getChannelName();
209
210 SystemObserverTimerMap::iterator iter = timers.find(timerName);
211
212 if (iter != timers.end())
213 {
214 iter->second.paused = false;
215 iter->second.startTimeMs += getElapsedTimeMs(iter->second.startTimeMs +
216 iter->second.elapsedMs);
217 }
218 else
219 {
220 std::string reason = "Timer " + timerName + " unknown";
221 amd->ice_exception(InvalidTimerException(reason.c_str()));
222 }
223 amd->ice_response();
224 });
225 }
226
227 void
228 SystemObserver::removeTimer_async(const AMD_SystemObserverInterface_removeTimerPtr& amd,
229 const ChannelRefBasePtr& timer,
230 const Ice::Current& c)
231 {
232 addWorkerJob("SystemObserver::removeTimer",
233 [this, amd, timer]
234 {
235 std::unique_lock lock(timersMutex);
236 std::string timerName =
237 ChannelRefPtr::dynamicCast(timer)->getChannelName();
238
239 SystemObserverTimerMap::iterator iter = timers.find(timerName);
240
241 if (iter != timers.end())
242 {
243 removeChannel(timerName);
244 timers.erase(iter);
245 }
246 amd->ice_response();
247 });
248 }
249
250 // ********************************************************************
251 // counters implementation
252 // ********************************************************************
253 void
254 SystemObserver::startCounter_async(const AMD_SystemObserverInterface_startCounterPtr& amd,
255 int initialValue,
256 const std::string& counterBaseName,
257 const Ice::Current& c)
258 {
259 addWorkerJob("SystemObserver::startCounter",
260 [this, amd, initialValue, counterBaseName]
261 {
262 std::stringstream temp;
263 temp << ++maxCounterId << "_" << counterBaseName;
264 std::string counterName = temp.str();
265
266 std::unique_lock lock(countersMutex);
267
268 if (counters.find(counterName) != counters.end())
269 {
270 std::string reason = "Counter " + counterName + " already exists";
271 amd->ice_exception(InvalidCounterException(reason.c_str()));
272 }
273
274
275 // create new counter
276 SystemObserverCounter counter;
277 counter.counterName = counterName;
278 counter.value = initialValue;
279
280 std::pair<std::string, SystemObserverCounter> entry;
281 entry.first = counterName;
282 entry.second = counter;
283
284
285 SystemObserverCounterMap::iterator iter;
286 iter = (counters.insert(entry)).first;
287
288
289 // add channel and datafield
290 offerChannel(counterName, "Counter " + counterName);
291 offerDataField(counterName,
292 "value",
294 "Current value of counter " + counterName);
295
296 updateCounter(iter);
297
298 // return datafield identifier for easy condition installation
299 amd->ice_response(new ChannelRef(this, counterName));
300 });
301 }
302
303 void
305 const AMD_SystemObserverInterface_incrementCounterPtr& amd,
306 const ChannelRefBasePtr& counter,
307 const Ice::Current& c)
308 {
309 addWorkerJob("SystemObserver::incrementCounter",
310 [this, amd, counter]
311 {
312 std::unique_lock lock(countersMutex);
313 std::string counterName =
314 ChannelRefPtr::dynamicCast(counter)->getChannelName();
315
316 SystemObserverCounterMap::iterator iter = counters.find(counterName);
317
318 if (iter != counters.end())
319 {
320 iter->second.value++;
321 }
322 else
323 {
324 std::string reason = "Counter " + counterName + " unknown";
325 amd->ice_exception(InvalidCounterException(reason.c_str()));
326 }
327
328 updateCounter(iter);
329 amd->ice_response(iter->second.value);
330 });
331 }
332
333 void
335 const AMD_SystemObserverInterface_decrementCounterPtr& amd,
336 const ChannelRefBasePtr& counter,
337 const Ice::Current& c)
338 {
339 addWorkerJob("SystemObserver::decrementCounter",
340 [this, amd, counter]
341 {
342 std::unique_lock lock(countersMutex);
343 std::string counterName =
344 ChannelRefPtr::dynamicCast(counter)->getChannelName();
345
346 SystemObserverCounterMap::iterator iter = counters.find(counterName);
347
348 if (iter != counters.end())
349 {
350 iter->second.value--;
351 }
352 else
353 {
354 std::string reason = "Counter " + counterName + " unknown";
355 amd->ice_exception(InvalidCounterException(reason.c_str()));
356 }
357
358 updateCounter(iter);
359 amd->ice_response(iter->second.value);
360 });
361 }
362
363 void
364 SystemObserver::resetCounter_async(const AMD_SystemObserverInterface_resetCounterPtr& amd,
365 const ChannelRefBasePtr& counter,
366 const Ice::Current& c)
367 {
368 addWorkerJob("SystemObserver::resetCounter",
369 [this, amd, counter]
370 {
371 std::unique_lock lock(countersMutex);
372 std::string counterName =
373 ChannelRefPtr::dynamicCast(counter)->getChannelName();
374
375 SystemObserverCounterMap::iterator iter = counters.find(counterName);
376
377 if (iter != counters.end())
378 {
379 iter->second.value = 0;
380 }
381 else
382 {
383 std::string reason = "Counter " + counterName + " unknown";
384 amd->ice_exception(InvalidCounterException(reason.c_str()));
385 }
386
387 updateCounter(iter);
388 amd->ice_response();
389 });
390 }
391
392 void
393 SystemObserver::setCounter_async(const AMD_SystemObserverInterface_setCounterPtr& amd,
394 const ChannelRefBasePtr& counter,
395 int counterValue,
396 const Ice::Current& c)
397 {
398 addWorkerJob("SystemObserver::setCounter",
399 [this, amd, counter, counterValue]
400 {
401 std::unique_lock lock(countersMutex);
402 std::string counterName =
403 ChannelRefPtr::dynamicCast(counter)->getChannelName();
404
405 SystemObserverCounterMap::iterator iter = counters.find(counterName);
406
407 if (iter != counters.end())
408 {
409 iter->second.value = counterValue;
410 }
411 else
412 {
413 std::string reason = "Counter " + counterName + " unknown";
414 amd->ice_exception(InvalidCounterException(reason.c_str()));
415 }
416
417 updateCounter(iter);
418 amd->ice_response();
419 });
420 }
421
422 void
423 SystemObserver::removeCounter_async(const AMD_SystemObserverInterface_removeCounterPtr& amd,
424 const ChannelRefBasePtr& counter,
425 const Ice::Current& c)
426 {
427 addWorkerJob("SystemObserver::removeCounter",
428 [this, amd, counter]
429 {
430 std::unique_lock lock(countersMutex);
431 std::string counterName =
432 ChannelRefPtr::dynamicCast(counter)->getChannelName();
433
434 SystemObserverCounterMap::iterator iter = counters.find(counterName);
435
436 if (iter != counters.end())
437 {
438 removeChannel(counterName);
439 counters.erase(iter);
440 }
441 amd->ice_response();
442 });
443 }
444
445 // ********************************************************************
446 // private methods
447 // ********************************************************************
448 void
449 SystemObserver::resetTimer(SystemObserverTimer& timer)
450 {
451 timer.startTimeMs = getCurrentTimeMs();
452 timer.elapsedMs = getElapsedTimeMs(timer.startTimeMs);
453 timer.paused = false;
454 }
455
456 void
457 SystemObserver::updateTimer(SystemObserverTimer& timer)
458 {
459 if (!timer.paused)
460 {
461 timer.elapsedMs = getElapsedTimeMs(timer.startTimeMs);
462 }
463 }
464
465 int
466 SystemObserver::getCurrentTimeMs()
467 {
468 IceUtil::Time current = TimeUtil::GetTime();
469
470 return current.toMilliSeconds();
471 }
472
473 int
474 SystemObserver::getElapsedTimeMs(int referenceTimeMs)
475 {
476 IceUtil::Time current = TimeUtil::GetTime();
477 IceUtil::Time reference = IceUtil::Time::milliSeconds(referenceTimeMs);
478 IceUtil::Time interval = current - reference;
479
480 return interval.toMilliSeconds();
481 }
482
483 void
484 SystemObserver::updateCounter(SystemObserverCounterMap::iterator& iterCounter)
485 {
486 // update data field and channel
487 setDataField(iterCounter->second.counterName, "value", iterCounter->second.value);
488 updateChannel(iterCounter->second.counterName);
489 }
490} // namespace armarx
constexpr T c
The ChannelRef class is a reference to a channel on an Observer.
Definition ChannelRef.h:51
Checks if the numbers published in the relevant data fields equal a reference value.
Checks if the numbers published in the relevant data fields are within a reference range.
Checks if the numbers published in the relevant data fields are larger than a reference value.
Checks if the numbers published in the relevant data fields are smaller than a reference value.
Checks if the relevant data fields have been updated since the installation of this condition.
Checks if the relevant data fields contain valid values.
void offerChannel(std::string channelName, std::string description)
Offer a channel.
Definition Observer.cpp:131
virtual void postWorkerJobs()
void removeChannel(std::string channelName)
Remove a channel.
Definition Observer.cpp:318
void offerDataField(std::string channelName, std::string datafieldName, VariantTypeId type, std::string description)
Offer a datafield without default value.
Definition Observer.cpp:201
void offerConditionCheck(std::string checkName, ConditionCheck *conditionCheck)
Offer a condition check.
Definition Observer.cpp:301
void updateChannel(const std::string &channelName, const std::set< std::string > &updatedDatafields=std::set< std::string >())
Update all conditions for a channel.
Definition Observer.cpp:788
void addWorkerJob(const std::string &name, std::function< void(void)> &&f) const
void offerDataFieldWithDefault(std::string channelName, std::string datafieldName, const Variant &defaultValue, std::string description)
Offer a datafield with default value.
Definition Observer.cpp:160
std::recursive_mutex channelsMutex
Definition Observer.h:600
void setDataField(const std::string &channelName, const std::string &datafieldName, const Variant &value, bool triggerFilterUpdate=true)
set datafield with datafieldName and in channel channelName
Definition Observer.cpp:508
void resetTimer_async(const AMD_SystemObserverInterface_resetTimerPtr &amd, const ChannelRefBasePtr &timer, const Ice::Current &c=Ice::emptyCurrent) override
resetTimer sets the start time of timer to Time.Now() and start the timer
void removeCounter_async(const AMD_SystemObserverInterface_removeCounterPtr &amd, const ChannelRefBasePtr &counter, const Ice::Current &c=Ice::emptyCurrent) override
void decrementCounter_async(const AMD_SystemObserverInterface_decrementCounterPtr &amd, const ChannelRefBasePtr &counter, const Ice::Current &c=Ice::emptyCurrent) override
void postWorkerJobs() override
void startTimer_async(const AMD_SystemObserverInterface_startTimerPtr &amd, const std::string &timerBaseName, const Ice::Current &c=Ice::emptyCurrent) override
Creates a new timer with name timerBaseName and starts it.
void incrementCounter_async(const AMD_SystemObserverInterface_incrementCounterPtr &amd, const ChannelRefBasePtr &counter, const Ice::Current &c=Ice::emptyCurrent) override
void removeTimer_async(const AMD_SystemObserverInterface_removeTimerPtr &amd, const ChannelRefBasePtr &timer, const Ice::Current &c=Ice::emptyCurrent) override
removeTimer stops timer and removes it from the SystemObserver.
void setCounter_async(const AMD_SystemObserverInterface_setCounterPtr &amd, const ChannelRefBasePtr &counter, int counterValue, const Ice::Current &c=Ice::emptyCurrent) override
void pauseTimer_async(const AMD_SystemObserverInterface_pauseTimerPtr &amd, const ChannelRefBasePtr &timer, const Ice::Current &c=Ice::emptyCurrent) override
pauseTimer pauses the advance of time in timer.
void onInitObserver() override
Framework hook.
void unpauseTimer_async(const AMD_SystemObserverInterface_unpauseTimerPtr &amd, const ChannelRefBasePtr &timer, const Ice::Current &c=Ice::emptyCurrent) override
unpauseTimer resumes the advancing in time of timer.
void startCounter_async(const AMD_SystemObserverInterface_startCounterPtr &amd, int initialValue, const std::string &counterBaseName, const Ice::Current &c=Ice::emptyCurrent) override
Creates a new counter and starts it.
void resetCounter_async(const AMD_SystemObserverInterface_resetCounterPtr &amd, const ChannelRefBasePtr &counter, const Ice::Current &c=Ice::emptyCurrent) override
static IceUtil::Time GetTime(TimeMode timeMode=TimeMode::VirtualTime)
Get the current time.
Definition TimeUtil.cpp:42
#define ARMARX_FATAL
The logging level for unexpected behaviour, that will lead to a seriously malfunctioning program and ...
Definition Logging.h:199
const VariantTypeId Int
Definition Variant.h:917
This file offers overloads of toIce() and fromIce() functions for STL container types.
Interval< T > interval(T lo, T hi)