Observer.cpp
Go to the documentation of this file.
1/*
2* This file is part of ArmarX.
3*
4* Copyright (C) 2011-2016, High Performance Humanoid Technologies (H2T), Karlsruhe Institute of Technology (KIT), all rights reserved.
5*
6* ArmarX is free software; you can redistribute it and/or modify
7* it under the terms of the GNU General Public License version 2 as
8* published by the Free Software Foundation.
9*
10* ArmarX is distributed in the hope that it will be useful, but
11* WITHOUT ANY WARRANTY; without even the implied warranty of
12* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13* GNU General Public License for more details.
14*
15* You should have received a copy of the GNU General Public License
16* along with this program. If not, see <http://www.gnu.org/licenses/>.
17*
18* @package ArmarX::Core
19* @author Kai Welke (welke@kit.edu)
20* @date 2011
21* @copyright http://www.gnu.org/licenses/gpl-2.0.txt
22* GNU General Public License
23*/
24#include "Observer.h"
25
26#include <algorithm>
27#include <condition_variable>
28
29#include <boost/circular_buffer.hpp>
30
41
42using namespace armarx;
43using namespace armarx::exceptions::local;
44
45const std::string LAST_REFRESH_DELTA_CHANNEL = "_LastRefreshDelta";
46
48{
51
52
53 // available checks and channels
54 StringConditionCheckMap availableChecks;
55 std::mutex checksMutex;
56
57 // available channels and data fields
60 std::unordered_map<std::string,
61 boost::circular_buffer<std::pair<IceUtil::Time, ChannelRegistryEntry>>>;
63 mutable std::recursive_mutex historyMutex;
66
68 std::mutex idMutex;
69
70 struct FilterData : IceUtil::Shared
71 {
72 DatafieldFilterBasePtr filter;
75 };
76
78
79 std::multimap<std::string, FilterDataPtr> orignalToFiltered;
80 std::map<std::string, FilterDataPtr> filteredToOriginal;
81 std::recursive_mutex filterMutex;
82 std::map<std::string, IceUtil::Time> channelUpdateTimestamps;
84
86 {
88 std::string channelName;
89 std::string datafieldName;
90 };
91
93 std::map<std::string, std::set<std::string>> channelQueue;
95 std::condition_variable idleChannelCondition;
96
97 using FilterUpdateQueue = std::unordered_map<std::string, FilterQueueData>;
99 std::mutex filterQueueMutex;
100 std::condition_variable idleCondition;
101
102 std::recursive_mutex workerUpdatesMutex;
103
105 {
106 WorkerUpdate(const std::string& name, std::function<void(void)> f);
107 float ageInMs() const;
108
109 std::string name;
111 std::function<void(void)> f;
112 };
113
114 mutable std::deque<WorkerUpdate> workerUpdates;
115 std::thread worker;
116 std::atomic_bool stopWorker;
117};
118
120{
121}
122
126
127// *******************************************************
128// offering of channels, datafield, and checks
129// *******************************************************
130void
131Observer::offerChannel(std::string channelName, std::string description)
132{
133 if (getState() < eManagedIceObjectInitialized)
134 {
135 throw LocalException()
136 << "offerChannel() must not be called before the Observer is initalized (i.e. not in "
137 "onInitObserver(), use onConnectObserver()";
138 }
139
140 std::unique_lock lock(channelsMutex);
141
142 if (impl->channelRegistry.find(channelName) != impl->channelRegistry.end())
143 {
145 }
146
147 ChannelRegistryEntry channel;
148 channel.name = channelName;
149 channel.description = description;
150 channel.initialized = false;
151
152 std::pair<std::string, ChannelRegistryEntry> entry;
153 entry.first = channelName;
154 entry.second = channel;
155
156 impl->channelRegistry.insert(entry);
157}
158
159void
161 std::string datafieldName,
162 const Variant& defaultValue,
163 std::string description)
164{
165 if (getState() < eManagedIceObjectInitialized)
166 {
167 throw LocalException()
168 << "offerDataFieldWithDefault() must not be called before the Observer is initalized "
169 "(i.e. not in onInitObserver(), use onConnectObserver()";
170 }
171
172 std::unique_lock lock(channelsMutex);
173
174 auto channelIt = impl->channelRegistry.find(channelName);
175
176 if (channelIt == impl->channelRegistry.end())
177 {
179 }
180
181 if (channelIt->second.dataFields.find(datafieldName) != channelIt->second.dataFields.end())
182 {
183 throw exceptions::user::DatafieldExistsAlreadyException(channelName, datafieldName);
184 }
185
186 DataFieldRegistryEntry dataField;
187 dataField.identifier = new DatafieldRef(this, channelName, datafieldName, false);
188 dataField.description = description;
189 dataField.typeName = defaultValue.getTypeName();
190 dataField.value = new TimedVariant();
191 *dataField.value = defaultValue;
192
193 std::pair<std::string, DataFieldRegistryEntry> entry;
194 entry.first = datafieldName;
195 entry.second = dataField;
196
197 channelIt->second.dataFields.insert(entry);
198}
199
200void
201Observer::offerDataField(std::string channelName,
202 std::string datafieldName,
203 VariantTypeId type,
204 std::string description)
205{
206 if (getState() < eManagedIceObjectInitialized)
207 {
208 throw LocalException()
209 << "offerDataField() must not be called before the Observer is initalized (i.e. not in "
210 "onInitObserver(), use onConnectObserver()";
211 }
212
213 std::unique_lock lock(channelsMutex);
214
215 auto channelIt = impl->channelRegistry.find(channelName);
216
217 if (channelIt == impl->channelRegistry.end())
218 {
220 }
221
222 if (channelIt->second.dataFields.find(datafieldName) != channelIt->second.dataFields.end())
223 {
224 throw exceptions::user::InvalidDataFieldException(channelName, datafieldName);
225 }
226
227 DataFieldRegistryEntry dataField;
228 dataField.identifier = new DatafieldRef(this, channelName, datafieldName, false);
229 dataField.description = description;
230 dataField.typeName = Variant::typeToString(type);
231 dataField.value = new TimedVariant();
232 dataField.value->setType(type);
233
234 std::pair<std::string, DataFieldRegistryEntry> entry;
235 entry.first = datafieldName;
236 entry.second = dataField;
237
238 impl->channelRegistry[channelName].dataFields.insert(entry);
239}
240
241bool
242Observer::offerOrUpdateDataField(std::string channelName,
243 std::string datafieldName,
244 const Variant& value,
245 const std::string& description)
246{
247 if (!existsDataField(channelName, datafieldName))
248 {
249 offerDataFieldWithDefault(channelName, datafieldName, value, description);
250 return true;
251 }
252 else
253 {
254 setDataField(channelName, datafieldName, value);
255 return false;
256 }
257}
258
259void
260Observer::offerOrUpdateDataFieldsFlatCopy(const std::string& channelName,
261 const StringVariantBaseMap& valueMap)
262{
263 try
264 {
265 // fastest way is to just try to set the datafields
266 maybeOfferChannelAndSetDataFieldsFlatCopy(channelName, "", valueMap, true);
267 }
269 {
270 ARMARX_INFO << deactivateSpam() << "Creating datafields for channel " << channelName;
271 // on failure: do it the slow way
272 ARMARX_DEBUG << deactivateSpam(1) << "failure when seting datafield "
273 << "using slow fallback";
274 for (const auto& value : valueMap)
275 {
276 const std::string& datafieldName = value.first;
277 if (!existsDataField(channelName, datafieldName))
278 {
279 try
280 {
282 channelName, datafieldName, *VariantPtr::dynamicCast(value.second), "");
283 }
285 {
287 channelName, datafieldName, VariantPtr::dynamicCast(value.second));
288 }
289 }
290 else
291 {
293 channelName, datafieldName, VariantPtr::dynamicCast(value.second));
294 }
295 }
296 }
297 updateChannel(channelName);
298}
299
300void
301Observer::offerConditionCheck(std::string checkName, ConditionCheck* conditionCheck)
302{
303 std::unique_lock lock(impl->checksMutex);
304
305 if (impl->availableChecks.find(checkName) != impl->availableChecks.end())
306 {
307 throw InvalidCheckException(checkName);
308 }
309
310 std::pair<std::string, ConditionCheckPtr> entry;
311 entry.first = checkName;
312 entry.second = conditionCheck;
313
314 impl->availableChecks.insert(entry);
315}
316
317void
318Observer::removeChannel(std::string channelName)
319{
320 {
321 std::unique_lock lock(channelsMutex);
322
323 auto iter = impl->channelRegistry.find(channelName);
324
325 if (iter == impl->channelRegistry.end())
326 {
327 return;
328 }
329
332 removeDatafield(id);
333 impl->channelRegistry.erase(iter);
334 ARMARX_INFO << "Removed channel " << channelName;
335 }
336
337 {
338 std::unique_lock lock(impl->historyMutex);
339 impl->channelHistory.erase(channelName);
340 }
341}
342
343void
345{
346 {
347 std::unique_lock lock(impl->filterMutex);
348 // Remove filter
349 DataFieldIdentifierPtr idptr = DataFieldIdentifierPtr::dynamicCast(id);
350 std::string idStr = idptr->getIdentifierStr();
351 auto itFilter = impl->filteredToOriginal.find(idStr);
352
353 if (itFilter != impl->filteredToOriginal.end())
354 {
355 DatafieldRefPtr refPtr = DatafieldRefPtr::dynamicCast(itFilter->second->original);
356 auto range = impl->orignalToFiltered.equal_range(
357 refPtr->getDataFieldIdentifier()->getIdentifierStr());
358
359 for (auto it = range.first; it != range.second; it++)
360 if (it->second->filtered->getDataFieldIdentifier()->getIdentifierStr() == idStr)
361 {
362 impl->orignalToFiltered.erase(it);
363 break;
364 }
365
366 impl->filteredToOriginal.erase(itFilter);
367 }
368 }
369
370 std::unique_lock lock(channelsMutex);
371
372 auto itChannel = impl->channelRegistry.find(id->channelName);
373
374 if (itChannel == impl->channelRegistry.end())
375 {
376 return;
377 }
378
379 itChannel->second.dataFields.erase(id->datafieldName);
380}
381
382// *******************************************************
383// utility methods for sensordatalistener
384// *******************************************************
385std::set<std::string>
386Observer::updateDatafieldFilter(const std::string& channelName,
387 const std::string& datafieldName,
388 const VariantBasePtr& value)
389{
390 std::vector<Impl::FilterQueueData> filterData;
391 std::set<std::string> foundFilterFields;
392 {
393 std::unique_lock lock(impl->filterMutex);
394 const std::string id = getName() + "." + channelName + "." + datafieldName;
395 // DatafieldRefPtr ref = new DatafieldRef(this, channelName, datafieldName);
396 auto range = impl->orignalToFiltered.equal_range(id);
397
398 //IceUtil::Time start = IceUtil::Time::now();
399 //bool found = false;
400 TimedVariantPtr origTV = TimedVariantPtr::dynamicCast(value);
401 auto t = origTV ? origTV->getTime() : TimeUtil::GetTime();
402 long tLong = t.toMicroSeconds();
403
404 for (auto it = range.first; it != range.second; it++)
405 {
406 it->second->filter->update(tLong, value);
407
408 foundFilterFields.insert(it->second->filtered->datafieldName);
409
410 TimedVariantPtr tv =
411 new TimedVariant(VariantPtr::dynamicCast(it->second->filter->getValue()), t);
412 filterData.emplace_back(
414 it->second->filtered->channelRef->channelName,
415 it->second->filtered->datafieldName});
416 //found = true;
417 }
418 }
419
420 for (auto& elem : filterData)
421 {
423 elem.channelName, elem.datafieldName, VariantPtr::dynamicCast(elem.value));
424 }
425
426 /*if(found)
427 {
428 IceUtil::Time end = IceUtil::Time::now();
429 IceUtil::Time duration = end - start;
430 ARMARX_IMPORTANT << deactivateSpam(0.1f) << channelName << ":" << datafieldName << ": all filters calc microseconds: " << duration.toMicroSeconds();
431 }*/
432
433 return foundFilterFields;
434}
435
436void
437Observer::scheduleDatafieldFilterUpdate(const std::string& channelName,
438 const std::string& datafieldName,
439 const VariantBasePtr& value)
440{
441 if (impl->orignalToFiltered.size() == 0)
442 {
443 return; // no filters installed anyway ...nothing todo
444 }
445 const std::string id = getName() + "." + channelName + "." + datafieldName;
446 {
447 std::unique_lock lock(impl->filterMutex);
448 if (impl->orignalToFiltered.count(id) == 0)
449 {
450 return; // no filter for this datafield installed ...nothing todo
451 }
452 }
453 std::unique_lock lock(impl->filterQueueMutex);
454 impl->filterQueue[id] = {value, channelName, datafieldName};
455 impl->idleCondition.notify_all();
456}
457
458void
459Observer::updateFilters()
460{
461 while (!impl->filterUpdateTask->isStopped())
462 {
464 {
465 std::unique_lock lock(impl->filterQueueMutex);
466 queue.swap(impl->filterQueue);
467 }
468 std::unordered_map<std::string, std::set<std::string>> channels;
469 for (const Impl::FilterUpdateQueue::value_type& elem : queue)
470 {
471 auto foundFilterFields = updateDatafieldFilter(
472 elem.second.channelName, elem.second.datafieldName, elem.second.value);
473 channels[elem.second.channelName].insert(foundFilterFields.begin(),
474 foundFilterFields.end());
475 }
476 for (const auto& channel : channels)
477 {
478 if (channel.second.size() > 0)
479 {
480 updateChannel(channel.first, channel.second);
481 }
482 }
483 std::unique_lock lock(impl->filterQueueMutex);
484 if (impl->filterQueue.size() == 0)
485 {
486 impl->idleCondition.wait(lock);
487 }
488 }
489}
490
491void
492Observer::setDataFieldFlatCopy(const DataFieldRegistry::iterator& dataFieldIter,
493 const VariantPtr& value)
494{
495 ARMARX_CHECK_EXPRESSION(value) << "Datafieldvariant is NULL!";
496 TimedVariantPtr tval = TimedVariantPtr::dynamicCast(value);
497 if (tval)
498 {
499 dataFieldIter->second.value = std::move(tval);
500 }
501 else
502 {
503 dataFieldIter->second.value = new TimedVariant(value, TimeUtil::GetTime());
504 }
505}
506
507void
508Observer::setDataField(const std::string& channelName,
509 const std::string& datafieldName,
510 const Variant& value,
511 bool triggerFilterUpdate)
512{
513 VariantBasePtr valuePtr;
514 {
515 std::unique_lock lock(channelsMutex);
516 auto itChannel = impl->channelRegistry.find(channelName);
517
518 if (itChannel == impl->channelRegistry.end())
519 {
521 }
522
523 DataFieldRegistry::iterator itDF = itChannel->second.dataFields.find(datafieldName);
524
525 if (itDF == itChannel->second.dataFields.end())
526 {
527 throw exceptions::user::InvalidDataFieldException(channelName, datafieldName);
528 }
529
530 if (dynamic_cast<const TimedVariant*>(&value))
531 {
532 itDF->second.value = value.clone();
533 }
534 else
535 {
536 itDF->second.value = new TimedVariant(value, TimeUtil::GetTime());
537 }
538 valuePtr = itDF->second.value;
539 }
540 if (triggerFilterUpdate)
541 {
542 scheduleDatafieldFilterUpdate(channelName, datafieldName, valuePtr);
543 }
544 //*dataFieldValue = value;
545}
546
547void
548Observer::setDataFieldFlatCopy(const std::string& channelName,
549 const std::string& datafieldName,
550 const VariantPtr& value,
551 bool triggerFilterUpdate)
552{
553 {
554 std::unique_lock lock(channelsMutex);
555
556 auto itChannel = impl->channelRegistry.find(channelName);
557
558 if (itChannel == impl->channelRegistry.end())
559 {
561 }
562
563 DataFieldRegistry::iterator itDF = itChannel->second.dataFields.find(datafieldName);
564
565 if (itDF == itChannel->second.dataFields.end())
566 {
567 throw exceptions::user::InvalidDataFieldException(channelName, datafieldName);
568 }
569
570 setDataFieldFlatCopy(itDF, value);
571 }
572
573 if (triggerFilterUpdate)
574 {
575 scheduleDatafieldFilterUpdate(channelName, datafieldName, value);
576 }
577}
578
579void
581 const std::string& channelName,
582 const std::unordered_map<std::string, Ice::Long>& datafieldValues)
583{
584 std::unique_lock lock(channelsMutex);
585
586 auto itChannel = impl->channelRegistry.find(channelName);
587
588 if (itChannel == impl->channelRegistry.end())
589 {
591 }
592
593 for (const auto& elem : datafieldValues)
594 {
595 DataFieldRegistry::iterator itDF = itChannel->second.dataFields.find(elem.first);
596
597 if (itDF == itChannel->second.dataFields.end())
598 {
599 throw exceptions::user::InvalidDataFieldException(channelName, elem.first);
600 }
601
602
603 TimedVariantPtr oldValue = TimedVariantPtr::dynamicCast(itDF->second.value);
604 if (!oldValue || (elem.second > oldValue->getTimestamp() && oldValue->getInitialized()))
605 {
606 itDF->second.value = new TimedVariant(VariantPtr::dynamicCast(itDF->second.value),
607 IceUtil::Time::microSeconds(elem.second));
608 }
609 }
610}
611
612void
613Observer::updateDatafieldTimestamps(const std::string& channelName, Ice::Long timestamp)
614{
615 std::unique_lock lock(channelsMutex);
616
617 auto itChannel = impl->channelRegistry.find(channelName);
618
619 if (itChannel == impl->channelRegistry.end())
620 {
622 }
623
624 for (auto& elem : itChannel->second.dataFields)
625 {
626
627 TimedVariantPtr oldValue = TimedVariantPtr::dynamicCast(elem.second.value);
628 if (!oldValue || (timestamp > oldValue->getTimestamp() && oldValue->getInitialized()))
629 {
630 elem.second.value = new TimedVariant(VariantPtr::dynamicCast(elem.second.value),
631 IceUtil::Time::microSeconds(timestamp));
632 }
633 }
634}
635
636void
638 const std::string& description,
639 const StringVariantBaseMap& datafieldValues,
640 bool triggerFilterUpdate)
641{
642 {
643 std::lock_guard lock(channelsMutex);
644 auto itChannel = impl->channelRegistry.find(channelName);
645 if (itChannel == impl->channelRegistry.end())
646 {
647 //add
648 auto& channel = impl->channelRegistry[channelName];
649 channel.name = channelName;
650 channel.description = description;
651 channel.initialized = false;
652
653 itChannel = impl->channelRegistry.find(channelName);
654 }
655
656 //use map merge algorithm since it is faster (n+m instead n*log(m))
657 auto targ = itChannel->second.dataFields.begin();
658 auto vals = datafieldValues.begin();
659 while (targ != itChannel->second.dataFields.end() && vals != datafieldValues.end())
660 {
661 if (vals->first > targ->first)
662 {
663 ++targ;
664 }
665 else if (vals->first == targ->first)
666 {
667 setDataFieldFlatCopy(targ, VariantPtr::dynamicCast(vals->second));
668 ++vals;
669 ++targ;
670 }
671 else
672 {
673 throw exceptions::user::InvalidDataFieldException(channelName, vals->first);
674 }
675 }
676 if (vals != datafieldValues.end())
677 {
678 throw exceptions::user::InvalidDataFieldException(channelName, vals->first);
679 }
680 }
681 if (triggerFilterUpdate)
682 {
683 for (const auto& elem : datafieldValues)
684 {
685 scheduleDatafieldFilterUpdate(channelName, elem.first, elem.second);
686 }
687 }
688}
689
690void
692 const std::string& channelName,
693 const std::unordered_map<::std::string, ::armarx::VariantBasePtr>& datafieldValues,
694 bool triggerFilterUpdate)
695{
696 {
697 std::unique_lock lock(channelsMutex);
698
699 auto itChannel = impl->channelRegistry.find(channelName);
700
701 if (itChannel == impl->channelRegistry.end())
702 {
704 }
705
706 for (const auto& elem : datafieldValues)
707 {
708 DataFieldRegistry::iterator itDF = itChannel->second.dataFields.find(elem.first);
709
710 if (itDF == itChannel->second.dataFields.end())
711 {
712 throw exceptions::user::InvalidDataFieldException(channelName, elem.first);
713 }
714
715 setDataFieldFlatCopy(itDF, VariantPtr::dynamicCast(elem.second));
716 }
717 }
718 if (triggerFilterUpdate)
719 {
720 for (const auto& elem : datafieldValues)
721 {
722 scheduleDatafieldFilterUpdate(channelName, elem.first, elem.second);
723 }
724 }
725}
726
727void
728Observer::setDataFieldsFlatCopy(const std::string& channelName,
729 const StringVariantBaseMap& datafieldValues,
730 bool triggerFilterUpdate)
731{
732 {
733 std::unique_lock lock(channelsMutex);
734
735 auto itChannel = impl->channelRegistry.find(channelName);
736
737 if (itChannel == impl->channelRegistry.end())
738 {
740 }
741
742 for (const auto& elem : datafieldValues)
743 {
744 DataFieldRegistry::iterator itDF = itChannel->second.dataFields.find(elem.first);
745
746 if (itDF == itChannel->second.dataFields.end())
747 {
748 throw exceptions::user::InvalidDataFieldException(channelName, elem.first);
749 }
750 ARMARX_CHECK_EXPRESSION(elem.second) << "Datafieldname: " << elem.first;
751 setDataFieldFlatCopy(itDF, VariantPtr::dynamicCast(elem.second));
752 }
753 }
754
755 if (triggerFilterUpdate)
756 {
757 for (const auto& elem : datafieldValues)
758 {
759 scheduleDatafieldFilterUpdate(channelName, elem.first, elem.second);
760 }
761 }
762}
763
764void
765Observer::updateRefreshRateChannel(const std::string& channelName)
766{
767 auto& oldUpdateTime = impl->channelUpdateTimestamps[channelName];
768 auto now = IceUtil::Time::now();
769
770 try
771 {
773 channelName,
774 new Variant((now - oldUpdateTime).toMilliSecondsDouble()),
775 false);
776 }
778 {
780 channelName,
781 0.0,
782 "Update delta of channel '" + channelName + "'");
783 }
784 oldUpdateTime = now;
785}
786
787void
788Observer::updateChannel(const std::string& channelName,
789 const std::set<std::string>& updatedDatafields)
790{
791 if (!existsChannel(channelName))
792 {
794 }
795 std::unique_lock lock(impl->channelQueueMutex);
796 std::set<std::string>& dfs = impl->channelQueue[channelName];
797 dfs.insert(updatedDatafields.begin(), updatedDatafields.end());
798 impl->idleChannelCondition.notify_all();
799}
800
801void
802Observer::channelUpdateFunction()
803{
804
805 while (!impl->channelUpdateTask->isStopped())
806 {
807 std::map<std::string, std::set<std::string>> queue;
808 {
809 std::unique_lock lock(impl->channelQueueMutex);
810 queue.swap(impl->channelQueue);
811 }
812 for (const auto& elem : queue)
813 {
814 doChannelUpdate(elem.first, elem.second);
815 }
816 std::unique_lock lock(impl->channelQueueMutex);
817 if (impl->channelQueue.size() == 0)
818 {
819 impl->idleChannelCondition.wait(lock);
820 }
821 }
822}
823
824void
825Observer::addToChannelHistory(const std::pair<IceUtil::Time, ChannelRegistryEntry>& historyEntry,
826 const std::string& channelName)
827{
828 if (impl->maxHistorySize == 0)
829 {
830 return;
831 }
832 std::unique_lock lockHistory(impl->historyMutex);
833 auto historyIt = impl->channelHistory.find(channelName);
834 if (historyIt == impl->channelHistory.end())
835 {
836 impl->channelHistory[channelName].set_capacity(impl->maxHistorySize);
837 historyIt = impl->channelHistory.find(channelName);
838 }
839 auto now = TimeUtil::GetTime();
840 if (historyIt->second.empty() ||
841 now > historyIt->second.rbegin()->first +
842 IceUtil::Time::secondsDouble(1.0 / impl->maxHistoryRecordFrequency))
843 {
844 historyIt->second.push_back(historyEntry);
845 }
846}
847
848void
849Observer::doChannelUpdate(const std::string& channelName,
850 const std::set<std::string>& updatedDatafields)
851{
852
853
854 try
855 {
856 ChannelRegistryEntry entry;
857 std::pair<IceUtil::Time, ChannelRegistryEntry> historyEntry;
858 {
859 std::unique_lock lock_channels(channelsMutex);
860 // check if channels exists
861 auto iterChannel = impl->channelRegistry.find(channelName);
862
863 if (iterChannel == impl->channelRegistry.end())
864 {
865 throw exceptions::user::InvalidChannelException(channelName);
866 }
867
868
869 if (impl->logChannelUpdateRate)
870 {
871 updateRefreshRateChannel(channelName);
872 }
873
874 // update initialized state
875 const DataFieldRegistry& dataFields = iterChannel->second.dataFields;
876 DataFieldRegistry::const_iterator iterDataFields = dataFields.begin();
877 bool channelInitialized = true;
878
879 while (iterDataFields != dataFields.end())
880 {
881 channelInitialized &= iterDataFields->second.value->getInitialized();
882 iterDataFields++;
883 }
884
885 iterChannel->second.initialized = channelInitialized;
886 if (iterChannel->second.conditionChecks.size() > 0)
887 {
888 entry = iterChannel->second;
889 }
890 historyEntry = std::make_pair(TimeUtil::GetTime(), iterChannel->second);
891 }
892 addToChannelHistory(historyEntry, channelName);
893 // evaluate checks
894 ConditionCheckRegistry::iterator iterChecks = entry.conditionChecks.begin();
895
896 while (iterChecks != entry.conditionChecks.end())
897 {
898 bool found = false;
899 if (updatedDatafields.size() > 0)
900 {
901 // check if this check belongs to an updated datafield.
902 for (const auto& elem : entry.dataFields)
903 {
904 const DataFieldRegistryEntry& datafieldEntry = elem.second;
905 if (updatedDatafields.count(datafieldEntry.identifier->datafieldName))
906 {
907 found = true;
908 break;
909 }
910 }
911 }
912 else
913 {
914 found = true;
915 }
916
917 try
918 {
919 if (found)
920 {
921 evaluateCheck(ConditionCheckPtr::dynamicCast(iterChecks->second), entry);
922 }
923 }
924 catch (...)
925 {
926 ARMARX_ERROR << "Evaluating condition for channel " << channelName << " failed!";
928 }
929
930 iterChecks++;
931 }
932 }
933 catch (...)
934 {
935 // ARMARX_ERROR << "Upating channel " << channelName << " failed!";
936 // handleExceptions();
937 }
938}
939
940// *******************************************************
941// Component hooks
942// *******************************************************
943void
944Observer::onInitComponent()
945{
946 impl->maxHistorySize = getProperty<int>("MaxHistorySize");
947 impl->maxHistoryRecordFrequency = getProperty<float>("MaxHistoryRecordFrequency");
948
949 {
950 std::unique_lock lock(impl->idMutex);
951 impl->currentId = 0;
952 }
953
955 impl->filterUpdateTask =
956 new RunningTask<Observer>(this, &Observer::updateFilters, "Filter update task");
957 impl->filterUpdateTask->start();
958 impl->channelUpdateTask =
959 new RunningTask<Observer>(this, &Observer::channelUpdateFunction, "Channel update task");
960 impl->channelUpdateTask->start();
961}
962
963void
964Observer::onConnectComponent()
965{
966 impl->logChannelUpdateRate = getProperty<bool>("CreateUpdateFrequenciesChannel").getValue();
967 if (impl->logChannelUpdateRate && !existsChannel(LAST_REFRESH_DELTA_CHANNEL))
968 {
971 "Metachannel with the last channel update deltas of all channels in milliseconds");
972 }
973 impl->channelHistory.clear();
974 // subclass init
976 // offerDataFieldWithDefault(channelName, LAST_REFRESH_DELTA_CHANNEL, 0.0, );
977 // impl->channelUpdateTimestamps[channelName] = IceUtil::Time::now();
978
979 std::unique_lock lock(channelsMutex);
980 auto proxy = ObserverInterfacePrx::checkedCast(getProxy());
981 // update the proxy in all existing datafield refs
982 for (auto& channel : impl->channelRegistry)
983 {
984 ChannelRegistryEntry& c = channel.second;
985 for (auto& df : c.dataFields)
986 {
987 DataFieldRegistryEntry& d = df.second;
988 d.identifier->channelRef->observerProxy = proxy;
989 }
990 }
991 impl->metaTask = new PeriodicTask<Observer>(this, &Observer::metaUpdateTask, 50);
992 if (impl->logChannelUpdateRate)
993 {
994 impl->metaTask->start();
995 }
996}
997
998void
999Observer::postOnConnectComponent()
1000{
1001 ARMARX_INFO << "starting worker thread";
1002 impl->stopWorker = false;
1003 ARMARX_CHECK_EXPRESSION(!impl->worker.joinable());
1004 impl->worker = std::thread{[&] { runWorker(); }};
1005}
1006
1007void
1008Observer::preOnDisconnectComponent()
1009{
1010 ARMARX_INFO << "stopping worker thread";
1011 impl->stopWorker = true;
1012 if (impl->worker.joinable())
1013 {
1014 impl->worker.join();
1015 }
1016}
1017
1018void
1019Observer::onExitComponent()
1020{
1022 if (impl->metaTask)
1023 {
1024 impl->metaTask->stop();
1025 }
1026 if (impl->filterUpdateTask)
1027 {
1028 impl->filterUpdateTask->stop(false);
1029 }
1030 impl->idleCondition.notify_all();
1031 if (impl->filterUpdateTask)
1032 {
1033 impl->filterUpdateTask->stop(true);
1034 }
1035 if (impl->channelUpdateTask)
1036 {
1037 impl->channelUpdateTask->stop(false);
1038 }
1039 impl->idleChannelCondition.notify_all();
1040 if (impl->channelUpdateTask)
1041 {
1042 impl->channelUpdateTask->stop(true);
1043 }
1044 impl->channelHistory.clear();
1045}
1046
1048Observer::createPropertyDefinitions()
1049{
1050 return PropertyDefinitionsPtr(new ObserverPropertyDefinitions(getConfigIdentifier()));
1051}
1052
1053void
1058
1059// *******************************************************
1060// private methods
1061// *******************************************************
1062int
1064{
1065 std::unique_lock lock(impl->idMutex);
1066
1067 return impl->currentId++;
1068}
1069
1071Observer::createCheck(const CheckConfiguration& configuration) const
1072{
1073 std::string checkName = configuration.checkName;
1074
1075 // create check from elementary condition
1076 StringConditionCheckMap::const_iterator iterChecks = impl->availableChecks.find(checkName);
1077
1078 if (iterChecks == impl->availableChecks.end())
1079 {
1080 std::string reason =
1081 "Invalid condition check \"" + checkName + "\" for observer \"" + getName() + "\".";
1082 throw InvalidConditionException(reason.c_str());
1083 }
1084
1085 ARMARX_CHECK_EXPRESSION(iterChecks->second);
1086 ConditionCheckPtr check = ConditionCheckPtr::dynamicCast(iterChecks->second)
1087 ->createInstance(configuration, impl->channelRegistry);
1088
1089 return check;
1090}
1091
1092CheckIdentifier
1093Observer::registerCheck(const ConditionCheckPtr& check)
1094{
1096 ARMARX_CHECK_EXPRESSION(check->configuration.dataFieldIdentifier);
1097 // create identifier
1098 int id = generateId();
1099 CheckIdentifier identifier;
1100 identifier.uniqueId = id;
1101 identifier.channelName = check->configuration.dataFieldIdentifier->channelName;
1102 identifier.observerName = getName();
1103
1104 // add to conditions list
1105 std::pair<int, ConditionCheckPtr> entry;
1106 entry.first = id;
1107 entry.second = check;
1108 impl->channelRegistry[check->configuration.dataFieldIdentifier->channelName]
1109 .conditionChecks.insert(entry);
1110
1111 return identifier;
1112}
1113
1114void
1115Observer::evaluateCheck(const ConditionCheckPtr& check, const ChannelRegistryEntry& channel) const
1116{
1117 check->evaluateCondition(channel.dataFields);
1118}
1119
1120// ////////////////////////////////////////////////////////////////////////// //
1121void
1123{
1124 ARMARX_INFO << "observer worker->start";
1126 {
1127 ARMARX_INFO << "observer worker->stop";
1128 };
1129 std::deque<Impl::WorkerUpdate> toProcess;
1130 while (!impl->stopWorker)
1131 {
1132 preWorkerJobs();
1134 {
1136 };
1137 {
1138 std::lock_guard g{impl->workerUpdatesMutex};
1139 std::swap(impl->workerUpdates, toProcess);
1140 }
1141 if (toProcess.empty())
1142 {
1144 << "no worker jobs (message only posted every 10 seconds";
1145 std::this_thread::sleep_for(std::chrono::milliseconds{1});
1146 continue;
1147 }
1148
1149 float minage = std::numeric_limits<float>::infinity();
1150 float maxage = 0;
1151
1152 struct Counters
1153 {
1154 float age = 0;
1155 float time = 0;
1156 std::size_t n = 0;
1157 };
1158
1159 std::map<std::string, Counters> counters;
1160 {
1161 std::lock_guard lock(channelsMutex);
1162 for (auto& f : toProcess)
1163 {
1165 float age = f.ageInMs();
1166 Counters& counter = counters[f.name];
1167 counter.age += age;
1168 ++counter.n;
1169 TimepointT start = Now();
1170 f.f();
1171 counter.time += TimeDeltaInMs(start);
1172 minage = std::min(minage, age);
1173 maxage = std::max(maxage, age);
1174 }
1175 }
1176 if (maxage <= 0)
1177 {
1178 minage = 0;
1179 }
1180 ARMARX_DEBUG << "observer worker got " << toProcess.size() << "\tjobs ages min/max "
1181 << minage << "\t" << maxage << "\n"
1183 {
1184 for (const auto& [key, c] : counters)
1185 {
1186 out << " " << key << "\t# " << c.n << "\tavg age " << (c.age / c.n)
1187 << "\tavg time " << (c.time / c.n) << "\n";
1188 }
1189 };
1190 toProcess.clear();
1191 }
1192}
1193
1194void
1195Observer::addWorkerJob(const std::string& name, std::function<void(void)>&& f) const
1196{
1197 std::lock_guard g{impl->workerUpdatesMutex};
1198 impl->workerUpdates.emplace_back(name, std::move(f));
1199}
1200
1201void
1202Observer::addWorkerJob(const std::string& name, std::function<void(void)>&& f)
1203{
1204 std::lock_guard g{impl->workerUpdatesMutex};
1205 impl->workerUpdates.emplace_back(name, std::move(f));
1206}
1207
1208void
1212
1213void
1217
1220{
1221 return ClockT::now();
1222}
1223
1224float
1226{
1227 return std::chrono::duration_cast<std::chrono::nanoseconds>(Now() - t0).count() * 1e-6f;
1228}
1229
1230// ////////////////////////////////////////////////////////////////////////// //
1231
1232std::string
1234{
1235 return getName();
1236}
1237
1238void
1239Observer::getObserverName_async(const AMD_ObserverInterface_getObserverNamePtr& amd,
1240 const Ice::Current&) const
1241{
1242 addWorkerJob("Observer::getObserverName", amd, &Observer::getObserverName);
1243}
1244
1245// //////////////////
1246CheckIdentifier
1247Observer::installCheck(const CheckConfiguration& configuration)
1248{
1249 // create condition check
1250 ConditionCheckPtr check;
1251 {
1252 std::unique_lock lock_checks(impl->checksMutex);
1253 std::unique_lock lock_channels(channelsMutex);
1254 check = createCheck(configuration);
1255 }
1257 // insert into registry
1258 CheckIdentifier identifier;
1259 {
1260 std::unique_lock lock_conditions(channelsMutex);
1261 identifier = registerCheck(check);
1262 }
1263
1264 ARMARX_DEBUG << "Installed check " << identifier.uniqueId << flush;
1265
1266 // perform initial check
1267 {
1268 std::unique_lock lock_channels(channelsMutex);
1269 ARMARX_CHECK_EXPRESSION(configuration.dataFieldIdentifier);
1270 // retrieve channel
1271 ChannelRegistryEntry channel =
1272 impl->channelRegistry[configuration.dataFieldIdentifier->channelName];
1273
1274 // evaluate
1275 evaluateCheck(check, channel);
1276 }
1277
1278 return identifier;
1279}
1280
1281void
1282Observer::installCheck_async(const AMD_ObserverInterface_installCheckPtr& amd,
1283 const CheckConfiguration& configuration,
1284 const Ice::Current&)
1285{
1286 addWorkerJob("Observer::installCheck", amd, &Observer::installCheck, configuration);
1287}
1288
1289// //////////////////
1290void
1291Observer::removeCheck(const CheckIdentifier& id)
1292{
1293 std::unique_lock lock(channelsMutex);
1294
1295 // find channel
1296 auto iter = impl->channelRegistry.find(id.channelName);
1297
1298 if (iter == impl->channelRegistry.end())
1299 {
1300 return;
1301 }
1302
1303 // find condition
1304 ConditionCheckRegistry::iterator iterCheck = iter->second.conditionChecks.find(id.uniqueId);
1305
1306 // remove condition
1307 if (iterCheck != iter->second.conditionChecks.end())
1308 {
1309 iter->second.conditionChecks.erase(iterCheck);
1310 }
1311
1312 ARMARX_DEBUG << "Removed check " << id.uniqueId << flush;
1313}
1314
1315void
1316Observer::removeCheck_async(const AMD_ObserverInterface_removeCheckPtr& amd,
1317 const CheckIdentifier& id,
1318 const Ice::Current&)
1319{
1320 addWorkerJob("Observer::removeCheck", amd, &Observer::removeCheck, id);
1321}
1322
1323// //////////////////
1324TimedVariantBasePtr
1325Observer::getDataField(const DataFieldIdentifierBasePtr& identifier, const Ice::Current& c) const
1326{
1327 //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1328 return getDatafieldByName(identifier->channelName, identifier->datafieldName);
1329}
1330
1331void
1332Observer::getDataField_async(const AMD_ObserverInterface_getDataFieldPtr& amd,
1333 const DataFieldIdentifierBasePtr& identifier,
1334 const Ice::Current& c) const
1335{
1336 addWorkerJob("Observer::getDataField", amd, &Observer::getDataField, identifier, c);
1337}
1338
1339// //////////////////
1340TimedVariantBasePtr
1341Observer::getDatafieldByName(const std::string& channelName, const std::string& datafieldName) const
1342{
1343 //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1344 std::unique_lock lock(channelsMutex);
1345
1346 auto it = impl->channelRegistry.find(channelName);
1347 if (it == impl->channelRegistry.end())
1348 {
1350 }
1351
1352 auto itDF = it->second.dataFields.find(datafieldName);
1353 if (itDF == it->second.dataFields.end())
1354 {
1355 throw exceptions::user::InvalidDataFieldException(channelName, datafieldName);
1356 }
1357
1358 // VariantPtr var = VariantPtr::dynamicCast(impl->channelRegistry[identifier->channelName].dataFields[identifier->datafieldName].value);
1359 TimedVariantPtr tv = TimedVariantPtr::dynamicCast(itDF->second.value);
1360 if (!tv)
1361 {
1362 ARMARX_IMPORTANT << "could not cast timed variant: " << itDF->second.value->ice_id();
1363 }
1364 return tv;
1365}
1366
1367void
1368Observer::getDatafieldByName_async(const AMD_ObserverInterface_getDatafieldByNamePtr& amd,
1369 const std::string& channelName,
1370 const std::string& datafieldName,
1371 const Ice::Current&) const
1372{
1373 addWorkerJob("Observer::getDatafieldByName",
1374 amd,
1376 channelName,
1377 datafieldName);
1378}
1379
1380// //////////////////
1381DatafieldRefBasePtr
1383{
1384 //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1385 std::unique_lock lock(channelsMutex);
1386 auto it = impl->channelRegistry.find(identifier->channelName);
1387 if (it == impl->channelRegistry.end())
1388 {
1389 throw exceptions::user::InvalidChannelException(identifier->channelName);
1390 }
1391
1392 auto itDF = it->second.dataFields.find(identifier->datafieldName);
1393 if (itDF == it->second.dataFields.end())
1394 {
1395 throw exceptions::user::InvalidDataFieldException(identifier->channelName,
1396 identifier->datafieldName);
1397 }
1398 // itDF->second.identifier->channelRef->observerProxy = ObserverInterfacePrx::uncheckedCast(getProxy());
1399 return DatafieldRefPtr::dynamicCast(itDF->second.identifier);
1400}
1401
1402void
1403Observer::getDataFieldRef_async(const AMD_ObserverInterface_getDataFieldRefPtr& amd,
1404 const DataFieldIdentifierBasePtr& identifier,
1405 const Ice::Current&) const
1406{
1407 addWorkerJob("Observer::getDataFieldRef", amd, &Observer::getDataFieldRef, identifier);
1408}
1409
1410// //////////////////
1411DatafieldRefBasePtr
1412Observer::getDatafieldRefByName(const std::string& channelName,
1413 const std::string& datafieldName) const
1414{
1415 return getDataFieldRef(new DataFieldIdentifier(getName(), channelName, datafieldName));
1416}
1417
1418void
1419Observer::getDatafieldRefByName_async(const AMD_ObserverInterface_getDatafieldRefByNamePtr& amd,
1420 const std::string& channelName,
1421 const std::string& datafieldName,
1422 const Ice::Current&) const
1423{
1424 addWorkerJob("Observer::getDatafieldRefByName",
1425 amd,
1427 channelName,
1428 datafieldName);
1429}
1430
1431// //////////////////
1432TimedVariantBaseList
1434{
1435 TimedVariantBaseList result;
1436
1437 DataFieldIdentifierBaseList::const_iterator iter = identifiers.begin();
1438
1439 std::unique_lock lock(channelsMutex);
1440 while (iter != identifiers.end())
1441 {
1442 result.push_back(getDataField(*iter, c));
1443 iter++;
1444 }
1445
1446 return result;
1447}
1448
1449void
1450Observer::getDataFields_async(const AMD_ObserverInterface_getDataFieldsPtr& amd,
1452 const Ice::Current& c)
1453{
1454 addWorkerJob("Observer::getDataFields", amd, &Observer::getDataFields, identifiers, c);
1455}
1456
1457// //////////////////
1458StringTimedVariantBaseMap
1459Observer::getDatafieldsOfChannel(const std::string& channelName) const
1460{
1461 //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1462 StringTimedVariantBaseMap result;
1463 std::unique_lock lock(channelsMutex);
1464 auto it = impl->channelRegistry.find(channelName);
1465 if (it == impl->channelRegistry.end())
1466 {
1468 }
1469 DataFieldRegistry fields = it->second.dataFields;
1470 for (auto& field : fields)
1471 {
1472 result[field.first] = TimedVariantBasePtr::dynamicCast(field.second.value);
1473 }
1474 return result;
1475}
1476
1477void
1478Observer::getDatafieldsOfChannel_async(const AMD_ObserverInterface_getDatafieldsOfChannelPtr& amd,
1479 const std::string& channelName,
1480 const Ice::Current&) const
1481{
1483 "Observer::getDatafieldsOfChannel", amd, &Observer::getDatafieldsOfChannel, channelName);
1484}
1485
1486// //////////////////
1487ChannelRegistryEntry
1488Observer::getChannel(const std::string& channelName) const
1489{
1490 //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1491 std::unique_lock lock(channelsMutex);
1492 auto it = impl->channelRegistry.find(channelName);
1493 if (it == impl->channelRegistry.end())
1494 {
1496 }
1497 return it->second;
1498}
1499
1500void
1501Observer::getChannel_async(const AMD_ObserverInterface_getChannelPtr& amd,
1502 const std::string& channelName,
1503 const Ice::Current&) const
1504{
1505 addWorkerJob("Observer::getChannel", amd, &Observer::getChannel, channelName);
1506}
1507
1508// //////////////////
1509ChannelRegistry
1510Observer::getAvailableChannels(bool includeMetaChannels)
1511{
1512 //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1513 std::unique_lock lock(channelsMutex);
1514
1515 ChannelRegistry result(impl->channelRegistry.begin(), impl->channelRegistry.end());
1516 if (!includeMetaChannels)
1517 {
1518 result.erase(LAST_REFRESH_DELTA_CHANNEL);
1519 }
1520 return result;
1521}
1522
1523void
1524Observer::getAvailableChannels_async(const AMD_ObserverInterface_getAvailableChannelsPtr& amd,
1525 bool includeMetaChannels,
1526 const Ice::Current&)
1527{
1528 addWorkerJob("Observer::getAvailableChannels",
1529 amd,
1531 includeMetaChannels);
1532}
1533
1534// //////////////////
1535StringConditionCheckMap
1537{
1538 std::unique_lock lock(impl->checksMutex);
1539
1540 return impl->availableChecks;
1541}
1542
1543void
1544Observer::getAvailableChecks_async(const AMD_ObserverInterface_getAvailableChecksPtr& amd,
1545 const Ice::Current&)
1546{
1547 addWorkerJob("Observer::getAvailableChecks", amd, &Observer::getAvailableChecks);
1548}
1549
1550// //////////////////
1551bool
1552Observer::existsChannel(const std::string& channelName) const
1553{
1554 //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1555 std::unique_lock lock(channelsMutex);
1556 return impl->channelRegistry.find(channelName) != impl->channelRegistry.end();
1557}
1558
1559void
1560Observer::existsChannel_async(const AMD_ObserverInterface_existsChannelPtr& amd,
1561 const std::string& channelName,
1562 const Ice::Current&) const
1563{
1564 addWorkerJob("Observer::existsChannel", amd, &Observer::existsChannel, channelName);
1565}
1566
1567// //////////////////
1568bool
1569Observer::existsDataField(const std::string& channelName, const std::string& datafieldName) const
1570{
1571 //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1572 std::unique_lock lock(channelsMutex);
1573 auto itChannel = impl->channelRegistry.find(channelName);
1574
1575 if (itChannel == impl->channelRegistry.end())
1576 {
1577 return false;
1578 }
1579
1580 return itChannel->second.dataFields.find(datafieldName) != itChannel->second.dataFields.end();
1581}
1582
1583void
1584Observer::existsDataField_async(const AMD_ObserverInterface_existsDataFieldPtr& amd,
1585 const std::string& channelName,
1586 const std::string& datafieldName,
1587 const Ice::Current&) const
1588{
1590 "Observer::existsDataField", amd, &Observer::existsDataField, channelName, datafieldName);
1591}
1592
1593// //////////////////
1594DatafieldRefBasePtr
1595Observer::createFilteredDatafield(const DatafieldFilterBasePtr& filter,
1596 const DatafieldRefBasePtr& datafieldRef)
1597{
1598 //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1599 // if( auto it = impl->orignalToFiltered.find(datafieldRef) != impl->orignalToFiltered.end())
1600 // {
1601 // return it->second.filtered;
1602 // }
1603 ARMARX_CHECK_EXPRESSION(datafieldRef);
1604
1605
1606 std::string filteredName = datafieldRef->datafieldName + "_" + filter->ice_id();
1607
1608
1609 int i = 1;
1610
1611 while (existsDataField(datafieldRef->channelRef->channelName, filteredName))
1612 {
1613 // ARMARX_IMPORTANT << "Checking if datafield " << filteredName << " exists";
1614 filteredName =
1615 datafieldRef->datafieldName + "_" + filter->ice_id() + "_" + ValueToString(i);
1616 i++;
1617 }
1618
1619 return createNamedFilteredDatafield(filteredName, filter, datafieldRef);
1620}
1621
1622void
1623Observer::createFilteredDatafield_async(const AMD_ObserverInterface_createFilteredDatafieldPtr& amd,
1624 const DatafieldFilterBasePtr& filter,
1625 const DatafieldRefBasePtr& datafieldRef,
1626 const Ice::Current&)
1627{
1628 addWorkerJob("Observer::createFilteredDatafield",
1629 amd,
1631 filter,
1632 datafieldRef);
1633}
1634
1635// //////////////////
1636DatafieldRefBasePtr
1637Observer::createNamedFilteredDatafield(const std::string& filterDatafieldName,
1638 const DatafieldFilterBasePtr& filter,
1639 const DatafieldRefBasePtr& datafieldRef)
1640{
1641 //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1642 DatafieldRefPtr ref = DatafieldRefPtr::dynamicCast(datafieldRef);
1645 if (!filter->checkTypeSupport(ref->getDataField()->getType()))
1646 {
1647 auto types = filter->getSupportedTypes();
1648 std::string suppTypes = "supported types";
1649
1650 for (auto t : types)
1651 {
1652 suppTypes += Variant::typeToString(t) + ", ";
1653 }
1654
1655 ARMARX_WARNING << suppTypes;
1656 throw exceptions::user::UnsupportedTypeException(ref->getDataField()->getType());
1657 }
1658
1659 // if filter with that name exists -> remove it
1660 if (existsDataField(datafieldRef->channelRef->channelName, filterDatafieldName))
1661 {
1662 DatafieldRefPtr filteredRef = new DatafieldRef(ref->getChannelRef(), filterDatafieldName);
1663 removeFilteredDatafield(filteredRef);
1664 }
1665
1666 // calculate initial value
1667 auto var = ref->getDataField();
1668 filter->update(var->getTime().toMicroSeconds(), var);
1669
1670 // create datafield for new filter
1671 offerDataFieldWithDefault(datafieldRef->channelRef->channelName,
1672 filterDatafieldName,
1673 *VariantPtr::dynamicCast(filter->getValue()),
1674 "Filtered value of " +
1675 ref->getDataFieldIdentifier()->getIdentifierStr());
1676
1677 //create and store the refs for the filters
1678 ChannelRefPtr channel = ChannelRefPtr::dynamicCast(ref->channelRef);
1679 channel->refetchChannel();
1680 DatafieldRefPtr filteredRef = new DatafieldRef(ref->getChannelRef(), filterDatafieldName);
1682 data->filter = filter;
1683 data->original = ref;
1684 data->filtered = filteredRef;
1685 std::unique_lock lock(impl->filterMutex);
1686 impl->orignalToFiltered.insert(
1687 std::make_pair(ref->getDataFieldIdentifier()->getIdentifierStr(), data));
1688 impl->filteredToOriginal[filteredRef->getDataFieldIdentifier()->getIdentifierStr()] = data;
1689 return filteredRef;
1690}
1691
1692void
1694 const AMD_ObserverInterface_createNamedFilteredDatafieldPtr& amd,
1695 const std::string& filterDatafieldName,
1696 const DatafieldFilterBasePtr& filter,
1697 const DatafieldRefBasePtr& datafieldRef,
1698 const Ice::Current&)
1699{
1700 addWorkerJob("Observer::createNamedFilteredDatafield",
1701 amd,
1703 filterDatafieldName,
1704 filter,
1705 datafieldRef);
1706}
1707
1708// //////////////////
1709void
1710Observer::removeFilteredDatafield(const DatafieldRefBasePtr& datafieldRef)
1711{
1712 //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1713 bool remove = true;
1714 DatafieldRefPtr ref;
1715 {
1716 std::unique_lock lock(impl->filterMutex);
1717 ref = DatafieldRefPtr::dynamicCast(datafieldRef);
1718 const std::string idStr = ref->getDataFieldIdentifier()->getIdentifierStr();
1719 auto it = impl->filteredToOriginal.find(idStr);
1720 remove = (it != impl->filteredToOriginal.end());
1721 }
1722
1723 if (remove && ref)
1724 {
1725 removeDatafield(ref->getDataFieldIdentifier());
1726 }
1727}
1728
1729void
1730Observer::removeFilteredDatafield_async(const AMD_ObserverInterface_removeFilteredDatafieldPtr& amd,
1731 const DatafieldRefBasePtr& datafieldRef,
1732 const Ice::Current&)
1733{
1735 "Observer::removeFilteredDatafield", amd, &Observer::removeFilteredDatafield, datafieldRef);
1736}
1737
1738// //////////////////
1739ChannelHistory
1740Observer::getChannelHistory(const std::string& channelName,
1741 Ice::Float timestepMs,
1742 const Ice::Current& c) const
1743{
1744 //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1746 channelName, 0, std::numeric_limits<long>::max(), timestepMs, c);
1747}
1748
1749void
1750Observer::getChannelHistory_async(const AMD_ObserverInterface_getChannelHistoryPtr& amd,
1751 const std::string& channelName,
1752 Ice::Float timestepMs,
1753 const Ice::Current& c) const
1754{
1755 addWorkerJob("Observer::getChannelHistory",
1756 amd,
1758 channelName,
1759 timestepMs,
1760 c);
1761}
1762
1763// //////////////////
1764ChannelHistory
1765Observer::getPartialChannelHistory(const std::string& channelName,
1766 Ice::Long startTimestamp,
1767 Ice::Long endTimestamp,
1768 Ice::Float timestepMs,
1769 const Ice::Current& c) const
1770{
1771 //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1772 ARMARX_IMPORTANT << "Waiting for mutex";
1773 std::unique_lock lock_channels(impl->historyMutex);
1774 ARMARX_IMPORTANT << "GOT mutex";
1775 ChannelHistory result;
1776 auto historyIt = impl->channelHistory.find(channelName);
1777 if (historyIt == impl->channelHistory.end())
1778 {
1779 return result;
1780 }
1781 ARMARX_IMPORTANT << "found channel";
1782
1783 auto lastInsertIt = result.begin();
1784 Ice::Long lastUsedTimestep = 0;
1785 float timestepUs = timestepMs * 1000;
1786 for (auto& entry : historyIt->second)
1787 {
1788 long timestamp = entry.first.toMicroSeconds();
1789 if (timestamp > endTimestamp)
1790 {
1791 break;
1792 }
1793
1794 if (timestamp > startTimestamp && timestamp - lastUsedTimestep > timestepUs)
1795 {
1796 lastInsertIt = result.emplace_hint(result.end(),
1797 std::make_pair(timestamp, entry.second.dataFields));
1798 lastUsedTimestep = timestamp;
1799 }
1800 }
1801 return result;
1802}
1803
1804void
1806 const AMD_ObserverInterface_getPartialChannelHistoryPtr& amd,
1807 const std::string& channelName,
1808 Ice::Long startTimestamp,
1809 Ice::Long endTimestamp,
1810 Ice::Float timestepMs,
1811 const Ice::Current& c) const
1812{
1813 addWorkerJob("Observer::getPartialChannelHistory",
1814 amd,
1816 channelName,
1817 startTimestamp,
1818 endTimestamp,
1819 timestepMs,
1820 c);
1821}
1822
1823// //////////////////
1824TimedVariantBaseList
1825Observer::getDatafieldHistory(const std::string& channelName,
1826 const std::string& datafieldName,
1827 Ice::Float timestepMs,
1828 const Ice::Current& c) const
1829{
1830 //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1832 channelName, datafieldName, 0, std::numeric_limits<long>::max(), timestepMs, c);
1833}
1834
1835void
1836Observer::getDatafieldHistory_async(const AMD_ObserverInterface_getDatafieldHistoryPtr& amd,
1837 const std::string& channelName,
1838 const std::string& datafieldName,
1839 Ice::Float timestepMs,
1840 const Ice::Current& c) const
1841{
1842 addWorkerJob("Observer::getDatafieldHistory",
1843 amd,
1845 channelName,
1846 datafieldName,
1847 timestepMs,
1848 c);
1849}
1850
1851// //////////////////
1852TimedVariantBaseList
1853Observer::getPartialDatafieldHistory(const std::string& channelName,
1854 const std::string& datafieldName,
1855 Ice::Long startTimestamp,
1856 Ice::Long endTimestamp,
1857 Ice::Float timestepMs,
1858 const Ice::Current& c) const
1859{
1860 //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1861 std::unique_lock lock_channels(impl->historyMutex);
1862 TimedVariantBaseList result;
1863 auto historyIt = impl->channelHistory.find(channelName);
1864 if (historyIt == impl->channelHistory.end())
1865 {
1866 return result;
1867 }
1868 if (!existsDataField(channelName, datafieldName))
1869 {
1870 return result;
1871 }
1872
1873 Ice::Long lastUsedTimestep = 0;
1874 float timestepUs = timestepMs * 1000;
1875 for (auto& entry : historyIt->second)
1876 {
1877 long timestamp = entry.first.toMicroSeconds();
1878 if (timestamp > endTimestamp)
1879 {
1880 break;
1881 }
1882
1883 if (timestamp > startTimestamp && timestamp - lastUsedTimestep > timestepUs)
1884 {
1885 auto datafieldIt = entry.second.dataFields.find(datafieldName);
1886 if (datafieldIt == entry.second.dataFields.end())
1887 {
1888 continue;
1889 }
1890 else
1891 {
1892 TimedVariantPtr tvar = TimedVariantPtr::dynamicCast(datafieldIt->second.value);
1893 if (tvar)
1894 {
1895 result.emplace_back(tvar);
1896 lastUsedTimestep = timestamp;
1897 }
1898 }
1899 }
1900 }
1901 return result;
1902}
1903
1904void
1906 const AMD_ObserverInterface_getPartialDatafieldHistoryPtr& amd,
1907 const std::string& channelName,
1908 const std::string& datafieldName,
1909 Ice::Long startTimestamp,
1910 Ice::Long endTimestamp,
1911 Ice::Float timestepMs,
1912 const Ice::Current& c) const
1913{
1914 addWorkerJob("Observer::getPartialDatafieldHistory",
1915 amd,
1917 channelName,
1918 datafieldName,
1919 startTimestamp,
1920 endTimestamp,
1921 timestepMs,
1922 c);
1923}
1924
1925// ////////////////////////////////////////////////////////////////////////// //
1926Observer::Impl::WorkerUpdate::WorkerUpdate(const std::string& name, std::function<void(void)> f) :
1927 name{name}, time{Now()}, f{std::move(f)}
1928{
1929}
1930
1931float
std::string timestamp()
#define ARMARX_STREAM_PRINTER
use this macro to write output code that is executed when printed and thus not executed if the debug ...
Definition Logging.h:310
const std::string LAST_REFRESH_DELTA_CHANNEL
Definition Observer.cpp:45
constexpr T c
std::string getConfigIdentifier()
Retrieve config identifier for this component as set in constructor.
Definition Component.cpp:90
Property< PropertyType > getProperty(const std::string &name)
A ConditionCheck implements a check on the sensor data stream of a Sensor-Actor Unit.
armarx::ChannelRegistry ChannelRegistry
Creates a new ConditionCheck instance.
DataFieldIdentifier provide the basis to identify data field within a distributed ArmarX scenario.
The DatafieldRef class is similar to the ChannelRef, but points to a specific Datafield instead of to...
SpamFilterDataPtr deactivateSpam(float deactivationDurationSec=10.0f, const std::string &identifier="", bool deactivate=true) const
disables the logging for the current line for the given amount of seconds.
Definition Logging.cpp:99
std::string getName() const
Retrieve name of object.
Ice::ObjectPrx getProxy(long timeoutMs=0, bool waitForScheduler=true) const
Returns the proxy of this object (optionally it waits for the proxy)
int getState() const
Retrieve current state of the ManagedIceObject.
void removeFilteredDatafield(const DatafieldRefBasePtr &datafieldRef)
Removes a previously installed filter.
DatafieldRefBasePtr createFilteredDatafield(const DatafieldFilterBasePtr &filter, const DatafieldRefBasePtr &datafieldRef)
This function creates a new datafield with new filter on the given datafield.
std::string getObserverName() const
void getAvailableChecks_async(const AMD_ObserverInterface_getAvailableChecksPtr &amd, const Ice::Current &) override
bool existsChannel(const std::string &channelName) const
void offerChannel(std::string channelName, std::string description)
Offer a channel.
Definition Observer.cpp:131
void removeCheck_async(const AMD_ObserverInterface_removeCheckPtr &amd, const CheckIdentifier &id, const Ice::Current &) override
void existsChannel_async(const AMD_ObserverInterface_existsChannelPtr &amd, const std::string &channelName, const Ice::Current &) const override
void getDatafieldHistory_async(const AMD_ObserverInterface_getDatafieldHistoryPtr &amd, const std::string &channelName, const std::string &datafieldName, Ice::Float timestepMs, const Ice::Current &c) const override
virtual void postWorkerJobs()
void getPartialDatafieldHistory_async(const AMD_ObserverInterface_getPartialDatafieldHistoryPtr &amd, const std::string &channelName, const std::string &datafieldName, Ice::Long startTimestamp, Ice::Long endTimestamp, Ice::Float timestepMs, const Ice::Current &c) const override
void removeDatafield(DataFieldIdentifierBasePtr id)
Definition Observer.cpp:344
void scheduleDatafieldFilterUpdate(const std::string &channelName, const std::string &datafieldName, const VariantBasePtr &value)
Definition Observer.cpp:437
typename ClockT::time_point TimepointT
Definition Observer.h:593
void getDataFieldRef_async(const AMD_ObserverInterface_getDataFieldRefPtr &amd, const DataFieldIdentifierBasePtr &identifier, const Ice::Current &) const override
StringConditionCheckMap getAvailableChecks()
Retrieve list of available condition checks.
void removeChannel(std::string channelName)
Remove a channel.
Definition Observer.cpp:318
void removeCheck(const CheckIdentifier &id)
Removes a condition check from the observer.
static TimepointT Now()
void getChannelHistory_async(const AMD_ObserverInterface_getChannelHistoryPtr &amd, const std::string &channelName, Ice::Float timestepMs, const Ice::Current &c) const override
void getAvailableChannels_async(const AMD_ObserverInterface_getAvailableChannelsPtr &amd, bool includeMetaChannels, const Ice::Current &) override
void getDataFields_async(const AMD_ObserverInterface_getDataFieldsPtr &amd, const DataFieldIdentifierBaseList &identifiers, const Ice::Current &c) override
void offerDataField(std::string channelName, std::string datafieldName, VariantTypeId type, std::string description)
Offer a datafield without default value.
Definition Observer.cpp:201
void getDatafieldByName_async(const AMD_ObserverInterface_getDatafieldByNamePtr &amd, const std::string &channelName, const std::string &datafieldName, const Ice::Current &) const override
void removeFilteredDatafield_async(const AMD_ObserverInterface_removeFilteredDatafieldPtr &amd, const DatafieldRefBasePtr &datafieldRef, const Ice::Current &) override
void getChannel_async(const AMD_ObserverInterface_getChannelPtr &amd, const std::string &channelName, const Ice::Current &) const override
void offerConditionCheck(std::string checkName, ConditionCheck *conditionCheck)
Offer a condition check.
Definition Observer.cpp:301
ChannelRegistryEntry getChannel(const std::string &channelName) const
Retrieve information on all sensory data channels available from the observer.
void getDataField_async(const AMD_ObserverInterface_getDataFieldPtr &amd, const DataFieldIdentifierBasePtr &identifier, const Ice::Current &c) const override
void createNamedFilteredDatafield_async(const AMD_ObserverInterface_createNamedFilteredDatafieldPtr &amd, const std::string &filterDatafieldName, const DatafieldFilterBasePtr &filter, const DatafieldRefBasePtr &datafieldRef, const Ice::Current &) override
void getObserverName_async(const AMD_ObserverInterface_getObserverNamePtr &amd, const Ice::Current &) const override
virtual void onInitObserver()=0
Framework hook.
void updateChannel(const std::string &channelName, const std::set< std::string > &updatedDatafields=std::set< std::string >())
Update all conditions for a channel.
Definition Observer.cpp:788
TimedVariantBasePtr getDataField(const DataFieldIdentifierBasePtr &identifier, const Ice::Current &c=Ice::emptyCurrent) const
Retrieve data field from observer.
void addWorkerJob(const std::string &name, std::function< void(void)> &&f) const
void setDataFieldsFlatCopy(const std::string &channelName, const StringVariantBaseMap &datafieldValues, bool triggerFilterUpdate=true)
Definition Observer.cpp:728
TimedVariantBasePtr getDatafieldByName(const std::string &channelName, const std::string &datafieldName) const
ChannelHistory getChannelHistory(const std::string &channelName, Ice::Float timestepMs, const Ice::Current &c) const
std::unique_ptr< Impl > impl
Definition Observer.h:603
void maybeOfferChannelAndSetDataFieldsFlatCopy(const std::string &channelName, const std::string &description, const StringVariantBaseMap &datafieldValues, bool triggerFilterUpdate=true)
Definition Observer.cpp:637
TimedVariantBaseList getDataFields(const DataFieldIdentifierBaseList &identifiers, const Ice::Current &c)
Retrieve list of data field from observer.
void offerDataFieldWithDefault(std::string channelName, std::string datafieldName, const Variant &defaultValue, std::string description)
Offer a datafield with default value.
Definition Observer.cpp:160
void existsDataField_async(const AMD_ObserverInterface_existsDataFieldPtr &amd, const std::string &channelName, const std::string &datafieldName, const Ice::Current &) const override
virtual void preWorkerJobs()
void updateDatafieldTimestamps(const std::string &channelName, const std::unordered_map< std::string, Ice::Long > &datafieldValues)
Definition Observer.cpp:580
static float TimeDeltaInMs(TimepointT t0)
ChannelHistory getPartialChannelHistory(const std::string &channelName, Ice::Long startTimestamp, Ice::Long endTimestamp, Ice::Float timestepMs, const Ice::Current &c) const
virtual void onConnectObserver()=0
Framework hook.
void getDatafieldRefByName_async(const AMD_ObserverInterface_getDatafieldRefByNamePtr &amd, const std::string &channelName, const std::string &datafieldName, const Ice::Current &) const override
bool existsDataField(const std::string &channelName, const std::string &datafieldName) const
TimedVariantBaseList getDatafieldHistory(const std::string &channelName, const std::string &datafieldName, Ice::Float timestepMs, const Ice::Current &c) const
void getDatafieldsOfChannel_async(const AMD_ObserverInterface_getDatafieldsOfChannelPtr &amd, const std::string &channelName, const Ice::Current &) const override
TimedVariantBaseList getPartialDatafieldHistory(const std::string &channelName, const std::string &datafieldName, Ice::Long startTimestamp, Ice::Long endTimestamp, Ice::Float timestepMs, const Ice::Current &c) const
std::recursive_mutex channelsMutex
Definition Observer.h:600
DatafieldRefBasePtr getDatafieldRefByName(const std::string &channelName, const std::string &datafieldName) const
void createFilteredDatafield_async(const AMD_ObserverInterface_createFilteredDatafieldPtr &amd, const DatafieldFilterBasePtr &filter, const DatafieldRefBasePtr &datafieldRef, const Ice::Current &) override
ChannelRegistry getAvailableChannels(bool includeMetaChannels)
Retrieve information on all sensory data channels available from the observer.
void setDataField(const std::string &channelName, const std::string &datafieldName, const Variant &value, bool triggerFilterUpdate=true)
set datafield with datafieldName and in channel channelName
Definition Observer.cpp:508
void setDataFieldFlatCopy(const std::string &channelName, const std::string &datafieldName, const VariantPtr &value, bool triggerFilterUpdate=true)
Definition Observer.cpp:548
DatafieldRefBasePtr createNamedFilteredDatafield(const std::string &filterDatafieldName, const DatafieldFilterBasePtr &filter, const DatafieldRefBasePtr &datafieldRef)
CheckIdentifier installCheck(const CheckConfiguration &configuration)
Installs a condition check with the observer.
DatafieldRefBasePtr getDataFieldRef(const DataFieldIdentifierBasePtr &identifier) const
void getPartialChannelHistory_async(const AMD_ObserverInterface_getPartialChannelHistoryPtr &amd, const std::string &channelName, Ice::Long startTimestamp, Ice::Long endTimestamp, Ice::Float timestepMs, const Ice::Current &c) const override
virtual void onExitObserver()
Framework hook.
Definition Observer.h:463
StringTimedVariantBaseMap getDatafieldsOfChannel(const std::string &channelName) const
void offerOrUpdateDataFieldsFlatCopy(const std::string &channelName, const StringVariantBaseMap &valueMap)
Definition Observer.cpp:260
bool offerOrUpdateDataField(std::string channelName, std::string datafieldName, const Variant &value, const std::string &description)
Definition Observer.cpp:242
void installCheck_async(const AMD_ObserverInterface_installCheckPtr &amd, const CheckConfiguration &configuration, const Ice::Current &) override
std::set< std::string > updateDatafieldFilter(const std::string &channelName, const std::string &datafieldName, const VariantBasePtr &value)
Definition Observer.cpp:386
IceUtil::Handle< PeriodicTask< T > > pointer_type
Shared pointer type for convenience.
IceUtil::Handle< RunningTask< T > > pointer_type
Shared pointer type for convenience.
static IceUtil::Time GetTime(TimeMode timeMode=TimeMode::VirtualTime)
Get the current time.
Definition TimeUtil.cpp:42
The Variant class is described here: Variants.
Definition Variant.h:224
std::string getTypeName(const Ice::Current &c=Ice::emptyCurrent) const override
Return the Variant's internal type.
Definition Variant.cpp:685
static std::string typeToString(VariantTypeId typeId)
Return the name of the registered type typeId.
Definition Variant.cpp:848
#define ARMARX_CHECK_EXPRESSION(expression)
This macro evaluates the expression and if it turns out to be false it will throw an ExpressionExcept...
#define ARMARX_INFO
The normal logging level.
Definition Logging.h:181
#define ARMARX_IMPORTANT
The logging level for always important information, but expected behaviour (in contrast to ARMARX_WAR...
Definition Logging.h:190
#define ARMARX_ERROR
The logging level for unexpected behaviour, that must be fixed.
Definition Logging.h:196
#define ARMARX_DEBUG
The logging level for output that is only interesting while debugging.
Definition Logging.h:184
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
#define ARMARX_ON_SCOPE_EXIT
Executes given code when the enclosing scope is left.
const std::list< std::string > identifiers
This file offers overloads of toIce() and fromIce() functions for STL container types.
std::map< std::string, VariantBasePtr > StringVariantBaseMap
void handleExceptions()
IceInternal::Handle< DataFieldIdentifierBase > DataFieldIdentifierBasePtr
IceInternal::Handle< TimedVariant > TimedVariantPtr
IceInternal::Handle< Variant > VariantPtr
Definition Variant.h:41
IceInternal::Handle< ChannelRef > ChannelRefPtr
Definition ChannelRef.h:40
::std::vector<::armarx::DataFieldIdentifierBasePtr > DataFieldIdentifierBaseList
IceInternal::Handle< ConditionCheck > ConditionCheckPtr
IceInternal::Handle< DatafieldRef > DatafieldRefPtr
Definition Observer.h:43
IceUtil::Handle< class PropertyDefinitionContainer > PropertyDefinitionsPtr
PropertyDefinitions smart pointer type.
Ice::Int VariantTypeId
Definition Variant.h:43
const LogSender::manipulator flush
Definition LogSender.h:251
::IceInternal::Handle<::armarx::VariantBase > VariantBasePtr
IceInternal::Handle< DataFieldIdentifier > DataFieldIdentifierPtr
Typedef of DataFieldIdentifierPtr as IceInternal::Handle<DataFieldIdentifier> for convenience.
std::string ValueToString(const T &value)
DatafieldFilterBasePtr filter
Definition Observer.cpp:72
WorkerUpdate(const std::string &name, std::function< void(void)> f)
std::function< void(void)> f
Definition Observer.cpp:111
PeriodicTask< Observer >::pointer_type metaTask
Definition Observer.cpp:49
RunningTask< Observer >::pointer_type channelUpdateTask
Definition Observer.cpp:92
FilterUpdateQueue filterQueue
Definition Observer.cpp:98
std::condition_variable idleCondition
Definition Observer.cpp:100
std::multimap< std::string, FilterDataPtr > orignalToFiltered
Definition Observer.cpp:79
ChannelRegistryHistory channelHistory
Definition Observer.cpp:62
std::recursive_mutex historyMutex
Definition Observer.cpp:63
std::deque< WorkerUpdate > workerUpdates
Definition Observer.cpp:114
std::recursive_mutex filterMutex
Definition Observer.cpp:81
std::map< std::string, IceUtil::Time > channelUpdateTimestamps
Definition Observer.cpp:82
std::unordered_map< std::string, FilterQueueData > FilterUpdateQueue
Definition Observer.cpp:97
RunningTask< Observer >::pointer_type filterUpdateTask
Definition Observer.cpp:83
ConditionCheck::ChannelRegistry channelRegistry
Definition Observer.cpp:58
StringConditionCheckMap availableChecks
Definition Observer.cpp:54
std::atomic_bool stopWorker
Definition Observer.cpp:116
std::map< std::string, FilterDataPtr > filteredToOriginal
Definition Observer.cpp:80
std::map< std::string, std::set< std::string > > channelQueue
Definition Observer.cpp:93
std::unordered_map< std::string, boost::circular_buffer< std::pair< IceUtil::Time, ChannelRegistryEntry > > > ChannelRegistryHistory
Definition Observer.cpp:59
std::recursive_mutex workerUpdatesMutex
Definition Observer.cpp:102
std::mutex filterQueueMutex
Definition Observer.cpp:99
std::condition_variable idleChannelCondition
Definition Observer.cpp:95
std::mutex channelQueueMutex
Definition Observer.cpp:94
IceUtil::Handle< FilterData > FilterDataPtr
Definition Observer.cpp:77
#define ARMARX_TRACE
Definition trace.h:77