Observer.cpp
Go to the documentation of this file.
1 /*
2 * This file is part of ArmarX.
3 *
4 * Copyright (C) 2011-2016, High Performance Humanoid Technologies (H2T), Karlsruhe Institute of Technology (KIT), all rights reserved.
5 *
6 * ArmarX is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 as
8 * published by the Free Software Foundation.
9 *
10 * ArmarX is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 *
18 * @package ArmarX::Core
19 * @author Kai Welke (welke@kit.edu)
20 * @date 2011
21 * @copyright http://www.gnu.org/licenses/gpl-2.0.txt
22 * GNU General Public License
23 */
24 #include "Observer.h"
25 
26 #include <algorithm>
27 #include <condition_variable>
28 
29 #include <boost/circular_buffer.hpp>
30 
41 
42 using namespace armarx;
43 using namespace armarx::exceptions::local;
44 
45 const std::string LAST_REFRESH_DELTA_CHANNEL = "_LastRefreshDelta";
46 
48 {
50  bool logChannelUpdateRate = false;
51 
52 
53  // available checks and channels
54  StringConditionCheckMap availableChecks;
55  std::mutex checksMutex;
56 
57  // available channels and data fields
60  std::unordered_map<std::string,
61  boost::circular_buffer<std::pair<IceUtil::Time, ChannelRegistryEntry>>>;
63  mutable std::recursive_mutex historyMutex;
66 
67  int currentId;
68  std::mutex idMutex;
69 
70  struct FilterData : IceUtil::Shared
71  {
72  DatafieldFilterBasePtr filter;
75  };
76 
78 
79  std::multimap<std::string, FilterDataPtr> orignalToFiltered;
80  std::map<std::string, FilterDataPtr> filteredToOriginal;
81  std::recursive_mutex filterMutex;
82  std::map<std::string, IceUtil::Time> channelUpdateTimestamps;
84 
86  {
88  std::string channelName;
89  std::string datafieldName;
90  };
91 
93  std::map<std::string, std::set<std::string>> channelQueue;
94  std::mutex channelQueueMutex;
95  std::condition_variable idleChannelCondition;
96 
97  using FilterUpdateQueue = std::unordered_map<std::string, FilterQueueData>;
99  std::mutex filterQueueMutex;
100  std::condition_variable idleCondition;
101 
102  std::recursive_mutex workerUpdatesMutex;
103 
105  {
106  WorkerUpdate(const std::string& name, std::function<void(void)> f);
107  float ageInMs() const;
108 
109  std::string name;
111  std::function<void(void)> f;
112  };
113 
114  mutable std::deque<WorkerUpdate> workerUpdates;
115  std::thread worker;
116  std::atomic_bool stopWorker;
117 };
118 
120 {
121 }
122 
124 {
125 }
126 
127 // *******************************************************
128 // offering of channels, datafield, and checks
129 // *******************************************************
130 void
131 Observer::offerChannel(std::string channelName, std::string description)
132 {
133  if (getState() < eManagedIceObjectInitialized)
134  {
135  throw LocalException()
136  << "offerChannel() must not be called before the Observer is initalized (i.e. not in "
137  "onInitObserver(), use onConnectObserver()";
138  }
139 
140  std::unique_lock lock(channelsMutex);
141 
142  if (impl->channelRegistry.find(channelName) != impl->channelRegistry.end())
143  {
145  }
146 
147  ChannelRegistryEntry channel;
148  channel.name = channelName;
149  channel.description = description;
150  channel.initialized = false;
151 
152  std::pair<std::string, ChannelRegistryEntry> entry;
153  entry.first = channelName;
154  entry.second = channel;
155 
156  impl->channelRegistry.insert(entry);
157 }
158 
159 void
160 Observer::offerDataFieldWithDefault(std::string channelName,
161  std::string datafieldName,
162  const Variant& defaultValue,
163  std::string description)
164 {
165  if (getState() < eManagedIceObjectInitialized)
166  {
167  throw LocalException()
168  << "offerDataFieldWithDefault() must not be called before the Observer is initalized "
169  "(i.e. not in onInitObserver(), use onConnectObserver()";
170  }
171 
172  std::unique_lock lock(channelsMutex);
173 
174  auto channelIt = impl->channelRegistry.find(channelName);
175 
176  if (channelIt == impl->channelRegistry.end())
177  {
179  }
180 
181  if (channelIt->second.dataFields.find(datafieldName) != channelIt->second.dataFields.end())
182  {
183  throw exceptions::user::DatafieldExistsAlreadyException(channelName, datafieldName);
184  }
185 
186  DataFieldRegistryEntry dataField;
187  dataField.identifier = new DatafieldRef(this, channelName, datafieldName, false);
188  dataField.description = description;
189  dataField.typeName = defaultValue.getTypeName();
190  dataField.value = new TimedVariant();
191  *dataField.value = defaultValue;
192 
193  std::pair<std::string, DataFieldRegistryEntry> entry;
194  entry.first = datafieldName;
195  entry.second = dataField;
196 
197  channelIt->second.dataFields.insert(entry);
198 }
199 
200 void
201 Observer::offerDataField(std::string channelName,
202  std::string datafieldName,
203  VariantTypeId type,
204  std::string description)
205 {
206  if (getState() < eManagedIceObjectInitialized)
207  {
208  throw LocalException()
209  << "offerDataField() must not be called before the Observer is initalized (i.e. not in "
210  "onInitObserver(), use onConnectObserver()";
211  }
212 
213  std::unique_lock lock(channelsMutex);
214 
215  auto channelIt = impl->channelRegistry.find(channelName);
216 
217  if (channelIt == impl->channelRegistry.end())
218  {
220  }
221 
222  if (channelIt->second.dataFields.find(datafieldName) != channelIt->second.dataFields.end())
223  {
224  throw exceptions::user::InvalidDataFieldException(channelName, datafieldName);
225  }
226 
227  DataFieldRegistryEntry dataField;
228  dataField.identifier = new DatafieldRef(this, channelName, datafieldName, false);
229  dataField.description = description;
230  dataField.typeName = Variant::typeToString(type);
231  dataField.value = new TimedVariant();
232  dataField.value->setType(type);
233 
234  std::pair<std::string, DataFieldRegistryEntry> entry;
235  entry.first = datafieldName;
236  entry.second = dataField;
237 
238  impl->channelRegistry[channelName].dataFields.insert(entry);
239 }
240 
241 bool
242 Observer::offerOrUpdateDataField(std::string channelName,
243  std::string datafieldName,
244  const Variant& value,
245  const std::string& description)
246 {
247  if (!existsDataField(channelName, datafieldName))
248  {
249  offerDataFieldWithDefault(channelName, datafieldName, value, description);
250  return true;
251  }
252  else
253  {
254  setDataField(channelName, datafieldName, value);
255  return false;
256  }
257 }
258 
259 void
260 Observer::offerOrUpdateDataFieldsFlatCopy(const std::string& channelName,
261  const StringVariantBaseMap& valueMap)
262 {
263  try
264  {
265  // fastest way is to just try to set the datafields
266  maybeOfferChannelAndSetDataFieldsFlatCopy(channelName, "", valueMap, true);
267  }
269  {
270  ARMARX_INFO << deactivateSpam() << "Creating datafields for channel " << channelName;
271  // on failure: do it the slow way
272  ARMARX_DEBUG << deactivateSpam(1) << "failure when seting datafield "
273  << "using slow fallback";
274  for (const auto& value : valueMap)
275  {
276  const std::string& datafieldName = value.first;
277  if (!existsDataField(channelName, datafieldName))
278  {
279  try
280  {
282  channelName, datafieldName, *VariantPtr::dynamicCast(value.second), "");
283  }
285  {
287  channelName, datafieldName, VariantPtr::dynamicCast(value.second));
288  }
289  }
290  else
291  {
293  channelName, datafieldName, VariantPtr::dynamicCast(value.second));
294  }
295  }
296  }
297  updateChannel(channelName);
298 }
299 
300 void
301 Observer::offerConditionCheck(std::string checkName, ConditionCheck* conditionCheck)
302 {
303  std::unique_lock lock(impl->checksMutex);
304 
305  if (impl->availableChecks.find(checkName) != impl->availableChecks.end())
306  {
307  throw InvalidCheckException(checkName);
308  }
309 
310  std::pair<std::string, ConditionCheckPtr> entry;
311  entry.first = checkName;
312  entry.second = conditionCheck;
313 
314  impl->availableChecks.insert(entry);
315 }
316 
317 void
318 Observer::removeChannel(std::string channelName)
319 {
320  {
321  std::unique_lock lock(channelsMutex);
322 
323  auto iter = impl->channelRegistry.find(channelName);
324 
325  if (iter == impl->channelRegistry.end())
326  {
327  return;
328  }
329 
332  removeDatafield(id);
333  impl->channelRegistry.erase(iter);
334  ARMARX_INFO << "Removed channel " << channelName;
335  }
336 
337  {
338  std::unique_lock lock(impl->historyMutex);
339  impl->channelHistory.erase(channelName);
340  }
341 }
342 
343 void
345 {
346  {
347  std::unique_lock lock(impl->filterMutex);
348  // Remove filter
349  DataFieldIdentifierPtr idptr = DataFieldIdentifierPtr::dynamicCast(id);
350  std::string idStr = idptr->getIdentifierStr();
351  auto itFilter = impl->filteredToOriginal.find(idStr);
352 
353  if (itFilter != impl->filteredToOriginal.end())
354  {
355  DatafieldRefPtr refPtr = DatafieldRefPtr::dynamicCast(itFilter->second->original);
356  auto range = impl->orignalToFiltered.equal_range(
357  refPtr->getDataFieldIdentifier()->getIdentifierStr());
358 
359  for (auto it = range.first; it != range.second; it++)
360  if (it->second->filtered->getDataFieldIdentifier()->getIdentifierStr() == idStr)
361  {
362  impl->orignalToFiltered.erase(it);
363  break;
364  }
365 
366  impl->filteredToOriginal.erase(itFilter);
367  }
368  }
369 
370  std::unique_lock lock(channelsMutex);
371 
372  auto itChannel = impl->channelRegistry.find(id->channelName);
373 
374  if (itChannel == impl->channelRegistry.end())
375  {
376  return;
377  }
378 
379  itChannel->second.dataFields.erase(id->datafieldName);
380 }
381 
382 // *******************************************************
383 // utility methods for sensordatalistener
384 // *******************************************************
385 std::set<std::string>
386 Observer::updateDatafieldFilter(const std::string& channelName,
387  const std::string& datafieldName,
388  const VariantBasePtr& value)
389 {
390  std::vector<Impl::FilterQueueData> filterData;
391  std::set<std::string> foundFilterFields;
392  {
393  std::unique_lock lock(impl->filterMutex);
394  const std::string id = getName() + "." + channelName + "." + datafieldName;
395  // DatafieldRefPtr ref = new DatafieldRef(this, channelName, datafieldName);
396  auto range = impl->orignalToFiltered.equal_range(id);
397 
398  //IceUtil::Time start = IceUtil::Time::now();
399  //bool found = false;
400  TimedVariantPtr origTV = TimedVariantPtr::dynamicCast(value);
401  auto t = origTV ? origTV->getTime() : TimeUtil::GetTime();
402  long tLong = t.toMicroSeconds();
403 
404  for (auto it = range.first; it != range.second; it++)
405  {
406  it->second->filter->update(tLong, value);
407 
408  foundFilterFields.insert(it->second->filtered->datafieldName);
409 
410  TimedVariantPtr tv =
411  new TimedVariant(VariantPtr::dynamicCast(it->second->filter->getValue()), t);
412  filterData.emplace_back(
414  it->second->filtered->channelRef->channelName,
415  it->second->filtered->datafieldName});
416  //found = true;
417  }
418  }
419 
420  for (auto& elem : filterData)
421  {
423  elem.channelName, elem.datafieldName, VariantPtr::dynamicCast(elem.value));
424  }
425 
426  /*if(found)
427  {
428  IceUtil::Time end = IceUtil::Time::now();
429  IceUtil::Time duration = end - start;
430  ARMARX_IMPORTANT << deactivateSpam(0.1f) << channelName << ":" << datafieldName << ": all filters calc microseconds: " << duration.toMicroSeconds();
431  }*/
432 
433  return foundFilterFields;
434 }
435 
436 void
437 Observer::scheduleDatafieldFilterUpdate(const std::string& channelName,
438  const std::string& datafieldName,
439  const VariantBasePtr& value)
440 {
441  if (impl->orignalToFiltered.size() == 0)
442  {
443  return; // no filters installed anyway ...nothing todo
444  }
445  const std::string id = getName() + "." + channelName + "." + datafieldName;
446  {
447  std::unique_lock lock(impl->filterMutex);
448  if (impl->orignalToFiltered.count(id) == 0)
449  {
450  return; // no filter for this datafield installed ...nothing todo
451  }
452  }
453  std::unique_lock lock(impl->filterQueueMutex);
454  impl->filterQueue[id] = {value, channelName, datafieldName};
455  impl->idleCondition.notify_all();
456 }
457 
458 void
459 Observer::updateFilters()
460 {
461  while (!impl->filterUpdateTask->isStopped())
462  {
464  {
465  std::unique_lock lock(impl->filterQueueMutex);
466  queue.swap(impl->filterQueue);
467  }
468  std::unordered_map<std::string, std::set<std::string>> channels;
469  for (const Impl::FilterUpdateQueue::value_type& elem : queue)
470  {
471  auto foundFilterFields = updateDatafieldFilter(
472  elem.second.channelName, elem.second.datafieldName, elem.second.value);
473  channels[elem.second.channelName].insert(foundFilterFields.begin(),
474  foundFilterFields.end());
475  }
476  for (const auto& channel : channels)
477  {
478  if (channel.second.size() > 0)
479  {
480  updateChannel(channel.first, channel.second);
481  }
482  }
483  std::unique_lock lock(impl->filterQueueMutex);
484  if (impl->filterQueue.size() == 0)
485  {
486  impl->idleCondition.wait(lock);
487  }
488  }
489 }
490 
491 void
492 Observer::setDataFieldFlatCopy(const DataFieldRegistry::iterator& dataFieldIter,
493  const VariantPtr& value)
494 {
495  ARMARX_CHECK_EXPRESSION(value) << "Datafieldvariant is NULL!";
496  TimedVariantPtr tval = TimedVariantPtr::dynamicCast(value);
497  if (tval)
498  {
499  dataFieldIter->second.value = std::move(tval);
500  }
501  else
502  {
503  dataFieldIter->second.value = new TimedVariant(value, TimeUtil::GetTime());
504  }
505 }
506 
507 void
508 Observer::setDataField(const std::string& channelName,
509  const std::string& datafieldName,
510  const Variant& value,
511  bool triggerFilterUpdate)
512 {
513  VariantBasePtr valuePtr;
514  {
515  std::unique_lock lock(channelsMutex);
516  auto itChannel = impl->channelRegistry.find(channelName);
517 
518  if (itChannel == impl->channelRegistry.end())
519  {
521  }
522 
523  DataFieldRegistry::iterator itDF = itChannel->second.dataFields.find(datafieldName);
524 
525  if (itDF == itChannel->second.dataFields.end())
526  {
527  throw exceptions::user::InvalidDataFieldException(channelName, datafieldName);
528  }
529 
530  if (dynamic_cast<const TimedVariant*>(&value))
531  {
532  itDF->second.value = value.clone();
533  }
534  else
535  {
536  itDF->second.value = new TimedVariant(value, TimeUtil::GetTime());
537  }
538  valuePtr = itDF->second.value;
539  }
540  if (triggerFilterUpdate)
541  {
542  scheduleDatafieldFilterUpdate(channelName, datafieldName, valuePtr);
543  }
544  //*dataFieldValue = value;
545 }
546 
547 void
548 Observer::setDataFieldFlatCopy(const std::string& channelName,
549  const std::string& datafieldName,
550  const VariantPtr& value,
551  bool triggerFilterUpdate)
552 {
553  {
554  std::unique_lock lock(channelsMutex);
555 
556  auto itChannel = impl->channelRegistry.find(channelName);
557 
558  if (itChannel == impl->channelRegistry.end())
559  {
561  }
562 
563  DataFieldRegistry::iterator itDF = itChannel->second.dataFields.find(datafieldName);
564 
565  if (itDF == itChannel->second.dataFields.end())
566  {
567  throw exceptions::user::InvalidDataFieldException(channelName, datafieldName);
568  }
569 
571  }
572 
573  if (triggerFilterUpdate)
574  {
575  scheduleDatafieldFilterUpdate(channelName, datafieldName, value);
576  }
577 }
578 
579 void
581  const std::string& channelName,
582  const std::unordered_map<std::string, Ice::Long>& datafieldValues)
583 {
584  std::unique_lock lock(channelsMutex);
585 
586  auto itChannel = impl->channelRegistry.find(channelName);
587 
588  if (itChannel == impl->channelRegistry.end())
589  {
591  }
592 
593  for (const auto& elem : datafieldValues)
594  {
595  DataFieldRegistry::iterator itDF = itChannel->second.dataFields.find(elem.first);
596 
597  if (itDF == itChannel->second.dataFields.end())
598  {
599  throw exceptions::user::InvalidDataFieldException(channelName, elem.first);
600  }
601 
602 
603  TimedVariantPtr oldValue = TimedVariantPtr::dynamicCast(itDF->second.value);
604  if (!oldValue || (elem.second > oldValue->getTimestamp() && oldValue->getInitialized()))
605  {
606  itDF->second.value = new TimedVariant(VariantPtr::dynamicCast(itDF->second.value),
607  IceUtil::Time::microSeconds(elem.second));
608  }
609  }
610 }
611 
612 void
613 Observer::updateDatafieldTimestamps(const std::string& channelName, Ice::Long timestamp)
614 {
615  std::unique_lock lock(channelsMutex);
616 
617  auto itChannel = impl->channelRegistry.find(channelName);
618 
619  if (itChannel == impl->channelRegistry.end())
620  {
622  }
623 
624  for (auto& elem : itChannel->second.dataFields)
625  {
626 
627  TimedVariantPtr oldValue = TimedVariantPtr::dynamicCast(elem.second.value);
628  if (!oldValue || (timestamp > oldValue->getTimestamp() && oldValue->getInitialized()))
629  {
630  elem.second.value = new TimedVariant(VariantPtr::dynamicCast(elem.second.value),
631  IceUtil::Time::microSeconds(timestamp));
632  }
633  }
634 }
635 
636 void
638  const std::string& description,
639  const StringVariantBaseMap& datafieldValues,
640  bool triggerFilterUpdate)
641 {
642  {
643  std::lock_guard lock(channelsMutex);
644  auto itChannel = impl->channelRegistry.find(channelName);
645  if (itChannel == impl->channelRegistry.end())
646  {
647  //add
648  auto& channel = impl->channelRegistry[channelName];
649  channel.name = channelName;
650  channel.description = description;
651  channel.initialized = false;
652 
653  itChannel = impl->channelRegistry.find(channelName);
654  }
655 
656  //use map merge algorithm since it is faster (n+m instead n*log(m))
657  auto targ = itChannel->second.dataFields.begin();
658  auto vals = datafieldValues.begin();
659  while (targ != itChannel->second.dataFields.end() && vals != datafieldValues.end())
660  {
661  if (vals->first > targ->first)
662  {
663  ++targ;
664  }
665  else if (vals->first == targ->first)
666  {
667  setDataFieldFlatCopy(targ, VariantPtr::dynamicCast(vals->second));
668  ++vals;
669  ++targ;
670  }
671  else
672  {
673  throw exceptions::user::InvalidDataFieldException(channelName, vals->first);
674  }
675  }
676  if (vals != datafieldValues.end())
677  {
678  throw exceptions::user::InvalidDataFieldException(channelName, vals->first);
679  }
680  }
681  if (triggerFilterUpdate)
682  {
683  for (const auto& elem : datafieldValues)
684  {
685  scheduleDatafieldFilterUpdate(channelName, elem.first, elem.second);
686  }
687  }
688 }
689 
690 void
692  const std::string& channelName,
693  const std::unordered_map<::std::string, ::armarx::VariantBasePtr>& datafieldValues,
694  bool triggerFilterUpdate)
695 {
696  {
697  std::unique_lock lock(channelsMutex);
698 
699  auto itChannel = impl->channelRegistry.find(channelName);
700 
701  if (itChannel == impl->channelRegistry.end())
702  {
704  }
705 
706  for (const auto& elem : datafieldValues)
707  {
708  DataFieldRegistry::iterator itDF = itChannel->second.dataFields.find(elem.first);
709 
710  if (itDF == itChannel->second.dataFields.end())
711  {
712  throw exceptions::user::InvalidDataFieldException(channelName, elem.first);
713  }
714 
715  setDataFieldFlatCopy(itDF, VariantPtr::dynamicCast(elem.second));
716  }
717  }
718  if (triggerFilterUpdate)
719  {
720  for (const auto& elem : datafieldValues)
721  {
722  scheduleDatafieldFilterUpdate(channelName, elem.first, elem.second);
723  }
724  }
725 }
726 
727 void
728 Observer::setDataFieldsFlatCopy(const std::string& channelName,
729  const StringVariantBaseMap& datafieldValues,
730  bool triggerFilterUpdate)
731 {
732  {
733  std::unique_lock lock(channelsMutex);
734 
735  auto itChannel = impl->channelRegistry.find(channelName);
736 
737  if (itChannel == impl->channelRegistry.end())
738  {
740  }
741 
742  for (const auto& elem : datafieldValues)
743  {
744  DataFieldRegistry::iterator itDF = itChannel->second.dataFields.find(elem.first);
745 
746  if (itDF == itChannel->second.dataFields.end())
747  {
748  throw exceptions::user::InvalidDataFieldException(channelName, elem.first);
749  }
750  ARMARX_CHECK_EXPRESSION(elem.second) << "Datafieldname: " << elem.first;
751  setDataFieldFlatCopy(itDF, VariantPtr::dynamicCast(elem.second));
752  }
753  }
754 
755  if (triggerFilterUpdate)
756  {
757  for (const auto& elem : datafieldValues)
758  {
759  scheduleDatafieldFilterUpdate(channelName, elem.first, elem.second);
760  }
761  }
762 }
763 
764 void
765 Observer::updateRefreshRateChannel(const std::string& channelName)
766 {
767  auto& oldUpdateTime = impl->channelUpdateTimestamps[channelName];
768  auto now = IceUtil::Time::now();
769 
770  try
771  {
773  channelName,
774  new Variant((now - oldUpdateTime).toMilliSecondsDouble()),
775  false);
776  }
778  {
780  channelName,
781  0.0,
782  "Update delta of channel '" + channelName + "'");
783  }
784  oldUpdateTime = now;
785 }
786 
787 void
788 Observer::updateChannel(const std::string& channelName,
789  const std::set<std::string>& updatedDatafields)
790 {
791  if (!existsChannel(channelName))
792  {
794  }
795  std::unique_lock lock(impl->channelQueueMutex);
796  std::set<std::string>& dfs = impl->channelQueue[channelName];
797  dfs.insert(updatedDatafields.begin(), updatedDatafields.end());
798  impl->idleChannelCondition.notify_all();
799 }
800 
801 void
802 Observer::channelUpdateFunction()
803 {
804 
805  while (!impl->channelUpdateTask->isStopped())
806  {
807  std::map<std::string, std::set<std::string>> queue;
808  {
809  std::unique_lock lock(impl->channelQueueMutex);
810  queue.swap(impl->channelQueue);
811  }
812  for (const auto& elem : queue)
813  {
814  doChannelUpdate(elem.first, elem.second);
815  }
816  std::unique_lock lock(impl->channelQueueMutex);
817  if (impl->channelQueue.size() == 0)
818  {
819  impl->idleChannelCondition.wait(lock);
820  }
821  }
822 }
823 
824 void
825 Observer::addToChannelHistory(const std::pair<IceUtil::Time, ChannelRegistryEntry>& historyEntry,
826  const std::string& channelName)
827 {
828  if (impl->maxHistorySize == 0)
829  {
830  return;
831  }
832  std::unique_lock lockHistory(impl->historyMutex);
833  auto historyIt = impl->channelHistory.find(channelName);
834  if (historyIt == impl->channelHistory.end())
835  {
836  impl->channelHistory[channelName].set_capacity(impl->maxHistorySize);
837  historyIt = impl->channelHistory.find(channelName);
838  }
839  auto now = TimeUtil::GetTime();
840  if (historyIt->second.empty() ||
841  now > historyIt->second.rbegin()->first +
842  IceUtil::Time::secondsDouble(1.0 / impl->maxHistoryRecordFrequency))
843  {
844  historyIt->second.push_back(historyEntry);
845  }
846 }
847 
848 void
849 Observer::doChannelUpdate(const std::string& channelName,
850  const std::set<std::string>& updatedDatafields)
851 {
852 
853 
854  try
855  {
856  ChannelRegistryEntry entry;
857  std::pair<IceUtil::Time, ChannelRegistryEntry> historyEntry;
858  {
859  std::unique_lock lock_channels(channelsMutex);
860  // check if channels exists
861  auto iterChannel = impl->channelRegistry.find(channelName);
862 
863  if (iterChannel == impl->channelRegistry.end())
864  {
866  }
867 
868 
869  if (impl->logChannelUpdateRate)
870  {
871  updateRefreshRateChannel(channelName);
872  }
873 
874  // update initialized state
875  const DataFieldRegistry& dataFields = iterChannel->second.dataFields;
876  DataFieldRegistry::const_iterator iterDataFields = dataFields.begin();
877  bool channelInitialized = true;
878 
879  while (iterDataFields != dataFields.end())
880  {
881  channelInitialized &= iterDataFields->second.value->getInitialized();
882  iterDataFields++;
883  }
884 
885  iterChannel->second.initialized = channelInitialized;
886  if (iterChannel->second.conditionChecks.size() > 0)
887  {
888  entry = iterChannel->second;
889  }
890  historyEntry = std::make_pair(TimeUtil::GetTime(), iterChannel->second);
891  }
892  addToChannelHistory(historyEntry, channelName);
893  // evaluate checks
894  ConditionCheckRegistry::iterator iterChecks = entry.conditionChecks.begin();
895 
896  while (iterChecks != entry.conditionChecks.end())
897  {
898  bool found = false;
899  if (updatedDatafields.size() > 0)
900  {
901  // check if this check belongs to an updated datafield.
902  for (const auto& elem : entry.dataFields)
903  {
904  const DataFieldRegistryEntry& datafieldEntry = elem.second;
905  if (updatedDatafields.count(datafieldEntry.identifier->datafieldName))
906  {
907  found = true;
908  break;
909  }
910  }
911  }
912  else
913  {
914  found = true;
915  }
916 
917  try
918  {
919  if (found)
920  {
921  evaluateCheck(ConditionCheckPtr::dynamicCast(iterChecks->second), entry);
922  }
923  }
924  catch (...)
925  {
926  ARMARX_ERROR << "Evaluating condition for channel " << channelName << " failed!";
928  }
929 
930  iterChecks++;
931  }
932  }
933  catch (...)
934  {
935  // ARMARX_ERROR << "Upating channel " << channelName << " failed!";
936  // handleExceptions();
937  }
938 }
939 
940 // *******************************************************
941 // Component hooks
942 // *******************************************************
943 void
944 Observer::onInitComponent()
945 {
946  impl->maxHistorySize = getProperty<int>("MaxHistorySize");
947  impl->maxHistoryRecordFrequency = getProperty<float>("MaxHistoryRecordFrequency");
948 
949  {
950  std::unique_lock lock(impl->idMutex);
951  impl->currentId = 0;
952  }
953 
954  onInitObserver();
955  impl->filterUpdateTask =
956  new RunningTask<Observer>(this, &Observer::updateFilters, "Filter update task");
957  impl->filterUpdateTask->start();
958  impl->channelUpdateTask =
959  new RunningTask<Observer>(this, &Observer::channelUpdateFunction, "Channel update task");
960  impl->channelUpdateTask->start();
961 }
962 
963 void
964 Observer::onConnectComponent()
965 {
966  impl->logChannelUpdateRate = getProperty<bool>("CreateUpdateFrequenciesChannel").getValue();
967  if (impl->logChannelUpdateRate && !existsChannel(LAST_REFRESH_DELTA_CHANNEL))
968  {
969  offerChannel(
971  "Metachannel with the last channel update deltas of all channels in milliseconds");
972  }
973  impl->channelHistory.clear();
974  // subclass init
976  // offerDataFieldWithDefault(channelName, LAST_REFRESH_DELTA_CHANNEL, 0.0, );
977  // impl->channelUpdateTimestamps[channelName] = IceUtil::Time::now();
978 
979  std::unique_lock lock(channelsMutex);
980  auto proxy = ObserverInterfacePrx::checkedCast(getProxy());
981  // update the proxy in all existing datafield refs
982  for (auto& channel : impl->channelRegistry)
983  {
984  ChannelRegistryEntry& c = channel.second;
985  for (auto& df : c.dataFields)
986  {
987  DataFieldRegistryEntry& d = df.second;
988  d.identifier->channelRef->observerProxy = proxy;
989  }
990  }
991  impl->metaTask = new PeriodicTask<Observer>(this, &Observer::metaUpdateTask, 50);
992  if (impl->logChannelUpdateRate)
993  {
994  impl->metaTask->start();
995  }
996 }
997 
998 void
999 Observer::postOnConnectComponent()
1000 {
1001  ARMARX_INFO << "starting worker thread";
1002  impl->stopWorker = false;
1003  ARMARX_CHECK_EXPRESSION(!impl->worker.joinable());
1004  impl->worker = std::thread{[&] { runWorker(); }};
1005 }
1006 
1007 void
1008 Observer::preOnDisconnectComponent()
1009 {
1010  ARMARX_INFO << "stopping worker thread";
1011  impl->stopWorker = true;
1012  if (impl->worker.joinable())
1013  {
1014  impl->worker.join();
1015  }
1016 }
1017 
1018 void
1019 Observer::onExitComponent()
1020 {
1021  onExitObserver();
1022  if (impl->metaTask)
1023  {
1024  impl->metaTask->stop();
1025  }
1026  if (impl->filterUpdateTask)
1027  {
1028  impl->filterUpdateTask->stop(false);
1029  }
1030  impl->idleCondition.notify_all();
1031  if (impl->filterUpdateTask)
1032  {
1033  impl->filterUpdateTask->stop(true);
1034  }
1035  if (impl->channelUpdateTask)
1036  {
1037  impl->channelUpdateTask->stop(false);
1038  }
1039  impl->idleChannelCondition.notify_all();
1040  if (impl->channelUpdateTask)
1041  {
1042  impl->channelUpdateTask->stop(true);
1043  }
1044  impl->channelHistory.clear();
1045 }
1046 
1048 Observer::createPropertyDefinitions()
1049 {
1051 }
1052 
1053 void
1055 {
1057 }
1058 
1059 // *******************************************************
1060 // private methods
1061 // *******************************************************
1062 int
1064 {
1065  std::unique_lock lock(impl->idMutex);
1066 
1067  return impl->currentId++;
1068 }
1069 
1071 Observer::createCheck(const CheckConfiguration& configuration) const
1072 {
1073  std::string checkName = configuration.checkName;
1074 
1075  // create check from elementary condition
1076  StringConditionCheckMap::const_iterator iterChecks = impl->availableChecks.find(checkName);
1077 
1078  if (iterChecks == impl->availableChecks.end())
1079  {
1080  std::string reason =
1081  "Invalid condition check \"" + checkName + "\" for observer \"" + getName() + "\".";
1082  throw InvalidConditionException(reason.c_str());
1083  }
1084 
1085  ARMARX_CHECK_EXPRESSION(iterChecks->second);
1086  ConditionCheckPtr check = ConditionCheckPtr::dynamicCast(iterChecks->second)
1087  ->createInstance(configuration, impl->channelRegistry);
1088 
1089  return check;
1090 }
1091 
1092 CheckIdentifier
1093 Observer::registerCheck(const ConditionCheckPtr& check)
1094 {
1095  ARMARX_CHECK_EXPRESSION(check);
1096  ARMARX_CHECK_EXPRESSION(check->configuration.dataFieldIdentifier);
1097  // create identifier
1098  int id = generateId();
1099  CheckIdentifier identifier;
1100  identifier.uniqueId = id;
1101  identifier.channelName = check->configuration.dataFieldIdentifier->channelName;
1102  identifier.observerName = getName();
1103 
1104  // add to conditions list
1105  std::pair<int, ConditionCheckPtr> entry;
1106  entry.first = id;
1107  entry.second = check;
1108  impl->channelRegistry[check->configuration.dataFieldIdentifier->channelName]
1109  .conditionChecks.insert(entry);
1110 
1111  return identifier;
1112 }
1113 
1114 void
1115 Observer::evaluateCheck(const ConditionCheckPtr& check, const ChannelRegistryEntry& channel) const
1116 {
1117  check->evaluateCondition(channel.dataFields);
1118 }
1119 
1120 // ////////////////////////////////////////////////////////////////////////// //
1121 void
1123 {
1124  ARMARX_INFO << "observer worker->start";
1126  {
1127  ARMARX_INFO << "observer worker->stop";
1128  };
1129  std::deque<Impl::WorkerUpdate> toProcess;
1130  while (!impl->stopWorker)
1131  {
1132  preWorkerJobs();
1134  {
1135  postWorkerJobs();
1136  };
1137  {
1138  std::lock_guard g{impl->workerUpdatesMutex};
1139  std::swap(impl->workerUpdates, toProcess);
1140  }
1141  if (toProcess.empty())
1142  {
1144  << "no worker jobs (message only posted every 10 seconds";
1145  std::this_thread::sleep_for(std::chrono::milliseconds{1});
1146  continue;
1147  }
1148 
1149  float minage = std::numeric_limits<float>::infinity();
1150  float maxage = 0;
1151 
1152  struct Counters
1153  {
1154  float age = 0;
1155  float time = 0;
1156  std::size_t n = 0;
1157  };
1158 
1159  std::map<std::string, Counters> counters;
1160  {
1161  std::lock_guard lock(channelsMutex);
1162  for (auto& f : toProcess)
1163  {
1164  ARMARX_TRACE;
1165  float age = f.ageInMs();
1166  Counters& counter = counters[f.name];
1167  counter.age += age;
1168  ++counter.n;
1169  TimepointT start = Now();
1170  f.f();
1171  counter.time += TimeDeltaInMs(start);
1172  minage = std::min(minage, age);
1173  maxage = std::max(maxage, age);
1174  }
1175  }
1176  if (maxage <= 0)
1177  {
1178  minage = 0;
1179  }
1180  ARMARX_DEBUG << "observer worker got " << toProcess.size() << "\tjobs ages min/max "
1181  << minage << "\t" << maxage << "\n"
1183  {
1184  for (const auto& [key, c] : counters)
1185  {
1186  out << " " << key << "\t# " << c.n << "\tavg age " << (c.age / c.n)
1187  << "\tavg time " << (c.time / c.n) << "\n";
1188  }
1189  };
1190  toProcess.clear();
1191  }
1192 }
1193 
1194 void
1195 Observer::addWorkerJob(const std::string& name, std::function<void(void)>&& f) const
1196 {
1197  std::lock_guard g{impl->workerUpdatesMutex};
1198  impl->workerUpdates.emplace_back(name, std::move(f));
1199 }
1200 
1201 void
1202 Observer::addWorkerJob(const std::string& name, std::function<void(void)>&& f)
1203 {
1204  std::lock_guard g{impl->workerUpdatesMutex};
1205  impl->workerUpdates.emplace_back(name, std::move(f));
1206 }
1207 
1208 void
1210 {
1211 }
1212 
1213 void
1215 {
1216 }
1217 
1220 {
1221  return ClockT::now();
1222 }
1223 
1224 float
1226 {
1227  return std::chrono::duration_cast<std::chrono::nanoseconds>(Now() - t0).count() * 1e-6f;
1228 }
1229 
1230 // ////////////////////////////////////////////////////////////////////////// //
1231 
1232 std::string
1234 {
1235  return getName();
1236 }
1237 
1238 void
1239 Observer::getObserverName_async(const AMD_ObserverInterface_getObserverNamePtr& amd,
1240  const Ice::Current&) const
1241 {
1242  addWorkerJob("Observer::getObserverName", amd, &Observer::getObserverName);
1243 }
1244 
1245 // //////////////////
1246 CheckIdentifier
1247 Observer::installCheck(const CheckConfiguration& configuration)
1248 {
1249  // create condition check
1250  ConditionCheckPtr check;
1251  {
1252  std::unique_lock lock_checks(impl->checksMutex);
1253  std::unique_lock lock_channels(channelsMutex);
1254  check = createCheck(configuration);
1255  }
1256  ARMARX_CHECK_EXPRESSION(check);
1257  // insert into registry
1258  CheckIdentifier identifier;
1259  {
1260  std::unique_lock lock_conditions(channelsMutex);
1261  identifier = registerCheck(check);
1262  }
1263 
1264  ARMARX_DEBUG << "Installed check " << identifier.uniqueId << flush;
1265 
1266  // perform initial check
1267  {
1268  std::unique_lock lock_channels(channelsMutex);
1269  ARMARX_CHECK_EXPRESSION(configuration.dataFieldIdentifier);
1270  // retrieve channel
1271  ChannelRegistryEntry channel =
1272  impl->channelRegistry[configuration.dataFieldIdentifier->channelName];
1273 
1274  // evaluate
1275  evaluateCheck(check, channel);
1276  }
1277 
1278  return identifier;
1279 }
1280 
1281 void
1282 Observer::installCheck_async(const AMD_ObserverInterface_installCheckPtr& amd,
1283  const CheckConfiguration& configuration,
1284  const Ice::Current&)
1285 {
1286  addWorkerJob("Observer::installCheck", amd, &Observer::installCheck, configuration);
1287 }
1288 
1289 // //////////////////
1290 void
1291 Observer::removeCheck(const CheckIdentifier& id)
1292 {
1293  std::unique_lock lock(channelsMutex);
1294 
1295  // find channel
1296  auto iter = impl->channelRegistry.find(id.channelName);
1297 
1298  if (iter == impl->channelRegistry.end())
1299  {
1300  return;
1301  }
1302 
1303  // find condition
1304  ConditionCheckRegistry::iterator iterCheck = iter->second.conditionChecks.find(id.uniqueId);
1305 
1306  // remove condition
1307  if (iterCheck != iter->second.conditionChecks.end())
1308  {
1309  iter->second.conditionChecks.erase(iterCheck);
1310  }
1311 
1312  ARMARX_DEBUG << "Removed check " << id.uniqueId << flush;
1313 }
1314 
1315 void
1316 Observer::removeCheck_async(const AMD_ObserverInterface_removeCheckPtr& amd,
1317  const CheckIdentifier& id,
1318  const Ice::Current&)
1319 {
1320  addWorkerJob("Observer::removeCheck", amd, &Observer::removeCheck, id);
1321 }
1322 
1323 // //////////////////
1324 TimedVariantBasePtr
1325 Observer::getDataField(const DataFieldIdentifierBasePtr& identifier, const Ice::Current& c) const
1326 {
1327  //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1328  return getDatafieldByName(identifier->channelName, identifier->datafieldName);
1329 }
1330 
1331 void
1332 Observer::getDataField_async(const AMD_ObserverInterface_getDataFieldPtr& amd,
1333  const DataFieldIdentifierBasePtr& identifier,
1334  const Ice::Current& c) const
1335 {
1336  addWorkerJob("Observer::getDataField", amd, &Observer::getDataField, identifier, c);
1337 }
1338 
1339 // //////////////////
1340 TimedVariantBasePtr
1341 Observer::getDatafieldByName(const std::string& channelName, const std::string& datafieldName) const
1342 {
1343  //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1344  std::unique_lock lock(channelsMutex);
1345 
1346  auto it = impl->channelRegistry.find(channelName);
1347  if (it == impl->channelRegistry.end())
1348  {
1349  throw exceptions::user::InvalidChannelException(channelName);
1350  }
1351 
1352  auto itDF = it->second.dataFields.find(datafieldName);
1353  if (itDF == it->second.dataFields.end())
1354  {
1355  throw exceptions::user::InvalidDataFieldException(channelName, datafieldName);
1356  }
1357 
1358  // VariantPtr var = VariantPtr::dynamicCast(impl->channelRegistry[identifier->channelName].dataFields[identifier->datafieldName].value);
1359  TimedVariantPtr tv = TimedVariantPtr::dynamicCast(itDF->second.value);
1360  if (!tv)
1361  {
1362  ARMARX_IMPORTANT << "could not cast timed variant: " << itDF->second.value->ice_id();
1363  }
1364  return tv;
1365 }
1366 
1367 void
1368 Observer::getDatafieldByName_async(const AMD_ObserverInterface_getDatafieldByNamePtr& amd,
1369  const std::string& channelName,
1370  const std::string& datafieldName,
1371  const Ice::Current&) const
1372 {
1373  addWorkerJob("Observer::getDatafieldByName",
1374  amd,
1376  channelName,
1377  datafieldName);
1378 }
1379 
1380 // //////////////////
1381 DatafieldRefBasePtr
1383 {
1384  //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1385  std::unique_lock lock(channelsMutex);
1386  auto it = impl->channelRegistry.find(identifier->channelName);
1387  if (it == impl->channelRegistry.end())
1388  {
1389  throw exceptions::user::InvalidChannelException(identifier->channelName);
1390  }
1391 
1392  auto itDF = it->second.dataFields.find(identifier->datafieldName);
1393  if (itDF == it->second.dataFields.end())
1394  {
1395  throw exceptions::user::InvalidDataFieldException(identifier->channelName,
1396  identifier->datafieldName);
1397  }
1398  // itDF->second.identifier->channelRef->observerProxy = ObserverInterfacePrx::uncheckedCast(getProxy());
1399  return DatafieldRefPtr::dynamicCast(itDF->second.identifier);
1400 }
1401 
1402 void
1403 Observer::getDataFieldRef_async(const AMD_ObserverInterface_getDataFieldRefPtr& amd,
1404  const DataFieldIdentifierBasePtr& identifier,
1405  const Ice::Current&) const
1406 {
1407  addWorkerJob("Observer::getDataFieldRef", amd, &Observer::getDataFieldRef, identifier);
1408 }
1409 
1410 // //////////////////
1411 DatafieldRefBasePtr
1412 Observer::getDatafieldRefByName(const std::string& channelName,
1413  const std::string& datafieldName) const
1414 {
1415  return getDataFieldRef(new DataFieldIdentifier(getName(), channelName, datafieldName));
1416 }
1417 
1418 void
1419 Observer::getDatafieldRefByName_async(const AMD_ObserverInterface_getDatafieldRefByNamePtr& amd,
1420  const std::string& channelName,
1421  const std::string& datafieldName,
1422  const Ice::Current&) const
1423 {
1424  addWorkerJob("Observer::getDatafieldRefByName",
1425  amd,
1427  channelName,
1428  datafieldName);
1429 }
1430 
1431 // //////////////////
1432 TimedVariantBaseList
1434 {
1435  TimedVariantBaseList result;
1436 
1437  DataFieldIdentifierBaseList::const_iterator iter = identifiers.begin();
1438 
1439  std::unique_lock lock(channelsMutex);
1440  while (iter != identifiers.end())
1441  {
1442  result.push_back(getDataField(*iter, c));
1443  iter++;
1444  }
1445 
1446  return result;
1447 }
1448 
1449 void
1450 Observer::getDataFields_async(const AMD_ObserverInterface_getDataFieldsPtr& amd,
1452  const Ice::Current& c)
1453 {
1454  addWorkerJob("Observer::getDataFields", amd, &Observer::getDataFields, identifiers, c);
1455 }
1456 
1457 // //////////////////
1458 StringTimedVariantBaseMap
1459 Observer::getDatafieldsOfChannel(const std::string& channelName) const
1460 {
1461  //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1462  StringTimedVariantBaseMap result;
1463  std::unique_lock lock(channelsMutex);
1464  auto it = impl->channelRegistry.find(channelName);
1465  if (it == impl->channelRegistry.end())
1466  {
1467  throw exceptions::user::InvalidChannelException(channelName);
1468  }
1469  DataFieldRegistry fields = it->second.dataFields;
1470  for (auto& field : fields)
1471  {
1472  result[field.first] = TimedVariantBasePtr::dynamicCast(field.second.value);
1473  }
1474  return result;
1475 }
1476 
1477 void
1478 Observer::getDatafieldsOfChannel_async(const AMD_ObserverInterface_getDatafieldsOfChannelPtr& amd,
1479  const std::string& channelName,
1480  const Ice::Current&) const
1481 {
1482  addWorkerJob(
1483  "Observer::getDatafieldsOfChannel", amd, &Observer::getDatafieldsOfChannel, channelName);
1484 }
1485 
1486 // //////////////////
1487 ChannelRegistryEntry
1488 Observer::getChannel(const std::string& channelName) const
1489 {
1490  //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1491  std::unique_lock lock(channelsMutex);
1492  auto it = impl->channelRegistry.find(channelName);
1493  if (it == impl->channelRegistry.end())
1494  {
1495  throw exceptions::user::InvalidChannelException(channelName);
1496  }
1497  return it->second;
1498 }
1499 
1500 void
1501 Observer::getChannel_async(const AMD_ObserverInterface_getChannelPtr& amd,
1502  const std::string& channelName,
1503  const Ice::Current&) const
1504 {
1505  addWorkerJob("Observer::getChannel", amd, &Observer::getChannel, channelName);
1506 }
1507 
1508 // //////////////////
1509 ChannelRegistry
1510 Observer::getAvailableChannels(bool includeMetaChannels)
1511 {
1512  //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1513  std::unique_lock lock(channelsMutex);
1514 
1515  ChannelRegistry result(impl->channelRegistry.begin(), impl->channelRegistry.end());
1516  if (!includeMetaChannels)
1517  {
1518  result.erase(LAST_REFRESH_DELTA_CHANNEL);
1519  }
1520  return result;
1521 }
1522 
1523 void
1524 Observer::getAvailableChannels_async(const AMD_ObserverInterface_getAvailableChannelsPtr& amd,
1525  bool includeMetaChannels,
1526  const Ice::Current&)
1527 {
1528  addWorkerJob("Observer::getAvailableChannels",
1529  amd,
1531  includeMetaChannels);
1532 }
1533 
1534 // //////////////////
1535 StringConditionCheckMap
1537 {
1538  std::unique_lock lock(impl->checksMutex);
1539 
1540  return impl->availableChecks;
1541 }
1542 
1543 void
1544 Observer::getAvailableChecks_async(const AMD_ObserverInterface_getAvailableChecksPtr& amd,
1545  const Ice::Current&)
1546 {
1547  addWorkerJob("Observer::getAvailableChecks", amd, &Observer::getAvailableChecks);
1548 }
1549 
1550 // //////////////////
1551 bool
1552 Observer::existsChannel(const std::string& channelName) const
1553 {
1554  //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1555  std::unique_lock lock(channelsMutex);
1556  return impl->channelRegistry.find(channelName) != impl->channelRegistry.end();
1557 }
1558 
1559 void
1560 Observer::existsChannel_async(const AMD_ObserverInterface_existsChannelPtr& amd,
1561  const std::string& channelName,
1562  const Ice::Current&) const
1563 {
1564  addWorkerJob("Observer::existsChannel", amd, &Observer::existsChannel, channelName);
1565 }
1566 
1567 // //////////////////
1568 bool
1569 Observer::existsDataField(const std::string& channelName, const std::string& datafieldName) const
1570 {
1571  //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1572  std::unique_lock lock(channelsMutex);
1573  auto itChannel = impl->channelRegistry.find(channelName);
1574 
1575  if (itChannel == impl->channelRegistry.end())
1576  {
1577  return false;
1578  }
1579 
1580  return itChannel->second.dataFields.find(datafieldName) != itChannel->second.dataFields.end();
1581 }
1582 
1583 void
1584 Observer::existsDataField_async(const AMD_ObserverInterface_existsDataFieldPtr& amd,
1585  const std::string& channelName,
1586  const std::string& datafieldName,
1587  const Ice::Current&) const
1588 {
1589  addWorkerJob(
1590  "Observer::existsDataField", amd, &Observer::existsDataField, channelName, datafieldName);
1591 }
1592 
1593 // //////////////////
1594 DatafieldRefBasePtr
1595 Observer::createFilteredDatafield(const DatafieldFilterBasePtr& filter,
1596  const DatafieldRefBasePtr& datafieldRef)
1597 {
1598  //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1599  // if( auto it = impl->orignalToFiltered.find(datafieldRef) != impl->orignalToFiltered.end())
1600  // {
1601  // return it->second.filtered;
1602  // }
1603  ARMARX_CHECK_EXPRESSION(datafieldRef);
1604 
1605 
1606  std::string filteredName = datafieldRef->datafieldName + "_" + filter->ice_id();
1607 
1608 
1609  int i = 1;
1610 
1611  while (existsDataField(datafieldRef->channelRef->channelName, filteredName))
1612  {
1613  // ARMARX_IMPORTANT << "Checking if datafield " << filteredName << " exists";
1614  filteredName =
1615  datafieldRef->datafieldName + "_" + filter->ice_id() + "_" + ValueToString(i);
1616  i++;
1617  }
1618 
1619  return createNamedFilteredDatafield(filteredName, filter, datafieldRef);
1620 }
1621 
1622 void
1623 Observer::createFilteredDatafield_async(const AMD_ObserverInterface_createFilteredDatafieldPtr& amd,
1624  const DatafieldFilterBasePtr& filter,
1625  const DatafieldRefBasePtr& datafieldRef,
1626  const Ice::Current&)
1627 {
1628  addWorkerJob("Observer::createFilteredDatafield",
1629  amd,
1631  filter,
1632  datafieldRef);
1633 }
1634 
1635 // //////////////////
1636 DatafieldRefBasePtr
1637 Observer::createNamedFilteredDatafield(const std::string& filterDatafieldName,
1638  const DatafieldFilterBasePtr& filter,
1639  const DatafieldRefBasePtr& datafieldRef)
1640 {
1641  //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1642  DatafieldRefPtr ref = DatafieldRefPtr::dynamicCast(datafieldRef);
1644  ARMARX_CHECK_EXPRESSION(filter);
1645  if (!filter->checkTypeSupport(ref->getDataField()->getType()))
1646  {
1647  auto types = filter->getSupportedTypes();
1648  std::string suppTypes = "supported types";
1649 
1650  for (auto t : types)
1651  {
1652  suppTypes += Variant::typeToString(t) + ", ";
1653  }
1654 
1655  ARMARX_WARNING << suppTypes;
1656  throw exceptions::user::UnsupportedTypeException(ref->getDataField()->getType());
1657  }
1658 
1659  // if filter with that name exists -> remove it
1660  if (existsDataField(datafieldRef->channelRef->channelName, filterDatafieldName))
1661  {
1662  DatafieldRefPtr filteredRef = new DatafieldRef(ref->getChannelRef(), filterDatafieldName);
1663  removeFilteredDatafield(filteredRef);
1664  }
1665 
1666  // calculate initial value
1667  auto var = ref->getDataField();
1668  filter->update(var->getTime().toMicroSeconds(), var);
1669 
1670  // create datafield for new filter
1671  offerDataFieldWithDefault(datafieldRef->channelRef->channelName,
1672  filterDatafieldName,
1673  *VariantPtr::dynamicCast(filter->getValue()),
1674  "Filtered value of " +
1675  ref->getDataFieldIdentifier()->getIdentifierStr());
1676 
1677  //create and store the refs for the filters
1678  ChannelRefPtr channel = ChannelRefPtr::dynamicCast(ref->channelRef);
1679  channel->refetchChannel();
1680  DatafieldRefPtr filteredRef = new DatafieldRef(ref->getChannelRef(), filterDatafieldName);
1682  data->filter = filter;
1683  data->original = ref;
1684  data->filtered = filteredRef;
1685  std::unique_lock lock(impl->filterMutex);
1686  impl->orignalToFiltered.insert(
1687  std::make_pair(ref->getDataFieldIdentifier()->getIdentifierStr(), data));
1688  impl->filteredToOriginal[filteredRef->getDataFieldIdentifier()->getIdentifierStr()] = data;
1689  return filteredRef;
1690 }
1691 
1692 void
1694  const AMD_ObserverInterface_createNamedFilteredDatafieldPtr& amd,
1695  const std::string& filterDatafieldName,
1696  const DatafieldFilterBasePtr& filter,
1697  const DatafieldRefBasePtr& datafieldRef,
1698  const Ice::Current&)
1699 {
1700  addWorkerJob("Observer::createNamedFilteredDatafield",
1701  amd,
1703  filterDatafieldName,
1704  filter,
1705  datafieldRef);
1706 }
1707 
1708 // //////////////////
1709 void
1710 Observer::removeFilteredDatafield(const DatafieldRefBasePtr& datafieldRef)
1711 {
1712  //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1713  bool remove = true;
1714  DatafieldRefPtr ref;
1715  {
1716  std::unique_lock lock(impl->filterMutex);
1717  ref = DatafieldRefPtr::dynamicCast(datafieldRef);
1718  const std::string idStr = ref->getDataFieldIdentifier()->getIdentifierStr();
1719  auto it = impl->filteredToOriginal.find(idStr);
1720  remove = (it != impl->filteredToOriginal.end());
1721  }
1722 
1723  if (remove && ref)
1724  {
1725  removeDatafield(ref->getDataFieldIdentifier());
1726  }
1727 }
1728 
1729 void
1730 Observer::removeFilteredDatafield_async(const AMD_ObserverInterface_removeFilteredDatafieldPtr& amd,
1731  const DatafieldRefBasePtr& datafieldRef,
1732  const Ice::Current&)
1733 {
1734  addWorkerJob(
1735  "Observer::removeFilteredDatafield", amd, &Observer::removeFilteredDatafield, datafieldRef);
1736 }
1737 
1738 // //////////////////
1739 ChannelHistory
1740 Observer::getChannelHistory(const std::string& channelName,
1741  Ice::Float timestepMs,
1742  const Ice::Current& c) const
1743 {
1744  //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1745  return getPartialChannelHistory(
1746  channelName, 0, std::numeric_limits<long>::max(), timestepMs, c);
1747 }
1748 
1749 void
1750 Observer::getChannelHistory_async(const AMD_ObserverInterface_getChannelHistoryPtr& amd,
1751  const std::string& channelName,
1752  Ice::Float timestepMs,
1753  const Ice::Current& c) const
1754 {
1755  addWorkerJob("Observer::getChannelHistory",
1756  amd,
1758  channelName,
1759  timestepMs,
1760  c);
1761 }
1762 
1763 // //////////////////
1764 ChannelHistory
1765 Observer::getPartialChannelHistory(const std::string& channelName,
1766  Ice::Long startTimestamp,
1767  Ice::Long endTimestamp,
1768  Ice::Float timestepMs,
1769  const Ice::Current& c) const
1770 {
1771  //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1772  ARMARX_IMPORTANT << "Waiting for mutex";
1773  std::unique_lock lock_channels(impl->historyMutex);
1774  ARMARX_IMPORTANT << "GOT mutex";
1775  ChannelHistory result;
1776  auto historyIt = impl->channelHistory.find(channelName);
1777  if (historyIt == impl->channelHistory.end())
1778  {
1779  return result;
1780  }
1781  ARMARX_IMPORTANT << "found channel";
1782 
1783  auto lastInsertIt = result.begin();
1784  Ice::Long lastUsedTimestep = 0;
1785  float timestepUs = timestepMs * 1000;
1786  for (auto& entry : historyIt->second)
1787  {
1788  long timestamp = entry.first.toMicroSeconds();
1789  if (timestamp > endTimestamp)
1790  {
1791  break;
1792  }
1793 
1794  if (timestamp > startTimestamp && timestamp - lastUsedTimestep > timestepUs)
1795  {
1796  lastInsertIt = result.emplace_hint(result.end(),
1797  std::make_pair(timestamp, entry.second.dataFields));
1798  lastUsedTimestep = timestamp;
1799  }
1800  }
1801  return result;
1802 }
1803 
1804 void
1806  const AMD_ObserverInterface_getPartialChannelHistoryPtr& amd,
1807  const std::string& channelName,
1808  Ice::Long startTimestamp,
1809  Ice::Long endTimestamp,
1810  Ice::Float timestepMs,
1811  const Ice::Current& c) const
1812 {
1813  addWorkerJob("Observer::getPartialChannelHistory",
1814  amd,
1816  channelName,
1817  startTimestamp,
1818  endTimestamp,
1819  timestepMs,
1820  c);
1821 }
1822 
1823 // //////////////////
1824 TimedVariantBaseList
1825 Observer::getDatafieldHistory(const std::string& channelName,
1826  const std::string& datafieldName,
1827  Ice::Float timestepMs,
1828  const Ice::Current& c) const
1829 {
1830  //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1832  channelName, datafieldName, 0, std::numeric_limits<long>::max(), timestepMs, c);
1833 }
1834 
1835 void
1836 Observer::getDatafieldHistory_async(const AMD_ObserverInterface_getDatafieldHistoryPtr& amd,
1837  const std::string& channelName,
1838  const std::string& datafieldName,
1839  Ice::Float timestepMs,
1840  const Ice::Current& c) const
1841 {
1842  addWorkerJob("Observer::getDatafieldHistory",
1843  amd,
1845  channelName,
1846  datafieldName,
1847  timestepMs,
1848  c);
1849 }
1850 
1851 // //////////////////
1852 TimedVariantBaseList
1853 Observer::getPartialDatafieldHistory(const std::string& channelName,
1854  const std::string& datafieldName,
1855  Ice::Long startTimestamp,
1856  Ice::Long endTimestamp,
1857  Ice::Float timestepMs,
1858  const Ice::Current& c) const
1859 {
1860  //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1861  std::unique_lock lock_channels(impl->historyMutex);
1862  TimedVariantBaseList result;
1863  auto historyIt = impl->channelHistory.find(channelName);
1864  if (historyIt == impl->channelHistory.end())
1865  {
1866  return result;
1867  }
1868  if (!existsDataField(channelName, datafieldName))
1869  {
1870  return result;
1871  }
1872 
1873  Ice::Long lastUsedTimestep = 0;
1874  float timestepUs = timestepMs * 1000;
1875  for (auto& entry : historyIt->second)
1876  {
1877  long timestamp = entry.first.toMicroSeconds();
1878  if (timestamp > endTimestamp)
1879  {
1880  break;
1881  }
1882 
1883  if (timestamp > startTimestamp && timestamp - lastUsedTimestep > timestepUs)
1884  {
1885  auto datafieldIt = entry.second.dataFields.find(datafieldName);
1886  if (datafieldIt == entry.second.dataFields.end())
1887  {
1888  continue;
1889  }
1890  else
1891  {
1892  TimedVariantPtr tvar = TimedVariantPtr::dynamicCast(datafieldIt->second.value);
1893  if (tvar)
1894  {
1895  result.emplace_back(tvar);
1896  lastUsedTimestep = timestamp;
1897  }
1898  }
1899  }
1900  }
1901  return result;
1902 }
1903 
1904 void
1906  const AMD_ObserverInterface_getPartialDatafieldHistoryPtr& amd,
1907  const std::string& channelName,
1908  const std::string& datafieldName,
1909  Ice::Long startTimestamp,
1910  Ice::Long endTimestamp,
1911  Ice::Float timestepMs,
1912  const Ice::Current& c) const
1913 {
1914  addWorkerJob("Observer::getPartialDatafieldHistory",
1915  amd,
1917  channelName,
1918  datafieldName,
1919  startTimestamp,
1920  endTimestamp,
1921  timestepMs,
1922  c);
1923 }
1924 
1925 // ////////////////////////////////////////////////////////////////////////// //
1926 Observer::Impl::WorkerUpdate::WorkerUpdate(const std::string& name, std::function<void(void)> f) :
1927  name{name}, time{Now()}, f{std::move(f)}
1928 {
1929 }
1930 
1931 float
1933 {
1934  return TimeDeltaInMs(time);
1935 }
armarx::Observer::getPartialDatafieldHistory
TimedVariantBaseList getPartialDatafieldHistory(const std::string &channelName, const std::string &datafieldName, Ice::Long startTimestamp, Ice::Long endTimestamp, Ice::Float timestepMs, const Ice::Current &c) const
Definition: Observer.cpp:1853
armarx::Observer::updateChannel
void updateChannel(const std::string &channelName, const std::set< std::string > &updatedDatafields=std::set< std::string >())
Update all conditions for a channel.
Definition: Observer.cpp:788
armarx::Observer::Impl::FilterData
Definition: Observer.cpp:70
armarx::Observer::getChannel
ChannelRegistryEntry getChannel(const std::string &channelName) const
Retrieve information on all sensory data channels available from the observer.
Definition: Observer.cpp:1488
armarx::Variant
The Variant class is described here: Variants.
Definition: Variant.h:223
armarx::Observer::removeFilteredDatafield
void removeFilteredDatafield(const DatafieldRefBasePtr &datafieldRef)
Removes a previously installed filter.
Definition: Observer.cpp:1710
armarx::Observer::Impl::FilterData::filtered
DatafieldRefPtr filtered
Definition: Observer.cpp:74
armarx::Observer::Impl::filteredToOriginal
std::map< std::string, FilterDataPtr > filteredToOriginal
Definition: Observer.cpp:80
armarx::Observer::Impl
Definition: Observer.cpp:47
armarx::VariantType::Float
const VariantTypeId Float
Definition: Variant.h:919
armarx::Observer::Observer
Observer()
Definition: Observer.cpp:119
armarx::Observer::getDatafieldsOfChannel
StringTimedVariantBaseMap getDatafieldsOfChannel(const std::string &channelName) const
Definition: Observer.cpp:1459
armarx::StringVariantBaseMap
std::map< std::string, VariantBasePtr > StringVariantBaseMap
Definition: ManagedIceObject.h:110
ARMARX_IMPORTANT
#define ARMARX_IMPORTANT
Definition: Logging.h:190
armarx::Observer::onInitObserver
virtual void onInitObserver()=0
Framework hook.
armarx::Observer::Impl::WorkerUpdate::name
std::string name
Definition: Observer.cpp:109
armarx::Observer::Impl::availableChecks
StringConditionCheckMap availableChecks
Definition: Observer.cpp:54
OnScopeExit.h
armarx::Observer::Impl::stopWorker
std::atomic_bool stopWorker
Definition: Observer.cpp:116
armarx::Observer::~Observer
~Observer()
Definition: Observer.cpp:123
armarx::Observer::Impl::WorkerUpdate::f
std::function< void(void)> f
Definition: Observer.cpp:111
InvalidChannelException.h
armarx::Observer::Impl::channelRegistry
ConditionCheck::ChannelRegistry channelRegistry
Definition: Observer.cpp:58
armarx::Observer::getDataField
TimedVariantBasePtr getDataField(const DataFieldIdentifierBasePtr &identifier, const Ice::Current &c=Ice::emptyCurrent) const
Retrieve data field from observer.
Definition: Observer.cpp:1325
armarx::Observer::removeChannel
void removeChannel(std::string channelName)
Remove a channel.
Definition: Observer.cpp:318
armarx::Observer::getPartialChannelHistory_async
void getPartialChannelHistory_async(const AMD_ObserverInterface_getPartialChannelHistoryPtr &amd, const std::string &channelName, Ice::Long startTimestamp, Ice::Long endTimestamp, Ice::Float timestepMs, const Ice::Current &c) const override
Definition: Observer.cpp:1805
armarx::Observer::getDatafieldRefByName
DatafieldRefBasePtr getDatafieldRefByName(const std::string &channelName, const std::string &datafieldName) const
Definition: Observer.cpp:1412
armarx::Observer::Impl::WorkerUpdate::WorkerUpdate
WorkerUpdate(const std::string &name, std::function< void(void)> f)
Definition: Observer.cpp:1926
armarx::Observer::offerOrUpdateDataField
bool offerOrUpdateDataField(std::string channelName, std::string datafieldName, const Variant &value, const std::string &description)
Definition: Observer.cpp:242
armarx::ManagedIceObject::getState
int getState() const
Retrieve current state of the ManagedIceObject.
Definition: ManagedIceObject.cpp:769
armarx::Observer::onConnectObserver
virtual void onConnectObserver()=0
Framework hook.
armarx::exceptions::user::DatafieldExistsAlreadyException
Definition: InvalidDatafieldException.h:61
armarx::Observer::Impl::filterMutex
std::recursive_mutex filterMutex
Definition: Observer.cpp:81
armarx::Observer::getDataField_async
void getDataField_async(const AMD_ObserverInterface_getDataFieldPtr &amd, const DataFieldIdentifierBasePtr &identifier, const Ice::Current &c) const override
Definition: Observer.cpp:1332
armarx::Observer::Impl::historyMutex
std::recursive_mutex historyMutex
Definition: Observer.cpp:63
c
constexpr T c
Definition: UnscentedKalmanFilterTest.cpp:46
armarx::Observer::getChannelHistory_async
void getChannelHistory_async(const AMD_ObserverInterface_getChannelHistoryPtr &amd, const std::string &channelName, Ice::Float timestepMs, const Ice::Current &c) const override
Definition: Observer.cpp:1750
ARMARX_ON_SCOPE_EXIT
#define ARMARX_ON_SCOPE_EXIT
Executes given code when the enclosing scope is left.
Definition: OnScopeExit.h:120
armarx::Observer::getPartialDatafieldHistory_async
void getPartialDatafieldHistory_async(const AMD_ObserverInterface_getPartialDatafieldHistoryPtr &amd, const std::string &channelName, const std::string &datafieldName, Ice::Long startTimestamp, Ice::Long endTimestamp, Ice::Float timestepMs, const Ice::Current &c) const override
Definition: Observer.cpp:1905
armarx::Observer::Impl::idleChannelCondition
std::condition_variable idleChannelCondition
Definition: Observer.cpp:95
armarx::Observer::getAvailableChecks_async
void getAvailableChecks_async(const AMD_ObserverInterface_getAvailableChecksPtr &amd, const Ice::Current &) override
Definition: Observer.cpp:1544
armarx::Observer::Impl::FilterUpdateQueue
std::unordered_map< std::string, FilterQueueData > FilterUpdateQueue
Definition: Observer.cpp:97
armarx::Observer::Impl::currentId
int currentId
Definition: Observer.cpp:67
armarx::Observer::getDatafieldHistory_async
void getDatafieldHistory_async(const AMD_ObserverInterface_getDatafieldHistoryPtr &amd, const std::string &channelName, const std::string &datafieldName, Ice::Float timestepMs, const Ice::Current &c) const override
Definition: Observer.cpp:1836
armarx::Observer::getDataFields_async
void getDataFields_async(const AMD_ObserverInterface_getDataFieldsPtr &amd, const DataFieldIdentifierBaseList &identifiers, const Ice::Current &c) override
Definition: Observer.cpp:1450
Observer.h
armarx::Observer::setDataFieldsFlatCopy
void setDataFieldsFlatCopy(const std::string &channelName, const StringVariantBaseMap &datafieldValues, bool triggerFilterUpdate=true)
Definition: Observer.cpp:728
armarx::Observer::existsChannel
bool existsChannel(const std::string &channelName) const
Definition: Observer.cpp:1552
armarx::Variant::getTypeName
std::string getTypeName(const Ice::Current &c=Ice::emptyCurrent) const override
Return the Variant's internal type.
Definition: Variant.cpp:685
armarx::Observer::removeCheck
void removeCheck(const CheckIdentifier &id)
Removes a condition check from the observer.
Definition: Observer.cpp:1291
armarx::Observer::Impl::FilterQueueData::datafieldName
std::string datafieldName
Definition: Observer.cpp:89
armarx::RunningTask
Definition: ArmarXMultipleObjectsScheduler.h:36
armarx::Observer::Impl::channelUpdateTask
RunningTask< Observer >::pointer_type channelUpdateTask
Definition: Observer.cpp:92
armarx::Observer::Impl::channelUpdateTimestamps
std::map< std::string, IceUtil::Time > channelUpdateTimestamps
Definition: Observer.cpp:82
armarx::Observer::Impl::WorkerUpdate::time
TimepointT time
Definition: Observer.cpp:110
armarx::Observer::Now
static TimepointT Now()
Definition: Observer.cpp:1219
armarx::Observer::getDataFieldRef_async
void getDataFieldRef_async(const AMD_ObserverInterface_getDataFieldRefPtr &amd, const DataFieldIdentifierBasePtr &identifier, const Ice::Current &) const override
Definition: Observer.cpp:1403
armarx::Observer::getDatafieldByName_async
void getDatafieldByName_async(const AMD_ObserverInterface_getDatafieldByNamePtr &amd, const std::string &channelName, const std::string &datafieldName, const Ice::Current &) const override
Definition: Observer.cpp:1368
armarx::Observer::updateDatafieldFilter
std::set< std::string > updateDatafieldFilter(const std::string &channelName, const std::string &datafieldName, const VariantBasePtr &value)
Definition: Observer.cpp:386
IceInternal::Handle< DatafieldRef >
armarx::Observer::updateDatafieldTimestamps
void updateDatafieldTimestamps(const std::string &channelName, const std::unordered_map< std::string, Ice::Long > &datafieldValues)
Definition: Observer.cpp:580
InvalidTypeException.h
armarx::Observer::Impl::FilterData::original
DatafieldRefPtr original
Definition: Observer.cpp:73
armarx::armem::client::util::swap
void swap(SubscriptionHandle &first, SubscriptionHandle &second)
Definition: SubscriptionHandle.cpp:66
DatafieldRef.h
ObserverObjectFactories.h
armarx::Observer::Impl::worker
std::thread worker
Definition: Observer.cpp:115
armarx::Observer::Impl::checksMutex
std::mutex checksMutex
Definition: Observer.cpp:55
ARMARX_TRACE
#define ARMARX_TRACE
Definition: trace.h:77
cxxopts::value
std::shared_ptr< Value > value()
Definition: cxxopts.hpp:855
armarx::Observer::Impl::FilterQueueData
Definition: Observer.cpp:85
armarx::Observer::runWorker
void runWorker()
Definition: Observer.cpp:1122
armarx::exceptions::local
Definition: DynamicLibraryException.h:31
armarx::exceptions::local::InvalidCheckException
Definition: InvalidCheckException.h:33
armarx::Observer::Impl::channelQueueMutex
std::mutex channelQueueMutex
Definition: Observer.cpp:94
armarx::Observer::scheduleDatafieldFilterUpdate
void scheduleDatafieldFilterUpdate(const std::string &channelName, const std::string &datafieldName, const VariantBasePtr &value)
Definition: Observer.cpp:437
armarx::Observer::Impl::metaTask
PeriodicTask< Observer >::pointer_type metaTask
Definition: Observer.cpp:49
armarx::Observer::Impl::maxHistoryRecordFrequency
float maxHistoryRecordFrequency
Definition: Observer.cpp:65
armarx::Observer::getDatafieldRefByName_async
void getDatafieldRefByName_async(const AMD_ObserverInterface_getDatafieldRefByNamePtr &amd, const std::string &channelName, const std::string &datafieldName, const Ice::Current &) const override
Definition: Observer.cpp:1419
armarx::Observer::Impl::filterUpdateTask
RunningTask< Observer >::pointer_type filterUpdateTask
Definition: Observer.cpp:83
armarx::Observer::TimepointT
typename ClockT::time_point TimepointT
Definition: Observer.h:593
armarx::ValueToString
std::string ValueToString(const T &value)
Definition: StringHelpers.h:61
armarx::Observer::getDataFieldRef
DatafieldRefBasePtr getDataFieldRef(const DataFieldIdentifierBasePtr &identifier) const
Definition: Observer.cpp:1382
armarx::flush
const LogSender::manipulator flush
Definition: LogSender.h:251
armarx::Observer::setDataFieldFlatCopy
void setDataFieldFlatCopy(const std::string &channelName, const std::string &datafieldName, const VariantPtr &value, bool triggerFilterUpdate=true)
Definition: Observer.cpp:548
data
uint8_t data[1]
Definition: EtherCATFrame.h:68
ARMARX_DEBUG
#define ARMARX_DEBUG
Definition: Logging.h:184
armarx::Observer::getDataFields
TimedVariantBaseList getDataFields(const DataFieldIdentifierBaseList &identifiers, const Ice::Current &c)
Retrieve list of data field from observer.
Definition: Observer.cpp:1433
armarx::Observer::existsChannel_async
void existsChannel_async(const AMD_ObserverInterface_existsChannelPtr &amd, const std::string &channelName, const Ice::Current &) const override
Definition: Observer.cpp:1560
armarx::Observer::Impl::WorkerUpdate::ageInMs
float ageInMs() const
Definition: Observer.cpp:1932
armarx::Observer::existsDataField_async
void existsDataField_async(const AMD_ObserverInterface_existsDataFieldPtr &amd, const std::string &channelName, const std::string &datafieldName, const Ice::Current &) const override
Definition: Observer.cpp:1584
InvalidDatafieldException.h
armarx::Observer::Impl::FilterQueueData::value
VariantBasePtr value
Definition: Observer.cpp:87
armarx::Observer::offerOrUpdateDataFieldsFlatCopy
void offerOrUpdateDataFieldsFlatCopy(const std::string &channelName, const StringVariantBaseMap &valueMap)
Definition: Observer.cpp:260
armarx::Observer::metaUpdateTask
void metaUpdateTask()
Definition: Observer.cpp:1054
armarx::Observer::createFilteredDatafield_async
void createFilteredDatafield_async(const AMD_ObserverInterface_createFilteredDatafieldPtr &amd, const DatafieldFilterBasePtr &filter, const DatafieldRefBasePtr &datafieldRef, const Ice::Current &) override
Definition: Observer.cpp:1623
armarx::Observer::TimeDeltaInMs
static float TimeDeltaInMs(TimepointT t0)
Definition: Observer.cpp:1225
armarx::Variant::typeToString
static std::string typeToString(VariantTypeId typeId)
Return the name of the registered type typeId.
Definition: Variant.cpp:848
armarx::VariantType::Long
const VariantTypeId Long
Definition: Variant.h:918
armarx::exceptions::user::UnsupportedTypeException
Definition: InvalidTypeException.h:76
armarx::Observer::Impl::FilterQueueData::channelName
std::string channelName
Definition: Observer.cpp:88
armarx::Observer::Impl::maxHistorySize
int maxHistorySize
Definition: Observer.cpp:64
armarx::Observer::setDataField
void setDataField(const std::string &channelName, const std::string &datafieldName, const Variant &value, bool triggerFilterUpdate=true)
set datafield with datafieldName and in channel channelName
Definition: Observer.cpp:508
armarx::Observer::removeDatafield
void removeDatafield(DataFieldIdentifierBasePtr id)
Definition: Observer.cpp:344
ArmarXObjectScheduler.h
max
T max(T t1, T t2)
Definition: gdiam.h:51
armarx::Observer::Impl::idleCondition
std::condition_variable idleCondition
Definition: Observer.cpp:100
ARMARX_ERROR
#define ARMARX_ERROR
Definition: Logging.h:196
armarx::Observer::installCheck_async
void installCheck_async(const AMD_ObserverInterface_installCheckPtr &amd, const CheckConfiguration &configuration, const Ice::Current &) override
Definition: Observer.cpp:1282
armarx::VariantTypeId
Ice::Int VariantTypeId
Definition: Variant.h:43
InvalidDataFieldException.h
armarx::Observer::installCheck
CheckIdentifier installCheck(const CheckConfiguration &configuration)
Installs a condition check with the observer.
Definition: Observer.cpp:1247
armarx::Observer::impl
std::unique_ptr< Impl > impl
Definition: Observer.h:602
armarx::ConditionCheck
Definition: ConditionCheck.h:54
armarx::Observer::Impl::WorkerUpdate
Definition: Observer.cpp:104
armarx::TimeUtil::GetTime
static IceUtil::Time GetTime(TimeMode timeMode=TimeMode::VirtualTime)
Get the current time.
Definition: TimeUtil.cpp:42
armarx::VariantType::DatafieldRef
const VariantTypeId DatafieldRef
Definition: DatafieldRef.h:197
ExpressionException.h
InvalidCheckException.h
armarx::ConditionCheck::ChannelRegistry
armarx::ChannelRegistry ChannelRegistry
Creates a new ConditionCheck instance.
Definition: ConditionCheck.h:82
armarx::Component::getConfigIdentifier
std::string getConfigIdentifier()
Retrieve config identifier for this component as set in constructor.
Definition: Component.cpp:79
armarx::Observer::getDatafieldsOfChannel_async
void getDatafieldsOfChannel_async(const AMD_ObserverInterface_getDatafieldsOfChannelPtr &amd, const std::string &channelName, const Ice::Current &) const override
Definition: Observer.cpp:1478
armarx::Observer::channelsMutex
std::recursive_mutex channelsMutex
Definition: Observer.h:600
armarx::Observer::getAvailableChannels
ChannelRegistry getAvailableChannels(bool includeMetaChannels)
Retrieve information on all sensory data channels available from the observer.
Definition: Observer.cpp:1510
armarx::Observer::getChannel_async
void getChannel_async(const AMD_ObserverInterface_getChannelPtr &amd, const std::string &channelName, const Ice::Current &) const override
Definition: Observer.cpp:1501
armarx::Observer::getDatafieldHistory
TimedVariantBaseList getDatafieldHistory(const std::string &channelName, const std::string &datafieldName, Ice::Float timestepMs, const Ice::Current &c) const
Definition: Observer.cpp:1825
armarx::Observer::getObserverName_async
void getObserverName_async(const AMD_ObserverInterface_getObserverNamePtr &amd, const Ice::Current &) const override
Definition: Observer.cpp:1239
armarx::Observer::getAvailableChecks
StringConditionCheckMap getAvailableChecks()
Retrieve list of available condition checks.
Definition: Observer.cpp:1536
ARMARX_CHECK_EXPRESSION
#define ARMARX_CHECK_EXPRESSION(expression)
This macro evaluates the expression and if it turns out to be false it will throw an ExpressionExcept...
Definition: ExpressionException.h:73
armarx::Observer::createNamedFilteredDatafield
DatafieldRefBasePtr createNamedFilteredDatafield(const std::string &filterDatafieldName, const DatafieldFilterBasePtr &filter, const DatafieldRefBasePtr &datafieldRef)
Definition: Observer.cpp:1637
armarx::Observer::Impl::orignalToFiltered
std::multimap< std::string, FilterDataPtr > orignalToFiltered
Definition: Observer.cpp:79
armarx::Observer::generateId
int generateId()
Definition: Observer.cpp:1063
armarx::Observer::Impl::filterQueue
FilterUpdateQueue filterQueue
Definition: Observer.cpp:98
ARMARX_INFO
#define ARMARX_INFO
Definition: Logging.h:181
armarx::Observer::removeFilteredDatafield_async
void removeFilteredDatafield_async(const AMD_ObserverInterface_removeFilteredDatafieldPtr &amd, const DatafieldRefBasePtr &datafieldRef, const Ice::Current &) override
Definition: Observer.cpp:1730
armarx::Observer::removeCheck_async
void removeCheck_async(const AMD_ObserverInterface_removeCheckPtr &amd, const CheckIdentifier &id, const Ice::Current &) override
Definition: Observer.cpp:1316
armarx::Observer::existsDataField
bool existsDataField(const std::string &channelName, const std::string &datafieldName) const
Definition: Observer.cpp:1569
armarx::Observer::getPartialChannelHistory
ChannelHistory getPartialChannelHistory(const std::string &channelName, Ice::Long startTimestamp, Ice::Long endTimestamp, Ice::Float timestepMs, const Ice::Current &c) const
Definition: Observer.cpp:1765
IceUtil::Handle
Definition: forward_declarations.h:30
armarx::Observer::Impl::channelHistory
ChannelRegistryHistory channelHistory
Definition: Observer.cpp:62
armarx::Observer::getObserverName
std::string getObserverName() const
Definition: Observer.cpp:1233
armarx::Observer::Impl::FilterData::filter
DatafieldFilterBasePtr filter
Definition: Observer.cpp:72
armarx::Observer::Impl::workerUpdatesMutex
std::recursive_mutex workerUpdatesMutex
Definition: Observer.cpp:102
armarx::Observer::Impl::idMutex
std::mutex idMutex
Definition: Observer.cpp:68
armarx::Observer::getAvailableChannels_async
void getAvailableChannels_async(const AMD_ObserverInterface_getAvailableChannelsPtr &amd, bool includeMetaChannels, const Ice::Current &) override
Definition: Observer.cpp:1524
armarx::exceptions::user::InvalidDataFieldException
Definition: InvalidDatafieldException.h:30
armarx::Observer::maybeOfferChannelAndSetDataFieldsFlatCopy
void maybeOfferChannelAndSetDataFieldsFlatCopy(const std::string &channelName, const std::string &description, const StringVariantBaseMap &datafieldValues, bool triggerFilterUpdate=true)
Definition: Observer.cpp:637
armarx::Logging::deactivateSpam
SpamFilterDataPtr deactivateSpam(float deactivationDurationSec=10.0f, const std::string &identifier="", bool deactivate=true) const
disables the logging for the current line for the given amount of seconds.
Definition: Logging.cpp:99
armarx::Observer::onExitObserver
virtual void onExitObserver()
Framework hook.
Definition: Observer.h:463
armarx::Observer::offerConditionCheck
void offerConditionCheck(std::string checkName, ConditionCheck *conditionCheck)
Offer a condition check.
Definition: Observer.cpp:301
armarx::ManagedIceObject::getName
std::string getName() const
Retrieve name of object.
Definition: ManagedIceObject.cpp:108
armarx::PeriodicTask
Definition: ArmarXManager.h:70
armarx::Observer::createNamedFilteredDatafield_async
void createNamedFilteredDatafield_async(const AMD_ObserverInterface_createNamedFilteredDatafieldPtr &amd, const std::string &filterDatafieldName, const DatafieldFilterBasePtr &filter, const DatafieldRefBasePtr &datafieldRef, const Ice::Current &) override
Definition: Observer.cpp:1693
min
T min(T t1, T t2)
Definition: gdiam.h:44
armarx::Observer::getDatafieldByName
TimedVariantBasePtr getDatafieldByName(const std::string &channelName, const std::string &datafieldName) const
Definition: Observer.cpp:1341
armarx::Observer::offerDataFieldWithDefault
void offerDataFieldWithDefault(std::string channelName, std::string datafieldName, const Variant &defaultValue, std::string description)
Offer a datafield with default value.
Definition: Observer.cpp:160
armarx::handleExceptions
void handleExceptions()
Definition: Exception.cpp:157
armarx::exceptions::user::InvalidChannelException
Definition: InvalidDatafieldException.h:46
ARMARX_WARNING
#define ARMARX_WARNING
Definition: Logging.h:193
armarx::DataFieldIdentifierBaseList
::std::vector<::armarx::DataFieldIdentifierBasePtr > DataFieldIdentifierBaseList
Definition: StatechartContextInterface.h:43
armarx::Observer::addWorkerJob
void addWorkerJob(const std::string &name, std::function< void(void)> &&f) const
Definition: Observer.cpp:1195
armarx::Observer::createFilteredDatafield
DatafieldRefBasePtr createFilteredDatafield(const DatafieldFilterBasePtr &filter, const DatafieldRefBasePtr &datafieldRef)
This function creates a new datafield with new filter on the given datafield.
Definition: Observer.cpp:1595
armarx::PropertyDefinitionsPtr
IceUtil::Handle< class PropertyDefinitionContainer > PropertyDefinitionsPtr
PropertyDefinitions smart pointer type.
Definition: forward_declarations.h:35
armarx::ManagedIceObject::getProxy
Ice::ObjectPrx getProxy(long timeoutMs=0, bool waitForScheduler=true) const
Returns the proxy of this object (optionally it waits for the proxy)
Definition: ManagedIceObject.cpp:407
armarx::Observer::offerDataField
void offerDataField(std::string channelName, std::string datafieldName, VariantTypeId type, std::string description)
Offer a datafield without default value.
Definition: Observer.cpp:201
armarx::Observer::getChannelHistory
ChannelHistory getChannelHistory(const std::string &channelName, Ice::Float timestepMs, const Ice::Current &c) const
Definition: Observer.cpp:1740
armarx::Observer::Impl::channelQueue
std::map< std::string, std::set< std::string > > channelQueue
Definition: Observer.cpp:93
LAST_REFRESH_DELTA_CHANNEL
const std::string LAST_REFRESH_DELTA_CHANNEL
Definition: Observer.cpp:45
armarx::ObserverPropertyDefinitions
Definition: Observer.h:49
armarx
This file offers overloads of toIce() and fromIce() functions for STL container types.
Definition: ArmarXTimeserver.cpp:27
identifiers
const std::list< std::string > identifiers
Definition: linux_networkload.cpp:22
armarx::Observer::Impl::workerUpdates
std::deque< WorkerUpdate > workerUpdates
Definition: Observer.cpp:114
armarx::TimedVariant
Definition: TimedVariant.h:39
armarx::Observer::Impl::ChannelRegistryHistory
std::unordered_map< std::string, boost::circular_buffer< std::pair< IceUtil::Time, ChannelRegistryEntry > >> ChannelRegistryHistory
Definition: Observer.cpp:61
armarx::DataFieldIdentifier
DataFieldIdentifier provide the basis to identify data field within a distributed ArmarX scenario.
Definition: DataFieldIdentifier.h:48
armarx::Observer::postWorkerJobs
virtual void postWorkerJobs()
Definition: Observer.cpp:1214
armarx::Observer::Impl::filterQueueMutex
std::mutex filterQueueMutex
Definition: Observer.cpp:99
armarx::Observer::preWorkerJobs
virtual void preWorkerJobs()
Definition: Observer.cpp:1209
armarx::Observer::offerChannel
void offerChannel(std::string channelName, std::string description)
Offer a channel.
Definition: Observer.cpp:131
ARMARX_STREAM_PRINTER
#define ARMARX_STREAM_PRINTER
use this macro to write output code that is executed when printed and thus not executed if the debug ...
Definition: Logging.h:310