Observer.cpp
Go to the documentation of this file.
1 /*
2 * This file is part of ArmarX.
3 *
4 * Copyright (C) 2011-2016, High Performance Humanoid Technologies (H2T), Karlsruhe Institute of Technology (KIT), all rights reserved.
5 *
6 * ArmarX is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 as
8 * published by the Free Software Foundation.
9 *
10 * ArmarX is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 *
18 * @package ArmarX::Core
19 * @author Kai Welke (welke@kit.edu)
20 * @date 2011
21 * @copyright http://www.gnu.org/licenses/gpl-2.0.txt
22 * GNU General Public License
23 */
24 #include <algorithm>
25 
26 #include "Observer.h"
36 
38 
39 #include <boost/circular_buffer.hpp>
40 
41 #include <condition_variable>
42 
43 using namespace armarx;
44 using namespace armarx::exceptions::local;
45 
46 const std::string LAST_REFRESH_DELTA_CHANNEL = "_LastRefreshDelta";
47 
49 {
51  bool logChannelUpdateRate = false;
52 
53 
54  // available checks and channels
55  StringConditionCheckMap availableChecks;
56  std::mutex checksMutex;
57 
58  // available channels and data fields
60  using ChannelRegistryHistory = std::unordered_map<std::string, boost::circular_buffer<std::pair<IceUtil::Time, ChannelRegistryEntry> > >;
62  mutable std::recursive_mutex historyMutex;
65 
66  int currentId;
67  std::mutex idMutex;
68 
69  struct FilterData : IceUtil::Shared
70  {
71  DatafieldFilterBasePtr filter;
74  };
76 
77  std::multimap<std::string, FilterDataPtr> orignalToFiltered;
78  std::map<std::string, FilterDataPtr> filteredToOriginal;
79  std::recursive_mutex filterMutex;
80  std::map<std::string, IceUtil::Time> channelUpdateTimestamps;
83  {
85  std::string channelName;
86  std::string datafieldName;
87  };
88 
90  std::map<std::string, std::set<std::string> > channelQueue;
91  std::mutex channelQueueMutex;
92  std::condition_variable idleChannelCondition;
93 
94  using FilterUpdateQueue = std::unordered_map<std::string, FilterQueueData>;
96  std::mutex filterQueueMutex;
97  std::condition_variable idleCondition;
98 
99  std::recursive_mutex workerUpdatesMutex;
101  {
102  WorkerUpdate(const std::string& name, std::function<void(void)> f);
103  float ageInMs() const;
104 
105  std::string name;
107  std::function<void(void)> f;
108  };
109  mutable std::deque<WorkerUpdate> workerUpdates;
110  std::thread worker;
111  std::atomic_bool stopWorker;
112 };
113 
115  : impl(new Impl)
116 {
117 }
118 
120 {
121 }
122 
123 // *******************************************************
124 // offering of channels, datafield, and checks
125 // *******************************************************
126 void Observer::offerChannel(std::string channelName, std::string description)
127 {
128  if (getState() < eManagedIceObjectInitialized)
129  {
130  throw LocalException() << "offerChannel() must not be called before the Observer is initalized (i.e. not in onInitObserver(), use onConnectObserver()";
131  }
132 
133  std::unique_lock lock(channelsMutex);
134 
135  if (impl->channelRegistry.find(channelName) != impl->channelRegistry.end())
136  {
138  }
139 
140  ChannelRegistryEntry channel;
141  channel.name = channelName;
142  channel.description = description;
143  channel.initialized = false;
144 
145  std::pair<std::string, ChannelRegistryEntry> entry;
146  entry.first = channelName;
147  entry.second = channel;
148 
149  impl->channelRegistry.insert(entry);
150 }
151 
152 void Observer::offerDataFieldWithDefault(std::string channelName, std::string datafieldName, const Variant& defaultValue, std::string description)
153 {
154  if (getState() < eManagedIceObjectInitialized)
155  {
156  throw LocalException() << "offerDataFieldWithDefault() must not be called before the Observer is initalized (i.e. not in onInitObserver(), use onConnectObserver()";
157  }
158 
159  std::unique_lock lock(channelsMutex);
160 
161  auto channelIt = impl->channelRegistry.find(channelName);
162 
163  if (channelIt == impl->channelRegistry.end())
164  {
166  }
167 
168  if (channelIt->second.dataFields.find(datafieldName) != channelIt->second.dataFields.end())
169  {
170  throw exceptions::user::DatafieldExistsAlreadyException(channelName, datafieldName);
171  }
172 
173  DataFieldRegistryEntry dataField;
174  dataField.identifier = new DatafieldRef(this, channelName, datafieldName, false);
175  dataField.description = description;
176  dataField.typeName = defaultValue.getTypeName();
177  dataField.value = new TimedVariant();
178  *dataField.value = defaultValue;
179 
180  std::pair<std::string, DataFieldRegistryEntry> entry;
181  entry.first = datafieldName;
182  entry.second = dataField;
183 
184  channelIt->second.dataFields.insert(entry);
185 }
186 
187 void Observer::offerDataField(std::string channelName, std::string datafieldName, VariantTypeId type, std::string description)
188 {
189  if (getState() < eManagedIceObjectInitialized)
190  {
191  throw LocalException() << "offerDataField() must not be called before the Observer is initalized (i.e. not in onInitObserver(), use onConnectObserver()";
192  }
193 
194  std::unique_lock lock(channelsMutex);
195 
196  auto channelIt = impl->channelRegistry.find(channelName);
197 
198  if (channelIt == impl->channelRegistry.end())
199  {
201  }
202 
203  if (channelIt->second.dataFields.find(datafieldName) != channelIt->second.dataFields.end())
204  {
205  throw exceptions::user::InvalidDataFieldException(channelName, datafieldName);
206  }
207 
208  DataFieldRegistryEntry dataField;
209  dataField.identifier = new DatafieldRef(this, channelName, datafieldName, false);
210  dataField.description = description;
211  dataField.typeName = Variant::typeToString(type);
212  dataField.value = new TimedVariant();
213  dataField.value->setType(type);
214 
215  std::pair<std::string, DataFieldRegistryEntry> entry;
216  entry.first = datafieldName;
217  entry.second = dataField;
218 
219  impl->channelRegistry[channelName].dataFields.insert(entry);
220 }
221 
222 bool Observer::offerOrUpdateDataField(std::string channelName, std::string datafieldName, const Variant& value, const std::string& description)
223 {
224  if (!existsDataField(channelName, datafieldName))
225  {
226  offerDataFieldWithDefault(channelName, datafieldName, value, description);
227  return true;
228  }
229  else
230  {
231  setDataField(channelName, datafieldName, value);
232  return false;
233  }
234 }
235 
236 void Observer::offerOrUpdateDataFieldsFlatCopy(const std::string& channelName, const StringVariantBaseMap& valueMap)
237 {
238  try
239  {
240  // fastest way is to just try to set the datafields
241  maybeOfferChannelAndSetDataFieldsFlatCopy(channelName, "", valueMap, true);
242  }
244  {
245  ARMARX_INFO << deactivateSpam() << "Creating datafields for channel " << channelName;
246  // on failure: do it the slow way
248  << "failure when seting datafield "
249  << "using slow fallback";
250  for (const auto& value : valueMap)
251  {
252  const std::string& datafieldName = value.first;
253  if (!existsDataField(channelName, datafieldName))
254  {
255  try
256  {
257  offerDataFieldWithDefault(channelName, datafieldName, *VariantPtr::dynamicCast(value.second), "");
258  }
260  {
261  setDataFieldFlatCopy(channelName, datafieldName, VariantPtr::dynamicCast(value.second));
262  }
263  }
264  else
265  {
266  setDataFieldFlatCopy(channelName, datafieldName, VariantPtr::dynamicCast(value.second));
267  }
268  }
269  }
270  updateChannel(channelName);
271 }
272 
273 void Observer::offerConditionCheck(std::string checkName, ConditionCheck* conditionCheck)
274 {
275  std::unique_lock lock(impl->checksMutex);
276 
277  if (impl->availableChecks.find(checkName) != impl->availableChecks.end())
278  {
279  throw InvalidCheckException(checkName);
280  }
281 
282  std::pair<std::string, ConditionCheckPtr> entry;
283  entry.first = checkName;
284  entry.second = conditionCheck;
285 
286  impl->availableChecks.insert(entry);
287 }
288 
289 void Observer::removeChannel(std::string channelName)
290 {
291  {
292  std::unique_lock lock(channelsMutex);
293 
294  auto iter = impl->channelRegistry.find(channelName);
295 
296  if (iter == impl->channelRegistry.end())
297  {
298  return;
299  }
300 
302  removeDatafield(id);
303  impl->channelRegistry.erase(iter);
304  ARMARX_INFO << "Removed channel " << channelName;
305  }
306 
307  {
308  std::unique_lock lock(impl->historyMutex);
309  impl->channelHistory.erase(channelName);
310  }
311 
312 }
313 
315 {
316  {
317  std::unique_lock lock(impl->filterMutex);
318  // Remove filter
319  DataFieldIdentifierPtr idptr = DataFieldIdentifierPtr::dynamicCast(id);
320  std::string idStr = idptr->getIdentifierStr();
321  auto itFilter = impl->filteredToOriginal.find(idStr);
322 
323  if (itFilter != impl->filteredToOriginal.end())
324  {
325  DatafieldRefPtr refPtr = DatafieldRefPtr::dynamicCast(itFilter->second->original);
326  auto range = impl->orignalToFiltered.equal_range(refPtr->getDataFieldIdentifier()->getIdentifierStr());
327 
328  for (auto it = range.first; it != range.second; it++)
329  if (it->second->filtered->getDataFieldIdentifier()->getIdentifierStr()
330  == idStr)
331  {
332  impl->orignalToFiltered.erase(it);
333  break;
334  }
335 
336  impl->filteredToOriginal.erase(itFilter);
337  }
338  }
339 
340  std::unique_lock lock(channelsMutex);
341 
342  auto itChannel = impl->channelRegistry.find(id->channelName);
343 
344  if (itChannel == impl->channelRegistry.end())
345  {
346  return;
347  }
348 
349  itChannel->second.dataFields.erase(id->datafieldName);
350 }
351 
352 
353 
354 // *******************************************************
355 // utility methods for sensordatalistener
356 // *******************************************************
357 std::set<std::string> Observer::updateDatafieldFilter(const std::string& channelName, const std::string& datafieldName, const VariantBasePtr& value)
358 {
359  std::vector<Impl::FilterQueueData> filterData;
360  std::set<std::string> foundFilterFields;
361  {
362  std::unique_lock lock(impl->filterMutex);
363  const std::string id = getName() + "." + channelName + "." + datafieldName;
364  // DatafieldRefPtr ref = new DatafieldRef(this, channelName, datafieldName);
365  auto range = impl->orignalToFiltered.equal_range(id);
366 
367  //IceUtil::Time start = IceUtil::Time::now();
368  //bool found = false;
369  TimedVariantPtr origTV = TimedVariantPtr::dynamicCast(value);
370  auto t = origTV ? origTV->getTime() : TimeUtil::GetTime();
371  long tLong = t.toMicroSeconds();
372 
373  for (auto it = range.first; it != range.second; it++)
374  {
375  it->second->filter->update(tLong, value);
376 
377  foundFilterFields.insert(it->second->filtered->datafieldName);
378 
379  TimedVariantPtr tv = new TimedVariant(VariantPtr::dynamicCast(it->second->filter->getValue()), t);
380  filterData.emplace_back(Impl::FilterQueueData {tv, it->second->filtered->channelRef->channelName, it->second->filtered->datafieldName});
381  //found = true;
382  }
383  }
384 
385  for (auto& elem : filterData)
386  {
387  setDataFieldFlatCopy(elem.channelName, elem.datafieldName, VariantPtr::dynamicCast(elem.value));
388  }
389 
390  /*if(found)
391  {
392  IceUtil::Time end = IceUtil::Time::now();
393  IceUtil::Time duration = end - start;
394  ARMARX_IMPORTANT << deactivateSpam(0.1f) << channelName << ":" << datafieldName << ": all filters calc microseconds: " << duration.toMicroSeconds();
395  }*/
396 
397  return foundFilterFields;
398 }
399 
400 void Observer::scheduleDatafieldFilterUpdate(const std::string& channelName, const std::string& datafieldName, const VariantBasePtr& value)
401 {
402  if (impl->orignalToFiltered.size() == 0)
403  {
404  return; // no filters installed anyway ...nothing todo
405  }
406  const std::string id = getName() + "." + channelName + "." + datafieldName;
407  {
408  std::unique_lock lock(impl->filterMutex);
409  if (impl->orignalToFiltered.count(id) == 0)
410  {
411  return; // no filter for this datafield installed ...nothing todo
412  }
413  }
414  std::unique_lock lock(impl->filterQueueMutex);
415  impl->filterQueue[id] = {value, channelName, datafieldName};
416  impl->idleCondition.notify_all();
417 }
418 
419 void Observer::updateFilters()
420 {
421  while (!impl->filterUpdateTask->isStopped())
422  {
424  {
425  std::unique_lock lock(impl->filterQueueMutex);
426  queue.swap(impl->filterQueue);
427  }
428  std::unordered_map<std::string, std::set<std::string> > channels;
429  for (const Impl::FilterUpdateQueue::value_type& elem : queue)
430  {
431  auto foundFilterFields = updateDatafieldFilter(elem.second.channelName,
432  elem.second.datafieldName,
433  elem.second.value);
434  channels[elem.second.channelName].insert(foundFilterFields.begin(), foundFilterFields.end());
435  }
436  for (const auto& channel : channels)
437  {
438  if (channel.second.size() > 0)
439  {
440  updateChannel(channel.first, channel.second);
441  }
442  }
443  std::unique_lock lock(impl->filterQueueMutex);
444  if (impl->filterQueue.size() == 0)
445  {
446  impl->idleCondition.wait(lock);
447  }
448  }
449 }
450 
451 void Observer::setDataFieldFlatCopy(const DataFieldRegistry::iterator& dataFieldIter, const VariantPtr& value)
452 {
453  ARMARX_CHECK_EXPRESSION(value) << "Datafieldvariant is NULL!";
454  TimedVariantPtr tval = TimedVariantPtr::dynamicCast(value);
455  if (tval)
456  {
457  dataFieldIter->second.value = std::move(tval);
458  }
459  else
460  {
461  dataFieldIter->second.value = new TimedVariant(value, TimeUtil::GetTime());
462  }
463 }
464 
465 void Observer::setDataField(const std::string& channelName, const std::string& datafieldName, const Variant& value, bool triggerFilterUpdate)
466 {
467  VariantBasePtr valuePtr;
468  {
469  std::unique_lock lock(channelsMutex);
470  auto itChannel = impl->channelRegistry.find(channelName);
471 
472  if (itChannel == impl->channelRegistry.end())
473  {
475  }
476 
477  DataFieldRegistry::iterator itDF = itChannel->second.dataFields.find(datafieldName);
478 
479  if (itDF == itChannel->second.dataFields.end())
480  {
481  throw exceptions::user::InvalidDataFieldException(channelName, datafieldName);
482  }
483 
484  if (dynamic_cast<const TimedVariant*>(&value))
485  {
486  itDF->second.value = value.clone();
487  }
488  else
489  {
490  itDF->second.value = new TimedVariant(value, TimeUtil::GetTime());
491  }
492  valuePtr = itDF->second.value;
493  }
494  if (triggerFilterUpdate)
495  {
496  scheduleDatafieldFilterUpdate(channelName, datafieldName, valuePtr);
497  }
498  //*dataFieldValue = value;
499 }
500 
501 void Observer::setDataFieldFlatCopy(const std::string& channelName, const std::string& datafieldName, const VariantPtr& value, bool triggerFilterUpdate)
502 {
503  {
504  std::unique_lock lock(channelsMutex);
505 
506  auto itChannel = impl->channelRegistry.find(channelName);
507 
508  if (itChannel == impl->channelRegistry.end())
509  {
511  }
512 
513  DataFieldRegistry::iterator itDF = itChannel->second.dataFields.find(datafieldName);
514 
515  if (itDF == itChannel->second.dataFields.end())
516  {
517  throw exceptions::user::InvalidDataFieldException(channelName, datafieldName);
518  }
519 
521  }
522 
523  if (triggerFilterUpdate)
524  {
525  scheduleDatafieldFilterUpdate(channelName, datafieldName, value);
526  }
527 }
528 
529 void Observer::updateDatafieldTimestamps(const std::string& channelName, const std::unordered_map<std::string, Ice::Long>& datafieldValues)
530 {
531  std::unique_lock lock(channelsMutex);
532 
533  auto itChannel = impl->channelRegistry.find(channelName);
534 
535  if (itChannel == impl->channelRegistry.end())
536  {
538  }
539 
540  for (const auto& elem : datafieldValues)
541  {
542  DataFieldRegistry::iterator itDF = itChannel->second.dataFields.find(elem.first);
543 
544  if (itDF == itChannel->second.dataFields.end())
545  {
546  throw exceptions::user::InvalidDataFieldException(channelName, elem.first);
547  }
548 
549 
550  TimedVariantPtr oldValue = TimedVariantPtr::dynamicCast(itDF->second.value);
551  if (!oldValue || (elem.second > oldValue->getTimestamp() && oldValue->getInitialized()))
552  {
553  itDF->second.value = new TimedVariant(VariantPtr::dynamicCast(itDF->second.value), IceUtil::Time::microSeconds(elem.second));
554  }
555  }
556 }
557 
558 void Observer::updateDatafieldTimestamps(const std::string& channelName, Ice::Long timestamp)
559 {
560  std::unique_lock lock(channelsMutex);
561 
562  auto itChannel = impl->channelRegistry.find(channelName);
563 
564  if (itChannel == impl->channelRegistry.end())
565  {
567  }
568 
569  for (auto& elem : itChannel->second.dataFields)
570  {
571 
572  TimedVariantPtr oldValue = TimedVariantPtr::dynamicCast(elem.second.value);
573  if (!oldValue || (timestamp > oldValue->getTimestamp() && oldValue->getInitialized()))
574  {
575  elem.second.value = new TimedVariant(VariantPtr::dynamicCast(elem.second.value), IceUtil::Time::microSeconds(timestamp));
576  }
577  }
578 }
579 
580 void Observer::maybeOfferChannelAndSetDataFieldsFlatCopy(const std::string& channelName, const std::string& description, const StringVariantBaseMap& datafieldValues, bool triggerFilterUpdate)
581 {
582  {
583  std::lock_guard lock(channelsMutex);
584  auto itChannel = impl->channelRegistry.find(channelName);
585  if (itChannel == impl->channelRegistry.end())
586  {
587  //add
588  auto& channel = impl->channelRegistry[channelName];
589  channel.name = channelName;
590  channel.description = description;
591  channel.initialized = false;
592 
593  itChannel = impl->channelRegistry.find(channelName);
594  }
595 
596  //use map merge algorithm since it is faster (n+m instead n*log(m))
597  auto targ = itChannel->second.dataFields.begin();
598  auto vals = datafieldValues.begin();
599  while (targ != itChannel->second.dataFields.end() &&
600  vals != datafieldValues.end())
601  {
602  if (vals->first > targ->first)
603  {
604  ++targ;
605  }
606  else if (vals->first == targ->first)
607  {
608  setDataFieldFlatCopy(targ, VariantPtr::dynamicCast(vals->second));
609  ++vals;
610  ++targ;
611  }
612  else
613  {
614  throw exceptions::user::InvalidDataFieldException(channelName, vals->first);
615  }
616  }
617  if (vals != datafieldValues.end())
618  {
619  throw exceptions::user::InvalidDataFieldException(channelName, vals->first);
620  }
621  }
622  if (triggerFilterUpdate)
623  {
624  for (const auto& elem : datafieldValues)
625  {
626  scheduleDatafieldFilterUpdate(channelName, elem.first, elem.second);
627  }
628  }
629 }
630 
631 
632 void Observer::setDataFieldsFlatCopy(const std::string& channelName, const std::unordered_map< ::std::string, ::armarx::VariantBasePtr>& datafieldValues, bool triggerFilterUpdate)
633 {
634  {
635  std::unique_lock lock(channelsMutex);
636 
637  auto itChannel = impl->channelRegistry.find(channelName);
638 
639  if (itChannel == impl->channelRegistry.end())
640  {
642  }
643 
644  for (const auto& elem : datafieldValues)
645  {
646  DataFieldRegistry::iterator itDF = itChannel->second.dataFields.find(elem.first);
647 
648  if (itDF == itChannel->second.dataFields.end())
649  {
650  throw exceptions::user::InvalidDataFieldException(channelName, elem.first);
651  }
652 
653  setDataFieldFlatCopy(itDF, VariantPtr::dynamicCast(elem.second));
654  }
655  }
656  if (triggerFilterUpdate)
657  {
658  for (const auto& elem : datafieldValues)
659  {
660  scheduleDatafieldFilterUpdate(channelName, elem.first, elem.second);
661  }
662  }
663 }
664 
665 void Observer::setDataFieldsFlatCopy(const std::string& channelName, const StringVariantBaseMap& datafieldValues, bool triggerFilterUpdate)
666 {
667  {
668  std::unique_lock lock(channelsMutex);
669 
670  auto itChannel = impl->channelRegistry.find(channelName);
671 
672  if (itChannel == impl->channelRegistry.end())
673  {
675  }
676 
677  for (const auto& elem : datafieldValues)
678  {
679  DataFieldRegistry::iterator itDF = itChannel->second.dataFields.find(elem.first);
680 
681  if (itDF == itChannel->second.dataFields.end())
682  {
683  throw exceptions::user::InvalidDataFieldException(channelName, elem.first);
684  }
685  ARMARX_CHECK_EXPRESSION(elem.second) << "Datafieldname: " << elem.first;
686  setDataFieldFlatCopy(itDF, VariantPtr::dynamicCast(elem.second));
687  }
688  }
689 
690  if (triggerFilterUpdate)
691  {
692  for (const auto& elem : datafieldValues)
693  {
694  scheduleDatafieldFilterUpdate(channelName, elem.first, elem.second);
695  }
696  }
697 }
698 
699 void Observer::updateRefreshRateChannel(const std::string& channelName)
700 {
701  auto& oldUpdateTime = impl->channelUpdateTimestamps[channelName];
702  auto now = IceUtil::Time::now();
703 
704  try
705  {
706  setDataFieldFlatCopy(LAST_REFRESH_DELTA_CHANNEL, channelName, new Variant((now - oldUpdateTime).toMilliSecondsDouble()), false);
707  }
709  {
710  offerDataFieldWithDefault(LAST_REFRESH_DELTA_CHANNEL, channelName, 0.0, "Update delta of channel '" + channelName + "'");
711  }
712  oldUpdateTime = now;
713 }
714 
715 void Observer::updateChannel(const std::string& channelName, const std::set<std::string>& updatedDatafields)
716 {
717  if (!existsChannel(channelName))
718  {
720  }
721  std::unique_lock lock(impl->channelQueueMutex);
722  std::set<std::string>& dfs = impl->channelQueue[channelName];
723  dfs.insert(updatedDatafields.begin(), updatedDatafields.end());
724  impl->idleChannelCondition.notify_all();
725 }
726 
727 void Observer::channelUpdateFunction()
728 {
729 
730  while (!impl->channelUpdateTask->isStopped())
731  {
732  std::map<std::string, std::set<std::string> > queue;
733  {
734  std::unique_lock lock(impl->channelQueueMutex);
735  queue.swap(impl->channelQueue);
736  }
737  for (const auto& elem : queue)
738  {
739  doChannelUpdate(elem.first, elem.second);
740  }
741  std::unique_lock lock(impl->channelQueueMutex);
742  if (impl->channelQueue.size() == 0)
743  {
744  impl->idleChannelCondition.wait(lock);
745  }
746  }
747 }
748 void Observer::addToChannelHistory(const std::pair<IceUtil::Time, ChannelRegistryEntry>& historyEntry, const std::string& channelName)
749 {
750  if (impl->maxHistorySize == 0)
751  {
752  return;
753  }
754  std::unique_lock lockHistory(impl->historyMutex);
755  auto historyIt = impl->channelHistory.find(channelName);
756  if (historyIt == impl->channelHistory.end())
757  {
758  impl->channelHistory[channelName].set_capacity(impl->maxHistorySize);
759  historyIt = impl->channelHistory.find(channelName);
760  }
761  auto now = TimeUtil::GetTime();
762  if (historyIt->second.empty() || now > historyIt->second.rbegin()->first + IceUtil::Time::secondsDouble(1.0 / impl->maxHistoryRecordFrequency))
763  {
764  historyIt->second.push_back(historyEntry);
765  }
766 }
767 
768 void Observer::doChannelUpdate(const std::string& channelName, const std::set<std::string>& updatedDatafields)
769 {
770 
771 
772  try
773  {
774  ChannelRegistryEntry entry;
775  std::pair<IceUtil::Time, ChannelRegistryEntry> historyEntry;
776  {
777  std::unique_lock lock_channels(channelsMutex);
778  // check if channels exists
779  auto iterChannel = impl->channelRegistry.find(channelName);
780 
781  if (iterChannel == impl->channelRegistry.end())
782  {
784  }
785 
786 
787 
788  if (impl->logChannelUpdateRate)
789  {
790  updateRefreshRateChannel(channelName);
791  }
792 
793  // update initialized state
794  const DataFieldRegistry& dataFields = iterChannel->second.dataFields;
795  DataFieldRegistry::const_iterator iterDataFields = dataFields.begin();
796  bool channelInitialized = true;
797 
798  while (iterDataFields != dataFields.end())
799  {
800  channelInitialized &= iterDataFields->second.value->getInitialized();
801  iterDataFields++;
802  }
803 
804  iterChannel->second.initialized = channelInitialized;
805  if (iterChannel->second.conditionChecks.size() > 0)
806  {
807  entry = iterChannel->second;
808  }
809  historyEntry = std::make_pair(TimeUtil::GetTime(), iterChannel->second);
810 
811  }
812  addToChannelHistory(historyEntry, channelName);
813  // evaluate checks
814  ConditionCheckRegistry::iterator iterChecks = entry.conditionChecks.begin();
815 
816  while (iterChecks != entry.conditionChecks.end())
817  {
818  bool found = false;
819  if (updatedDatafields.size() > 0)
820  {
821  // check if this check belongs to an updated datafield.
822  for (const auto& elem : entry.dataFields)
823  {
824  const DataFieldRegistryEntry& datafieldEntry = elem.second;
825  if (updatedDatafields.count(datafieldEntry.identifier->datafieldName))
826  {
827  found = true;
828  break;
829  }
830  }
831  }
832  else
833  {
834  found = true;
835  }
836 
837  try
838  {
839  if (found)
840  {
841  evaluateCheck(ConditionCheckPtr::dynamicCast(iterChecks->second), entry);
842  }
843  }
844  catch (...)
845  {
846  ARMARX_ERROR << "Evaluating condition for channel " << channelName << " failed!";
848  }
849 
850  iterChecks++;
851  }
852  }
853  catch (...)
854  {
855  // ARMARX_ERROR << "Upating channel " << channelName << " failed!";
856  // handleExceptions();
857  }
858 }
859 
860 // *******************************************************
861 // Component hooks
862 // *******************************************************
863 void Observer::onInitComponent()
864 {
865  impl->maxHistorySize = getProperty<int>("MaxHistorySize");
866  impl->maxHistoryRecordFrequency = getProperty<float>("MaxHistoryRecordFrequency");
867 
868  {
869  std::unique_lock lock(impl->idMutex);
870  impl->currentId = 0;
871  }
872 
873  onInitObserver();
874  impl->filterUpdateTask = new RunningTask<Observer>(this, &Observer::updateFilters, "Filter update task");
875  impl->filterUpdateTask->start();
876  impl->channelUpdateTask = new RunningTask<Observer>(this, &Observer::channelUpdateFunction, "Channel update task");
877  impl->channelUpdateTask->start();
878 }
879 
880 void Observer::onConnectComponent()
881 {
882  impl->logChannelUpdateRate = getProperty<bool>("CreateUpdateFrequenciesChannel").getValue();
883  if (impl->logChannelUpdateRate && !existsChannel(LAST_REFRESH_DELTA_CHANNEL))
884  {
885  offerChannel(LAST_REFRESH_DELTA_CHANNEL, "Metachannel with the last channel update deltas of all channels in milliseconds");
886  }
887  impl->channelHistory.clear();
888  // subclass init
890  // offerDataFieldWithDefault(channelName, LAST_REFRESH_DELTA_CHANNEL, 0.0, );
891  // impl->channelUpdateTimestamps[channelName] = IceUtil::Time::now();
892 
893  std::unique_lock lock(channelsMutex);
894  auto proxy = ObserverInterfacePrx::checkedCast(getProxy());
895  // update the proxy in all existing datafield refs
896  for (auto& channel : impl->channelRegistry)
897  {
898  ChannelRegistryEntry& c = channel.second;
899  for (auto& df : c.dataFields)
900  {
901  DataFieldRegistryEntry& d = df.second;
902  d.identifier->channelRef->observerProxy = proxy;
903  }
904  }
905  impl->metaTask = new PeriodicTask<Observer>(this, &Observer::metaUpdateTask, 50);
906  if (impl->logChannelUpdateRate)
907  {
908  impl->metaTask->start();
909  }
910 }
911 void Observer::postOnConnectComponent()
912 {
913  ARMARX_INFO << "starting worker thread";
914  impl->stopWorker = false;
915  ARMARX_CHECK_EXPRESSION(!impl->worker.joinable());
916  impl->worker = std::thread{[&]{runWorker();}};
917 }
918 
919 void Observer::preOnDisconnectComponent()
920 {
921  ARMARX_INFO << "stopping worker thread";
922  impl->stopWorker = true;
923  if (impl->worker.joinable())
924  {
925  impl->worker.join();
926  }
927 }
928 
929 void Observer::onExitComponent()
930 {
931  onExitObserver();
932  if (impl->metaTask)
933  {
934  impl->metaTask->stop();
935  }
936  if (impl->filterUpdateTask)
937  {
938  impl->filterUpdateTask->stop(false);
939  }
940  impl->idleCondition.notify_all();
941  if (impl->filterUpdateTask)
942  {
943  impl->filterUpdateTask->stop(true);
944  }
945  if (impl->channelUpdateTask)
946  {
947  impl->channelUpdateTask->stop(false);
948  }
949  impl->idleChannelCondition.notify_all();
950  if (impl->channelUpdateTask)
951  {
952  impl->channelUpdateTask->stop(true);
953  }
954  impl->channelHistory.clear();
955 
956 }
957 
958 PropertyDefinitionsPtr Observer::createPropertyDefinitions()
959 {
962 }
963 
965 {
967 }
968 
969 
970 
971 // *******************************************************
972 // private methods
973 // *******************************************************
975 {
976  std::unique_lock lock(impl->idMutex);
977 
978  return impl->currentId++;
979 }
980 
981 
982 ConditionCheckPtr Observer::createCheck(const CheckConfiguration& configuration) const
983 {
984  std::string checkName = configuration.checkName;
985 
986  // create check from elementary condition
987  StringConditionCheckMap::const_iterator iterChecks = impl->availableChecks.find(checkName);
988 
989  if (iterChecks == impl->availableChecks.end())
990  {
991  std::string reason = "Invalid condition check \"" + checkName + "\" for observer \"" + getName() + "\".";
992  throw InvalidConditionException(reason.c_str());
993  }
994 
995  ARMARX_CHECK_EXPRESSION(iterChecks->second);
996  ConditionCheckPtr check = ConditionCheckPtr::dynamicCast(iterChecks->second)->createInstance(configuration, impl->channelRegistry);
997 
998  return check;
999 }
1000 
1001 CheckIdentifier Observer::registerCheck(const ConditionCheckPtr& check)
1002 {
1003  ARMARX_CHECK_EXPRESSION(check);
1004  ARMARX_CHECK_EXPRESSION(check->configuration.dataFieldIdentifier);
1005  // create identifier
1006  int id = generateId();
1007  CheckIdentifier identifier;
1008  identifier.uniqueId = id;
1009  identifier.channelName = check->configuration.dataFieldIdentifier->channelName;
1010  identifier.observerName = getName();
1011 
1012  // add to conditions list
1013  std::pair<int, ConditionCheckPtr> entry;
1014  entry.first = id;
1015  entry.second = check;
1016  impl->channelRegistry[check->configuration.dataFieldIdentifier->channelName].conditionChecks.insert(entry);
1017 
1018  return identifier;
1019 }
1020 
1021 void Observer::evaluateCheck(const ConditionCheckPtr& check, const ChannelRegistryEntry& channel) const
1022 {
1023  check->evaluateCondition(channel.dataFields);
1024 }
1025 
1026 // ////////////////////////////////////////////////////////////////////////// //
1028 {
1029  ARMARX_INFO << "observer worker->start";
1031  {
1032  ARMARX_INFO << "observer worker->stop";
1033  };
1034  std::deque<Impl::WorkerUpdate> toProcess;
1035  while (!impl->stopWorker)
1036  {
1037  preWorkerJobs();
1039  {
1040  postWorkerJobs();
1041  };
1042  {
1043  std::lock_guard g{impl->workerUpdatesMutex};
1044  std::swap(impl->workerUpdates, toProcess);
1045  }
1046  if (toProcess.empty())
1047  {
1048  ARMARX_DEBUG << deactivateSpam() << "no worker jobs (message only posted every 10 seconds";
1049  std::this_thread::sleep_for(std::chrono::milliseconds{1});
1050  continue;
1051  }
1052 
1053  float minage = std::numeric_limits<float>::infinity();
1054  float maxage = 0;
1055  struct Counters
1056  {
1057  float age = 0;
1058  float time = 0;
1059  std::size_t n = 0;
1060  };
1061 
1062  std::map<std::string, Counters> counters;
1063  {
1064  std::lock_guard lock(channelsMutex);
1065  for (auto& f : toProcess)
1066  {
1067  ARMARX_TRACE;
1068  float age = f.ageInMs();
1069  Counters& counter = counters[f.name];
1070  counter.age += age;
1071  ++counter.n;
1072  TimepointT start = Now();
1073  f.f();
1074  counter.time += TimeDeltaInMs(start);
1075  minage = std::min(minage, age);
1076  maxage = std::max(maxage, age);
1077  }
1078  }
1079  if (maxage <= 0)
1080  {
1081  minage = 0;
1082  }
1083  ARMARX_DEBUG << "observer worker got " << toProcess.size()
1084  << "\tjobs ages min/max " << minage << "\t" << maxage << "\n"
1086  {
1087  for (const auto& [key, c] : counters)
1088  {
1089  out << " " << key
1090  << "\t# " << c.n
1091  << "\tavg age " << (c.age / c.n)
1092  << "\tavg time " << (c.time / c.n) << "\n";
1093  }
1094  };
1095  toProcess.clear();
1096  }
1097 }
1098 
1099 void Observer::addWorkerJob(const std::string& name, std::function<void(void)>&& f) const
1100 {
1101  std::lock_guard g{impl->workerUpdatesMutex};
1102  impl->workerUpdates.emplace_back(name, std::move(f));
1103 }
1104 void Observer::addWorkerJob(const std::string& name, std::function<void(void)>&& f)
1105 {
1106  std::lock_guard g{impl->workerUpdatesMutex};
1107  impl->workerUpdates.emplace_back(name, std::move(f));
1108 }
1109 
1111 
1113 
1115 {
1116  return ClockT::now();
1117 }
1118 
1120 {
1121  return std::chrono::duration_cast<std::chrono::nanoseconds>(Now() - t0).count() * 1e-6f;
1122 }
1123 // ////////////////////////////////////////////////////////////////////////// //
1124 
1125 std::string Observer::getObserverName() const
1126 {
1127  return getName();
1128 }
1130  const AMD_ObserverInterface_getObserverNamePtr& amd,
1131  const Ice::Current&) const
1132 {
1133  addWorkerJob("Observer::getObserverName", amd,
1135 }
1136 // //////////////////
1137 CheckIdentifier Observer::installCheck(
1138  const CheckConfiguration& configuration)
1139 {
1140  // create condition check
1141  ConditionCheckPtr check;
1142  {
1143  std::unique_lock lock_checks(impl->checksMutex);
1144  std::unique_lock lock_channels(channelsMutex);
1145  check = createCheck(configuration);
1146  }
1147  ARMARX_CHECK_EXPRESSION(check);
1148  // insert into registry
1149  CheckIdentifier identifier;
1150  {
1151  std::unique_lock lock_conditions(channelsMutex);
1152  identifier = registerCheck(check);
1153  }
1154 
1155  ARMARX_DEBUG << "Installed check " << identifier.uniqueId << flush;
1156 
1157  // perform initial check
1158  {
1159  std::unique_lock lock_channels(channelsMutex);
1160  ARMARX_CHECK_EXPRESSION(configuration.dataFieldIdentifier);
1161  // retrieve channel
1162  ChannelRegistryEntry channel = impl->channelRegistry[configuration.dataFieldIdentifier->channelName];
1163 
1164  // evaluate
1165  evaluateCheck(check, channel);
1166  }
1167 
1168  return identifier;
1169 }
1170 
1172  const AMD_ObserverInterface_installCheckPtr& amd,
1173  const CheckConfiguration& configuration,
1174  const Ice::Current&)
1175 {
1176  addWorkerJob("Observer::installCheck", amd,
1178  configuration);
1179 }
1180 // //////////////////
1181 void Observer::removeCheck(const CheckIdentifier& id)
1182 {
1183  std::unique_lock lock(channelsMutex);
1184 
1185  // find channel
1186  auto iter = impl->channelRegistry.find(id.channelName);
1187 
1188  if (iter == impl->channelRegistry.end())
1189  {
1190  return;
1191  }
1192 
1193  // find condition
1194  ConditionCheckRegistry::iterator iterCheck = iter->second.conditionChecks.find(id.uniqueId);
1195 
1196  // remove condition
1197  if (iterCheck != iter->second.conditionChecks.end())
1198  {
1199  iter->second.conditionChecks.erase(iterCheck);
1200  }
1201 
1202  ARMARX_DEBUG << "Removed check " << id.uniqueId << flush;
1203 }
1205  const AMD_ObserverInterface_removeCheckPtr& amd,
1206  const CheckIdentifier& id,
1207  const Ice::Current&)
1208 {
1209  addWorkerJob("Observer::removeCheck", amd,
1211  id);
1212 }
1213 // //////////////////
1214 TimedVariantBasePtr Observer::getDataField(
1215  const DataFieldIdentifierBasePtr& identifier,
1216  const Ice::Current& c) const
1217 {
1218  //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1219  return getDatafieldByName(identifier->channelName, identifier->datafieldName);
1220 }
1222  const AMD_ObserverInterface_getDataFieldPtr& amd,
1223  const DataFieldIdentifierBasePtr& identifier,
1224  const Ice::Current& c) const
1225 {
1226  addWorkerJob("Observer::getDataField", amd,
1228  identifier,
1229  c);
1230 }
1231 // //////////////////
1232 TimedVariantBasePtr Observer::getDatafieldByName(
1233  const std::string& channelName,
1234  const std::string& datafieldName) const
1235 {
1236  //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1237  std::unique_lock lock(channelsMutex);
1238 
1239  auto it = impl->channelRegistry.find(channelName);
1240  if (it == impl->channelRegistry.end())
1241  {
1242  throw exceptions::user::InvalidChannelException(channelName);
1243  }
1244 
1245  auto itDF = it->second.dataFields.find(datafieldName);
1246  if (itDF == it->second.dataFields.end())
1247  {
1248  throw exceptions::user::InvalidDataFieldException(channelName, datafieldName);
1249  }
1250 
1251  // VariantPtr var = VariantPtr::dynamicCast(impl->channelRegistry[identifier->channelName].dataFields[identifier->datafieldName].value);
1252  TimedVariantPtr tv = TimedVariantPtr::dynamicCast(itDF->second.value);
1253  if (!tv)
1254  {
1255  ARMARX_IMPORTANT << "could not cast timed variant: " << itDF->second.value->ice_id();
1256  }
1257  return tv;
1258 }
1260  const AMD_ObserverInterface_getDatafieldByNamePtr& amd,
1261  const std::string& channelName,
1262  const std::string& datafieldName,
1263  const Ice::Current&) const
1264 {
1265  addWorkerJob("Observer::getDatafieldByName", amd,
1267  channelName,
1268  datafieldName);
1269 }
1270 // //////////////////
1271 DatafieldRefBasePtr Observer::getDataFieldRef(
1272  const DataFieldIdentifierBasePtr& identifier) const
1273 {
1274  //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1275  std::unique_lock lock(channelsMutex);
1276  auto it = impl->channelRegistry.find(identifier->channelName);
1277  if (it == impl->channelRegistry.end())
1278  {
1279  throw exceptions::user::InvalidChannelException(identifier->channelName);
1280  }
1281 
1282  auto itDF = it->second.dataFields.find(identifier->datafieldName);
1283  if (itDF == it->second.dataFields.end())
1284  {
1285  throw exceptions::user::InvalidDataFieldException(identifier->channelName, identifier->datafieldName);
1286  }
1287  // itDF->second.identifier->channelRef->observerProxy = ObserverInterfacePrx::uncheckedCast(getProxy());
1288  return DatafieldRefPtr::dynamicCast(itDF->second.identifier);
1289 }
1291  const AMD_ObserverInterface_getDataFieldRefPtr& amd,
1292  const DataFieldIdentifierBasePtr& identifier,
1293  const Ice::Current&) const
1294 {
1295  addWorkerJob("Observer::getDataFieldRef", amd,
1297  identifier);
1298 }
1299 // //////////////////
1301  const std::string& channelName,
1302  const std::string& datafieldName) const
1303 {
1304  return getDataFieldRef(new DataFieldIdentifier(getName(), channelName, datafieldName));
1305 }
1307  const AMD_ObserverInterface_getDatafieldRefByNamePtr& amd,
1308  const std::string& channelName,
1309  const std::string& datafieldName,
1310  const Ice::Current&) const
1311 {
1312  addWorkerJob("Observer::getDatafieldRefByName", amd,
1314  channelName,
1315  datafieldName);
1316 }
1317 // //////////////////
1318 TimedVariantBaseList Observer::getDataFields(
1320  const Ice::Current& c)
1321 {
1322  TimedVariantBaseList result;
1323 
1324  DataFieldIdentifierBaseList::const_iterator iter = identifiers.begin();
1325 
1326  std::unique_lock lock(channelsMutex);
1327  while (iter != identifiers.end())
1328  {
1329  result.push_back(getDataField(*iter, c));
1330  iter++;
1331  }
1332 
1333  return result;
1334 }
1336  const AMD_ObserverInterface_getDataFieldsPtr& amd,
1338  const Ice::Current& c)
1339 {
1340  addWorkerJob("Observer::getDataFields", amd,
1342  identifiers,
1343  c);
1344 }
1345 // //////////////////
1346 StringTimedVariantBaseMap Observer::getDatafieldsOfChannel(
1347  const std::string& channelName) const
1348 {
1349  //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1350  StringTimedVariantBaseMap result;
1351  std::unique_lock lock(channelsMutex);
1352  auto it = impl->channelRegistry.find(channelName);
1353  if (it == impl->channelRegistry.end())
1354  {
1355  throw exceptions::user::InvalidChannelException(channelName);
1356  }
1357  DataFieldRegistry fields = it->second.dataFields;
1358  for (auto& field : fields)
1359  {
1360  result[field.first] = TimedVariantBasePtr::dynamicCast(field.second.value);
1361  }
1362  return result;
1363 }
1365  const AMD_ObserverInterface_getDatafieldsOfChannelPtr& amd,
1366  const std::string& channelName,
1367  const Ice::Current&) const
1368 {
1369  addWorkerJob("Observer::getDatafieldsOfChannel", amd,
1371  channelName);
1372 }
1373 // //////////////////
1374 ChannelRegistryEntry Observer::getChannel(
1375  const std::string& channelName) const
1376 {
1377  //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1378  std::unique_lock lock(channelsMutex);
1379  auto it = impl->channelRegistry.find(channelName);
1380  if (it == impl->channelRegistry.end())
1381  {
1382  throw exceptions::user::InvalidChannelException(channelName);
1383  }
1384  return it->second;
1385 }
1387  const AMD_ObserverInterface_getChannelPtr& amd,
1388  const std::string& channelName,
1389  const Ice::Current&) const
1390 {
1391  addWorkerJob("Observer::getChannel", amd,
1393  channelName);
1394 }
1395 // //////////////////
1397  bool includeMetaChannels)
1398 {
1399  //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1400  std::unique_lock lock(channelsMutex);
1401 
1402  ChannelRegistry result(impl->channelRegistry.begin(), impl->channelRegistry.end());
1403  if (!includeMetaChannels)
1404  {
1405  result.erase(LAST_REFRESH_DELTA_CHANNEL);
1406  }
1407  return result;
1408 }
1410  const AMD_ObserverInterface_getAvailableChannelsPtr& amd,
1411  bool includeMetaChannels,
1412  const Ice::Current&)
1413 {
1414  addWorkerJob("Observer::getAvailableChannels", amd,
1416  includeMetaChannels);
1417 }
1418 // //////////////////
1419 StringConditionCheckMap Observer::getAvailableChecks()
1420 {
1421  std::unique_lock lock(impl->checksMutex);
1422 
1423  return impl->availableChecks;
1424 }
1426  const AMD_ObserverInterface_getAvailableChecksPtr& amd,
1427  const Ice::Current&)
1428 {
1429  addWorkerJob("Observer::getAvailableChecks", amd,
1431 }
1432 // //////////////////
1434  const std::string& channelName) const
1435 {
1436  //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1437  std::unique_lock lock(channelsMutex);
1438  return impl->channelRegistry.find(channelName) != impl->channelRegistry.end();
1439 }
1441  const AMD_ObserverInterface_existsChannelPtr& amd,
1442  const std::string& channelName,
1443  const Ice::Current&) const
1444 {
1445  addWorkerJob("Observer::existsChannel", amd,
1447  channelName);
1448 }
1449 // //////////////////
1451  const std::string& channelName,
1452  const std::string& datafieldName) const
1453 {
1454  //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1455  std::unique_lock lock(channelsMutex);
1456  auto itChannel = impl->channelRegistry.find(channelName);
1457 
1458  if (itChannel == impl->channelRegistry.end())
1459  {
1460  return false;
1461  }
1462 
1463  return itChannel->second.dataFields.find(datafieldName) != itChannel->second.dataFields.end();
1464 }
1466  const AMD_ObserverInterface_existsDataFieldPtr& amd,
1467  const std::string& channelName,
1468  const std::string& datafieldName,
1469  const Ice::Current&) const
1470 {
1471  addWorkerJob("Observer::existsDataField", amd,
1473  channelName,
1474  datafieldName);
1475 }
1476 // //////////////////
1478  const DatafieldFilterBasePtr& filter,
1479  const DatafieldRefBasePtr& datafieldRef)
1480 {
1481  //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1482  // if( auto it = impl->orignalToFiltered.find(datafieldRef) != impl->orignalToFiltered.end())
1483  // {
1484  // return it->second.filtered;
1485  // }
1486  ARMARX_CHECK_EXPRESSION(datafieldRef);
1487 
1488 
1489  std::string filteredName = datafieldRef->datafieldName + "_" + filter->ice_id();
1490 
1491 
1492 
1493  int i = 1;
1494 
1495  while (existsDataField(datafieldRef->channelRef->channelName, filteredName))
1496  {
1497  // ARMARX_IMPORTANT << "Checking if datafield " << filteredName << " exists";
1498  filteredName = datafieldRef->datafieldName + "_" + filter->ice_id() + "_" + ValueToString(i);
1499  i++;
1500  }
1501 
1502  return createNamedFilteredDatafield(filteredName, filter, datafieldRef);
1503 }
1505  const AMD_ObserverInterface_createFilteredDatafieldPtr& amd,
1506  const DatafieldFilterBasePtr& filter,
1507  const DatafieldRefBasePtr& datafieldRef,
1508  const Ice::Current&)
1509 {
1510  addWorkerJob("Observer::createFilteredDatafield", amd,
1512  filter,
1513  datafieldRef);
1514 }
1515 // //////////////////
1517  const std::string& filterDatafieldName,
1518  const DatafieldFilterBasePtr& filter,
1519  const DatafieldRefBasePtr& datafieldRef)
1520 {
1521  //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1522  DatafieldRefPtr ref = DatafieldRefPtr::dynamicCast(datafieldRef);
1524  ARMARX_CHECK_EXPRESSION(filter);
1525  if (!filter->checkTypeSupport(ref->getDataField()->getType()))
1526  {
1527  auto types = filter->getSupportedTypes();
1528  std::string suppTypes = "supported types";
1529 
1530  for (auto t : types)
1531  {
1532  suppTypes += Variant::typeToString(t) + ", ";
1533  }
1534 
1535  ARMARX_WARNING << suppTypes;
1536  throw exceptions::user::UnsupportedTypeException(ref->getDataField()->getType());
1537  }
1538 
1539  // if filter with that name exists -> remove it
1540  if (existsDataField(datafieldRef->channelRef->channelName, filterDatafieldName))
1541  {
1542  DatafieldRefPtr filteredRef = new DatafieldRef(ref->getChannelRef(), filterDatafieldName);
1543  removeFilteredDatafield(filteredRef);
1544 
1545  }
1546 
1547  // calculate initial value
1548  auto var = ref->getDataField();
1549  filter->update(var->getTime().toMicroSeconds(), var);
1550 
1551  // create datafield for new filter
1552  offerDataFieldWithDefault(datafieldRef->channelRef->channelName, filterDatafieldName, *VariantPtr::dynamicCast(filter->getValue()), "Filtered value of " + ref->getDataFieldIdentifier()->getIdentifierStr());
1553 
1554  //create and store the refs for the filters
1555  ChannelRefPtr channel = ChannelRefPtr::dynamicCast(ref->channelRef);
1556  channel->refetchChannel();
1557  DatafieldRefPtr filteredRef = new DatafieldRef(ref->getChannelRef(), filterDatafieldName);
1559  data->filter = filter;
1560  data->original = ref;
1561  data->filtered = filteredRef;
1562  std::unique_lock lock(impl->filterMutex);
1563  impl->orignalToFiltered.insert(std::make_pair(ref->getDataFieldIdentifier()->getIdentifierStr(), data));
1564  impl->filteredToOriginal[filteredRef->getDataFieldIdentifier()->getIdentifierStr()] = data;
1565  return filteredRef;
1566 }
1568  const AMD_ObserverInterface_createNamedFilteredDatafieldPtr& amd,
1569  const std::string& filterDatafieldName,
1570  const DatafieldFilterBasePtr& filter,
1571  const DatafieldRefBasePtr& datafieldRef,
1572  const Ice::Current&)
1573 {
1574  addWorkerJob("Observer::createNamedFilteredDatafield", amd,
1576  filterDatafieldName,
1577  filter,
1578  datafieldRef);
1579 }
1580 // //////////////////
1582  const DatafieldRefBasePtr& datafieldRef)
1583 {
1584  //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1585  bool remove = true;
1586  DatafieldRefPtr ref;
1587  {
1588  std::unique_lock lock(impl->filterMutex);
1589  ref = DatafieldRefPtr::dynamicCast(datafieldRef);
1590  const std::string idStr = ref->getDataFieldIdentifier()->getIdentifierStr();
1591  auto it = impl->filteredToOriginal.find(idStr);
1592  remove = (it != impl->filteredToOriginal.end());
1593  }
1594 
1595  if (remove && ref)
1596  {
1597  removeDatafield(ref->getDataFieldIdentifier());
1598  }
1599 }
1601  const AMD_ObserverInterface_removeFilteredDatafieldPtr& amd,
1602  const DatafieldRefBasePtr& datafieldRef,
1603  const Ice::Current&)
1604 {
1605  addWorkerJob("Observer::removeFilteredDatafield", amd,
1607  datafieldRef);
1608 }
1609 // //////////////////
1611  const std::string& channelName,
1612  Ice::Float timestepMs,
1613  const Ice::Current& c) const
1614 {
1615  //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1616  return getPartialChannelHistory(channelName, 0, std::numeric_limits<long>::max(), timestepMs, c);
1617 }
1619  const AMD_ObserverInterface_getChannelHistoryPtr& amd,
1620  const std::string& channelName,
1621  Ice::Float timestepMs,
1622  const Ice::Current& c) const
1623 {
1624  addWorkerJob("Observer::getChannelHistory", amd,
1626  channelName,
1627  timestepMs,
1628  c);
1629 }
1630 // //////////////////
1632  const std::string& channelName,
1633  Ice::Long startTimestamp,
1634  Ice::Long endTimestamp,
1635  Ice::Float timestepMs,
1636  const Ice::Current& c) const
1637 {
1638  //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1639  ARMARX_IMPORTANT << "Waiting for mutex";
1640  std::unique_lock lock_channels(impl->historyMutex);
1641  ARMARX_IMPORTANT << "GOT mutex";
1642  ChannelHistory result;
1643  auto historyIt = impl->channelHistory.find(channelName);
1644  if (historyIt == impl->channelHistory.end())
1645  {
1646  return result;
1647  }
1648  ARMARX_IMPORTANT << "found channel";
1649 
1650  auto lastInsertIt = result.begin();
1651  Ice::Long lastUsedTimestep = 0;
1652  float timestepUs = timestepMs * 1000;
1653  for (auto& entry : historyIt->second)
1654  {
1655  long timestamp = entry.first.toMicroSeconds();
1656  if (timestamp > endTimestamp)
1657  {
1658  break;
1659  }
1660 
1661  if (timestamp > startTimestamp && timestamp - lastUsedTimestep > timestepUs)
1662  {
1663  lastInsertIt = result.emplace_hint(result.end(), std::make_pair(timestamp, entry.second.dataFields));
1664  lastUsedTimestep = timestamp;
1665  }
1666  }
1667  return result;
1668 }
1670  const AMD_ObserverInterface_getPartialChannelHistoryPtr& amd,
1671  const std::string& channelName,
1672  Ice::Long startTimestamp,
1673  Ice::Long endTimestamp,
1674  Ice::Float timestepMs,
1675  const Ice::Current& c) const
1676 {
1677  addWorkerJob("Observer::getPartialChannelHistory", amd,
1679  channelName,
1680  startTimestamp,
1681  endTimestamp,
1682  timestepMs,
1683  c);
1684 }
1685 // //////////////////
1686 TimedVariantBaseList Observer::getDatafieldHistory(
1687  const std::string& channelName,
1688  const std::string& datafieldName,
1689  Ice::Float timestepMs,
1690  const Ice::Current& c) const
1691 {
1692  //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1693  return getPartialDatafieldHistory(channelName, datafieldName, 0, std::numeric_limits<long>::max(), timestepMs, c);
1694 }
1696  const AMD_ObserverInterface_getDatafieldHistoryPtr& amd,
1697  const std::string& channelName,
1698  const std::string& datafieldName,
1699  Ice::Float timestepMs,
1700  const Ice::Current& c) const
1701 {
1702  addWorkerJob("Observer::getDatafieldHistory", amd,
1704  channelName,
1705  datafieldName,
1706  timestepMs,
1707  c);
1708 }
1709 // //////////////////
1711  const std::string& channelName,
1712  const std::string& datafieldName,
1713  Ice::Long startTimestamp,
1714  Ice::Long endTimestamp,
1715  Ice::Float timestepMs,
1716  const Ice::Current& c) const
1717 {
1718  //ARMARX_DEBUG << BOOST_CURRENT_FUNCTION;
1719  std::unique_lock lock_channels(impl->historyMutex);
1720  TimedVariantBaseList result;
1721  auto historyIt = impl->channelHistory.find(channelName);
1722  if (historyIt == impl->channelHistory.end())
1723  {
1724  return result;
1725  }
1726  if (!existsDataField(channelName, datafieldName))
1727  {
1728  return result;
1729  }
1730 
1731  Ice::Long lastUsedTimestep = 0;
1732  float timestepUs = timestepMs * 1000;
1733  for (auto& entry : historyIt->second)
1734  {
1735  long timestamp = entry.first.toMicroSeconds();
1736  if (timestamp > endTimestamp)
1737  {
1738  break;
1739  }
1740 
1741  if (timestamp > startTimestamp && timestamp - lastUsedTimestep > timestepUs)
1742  {
1743  auto datafieldIt = entry.second.dataFields.find(datafieldName);
1744  if (datafieldIt == entry.second.dataFields.end())
1745  {
1746  continue;
1747  }
1748  else
1749  {
1750  TimedVariantPtr tvar = TimedVariantPtr::dynamicCast(datafieldIt->second.value);
1751  if (tvar)
1752  {
1753  result.emplace_back(tvar);
1754  lastUsedTimestep = timestamp;
1755  }
1756  }
1757  }
1758  }
1759  return result;
1760 }
1762  const AMD_ObserverInterface_getPartialDatafieldHistoryPtr& amd,
1763  const std::string& channelName,
1764  const std::string& datafieldName,
1765  Ice::Long startTimestamp,
1766  Ice::Long endTimestamp,
1767  Ice::Float timestepMs,
1768  const Ice::Current& c) const
1769 {
1770  addWorkerJob("Observer::getPartialDatafieldHistory", amd,
1772  channelName,
1773  datafieldName,
1774  startTimestamp,
1775  endTimestamp,
1776  timestepMs,
1777  c);
1778 }
1779 // ////////////////////////////////////////////////////////////////////////// //
1781  std::function<void(void)> f)
1782  : name{name}
1783  , time{Now()}
1784  , f{std::move(f)}
1785 {
1786 }
1787 
1789 {
1790  return TimeDeltaInMs(time);
1791 }
armarx::Observer::getPartialDatafieldHistory
TimedVariantBaseList getPartialDatafieldHistory(const std::string &channelName, const std::string &datafieldName, Ice::Long startTimestamp, Ice::Long endTimestamp, Ice::Float timestepMs, const Ice::Current &c) const
Definition: Observer.cpp:1710
armarx::Observer::updateChannel
void updateChannel(const std::string &channelName, const std::set< std::string > &updatedDatafields=std::set< std::string >())
Update all conditions for a channel.
Definition: Observer.cpp:715
armarx::Observer::Impl::FilterData
Definition: Observer.cpp:69
armarx::Observer::getChannel
ChannelRegistryEntry getChannel(const std::string &channelName) const
Retrieve information on all sensory data channels available from the observer.
Definition: Observer.cpp:1374
armarx::Variant
The Variant class is described here: Variants.
Definition: Variant.h:224
armarx::Observer::removeFilteredDatafield
void removeFilteredDatafield(const DatafieldRefBasePtr &datafieldRef)
Removes a previously installed filter.
Definition: Observer.cpp:1581
armarx::DataFieldIdentifierBaseList
::std::vector< ::armarx::DataFieldIdentifierBasePtr > DataFieldIdentifierBaseList
Definition: StatechartContextInterface.h:43
armarx::Observer::Impl::FilterData::filtered
DatafieldRefPtr filtered
Definition: Observer.cpp:73
armarx::Observer::Impl::filteredToOriginal
std::map< std::string, FilterDataPtr > filteredToOriginal
Definition: Observer.cpp:78
armarx::Observer::Impl
Definition: Observer.cpp:48
armarx::VariantType::Float
const VariantTypeId Float
Definition: Variant.h:918
armarx::Observer::Observer
Observer()
Definition: Observer.cpp:114
armarx::Observer::getDatafieldsOfChannel
StringTimedVariantBaseMap getDatafieldsOfChannel(const std::string &channelName) const
Definition: Observer.cpp:1346
armarx::StringVariantBaseMap
std::map< std::string, VariantBasePtr > StringVariantBaseMap
Definition: ManagedIceObject.h:111
ARMARX_IMPORTANT
#define ARMARX_IMPORTANT
Definition: Logging.h:183
armarx::Observer::onInitObserver
virtual void onInitObserver()=0
Framework hook.
armarx::Observer::Impl::WorkerUpdate::name
std::string name
Definition: Observer.cpp:105
armarx::Observer::Impl::availableChecks
StringConditionCheckMap availableChecks
Definition: Observer.cpp:55
OnScopeExit.h
armarx::Observer::Impl::stopWorker
std::atomic_bool stopWorker
Definition: Observer.cpp:111
armarx::Observer::~Observer
~Observer()
Definition: Observer.cpp:119
armarx::Observer::Impl::WorkerUpdate::f
std::function< void(void)> f
Definition: Observer.cpp:107
InvalidChannelException.h
armarx::Observer::Impl::channelRegistry
ConditionCheck::ChannelRegistry channelRegistry
Definition: Observer.cpp:59
armarx::Observer::getDataField
TimedVariantBasePtr getDataField(const DataFieldIdentifierBasePtr &identifier, const Ice::Current &c=Ice::emptyCurrent) const
Retrieve data field from observer.
Definition: Observer.cpp:1214
armarx::Observer::removeChannel
void removeChannel(std::string channelName)
Remove a channel.
Definition: Observer.cpp:289
armarx::Observer::getPartialChannelHistory_async
void getPartialChannelHistory_async(const AMD_ObserverInterface_getPartialChannelHistoryPtr &amd, const std::string &channelName, Ice::Long startTimestamp, Ice::Long endTimestamp, Ice::Float timestepMs, const Ice::Current &c) const override
Definition: Observer.cpp:1669
armarx::Observer::getDatafieldRefByName
DatafieldRefBasePtr getDatafieldRefByName(const std::string &channelName, const std::string &datafieldName) const
Definition: Observer.cpp:1300
armarx::Observer::Impl::WorkerUpdate::WorkerUpdate
WorkerUpdate(const std::string &name, std::function< void(void)> f)
Definition: Observer.cpp:1780
armarx::Observer::offerOrUpdateDataField
bool offerOrUpdateDataField(std::string channelName, std::string datafieldName, const Variant &value, const std::string &description)
Definition: Observer.cpp:222
armarx::ManagedIceObject::getState
int getState() const
Retrieve current state of the ManagedIceObject.
Definition: ManagedIceObject.cpp:725
armarx::Observer::onConnectObserver
virtual void onConnectObserver()=0
Framework hook.
armarx::exceptions::user::DatafieldExistsAlreadyException
Definition: InvalidDatafieldException.h:56
armarx::Observer::Impl::filterMutex
std::recursive_mutex filterMutex
Definition: Observer.cpp:79
armarx::Observer::getDataField_async
void getDataField_async(const AMD_ObserverInterface_getDataFieldPtr &amd, const DataFieldIdentifierBasePtr &identifier, const Ice::Current &c) const override
Definition: Observer.cpp:1221
armarx::Observer::Impl::historyMutex
std::recursive_mutex historyMutex
Definition: Observer.cpp:62
c
constexpr T c
Definition: UnscentedKalmanFilterTest.cpp:43
armarx::Observer::getChannelHistory_async
void getChannelHistory_async(const AMD_ObserverInterface_getChannelHistoryPtr &amd, const std::string &channelName, Ice::Float timestepMs, const Ice::Current &c) const override
Definition: Observer.cpp:1618
ARMARX_ON_SCOPE_EXIT
#define ARMARX_ON_SCOPE_EXIT
Executes given code when the enclosing scope is left.
Definition: OnScopeExit.h:112
armarx::Observer::getPartialDatafieldHistory_async
void getPartialDatafieldHistory_async(const AMD_ObserverInterface_getPartialDatafieldHistoryPtr &amd, const std::string &channelName, const std::string &datafieldName, Ice::Long startTimestamp, Ice::Long endTimestamp, Ice::Float timestepMs, const Ice::Current &c) const override
Definition: Observer.cpp:1761
armarx::Observer::Impl::idleChannelCondition
std::condition_variable idleChannelCondition
Definition: Observer.cpp:92
armarx::Observer::getAvailableChecks_async
void getAvailableChecks_async(const AMD_ObserverInterface_getAvailableChecksPtr &amd, const Ice::Current &) override
Definition: Observer.cpp:1425
armarx::Observer::Impl::FilterUpdateQueue
std::unordered_map< std::string, FilterQueueData > FilterUpdateQueue
Definition: Observer.cpp:94
armarx::Observer::Impl::currentId
int currentId
Definition: Observer.cpp:66
armarx::Observer::getDatafieldHistory_async
void getDatafieldHistory_async(const AMD_ObserverInterface_getDatafieldHistoryPtr &amd, const std::string &channelName, const std::string &datafieldName, Ice::Float timestepMs, const Ice::Current &c) const override
Definition: Observer.cpp:1695
armarx::Observer::getDataFields_async
void getDataFields_async(const AMD_ObserverInterface_getDataFieldsPtr &amd, const DataFieldIdentifierBaseList &identifiers, const Ice::Current &c) override
Definition: Observer.cpp:1335
Observer.h
armarx::Observer::setDataFieldsFlatCopy
void setDataFieldsFlatCopy(const std::string &channelName, const StringVariantBaseMap &datafieldValues, bool triggerFilterUpdate=true)
Definition: Observer.cpp:665
armarx::Observer::existsChannel
bool existsChannel(const std::string &channelName) const
Definition: Observer.cpp:1433
armarx::Variant::getTypeName
std::string getTypeName(const Ice::Current &c=Ice::emptyCurrent) const override
Return the Variant's internal type.
Definition: Variant.cpp:579
armarx::Observer::removeCheck
void removeCheck(const CheckIdentifier &id)
Removes a condition check from the observer.
Definition: Observer.cpp:1181
armarx::Observer::Impl::FilterQueueData::datafieldName
std::string datafieldName
Definition: Observer.cpp:86
armarx::RunningTask
Definition: ArmarXMultipleObjectsScheduler.h:35
armarx::Observer::Impl::channelUpdateTask
RunningTask< Observer >::pointer_type channelUpdateTask
Definition: Observer.cpp:89
armarx::Observer::Impl::channelUpdateTimestamps
std::map< std::string, IceUtil::Time > channelUpdateTimestamps
Definition: Observer.cpp:80
armarx::Observer::Impl::WorkerUpdate::time
TimepointT time
Definition: Observer.cpp:106
armarx::Observer::Now
static TimepointT Now()
Definition: Observer.cpp:1114
armarx::Observer::getDataFieldRef_async
void getDataFieldRef_async(const AMD_ObserverInterface_getDataFieldRefPtr &amd, const DataFieldIdentifierBasePtr &identifier, const Ice::Current &) const override
Definition: Observer.cpp:1290
armarx::Observer::getDatafieldByName_async
void getDatafieldByName_async(const AMD_ObserverInterface_getDatafieldByNamePtr &amd, const std::string &channelName, const std::string &datafieldName, const Ice::Current &) const override
Definition: Observer.cpp:1259
armarx::Observer::updateDatafieldFilter
std::set< std::string > updateDatafieldFilter(const std::string &channelName, const std::string &datafieldName, const VariantBasePtr &value)
Definition: Observer.cpp:357
IceInternal::Handle< DatafieldRef >
armarx::Observer::updateDatafieldTimestamps
void updateDatafieldTimestamps(const std::string &channelName, const std::unordered_map< std::string, Ice::Long > &datafieldValues)
Definition: Observer.cpp:529
InvalidTypeException.h
armarx::Observer::Impl::FilterData::original
DatafieldRefPtr original
Definition: Observer.cpp:72
armarx::armem::client::util::swap
void swap(SubscriptionHandle &first, SubscriptionHandle &second)
Definition: SubscriptionHandle.cpp:66
DatafieldRef.h
ObserverObjectFactories.h
armarx::Observer::Impl::worker
std::thread worker
Definition: Observer.cpp:110
armarx::Observer::Impl::checksMutex
std::mutex checksMutex
Definition: Observer.cpp:56
ARMARX_TRACE
#define ARMARX_TRACE
Definition: trace.h:69
cxxopts::value
std::shared_ptr< Value > value()
Definition: cxxopts.hpp:926
armarx::Observer::Impl::FilterQueueData
Definition: Observer.cpp:82
armarx::Observer::runWorker
void runWorker()
Definition: Observer.cpp:1027
armarx::exceptions::local
Definition: DynamicLibraryException.h:31
armarx::exceptions::local::InvalidCheckException
Definition: InvalidCheckException.h:32
armarx::Observer::Impl::channelQueueMutex
std::mutex channelQueueMutex
Definition: Observer.cpp:91
armarx::Observer::scheduleDatafieldFilterUpdate
void scheduleDatafieldFilterUpdate(const std::string &channelName, const std::string &datafieldName, const VariantBasePtr &value)
Definition: Observer.cpp:400
armarx::Observer::Impl::metaTask
PeriodicTask< Observer >::pointer_type metaTask
Definition: Observer.cpp:50
armarx::Observer::Impl::maxHistoryRecordFrequency
float maxHistoryRecordFrequency
Definition: Observer.cpp:64
armarx::Observer::getDatafieldRefByName_async
void getDatafieldRefByName_async(const AMD_ObserverInterface_getDatafieldRefByNamePtr &amd, const std::string &channelName, const std::string &datafieldName, const Ice::Current &) const override
Definition: Observer.cpp:1306
armarx::Observer::Impl::filterUpdateTask
RunningTask< Observer >::pointer_type filterUpdateTask
Definition: Observer.cpp:81
armarx::Observer::TimepointT
typename ClockT::time_point TimepointT
Definition: Observer.h:579
armarx::ValueToString
std::string ValueToString(const T &value)
Definition: StringHelpers.h:58
armarx::Observer::getDataFieldRef
DatafieldRefBasePtr getDataFieldRef(const DataFieldIdentifierBasePtr &identifier) const
Definition: Observer.cpp:1271
armarx::flush
const LogSender::manipulator flush
Definition: LogSender.h:251
armarx::Observer::setDataFieldFlatCopy
void setDataFieldFlatCopy(const std::string &channelName, const std::string &datafieldName, const VariantPtr &value, bool triggerFilterUpdate=true)
Definition: Observer.cpp:501
data
uint8_t data[1]
Definition: EtherCATFrame.h:68
ARMARX_DEBUG
#define ARMARX_DEBUG
Definition: Logging.h:177
armarx::Observer::getDataFields
TimedVariantBaseList getDataFields(const DataFieldIdentifierBaseList &identifiers, const Ice::Current &c)
Retrieve list of data field from observer.
Definition: Observer.cpp:1318
armarx::Observer::existsChannel_async
void existsChannel_async(const AMD_ObserverInterface_existsChannelPtr &amd, const std::string &channelName, const Ice::Current &) const override
Definition: Observer.cpp:1440
armarx::Observer::Impl::WorkerUpdate::ageInMs
float ageInMs() const
Definition: Observer.cpp:1788
armarx::Observer::existsDataField_async
void existsDataField_async(const AMD_ObserverInterface_existsDataFieldPtr &amd, const std::string &channelName, const std::string &datafieldName, const Ice::Current &) const override
Definition: Observer.cpp:1465
InvalidDatafieldException.h
armarx::Observer::Impl::FilterQueueData::value
VariantBasePtr value
Definition: Observer.cpp:84
armarx::Observer::offerOrUpdateDataFieldsFlatCopy
void offerOrUpdateDataFieldsFlatCopy(const std::string &channelName, const StringVariantBaseMap &valueMap)
Definition: Observer.cpp:236
armarx::Observer::metaUpdateTask
void metaUpdateTask()
Definition: Observer.cpp:964
armarx::Observer::createFilteredDatafield_async
void createFilteredDatafield_async(const AMD_ObserverInterface_createFilteredDatafieldPtr &amd, const DatafieldFilterBasePtr &filter, const DatafieldRefBasePtr &datafieldRef, const Ice::Current &) override
Definition: Observer.cpp:1504
armarx::Observer::TimeDeltaInMs
static float TimeDeltaInMs(TimepointT t0)
Definition: Observer.cpp:1119
armarx::Variant::typeToString
static std::string typeToString(VariantTypeId typeId)
Return the name of the registered type typeId.
Definition: Variant.cpp:732
armarx::VariantType::Long
const VariantTypeId Long
Definition: Variant.h:917
armarx::exceptions::user::UnsupportedTypeException
Definition: InvalidTypeException.h:67
armarx::Observer::Impl::FilterQueueData::channelName
std::string channelName
Definition: Observer.cpp:85
armarx::Observer::Impl::maxHistorySize
int maxHistorySize
Definition: Observer.cpp:63
armarx::Observer::setDataField
void setDataField(const std::string &channelName, const std::string &datafieldName, const Variant &value, bool triggerFilterUpdate=true)
set datafield with datafieldName and in channel channelName
Definition: Observer.cpp:465
armarx::Observer::removeDatafield
void removeDatafield(DataFieldIdentifierBasePtr id)
Definition: Observer.cpp:314
ArmarXObjectScheduler.h
max
T max(T t1, T t2)
Definition: gdiam.h:48
armarx::Observer::Impl::idleCondition
std::condition_variable idleCondition
Definition: Observer.cpp:97
ARMARX_ERROR
#define ARMARX_ERROR
Definition: Logging.h:189
armarx::Observer::installCheck_async
void installCheck_async(const AMD_ObserverInterface_installCheckPtr &amd, const CheckConfiguration &configuration, const Ice::Current &) override
Definition: Observer.cpp:1171
armarx::VariantTypeId
Ice::Int VariantTypeId
Definition: Variant.h:44
InvalidDataFieldException.h
armarx::Observer::installCheck
CheckIdentifier installCheck(const CheckConfiguration &configuration)
Installs a condition check with the observer.
Definition: Observer.cpp:1137
armarx::Observer::impl
std::unique_ptr< Impl > impl
Definition: Observer.h:588
armarx::ConditionCheck
Definition: ConditionCheck.h:56
armarx::Observer::Impl::WorkerUpdate
Definition: Observer.cpp:100
armarx::TimeUtil::GetTime
static IceUtil::Time GetTime(TimeMode timeMode=TimeMode::VirtualTime)
Get the current time.
Definition: TimeUtil.cpp:42
armarx::VariantType::DatafieldRef
const VariantTypeId DatafieldRef
Definition: DatafieldRef.h:169
ExpressionException.h
InvalidCheckException.h
armarx::ConditionCheck::ChannelRegistry
armarx::ChannelRegistry ChannelRegistry
Creates a new ConditionCheck instance.
Definition: ConditionCheck.h:83
armarx::Component::getConfigIdentifier
std::string getConfigIdentifier()
Retrieve config identifier for this component as set in constructor.
Definition: Component.cpp:74
armarx::Observer::getDatafieldsOfChannel_async
void getDatafieldsOfChannel_async(const AMD_ObserverInterface_getDatafieldsOfChannelPtr &amd, const std::string &channelName, const Ice::Current &) const override
Definition: Observer.cpp:1364
armarx::Observer::channelsMutex
std::recursive_mutex channelsMutex
Definition: Observer.h:586
armarx::Observer::getAvailableChannels
ChannelRegistry getAvailableChannels(bool includeMetaChannels)
Retrieve information on all sensory data channels available from the observer.
Definition: Observer.cpp:1396
armarx::Observer::getChannel_async
void getChannel_async(const AMD_ObserverInterface_getChannelPtr &amd, const std::string &channelName, const Ice::Current &) const override
Definition: Observer.cpp:1386
armarx::Observer::getDatafieldHistory
TimedVariantBaseList getDatafieldHistory(const std::string &channelName, const std::string &datafieldName, Ice::Float timestepMs, const Ice::Current &c) const
Definition: Observer.cpp:1686
armarx::Observer::getObserverName_async
void getObserverName_async(const AMD_ObserverInterface_getObserverNamePtr &amd, const Ice::Current &) const override
Definition: Observer.cpp:1129
armarx::Observer::getAvailableChecks
StringConditionCheckMap getAvailableChecks()
Retrieve list of available condition checks.
Definition: Observer.cpp:1419
ARMARX_CHECK_EXPRESSION
#define ARMARX_CHECK_EXPRESSION(expression)
This macro evaluates the expression and if it turns out to be false it will throw an ExpressionExcept...
Definition: ExpressionException.h:73
armarx::Observer::createNamedFilteredDatafield
DatafieldRefBasePtr createNamedFilteredDatafield(const std::string &filterDatafieldName, const DatafieldFilterBasePtr &filter, const DatafieldRefBasePtr &datafieldRef)
Definition: Observer.cpp:1516
armarx::Observer::Impl::orignalToFiltered
std::multimap< std::string, FilterDataPtr > orignalToFiltered
Definition: Observer.cpp:77
armarx::Observer::generateId
int generateId()
Definition: Observer.cpp:974
armarx::Observer::Impl::filterQueue
FilterUpdateQueue filterQueue
Definition: Observer.cpp:95
ARMARX_INFO
#define ARMARX_INFO
Definition: Logging.h:174
armarx::Observer::removeFilteredDatafield_async
void removeFilteredDatafield_async(const AMD_ObserverInterface_removeFilteredDatafieldPtr &amd, const DatafieldRefBasePtr &datafieldRef, const Ice::Current &) override
Definition: Observer.cpp:1600
armarx::Observer::removeCheck_async
void removeCheck_async(const AMD_ObserverInterface_removeCheckPtr &amd, const CheckIdentifier &id, const Ice::Current &) override
Definition: Observer.cpp:1204
armarx::Observer::existsDataField
bool existsDataField(const std::string &channelName, const std::string &datafieldName) const
Definition: Observer.cpp:1450
armarx::Observer::getPartialChannelHistory
ChannelHistory getPartialChannelHistory(const std::string &channelName, Ice::Long startTimestamp, Ice::Long endTimestamp, Ice::Float timestepMs, const Ice::Current &c) const
Definition: Observer.cpp:1631
IceUtil::Handle
Definition: forward_declarations.h:29
armarx::Observer::Impl::channelHistory
ChannelRegistryHistory channelHistory
Definition: Observer.cpp:61
armarx::Observer::getObserverName
std::string getObserverName() const
Definition: Observer.cpp:1125
armarx::Observer::Impl::FilterData::filter
DatafieldFilterBasePtr filter
Definition: Observer.cpp:71
armarx::Observer::Impl::workerUpdatesMutex
std::recursive_mutex workerUpdatesMutex
Definition: Observer.cpp:99
armarx::Observer::Impl::idMutex
std::mutex idMutex
Definition: Observer.cpp:67
armarx::Observer::getAvailableChannels_async
void getAvailableChannels_async(const AMD_ObserverInterface_getAvailableChannelsPtr &amd, bool includeMetaChannels, const Ice::Current &) override
Definition: Observer.cpp:1409
armarx::exceptions::user::InvalidDataFieldException
Definition: InvalidDatafieldException.h:30
armarx::Observer::maybeOfferChannelAndSetDataFieldsFlatCopy
void maybeOfferChannelAndSetDataFieldsFlatCopy(const std::string &channelName, const std::string &description, const StringVariantBaseMap &datafieldValues, bool triggerFilterUpdate=true)
Definition: Observer.cpp:580
armarx::Logging::deactivateSpam
SpamFilterDataPtr deactivateSpam(float deactivationDurationSec=10.0f, const std::string &identifier="", bool deactivate=true) const
disables the logging for the current line for the given amount of seconds.
Definition: Logging.cpp:92
armarx::Observer::onExitObserver
virtual void onExitObserver()
Framework hook.
Definition: Observer.h:464
armarx::Observer::offerConditionCheck
void offerConditionCheck(std::string checkName, ConditionCheck *conditionCheck)
Offer a condition check.
Definition: Observer.cpp:273
armarx::ManagedIceObject::getName
std::string getName() const
Retrieve name of object.
Definition: ManagedIceObject.cpp:107
armarx::PeriodicTask
Definition: ArmarXManager.h:70
armarx::Observer::createNamedFilteredDatafield_async
void createNamedFilteredDatafield_async(const AMD_ObserverInterface_createNamedFilteredDatafieldPtr &amd, const std::string &filterDatafieldName, const DatafieldFilterBasePtr &filter, const DatafieldRefBasePtr &datafieldRef, const Ice::Current &) override
Definition: Observer.cpp:1567
min
T min(T t1, T t2)
Definition: gdiam.h:42
armarx::Observer::getDatafieldByName
TimedVariantBasePtr getDatafieldByName(const std::string &channelName, const std::string &datafieldName) const
Definition: Observer.cpp:1232
armarx::Observer::offerDataFieldWithDefault
void offerDataFieldWithDefault(std::string channelName, std::string datafieldName, const Variant &defaultValue, std::string description)
Offer a datafield with default value.
Definition: Observer.cpp:152
armarx::handleExceptions
void handleExceptions()
Definition: Exception.cpp:141
armarx::exceptions::user::InvalidChannelException
Definition: InvalidDatafieldException.h:44
ARMARX_WARNING
#define ARMARX_WARNING
Definition: Logging.h:186
armarx::Observer::addWorkerJob
void addWorkerJob(const std::string &name, std::function< void(void)> &&f) const
Definition: Observer.cpp:1099
armarx::Observer::createFilteredDatafield
DatafieldRefBasePtr createFilteredDatafield(const DatafieldFilterBasePtr &filter, const DatafieldRefBasePtr &datafieldRef)
This function creates a new datafield with new filter on the given datafield.
Definition: Observer.cpp:1477
armarx::PropertyDefinitionsPtr
IceUtil::Handle< class PropertyDefinitionContainer > PropertyDefinitionsPtr
PropertyDefinitions smart pointer type.
Definition: forward_declarations.h:34
armarx::ManagedIceObject::getProxy
Ice::ObjectPrx getProxy(long timeoutMs=0, bool waitForScheduler=true) const
Returns the proxy of this object (optionally it waits for the proxy)
Definition: ManagedIceObject.cpp:393
armarx::Observer::offerDataField
void offerDataField(std::string channelName, std::string datafieldName, VariantTypeId type, std::string description)
Offer a datafield without default value.
Definition: Observer.cpp:187
armarx::Observer::getChannelHistory
ChannelHistory getChannelHistory(const std::string &channelName, Ice::Float timestepMs, const Ice::Current &c) const
Definition: Observer.cpp:1610
armarx::Observer::Impl::channelQueue
std::map< std::string, std::set< std::string > > channelQueue
Definition: Observer.cpp:90
LAST_REFRESH_DELTA_CHANNEL
const std::string LAST_REFRESH_DELTA_CHANNEL
Definition: Observer.cpp:46
armarx::ObserverPropertyDefinitions
Definition: Observer.h:50
armarx
This file offers overloads of toIce() and fromIce() functions for STL container types.
Definition: ArmarXTimeserver.cpp:28
identifiers
const std::list< std::string > identifiers
Definition: linux_networkload.cpp:21
armarx::Observer::Impl::workerUpdates
std::deque< WorkerUpdate > workerUpdates
Definition: Observer.cpp:109
armarx::TimedVariant
Definition: TimedVariant.h:40
armarx::DataFieldIdentifier
DataFieldIdentifier provide the basis to identify data field within a distributed ArmarX scenario.
Definition: DataFieldIdentifier.h:48
armarx::Observer::postWorkerJobs
virtual void postWorkerJobs()
Definition: Observer.cpp:1112
armarx::Observer::Impl::ChannelRegistryHistory
std::unordered_map< std::string, boost::circular_buffer< std::pair< IceUtil::Time, ChannelRegistryEntry > > > ChannelRegistryHistory
Definition: Observer.cpp:60
armarx::Observer::Impl::filterQueueMutex
std::mutex filterQueueMutex
Definition: Observer.cpp:96
armarx::Observer::preWorkerJobs
virtual void preWorkerJobs()
Definition: Observer.cpp:1110
armarx::Observer::offerChannel
void offerChannel(std::string channelName, std::string description)
Offer a channel.
Definition: Observer.cpp:126
ARMARX_STREAM_PRINTER
#define ARMARX_STREAM_PRINTER
use this macro to write output code that is executed when printed and thus not executed if the debug ...
Definition: Logging.h:304