27#include <condition_variable>
29#include <boost/circular_buffer.hpp>
60 std::unordered_map<std::string,
61 boost::circular_buffer<std::pair<IceUtil::Time, ChannelRegistryEntry>>>;
111 std::function<void(
void)>
f;
133 if (
getState() < eManagedIceObjectInitialized)
135 throw LocalException()
136 <<
"offerChannel() must not be called before the Observer is initalized (i.e. not in "
137 "onInitObserver(), use onConnectObserver()";
142 if (
impl->channelRegistry.find(channelName) !=
impl->channelRegistry.end())
147 ChannelRegistryEntry channel;
148 channel.name = channelName;
149 channel.description = description;
150 channel.initialized =
false;
152 std::pair<std::string, ChannelRegistryEntry> entry;
153 entry.first = channelName;
154 entry.second = channel;
156 impl->channelRegistry.insert(entry);
161 std::string datafieldName,
163 std::string description)
165 if (
getState() < eManagedIceObjectInitialized)
167 throw LocalException()
168 <<
"offerDataFieldWithDefault() must not be called before the Observer is initalized "
169 "(i.e. not in onInitObserver(), use onConnectObserver()";
174 auto channelIt =
impl->channelRegistry.find(channelName);
176 if (channelIt ==
impl->channelRegistry.end())
181 if (channelIt->second.dataFields.find(datafieldName) != channelIt->second.dataFields.end())
186 DataFieldRegistryEntry dataField;
187 dataField.identifier =
new DatafieldRef(
this, channelName, datafieldName,
false);
188 dataField.description = description;
191 *dataField.value = defaultValue;
193 std::pair<std::string, DataFieldRegistryEntry> entry;
194 entry.first = datafieldName;
195 entry.second = dataField;
197 channelIt->second.dataFields.insert(entry);
202 std::string datafieldName,
204 std::string description)
206 if (
getState() < eManagedIceObjectInitialized)
208 throw LocalException()
209 <<
"offerDataField() must not be called before the Observer is initalized (i.e. not in "
210 "onInitObserver(), use onConnectObserver()";
215 auto channelIt =
impl->channelRegistry.find(channelName);
217 if (channelIt ==
impl->channelRegistry.end())
222 if (channelIt->second.dataFields.find(datafieldName) != channelIt->second.dataFields.end())
227 DataFieldRegistryEntry dataField;
228 dataField.identifier =
new DatafieldRef(
this, channelName, datafieldName,
false);
229 dataField.description = description;
232 dataField.value->setType(type);
234 std::pair<std::string, DataFieldRegistryEntry> entry;
235 entry.first = datafieldName;
236 entry.second = dataField;
238 impl->channelRegistry[channelName].dataFields.insert(entry);
243 std::string datafieldName,
245 const std::string& description)
273 <<
"using slow fallback";
274 for (
const auto& value : valueMap)
276 const std::string& datafieldName = value.first;
282 channelName, datafieldName, *VariantPtr::dynamicCast(value.second),
"");
287 channelName, datafieldName, VariantPtr::dynamicCast(value.second));
293 channelName, datafieldName, VariantPtr::dynamicCast(value.second));
303 std::unique_lock lock(
impl->checksMutex);
305 if (
impl->availableChecks.find(checkName) !=
impl->availableChecks.end())
310 std::pair<std::string, ConditionCheckPtr> entry;
311 entry.first = checkName;
312 entry.second = conditionCheck;
314 impl->availableChecks.insert(entry);
323 auto iter =
impl->channelRegistry.find(channelName);
325 if (iter ==
impl->channelRegistry.end())
333 impl->channelRegistry.erase(iter);
338 std::unique_lock lock(
impl->historyMutex);
339 impl->channelHistory.erase(channelName);
347 std::unique_lock lock(
impl->filterMutex);
350 std::string idStr = idptr->getIdentifierStr();
351 auto itFilter =
impl->filteredToOriginal.find(idStr);
353 if (itFilter !=
impl->filteredToOriginal.end())
355 DatafieldRefPtr refPtr = DatafieldRefPtr::dynamicCast(itFilter->second->original);
356 auto range =
impl->orignalToFiltered.equal_range(
357 refPtr->getDataFieldIdentifier()->getIdentifierStr());
359 for (
auto it = range.first; it != range.second; it++)
360 if (it->second->filtered->getDataFieldIdentifier()->getIdentifierStr() == idStr)
362 impl->orignalToFiltered.erase(it);
366 impl->filteredToOriginal.erase(itFilter);
372 auto itChannel =
impl->channelRegistry.find(id->channelName);
374 if (itChannel ==
impl->channelRegistry.end())
379 itChannel->second.dataFields.erase(id->datafieldName);
387 const std::string& datafieldName,
390 std::vector<Impl::FilterQueueData> filterData;
391 std::set<std::string> foundFilterFields;
393 std::unique_lock lock(
impl->filterMutex);
394 const std::string
id =
getName() +
"." + channelName +
"." + datafieldName;
396 auto range =
impl->orignalToFiltered.equal_range(
id);
402 long tLong = t.toMicroSeconds();
404 for (
auto it = range.first; it != range.second; it++)
406 it->second->filter->update(tLong, value);
408 foundFilterFields.insert(it->second->filtered->datafieldName);
411 new TimedVariant(VariantPtr::dynamicCast(it->second->filter->getValue()), t);
412 filterData.emplace_back(
414 it->second->filtered->channelRef->channelName,
415 it->second->filtered->datafieldName});
420 for (
auto& elem : filterData)
423 elem.channelName, elem.datafieldName, VariantPtr::dynamicCast(elem.value));
433 return foundFilterFields;
438 const std::string& datafieldName,
441 if (
impl->orignalToFiltered.size() == 0)
445 const std::string
id =
getName() +
"." + channelName +
"." + datafieldName;
447 std::unique_lock lock(
impl->filterMutex);
448 if (
impl->orignalToFiltered.count(
id) == 0)
453 std::unique_lock lock(
impl->filterQueueMutex);
454 impl->filterQueue[id] = {value, channelName, datafieldName};
455 impl->idleCondition.notify_all();
459Observer::updateFilters()
461 while (!
impl->filterUpdateTask->isStopped())
465 std::unique_lock lock(
impl->filterQueueMutex);
466 queue.swap(
impl->filterQueue);
468 std::unordered_map<std::string, std::set<std::string>>
channels;
469 for (
const Impl::FilterUpdateQueue::value_type& elem : queue)
472 elem.second.channelName, elem.second.datafieldName, elem.second.value);
473 channels[elem.second.channelName].insert(foundFilterFields.begin(),
474 foundFilterFields.end());
476 for (
const auto& channel : channels)
478 if (channel.second.size() > 0)
483 std::unique_lock lock(
impl->filterQueueMutex);
484 if (
impl->filterQueue.size() == 0)
486 impl->idleCondition.wait(lock);
499 dataFieldIter->second.value = std::move(tval);
509 const std::string& datafieldName,
511 bool triggerFilterUpdate)
516 auto itChannel =
impl->channelRegistry.find(channelName);
518 if (itChannel ==
impl->channelRegistry.end())
523 DataFieldRegistry::iterator itDF = itChannel->second.dataFields.find(datafieldName);
525 if (itDF == itChannel->second.dataFields.end())
532 itDF->second.value = value.clone();
538 valuePtr = itDF->second.value;
540 if (triggerFilterUpdate)
549 const std::string& datafieldName,
551 bool triggerFilterUpdate)
556 auto itChannel =
impl->channelRegistry.find(channelName);
558 if (itChannel ==
impl->channelRegistry.end())
563 DataFieldRegistry::iterator itDF = itChannel->second.dataFields.find(datafieldName);
565 if (itDF == itChannel->second.dataFields.end())
573 if (triggerFilterUpdate)
581 const std::string& channelName,
582 const std::unordered_map<std::string, Ice::Long>& datafieldValues)
586 auto itChannel =
impl->channelRegistry.find(channelName);
588 if (itChannel ==
impl->channelRegistry.end())
593 for (
const auto& elem : datafieldValues)
595 DataFieldRegistry::iterator itDF = itChannel->second.dataFields.find(elem.first);
597 if (itDF == itChannel->second.dataFields.end())
603 TimedVariantPtr oldValue = TimedVariantPtr::dynamicCast(itDF->second.value);
604 if (!oldValue || (elem.second > oldValue->getTimestamp() && oldValue->getInitialized()))
606 itDF->second.value =
new TimedVariant(VariantPtr::dynamicCast(itDF->second.value),
607 IceUtil::Time::microSeconds(elem.second));
617 auto itChannel =
impl->channelRegistry.find(channelName);
619 if (itChannel ==
impl->channelRegistry.end())
624 for (
auto& elem : itChannel->second.dataFields)
627 TimedVariantPtr oldValue = TimedVariantPtr::dynamicCast(elem.second.value);
628 if (!oldValue || (
timestamp > oldValue->getTimestamp() && oldValue->getInitialized()))
630 elem.second.value =
new TimedVariant(VariantPtr::dynamicCast(elem.second.value),
638 const std::string& description,
640 bool triggerFilterUpdate)
644 auto itChannel =
impl->channelRegistry.find(channelName);
645 if (itChannel ==
impl->channelRegistry.end())
648 auto& channel =
impl->channelRegistry[channelName];
649 channel.name = channelName;
650 channel.description = description;
651 channel.initialized =
false;
653 itChannel =
impl->channelRegistry.find(channelName);
657 auto targ = itChannel->second.dataFields.begin();
658 auto vals = datafieldValues.begin();
659 while (targ != itChannel->second.dataFields.end() && vals != datafieldValues.end())
661 if (vals->first > targ->first)
665 else if (vals->first == targ->first)
676 if (vals != datafieldValues.end())
681 if (triggerFilterUpdate)
683 for (
const auto& elem : datafieldValues)
692 const std::string& channelName,
693 const std::unordered_map<::std::string, ::armarx::VariantBasePtr>& datafieldValues,
694 bool triggerFilterUpdate)
699 auto itChannel =
impl->channelRegistry.find(channelName);
701 if (itChannel ==
impl->channelRegistry.end())
706 for (
const auto& elem : datafieldValues)
708 DataFieldRegistry::iterator itDF = itChannel->second.dataFields.find(elem.first);
710 if (itDF == itChannel->second.dataFields.end())
718 if (triggerFilterUpdate)
720 for (
const auto& elem : datafieldValues)
730 bool triggerFilterUpdate)
735 auto itChannel =
impl->channelRegistry.find(channelName);
737 if (itChannel ==
impl->channelRegistry.end())
742 for (
const auto& elem : datafieldValues)
744 DataFieldRegistry::iterator itDF = itChannel->second.dataFields.find(elem.first);
746 if (itDF == itChannel->second.dataFields.end())
755 if (triggerFilterUpdate)
757 for (
const auto& elem : datafieldValues)
765Observer::updateRefreshRateChannel(
const std::string& channelName)
767 auto& oldUpdateTime =
impl->channelUpdateTimestamps[channelName];
768 auto now = IceUtil::Time::now();
774 new Variant((now - oldUpdateTime).toMilliSecondsDouble()),
782 "Update delta of channel '" + channelName +
"'");
789 const std::set<std::string>& updatedDatafields)
795 std::unique_lock lock(
impl->channelQueueMutex);
796 std::set<std::string>& dfs =
impl->channelQueue[channelName];
797 dfs.insert(updatedDatafields.begin(), updatedDatafields.end());
798 impl->idleChannelCondition.notify_all();
802Observer::channelUpdateFunction()
805 while (!
impl->channelUpdateTask->isStopped())
807 std::map<std::string, std::set<std::string>> queue;
809 std::unique_lock lock(
impl->channelQueueMutex);
810 queue.swap(
impl->channelQueue);
812 for (
const auto& elem : queue)
814 doChannelUpdate(elem.first, elem.second);
816 std::unique_lock lock(
impl->channelQueueMutex);
817 if (
impl->channelQueue.size() == 0)
819 impl->idleChannelCondition.wait(lock);
825Observer::addToChannelHistory(
const std::pair<IceUtil::Time, ChannelRegistryEntry>& historyEntry,
826 const std::string& channelName)
828 if (
impl->maxHistorySize == 0)
832 std::unique_lock lockHistory(
impl->historyMutex);
833 auto historyIt =
impl->channelHistory.find(channelName);
834 if (historyIt ==
impl->channelHistory.end())
836 impl->channelHistory[channelName].set_capacity(
impl->maxHistorySize);
837 historyIt =
impl->channelHistory.find(channelName);
840 if (historyIt->second.empty() ||
841 now > historyIt->second.rbegin()->first +
842 IceUtil::Time::secondsDouble(1.0 /
impl->maxHistoryRecordFrequency))
844 historyIt->second.push_back(historyEntry);
849Observer::doChannelUpdate(
const std::string& channelName,
850 const std::set<std::string>& updatedDatafields)
856 ChannelRegistryEntry entry;
857 std::pair<IceUtil::Time, ChannelRegistryEntry> historyEntry;
861 auto iterChannel =
impl->channelRegistry.find(channelName);
863 if (iterChannel ==
impl->channelRegistry.end())
865 throw exceptions::user::InvalidChannelException(channelName);
869 if (
impl->logChannelUpdateRate)
871 updateRefreshRateChannel(channelName);
875 const DataFieldRegistry& dataFields = iterChannel->second.dataFields;
876 DataFieldRegistry::const_iterator iterDataFields = dataFields.begin();
877 bool channelInitialized =
true;
879 while (iterDataFields != dataFields.end())
881 channelInitialized &= iterDataFields->second.value->getInitialized();
885 iterChannel->second.initialized = channelInitialized;
886 if (iterChannel->second.conditionChecks.size() > 0)
888 entry = iterChannel->second;
892 addToChannelHistory(historyEntry, channelName);
894 ConditionCheckRegistry::iterator iterChecks = entry.conditionChecks.begin();
896 while (iterChecks != entry.conditionChecks.end())
899 if (updatedDatafields.size() > 0)
902 for (
const auto& elem : entry.dataFields)
904 const DataFieldRegistryEntry& datafieldEntry = elem.second;
905 if (updatedDatafields.count(datafieldEntry.identifier->datafieldName))
921 evaluateCheck(ConditionCheckPtr::dynamicCast(iterChecks->second), entry);
926 ARMARX_ERROR <<
"Evaluating condition for channel " << channelName <<
" failed!";
944Observer::onInitComponent()
950 std::unique_lock lock(
impl->idMutex);
955 impl->filterUpdateTask =
956 new RunningTask<Observer>(
this, &Observer::updateFilters,
"Filter update task");
957 impl->filterUpdateTask->start();
958 impl->channelUpdateTask =
959 new RunningTask<Observer>(
this, &Observer::channelUpdateFunction,
"Channel update task");
960 impl->channelUpdateTask->start();
964Observer::onConnectComponent()
971 "Metachannel with the last channel update deltas of all channels in milliseconds");
973 impl->channelHistory.clear();
980 auto proxy = ObserverInterfacePrx::checkedCast(
getProxy());
982 for (
auto& channel :
impl->channelRegistry)
984 ChannelRegistryEntry&
c = channel.second;
985 for (
auto& df :
c.dataFields)
987 DataFieldRegistryEntry& d = df.second;
988 d.identifier->channelRef->observerProxy = proxy;
992 if (
impl->logChannelUpdateRate)
994 impl->metaTask->start();
999Observer::postOnConnectComponent()
1002 impl->stopWorker =
false;
1008Observer::preOnDisconnectComponent()
1011 impl->stopWorker =
true;
1012 if (
impl->worker.joinable())
1014 impl->worker.join();
1019Observer::onExitComponent()
1024 impl->metaTask->stop();
1026 if (
impl->filterUpdateTask)
1028 impl->filterUpdateTask->stop(
false);
1030 impl->idleCondition.notify_all();
1031 if (
impl->filterUpdateTask)
1033 impl->filterUpdateTask->stop(
true);
1035 if (
impl->channelUpdateTask)
1037 impl->channelUpdateTask->stop(
false);
1039 impl->idleChannelCondition.notify_all();
1040 if (
impl->channelUpdateTask)
1042 impl->channelUpdateTask->stop(
true);
1044 impl->channelHistory.clear();
1048Observer::createPropertyDefinitions()
1065 std::unique_lock lock(
impl->idMutex);
1067 return impl->currentId++;
1071Observer::createCheck(
const CheckConfiguration& configuration)
const
1073 std::string checkName = configuration.checkName;
1076 StringConditionCheckMap::const_iterator iterChecks =
impl->availableChecks.find(checkName);
1078 if (iterChecks ==
impl->availableChecks.end())
1080 std::string reason =
1081 "Invalid condition check \"" + checkName +
"\" for observer \"" +
getName() +
"\".";
1082 throw InvalidConditionException(reason.c_str());
1087 ->createInstance(configuration,
impl->channelRegistry);
1099 CheckIdentifier identifier;
1100 identifier.uniqueId = id;
1101 identifier.channelName = check->configuration.dataFieldIdentifier->channelName;
1102 identifier.observerName =
getName();
1105 std::pair<int, ConditionCheckPtr> entry;
1107 entry.second = check;
1108 impl->channelRegistry[check->configuration.dataFieldIdentifier->channelName]
1109 .conditionChecks.insert(entry);
1115Observer::evaluateCheck(
const ConditionCheckPtr& check,
const ChannelRegistryEntry& channel)
const
1117 check->evaluateCondition(channel.dataFields);
1129 std::deque<Impl::WorkerUpdate> toProcess;
1130 while (!
impl->stopWorker)
1138 std::lock_guard g{
impl->workerUpdatesMutex};
1139 std::swap(
impl->workerUpdates, toProcess);
1141 if (toProcess.empty())
1144 <<
"no worker jobs (message only posted every 10 seconds";
1145 std::this_thread::sleep_for(std::chrono::milliseconds{1});
1149 float minage = std::numeric_limits<float>::infinity();
1159 std::map<std::string, Counters> counters;
1162 for (
auto& f : toProcess)
1165 float age = f.ageInMs();
1166 Counters& counter = counters[f.name];
1172 minage = std::min(minage, age);
1173 maxage = std::max(maxage, age);
1180 ARMARX_DEBUG <<
"observer worker got " << toProcess.size() <<
"\tjobs ages min/max "
1181 << minage <<
"\t" << maxage <<
"\n"
1184 for (
const auto& [key,
c] : counters)
1186 out <<
" " << key <<
"\t# " <<
c.n <<
"\tavg age " << (
c.age /
c.n)
1187 <<
"\tavg time " << (
c.time /
c.n) <<
"\n";
1197 std::lock_guard g{
impl->workerUpdatesMutex};
1198 impl->workerUpdates.emplace_back(name, std::move(f));
1204 std::lock_guard g{
impl->workerUpdatesMutex};
1205 impl->workerUpdates.emplace_back(name, std::move(f));
1221 return ClockT::now();
1227 return std::chrono::duration_cast<std::chrono::nanoseconds>(
Now() - t0).count() * 1e-6f;
1240 const Ice::Current&)
const
1252 std::unique_lock lock_checks(
impl->checksMutex);
1254 check = createCheck(configuration);
1258 CheckIdentifier identifier;
1261 identifier = registerCheck(check);
1271 ChannelRegistryEntry channel =
1272 impl->channelRegistry[configuration.dataFieldIdentifier->channelName];
1275 evaluateCheck(check, channel);
1283 const CheckConfiguration& configuration,
1284 const Ice::Current&)
1296 auto iter =
impl->channelRegistry.find(
id.channelName);
1298 if (iter ==
impl->channelRegistry.end())
1304 ConditionCheckRegistry::iterator iterCheck = iter->second.conditionChecks.find(
id.uniqueId);
1307 if (iterCheck != iter->second.conditionChecks.end())
1309 iter->second.conditionChecks.erase(iterCheck);
1317 const CheckIdentifier&
id,
1318 const Ice::Current&)
1334 const Ice::Current&
c)
const
1346 auto it =
impl->channelRegistry.find(channelName);
1347 if (it ==
impl->channelRegistry.end())
1352 auto itDF = it->second.dataFields.find(datafieldName);
1353 if (itDF == it->second.dataFields.end())
1359 TimedVariantPtr tv = TimedVariantPtr::dynamicCast(itDF->second.value);
1362 ARMARX_IMPORTANT <<
"could not cast timed variant: " << itDF->second.value->ice_id();
1369 const std::string& channelName,
1370 const std::string& datafieldName,
1371 const Ice::Current&)
const
1386 auto it =
impl->channelRegistry.find(identifier->channelName);
1387 if (it ==
impl->channelRegistry.end())
1392 auto itDF = it->second.dataFields.find(identifier->datafieldName);
1393 if (itDF == it->second.dataFields.end())
1396 identifier->datafieldName);
1399 return DatafieldRefPtr::dynamicCast(itDF->second.identifier);
1405 const Ice::Current&)
const
1413 const std::string& datafieldName)
const
1420 const std::string& channelName,
1421 const std::string& datafieldName,
1422 const Ice::Current&)
const
1435 TimedVariantBaseList result;
1437 DataFieldIdentifierBaseList::const_iterator iter =
identifiers.begin();
1452 const Ice::Current&
c)
1458StringTimedVariantBaseMap
1462 StringTimedVariantBaseMap result;
1464 auto it =
impl->channelRegistry.find(channelName);
1465 if (it ==
impl->channelRegistry.end())
1469 DataFieldRegistry fields = it->second.dataFields;
1470 for (
auto& field : fields)
1472 result[field.first] = TimedVariantBasePtr::dynamicCast(field.second.value);
1479 const std::string& channelName,
1480 const Ice::Current&)
const
1492 auto it =
impl->channelRegistry.find(channelName);
1493 if (it ==
impl->channelRegistry.end())
1502 const std::string& channelName,
1503 const Ice::Current&)
const
1515 ChannelRegistry result(
impl->channelRegistry.begin(),
impl->channelRegistry.end());
1516 if (!includeMetaChannels)
1525 bool includeMetaChannels,
1526 const Ice::Current&)
1531 includeMetaChannels);
1535StringConditionCheckMap
1538 std::unique_lock lock(
impl->checksMutex);
1540 return impl->availableChecks;
1545 const Ice::Current&)
1556 return impl->channelRegistry.find(channelName) !=
impl->channelRegistry.end();
1561 const std::string& channelName,
1562 const Ice::Current&)
const
1573 auto itChannel =
impl->channelRegistry.find(channelName);
1575 if (itChannel ==
impl->channelRegistry.end())
1580 return itChannel->second.dataFields.find(datafieldName) != itChannel->second.dataFields.end();
1585 const std::string& channelName,
1586 const std::string& datafieldName,
1587 const Ice::Current&)
const
1596 const DatafieldRefBasePtr& datafieldRef)
1606 std::string filteredName = datafieldRef->datafieldName +
"_" + filter->ice_id();
1611 while (
existsDataField(datafieldRef->channelRef->channelName, filteredName))
1615 datafieldRef->datafieldName +
"_" + filter->ice_id() +
"_" +
ValueToString(i);
1624 const DatafieldFilterBasePtr& filter,
1625 const DatafieldRefBasePtr& datafieldRef,
1626 const Ice::Current&)
1638 const DatafieldFilterBasePtr& filter,
1639 const DatafieldRefBasePtr& datafieldRef)
1645 if (!filter->checkTypeSupport(ref->getDataField()->getType()))
1647 auto types = filter->getSupportedTypes();
1648 std::string suppTypes =
"supported types";
1650 for (
auto t : types)
1660 if (
existsDataField(datafieldRef->channelRef->channelName, filterDatafieldName))
1667 auto var = ref->getDataField();
1668 filter->update(var->getTime().toMicroSeconds(), var);
1672 filterDatafieldName,
1673 *VariantPtr::dynamicCast(filter->getValue()),
1674 "Filtered value of " +
1675 ref->getDataFieldIdentifier()->getIdentifierStr());
1678 ChannelRefPtr channel = ChannelRefPtr::dynamicCast(ref->channelRef);
1679 channel->refetchChannel();
1682 data->filter = filter;
1683 data->original = ref;
1684 data->filtered = filteredRef;
1685 std::unique_lock lock(
impl->filterMutex);
1686 impl->orignalToFiltered.insert(
1687 std::make_pair(ref->getDataFieldIdentifier()->getIdentifierStr(),
data));
1688 impl->filteredToOriginal[filteredRef->getDataFieldIdentifier()->getIdentifierStr()] =
data;
1694 const AMD_ObserverInterface_createNamedFilteredDatafieldPtr& amd,
1695 const std::string& filterDatafieldName,
1696 const DatafieldFilterBasePtr& filter,
1697 const DatafieldRefBasePtr& datafieldRef,
1698 const Ice::Current&)
1703 filterDatafieldName,
1716 std::unique_lock lock(
impl->filterMutex);
1717 ref = DatafieldRefPtr::dynamicCast(datafieldRef);
1718 const std::string idStr = ref->getDataFieldIdentifier()->getIdentifierStr();
1719 auto it =
impl->filteredToOriginal.find(idStr);
1720 remove = (it !=
impl->filteredToOriginal.end());
1731 const DatafieldRefBasePtr& datafieldRef,
1732 const Ice::Current&)
1741 Ice::Float timestepMs,
1742 const Ice::Current&
c)
const
1746 channelName, 0, std::numeric_limits<long>::max(), timestepMs,
c);
1751 const std::string& channelName,
1752 Ice::Float timestepMs,
1753 const Ice::Current&
c)
const
1766 Ice::Long startTimestamp,
1767 Ice::Long endTimestamp,
1768 Ice::Float timestepMs,
1769 const Ice::Current&
c)
const
1773 std::unique_lock lock_channels(
impl->historyMutex);
1775 ChannelHistory result;
1776 auto historyIt =
impl->channelHistory.find(channelName);
1777 if (historyIt ==
impl->channelHistory.end())
1783 auto lastInsertIt = result.begin();
1784 Ice::Long lastUsedTimestep = 0;
1785 float timestepUs = timestepMs * 1000;
1786 for (
auto& entry : historyIt->second)
1788 long timestamp = entry.first.toMicroSeconds();
1796 lastInsertIt = result.emplace_hint(result.end(),
1797 std::make_pair(
timestamp, entry.second.dataFields));
1806 const AMD_ObserverInterface_getPartialChannelHistoryPtr& amd,
1807 const std::string& channelName,
1808 Ice::Long startTimestamp,
1809 Ice::Long endTimestamp,
1810 Ice::Float timestepMs,
1811 const Ice::Current&
c)
const
1826 const std::string& datafieldName,
1827 Ice::Float timestepMs,
1828 const Ice::Current&
c)
const
1832 channelName, datafieldName, 0, std::numeric_limits<long>::max(), timestepMs,
c);
1837 const std::string& channelName,
1838 const std::string& datafieldName,
1839 Ice::Float timestepMs,
1840 const Ice::Current&
c)
const
1854 const std::string& datafieldName,
1855 Ice::Long startTimestamp,
1856 Ice::Long endTimestamp,
1857 Ice::Float timestepMs,
1858 const Ice::Current&
c)
const
1861 std::unique_lock lock_channels(
impl->historyMutex);
1862 TimedVariantBaseList result;
1863 auto historyIt =
impl->channelHistory.find(channelName);
1864 if (historyIt ==
impl->channelHistory.end())
1873 Ice::Long lastUsedTimestep = 0;
1874 float timestepUs = timestepMs * 1000;
1875 for (
auto& entry : historyIt->second)
1877 long timestamp = entry.first.toMicroSeconds();
1885 auto datafieldIt = entry.second.dataFields.find(datafieldName);
1886 if (datafieldIt == entry.second.dataFields.end())
1892 TimedVariantPtr tvar = TimedVariantPtr::dynamicCast(datafieldIt->second.value);
1895 result.emplace_back(tvar);
1906 const AMD_ObserverInterface_getPartialDatafieldHistoryPtr& amd,
1907 const std::string& channelName,
1908 const std::string& datafieldName,
1909 Ice::Long startTimestamp,
1910 Ice::Long endTimestamp,
1911 Ice::Float timestepMs,
1912 const Ice::Current&
c)
const
#define ARMARX_STREAM_PRINTER
use this macro to write output code that is executed when printed and thus not executed if the debug ...
const std::string LAST_REFRESH_DELTA_CHANNEL
std::string getConfigIdentifier()
Retrieve config identifier for this component as set in constructor.
Property< PropertyType > getProperty(const std::string &name)
A ConditionCheck implements a check on the sensor data stream of a Sensor-Actor Unit.
armarx::ChannelRegistry ChannelRegistry
Creates a new ConditionCheck instance.
DataFieldIdentifier provide the basis to identify data field within a distributed ArmarX scenario.
The DatafieldRef class is similar to the ChannelRef, but points to a specific Datafield instead of to...
SpamFilterDataPtr deactivateSpam(float deactivationDurationSec=10.0f, const std::string &identifier="", bool deactivate=true) const
disables the logging for the current line for the given amount of seconds.
std::string getName() const
Retrieve name of object.
Ice::ObjectPrx getProxy(long timeoutMs=0, bool waitForScheduler=true) const
Returns the proxy of this object (optionally it waits for the proxy)
int getState() const
Retrieve current state of the ManagedIceObject.
void removeFilteredDatafield(const DatafieldRefBasePtr &datafieldRef)
Removes a previously installed filter.
DatafieldRefBasePtr createFilteredDatafield(const DatafieldFilterBasePtr &filter, const DatafieldRefBasePtr &datafieldRef)
This function creates a new datafield with new filter on the given datafield.
std::string getObserverName() const
void getAvailableChecks_async(const AMD_ObserverInterface_getAvailableChecksPtr &amd, const Ice::Current &) override
bool existsChannel(const std::string &channelName) const
void offerChannel(std::string channelName, std::string description)
Offer a channel.
void removeCheck_async(const AMD_ObserverInterface_removeCheckPtr &amd, const CheckIdentifier &id, const Ice::Current &) override
void existsChannel_async(const AMD_ObserverInterface_existsChannelPtr &amd, const std::string &channelName, const Ice::Current &) const override
void getDatafieldHistory_async(const AMD_ObserverInterface_getDatafieldHistoryPtr &amd, const std::string &channelName, const std::string &datafieldName, Ice::Float timestepMs, const Ice::Current &c) const override
virtual void postWorkerJobs()
void getPartialDatafieldHistory_async(const AMD_ObserverInterface_getPartialDatafieldHistoryPtr &amd, const std::string &channelName, const std::string &datafieldName, Ice::Long startTimestamp, Ice::Long endTimestamp, Ice::Float timestepMs, const Ice::Current &c) const override
void removeDatafield(DataFieldIdentifierBasePtr id)
void scheduleDatafieldFilterUpdate(const std::string &channelName, const std::string &datafieldName, const VariantBasePtr &value)
typename ClockT::time_point TimepointT
void getDataFieldRef_async(const AMD_ObserverInterface_getDataFieldRefPtr &amd, const DataFieldIdentifierBasePtr &identifier, const Ice::Current &) const override
StringConditionCheckMap getAvailableChecks()
Retrieve list of available condition checks.
void removeChannel(std::string channelName)
Remove a channel.
void removeCheck(const CheckIdentifier &id)
Removes a condition check from the observer.
void getChannelHistory_async(const AMD_ObserverInterface_getChannelHistoryPtr &amd, const std::string &channelName, Ice::Float timestepMs, const Ice::Current &c) const override
void getAvailableChannels_async(const AMD_ObserverInterface_getAvailableChannelsPtr &amd, bool includeMetaChannels, const Ice::Current &) override
void getDataFields_async(const AMD_ObserverInterface_getDataFieldsPtr &amd, const DataFieldIdentifierBaseList &identifiers, const Ice::Current &c) override
void offerDataField(std::string channelName, std::string datafieldName, VariantTypeId type, std::string description)
Offer a datafield without default value.
void getDatafieldByName_async(const AMD_ObserverInterface_getDatafieldByNamePtr &amd, const std::string &channelName, const std::string &datafieldName, const Ice::Current &) const override
void removeFilteredDatafield_async(const AMD_ObserverInterface_removeFilteredDatafieldPtr &amd, const DatafieldRefBasePtr &datafieldRef, const Ice::Current &) override
void getChannel_async(const AMD_ObserverInterface_getChannelPtr &amd, const std::string &channelName, const Ice::Current &) const override
void offerConditionCheck(std::string checkName, ConditionCheck *conditionCheck)
Offer a condition check.
ChannelRegistryEntry getChannel(const std::string &channelName) const
Retrieve information on all sensory data channels available from the observer.
void getDataField_async(const AMD_ObserverInterface_getDataFieldPtr &amd, const DataFieldIdentifierBasePtr &identifier, const Ice::Current &c) const override
void createNamedFilteredDatafield_async(const AMD_ObserverInterface_createNamedFilteredDatafieldPtr &amd, const std::string &filterDatafieldName, const DatafieldFilterBasePtr &filter, const DatafieldRefBasePtr &datafieldRef, const Ice::Current &) override
void getObserverName_async(const AMD_ObserverInterface_getObserverNamePtr &amd, const Ice::Current &) const override
virtual void onInitObserver()=0
Framework hook.
void updateChannel(const std::string &channelName, const std::set< std::string > &updatedDatafields=std::set< std::string >())
Update all conditions for a channel.
TimedVariantBasePtr getDataField(const DataFieldIdentifierBasePtr &identifier, const Ice::Current &c=Ice::emptyCurrent) const
Retrieve data field from observer.
void addWorkerJob(const std::string &name, std::function< void(void)> &&f) const
void setDataFieldsFlatCopy(const std::string &channelName, const StringVariantBaseMap &datafieldValues, bool triggerFilterUpdate=true)
TimedVariantBasePtr getDatafieldByName(const std::string &channelName, const std::string &datafieldName) const
ChannelHistory getChannelHistory(const std::string &channelName, Ice::Float timestepMs, const Ice::Current &c) const
std::unique_ptr< Impl > impl
void maybeOfferChannelAndSetDataFieldsFlatCopy(const std::string &channelName, const std::string &description, const StringVariantBaseMap &datafieldValues, bool triggerFilterUpdate=true)
TimedVariantBaseList getDataFields(const DataFieldIdentifierBaseList &identifiers, const Ice::Current &c)
Retrieve list of data field from observer.
void offerDataFieldWithDefault(std::string channelName, std::string datafieldName, const Variant &defaultValue, std::string description)
Offer a datafield with default value.
void existsDataField_async(const AMD_ObserverInterface_existsDataFieldPtr &amd, const std::string &channelName, const std::string &datafieldName, const Ice::Current &) const override
virtual void preWorkerJobs()
void updateDatafieldTimestamps(const std::string &channelName, const std::unordered_map< std::string, Ice::Long > &datafieldValues)
static float TimeDeltaInMs(TimepointT t0)
ChannelHistory getPartialChannelHistory(const std::string &channelName, Ice::Long startTimestamp, Ice::Long endTimestamp, Ice::Float timestepMs, const Ice::Current &c) const
virtual void onConnectObserver()=0
Framework hook.
void getDatafieldRefByName_async(const AMD_ObserverInterface_getDatafieldRefByNamePtr &amd, const std::string &channelName, const std::string &datafieldName, const Ice::Current &) const override
bool existsDataField(const std::string &channelName, const std::string &datafieldName) const
TimedVariantBaseList getDatafieldHistory(const std::string &channelName, const std::string &datafieldName, Ice::Float timestepMs, const Ice::Current &c) const
void getDatafieldsOfChannel_async(const AMD_ObserverInterface_getDatafieldsOfChannelPtr &amd, const std::string &channelName, const Ice::Current &) const override
TimedVariantBaseList getPartialDatafieldHistory(const std::string &channelName, const std::string &datafieldName, Ice::Long startTimestamp, Ice::Long endTimestamp, Ice::Float timestepMs, const Ice::Current &c) const
std::recursive_mutex channelsMutex
DatafieldRefBasePtr getDatafieldRefByName(const std::string &channelName, const std::string &datafieldName) const
void createFilteredDatafield_async(const AMD_ObserverInterface_createFilteredDatafieldPtr &amd, const DatafieldFilterBasePtr &filter, const DatafieldRefBasePtr &datafieldRef, const Ice::Current &) override
ChannelRegistry getAvailableChannels(bool includeMetaChannels)
Retrieve information on all sensory data channels available from the observer.
void setDataField(const std::string &channelName, const std::string &datafieldName, const Variant &value, bool triggerFilterUpdate=true)
set datafield with datafieldName and in channel channelName
void setDataFieldFlatCopy(const std::string &channelName, const std::string &datafieldName, const VariantPtr &value, bool triggerFilterUpdate=true)
DatafieldRefBasePtr createNamedFilteredDatafield(const std::string &filterDatafieldName, const DatafieldFilterBasePtr &filter, const DatafieldRefBasePtr &datafieldRef)
CheckIdentifier installCheck(const CheckConfiguration &configuration)
Installs a condition check with the observer.
DatafieldRefBasePtr getDataFieldRef(const DataFieldIdentifierBasePtr &identifier) const
void getPartialChannelHistory_async(const AMD_ObserverInterface_getPartialChannelHistoryPtr &amd, const std::string &channelName, Ice::Long startTimestamp, Ice::Long endTimestamp, Ice::Float timestepMs, const Ice::Current &c) const override
virtual void onExitObserver()
Framework hook.
StringTimedVariantBaseMap getDatafieldsOfChannel(const std::string &channelName) const
void offerOrUpdateDataFieldsFlatCopy(const std::string &channelName, const StringVariantBaseMap &valueMap)
bool offerOrUpdateDataField(std::string channelName, std::string datafieldName, const Variant &value, const std::string &description)
void installCheck_async(const AMD_ObserverInterface_installCheckPtr &amd, const CheckConfiguration &configuration, const Ice::Current &) override
std::set< std::string > updateDatafieldFilter(const std::string &channelName, const std::string &datafieldName, const VariantBasePtr &value)
IceUtil::Handle< PeriodicTask< T > > pointer_type
Shared pointer type for convenience.
IceUtil::Handle< RunningTask< T > > pointer_type
Shared pointer type for convenience.
static IceUtil::Time GetTime(TimeMode timeMode=TimeMode::VirtualTime)
Get the current time.
The Variant class is described here: Variants.
std::string getTypeName(const Ice::Current &c=Ice::emptyCurrent) const override
Return the Variant's internal type.
static std::string typeToString(VariantTypeId typeId)
Return the name of the registered type typeId.
#define ARMARX_CHECK_EXPRESSION(expression)
This macro evaluates the expression and if it turns out to be false it will throw an ExpressionExcept...
#define ARMARX_INFO
The normal logging level.
#define ARMARX_IMPORTANT
The logging level for always important information, but expected behaviour (in contrast to ARMARX_WAR...
#define ARMARX_ERROR
The logging level for unexpected behaviour, that must be fixed.
#define ARMARX_DEBUG
The logging level for output that is only interesting while debugging.
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
#define ARMARX_ON_SCOPE_EXIT
Executes given code when the enclosing scope is left.
const std::list< std::string > identifiers
This file offers overloads of toIce() and fromIce() functions for STL container types.
std::map< std::string, VariantBasePtr > StringVariantBaseMap
IceInternal::Handle< DataFieldIdentifierBase > DataFieldIdentifierBasePtr
IceInternal::Handle< TimedVariant > TimedVariantPtr
IceInternal::Handle< Variant > VariantPtr
IceInternal::Handle< ChannelRef > ChannelRefPtr
::std::vector<::armarx::DataFieldIdentifierBasePtr > DataFieldIdentifierBaseList
IceInternal::Handle< ConditionCheck > ConditionCheckPtr
IceInternal::Handle< DatafieldRef > DatafieldRefPtr
IceUtil::Handle< class PropertyDefinitionContainer > PropertyDefinitionsPtr
PropertyDefinitions smart pointer type.
const LogSender::manipulator flush
::IceInternal::Handle<::armarx::VariantBase > VariantBasePtr
IceInternal::Handle< DataFieldIdentifier > DataFieldIdentifierPtr
Typedef of DataFieldIdentifierPtr as IceInternal::Handle<DataFieldIdentifier> for convenience.
std::string ValueToString(const T &value)
DatafieldFilterBasePtr filter
std::string datafieldName
WorkerUpdate(const std::string &name, std::function< void(void)> f)
std::function< void(void)> f
PeriodicTask< Observer >::pointer_type metaTask
RunningTask< Observer >::pointer_type channelUpdateTask
FilterUpdateQueue filterQueue
std::condition_variable idleCondition
std::multimap< std::string, FilterDataPtr > orignalToFiltered
ChannelRegistryHistory channelHistory
std::recursive_mutex historyMutex
std::deque< WorkerUpdate > workerUpdates
std::recursive_mutex filterMutex
std::map< std::string, IceUtil::Time > channelUpdateTimestamps
std::unordered_map< std::string, FilterQueueData > FilterUpdateQueue
RunningTask< Observer >::pointer_type filterUpdateTask
ConditionCheck::ChannelRegistry channelRegistry
StringConditionCheckMap availableChecks
std::atomic_bool stopWorker
std::map< std::string, FilterDataPtr > filteredToOriginal
std::map< std::string, std::set< std::string > > channelQueue
bool logChannelUpdateRate
std::unordered_map< std::string, boost::circular_buffer< std::pair< IceUtil::Time, ChannelRegistryEntry > > > ChannelRegistryHistory
std::recursive_mutex workerUpdatesMutex
float maxHistoryRecordFrequency
std::mutex filterQueueMutex
std::condition_variable idleChannelCondition
std::mutex channelQueueMutex
IceUtil::Handle< FilterData > FilterDataPtr