27 #include <condition_variable>
29 #include <boost/circular_buffer.hpp>
50 bool logChannelUpdateRate =
false;
60 std::unordered_map<std::string,
61 boost::circular_buffer<std::pair<IceUtil::Time, ChannelRegistryEntry>>>;
106 WorkerUpdate(
const std::string& name, std::function<
void(
void)> f);
107 float ageInMs()
const;
111 std::function<void(
void)>
f;
133 if (
getState() < eManagedIceObjectInitialized)
135 throw LocalException()
136 <<
"offerChannel() must not be called before the Observer is initalized (i.e. not in "
137 "onInitObserver(), use onConnectObserver()";
142 if (
impl->channelRegistry.find(channelName) !=
impl->channelRegistry.end())
147 ChannelRegistryEntry channel;
148 channel.name = channelName;
149 channel.description = description;
150 channel.initialized =
false;
152 std::pair<std::string, ChannelRegistryEntry> entry;
153 entry.first = channelName;
154 entry.second = channel;
156 impl->channelRegistry.insert(entry);
161 std::string datafieldName,
163 std::string description)
165 if (
getState() < eManagedIceObjectInitialized)
167 throw LocalException()
168 <<
"offerDataFieldWithDefault() must not be called before the Observer is initalized "
169 "(i.e. not in onInitObserver(), use onConnectObserver()";
174 auto channelIt =
impl->channelRegistry.find(channelName);
176 if (channelIt ==
impl->channelRegistry.end())
181 if (channelIt->second.dataFields.find(datafieldName) != channelIt->second.dataFields.end())
186 DataFieldRegistryEntry dataField;
187 dataField.identifier =
new DatafieldRef(
this, channelName, datafieldName,
false);
188 dataField.description = description;
191 *dataField.value = defaultValue;
193 std::pair<std::string, DataFieldRegistryEntry> entry;
194 entry.first = datafieldName;
195 entry.second = dataField;
197 channelIt->second.dataFields.insert(entry);
202 std::string datafieldName,
204 std::string description)
206 if (
getState() < eManagedIceObjectInitialized)
208 throw LocalException()
209 <<
"offerDataField() must not be called before the Observer is initalized (i.e. not in "
210 "onInitObserver(), use onConnectObserver()";
215 auto channelIt =
impl->channelRegistry.find(channelName);
217 if (channelIt ==
impl->channelRegistry.end())
222 if (channelIt->second.dataFields.find(datafieldName) != channelIt->second.dataFields.end())
227 DataFieldRegistryEntry dataField;
228 dataField.identifier =
new DatafieldRef(
this, channelName, datafieldName,
false);
229 dataField.description = description;
232 dataField.value->setType(type);
234 std::pair<std::string, DataFieldRegistryEntry> entry;
235 entry.first = datafieldName;
236 entry.second = dataField;
238 impl->channelRegistry[channelName].dataFields.insert(entry);
243 std::string datafieldName,
245 const std::string& description)
273 <<
"using slow fallback";
274 for (
const auto&
value : valueMap)
276 const std::string& datafieldName =
value.first;
282 channelName, datafieldName, *VariantPtr::dynamicCast(
value.second),
"");
287 channelName, datafieldName, VariantPtr::dynamicCast(
value.second));
293 channelName, datafieldName, VariantPtr::dynamicCast(
value.second));
303 std::unique_lock lock(
impl->checksMutex);
305 if (
impl->availableChecks.find(checkName) !=
impl->availableChecks.end())
310 std::pair<std::string, ConditionCheckPtr> entry;
311 entry.first = checkName;
312 entry.second = conditionCheck;
314 impl->availableChecks.insert(entry);
323 auto iter =
impl->channelRegistry.find(channelName);
325 if (iter ==
impl->channelRegistry.end())
333 impl->channelRegistry.erase(iter);
338 std::unique_lock lock(
impl->historyMutex);
339 impl->channelHistory.erase(channelName);
347 std::unique_lock lock(
impl->filterMutex);
350 std::string idStr = idptr->getIdentifierStr();
351 auto itFilter =
impl->filteredToOriginal.find(idStr);
353 if (itFilter !=
impl->filteredToOriginal.end())
355 DatafieldRefPtr refPtr = DatafieldRefPtr::dynamicCast(itFilter->second->original);
356 auto range =
impl->orignalToFiltered.equal_range(
357 refPtr->getDataFieldIdentifier()->getIdentifierStr());
359 for (
auto it = range.first; it != range.second; it++)
360 if (it->second->filtered->getDataFieldIdentifier()->getIdentifierStr() == idStr)
362 impl->orignalToFiltered.erase(it);
366 impl->filteredToOriginal.erase(itFilter);
372 auto itChannel =
impl->channelRegistry.find(id->channelName);
374 if (itChannel ==
impl->channelRegistry.end())
379 itChannel->second.dataFields.erase(id->datafieldName);
385 std::set<std::string>
387 const std::string& datafieldName,
390 std::vector<Impl::FilterQueueData> filterData;
391 std::set<std::string> foundFilterFields;
393 std::unique_lock lock(
impl->filterMutex);
394 const std::string
id =
getName() +
"." + channelName +
"." + datafieldName;
396 auto range =
impl->orignalToFiltered.equal_range(
id);
402 long tLong = t.toMicroSeconds();
404 for (
auto it = range.first; it != range.second; it++)
406 it->second->filter->update(tLong,
value);
408 foundFilterFields.insert(it->second->filtered->datafieldName);
411 new TimedVariant(VariantPtr::dynamicCast(it->second->filter->getValue()), t);
412 filterData.emplace_back(
414 it->second->filtered->channelRef->channelName,
415 it->second->filtered->datafieldName});
420 for (
auto& elem : filterData)
423 elem.channelName, elem.datafieldName, VariantPtr::dynamicCast(elem.value));
433 return foundFilterFields;
438 const std::string& datafieldName,
441 if (
impl->orignalToFiltered.size() == 0)
445 const std::string
id =
getName() +
"." + channelName +
"." + datafieldName;
447 std::unique_lock lock(
impl->filterMutex);
448 if (
impl->orignalToFiltered.count(
id) == 0)
453 std::unique_lock lock(
impl->filterQueueMutex);
454 impl->filterQueue[id] = {
value, channelName, datafieldName};
455 impl->idleCondition.notify_all();
459 Observer::updateFilters()
461 while (!
impl->filterUpdateTask->isStopped())
465 std::unique_lock lock(
impl->filterQueueMutex);
466 queue.swap(
impl->filterQueue);
468 std::unordered_map<std::string, std::set<std::string>> channels;
469 for (
const Impl::FilterUpdateQueue::value_type& elem : queue)
472 elem.second.channelName, elem.second.datafieldName, elem.second.value);
473 channels[elem.second.channelName].insert(foundFilterFields.begin(),
474 foundFilterFields.end());
476 for (
const auto& channel : channels)
478 if (channel.second.size() > 0)
483 std::unique_lock lock(
impl->filterQueueMutex);
484 if (
impl->filterQueue.size() == 0)
486 impl->idleCondition.wait(lock);
499 dataFieldIter->second.value = std::move(tval);
509 const std::string& datafieldName,
511 bool triggerFilterUpdate)
516 auto itChannel =
impl->channelRegistry.find(channelName);
518 if (itChannel ==
impl->channelRegistry.end())
523 DataFieldRegistry::iterator itDF = itChannel->second.dataFields.find(datafieldName);
525 if (itDF == itChannel->second.dataFields.end())
532 itDF->second.value =
value.clone();
538 valuePtr = itDF->second.value;
540 if (triggerFilterUpdate)
549 const std::string& datafieldName,
551 bool triggerFilterUpdate)
556 auto itChannel =
impl->channelRegistry.find(channelName);
558 if (itChannel ==
impl->channelRegistry.end())
563 DataFieldRegistry::iterator itDF = itChannel->second.dataFields.find(datafieldName);
565 if (itDF == itChannel->second.dataFields.end())
573 if (triggerFilterUpdate)
581 const std::string& channelName,
582 const std::unordered_map<std::string, Ice::Long>& datafieldValues)
586 auto itChannel =
impl->channelRegistry.find(channelName);
588 if (itChannel ==
impl->channelRegistry.end())
593 for (
const auto& elem : datafieldValues)
595 DataFieldRegistry::iterator itDF = itChannel->second.dataFields.find(elem.first);
597 if (itDF == itChannel->second.dataFields.end())
603 TimedVariantPtr oldValue = TimedVariantPtr::dynamicCast(itDF->second.value);
604 if (!oldValue || (elem.second > oldValue->getTimestamp() && oldValue->getInitialized()))
606 itDF->second.value =
new TimedVariant(VariantPtr::dynamicCast(itDF->second.value),
607 IceUtil::Time::microSeconds(elem.second));
617 auto itChannel =
impl->channelRegistry.find(channelName);
619 if (itChannel ==
impl->channelRegistry.end())
624 for (
auto& elem : itChannel->second.dataFields)
627 TimedVariantPtr oldValue = TimedVariantPtr::dynamicCast(elem.second.value);
628 if (!oldValue || (timestamp > oldValue->getTimestamp() && oldValue->getInitialized()))
630 elem.second.value =
new TimedVariant(VariantPtr::dynamicCast(elem.second.value),
631 IceUtil::Time::microSeconds(timestamp));
638 const std::string& description,
640 bool triggerFilterUpdate)
644 auto itChannel =
impl->channelRegistry.find(channelName);
645 if (itChannel ==
impl->channelRegistry.end())
648 auto& channel =
impl->channelRegistry[channelName];
649 channel.name = channelName;
650 channel.description = description;
651 channel.initialized =
false;
653 itChannel =
impl->channelRegistry.find(channelName);
657 auto targ = itChannel->second.dataFields.begin();
658 auto vals = datafieldValues.begin();
659 while (targ != itChannel->second.dataFields.end() && vals != datafieldValues.end())
661 if (vals->first > targ->first)
665 else if (vals->first == targ->first)
676 if (vals != datafieldValues.end())
681 if (triggerFilterUpdate)
683 for (
const auto& elem : datafieldValues)
692 const std::string& channelName,
693 const std::unordered_map<::std::string, ::armarx::VariantBasePtr>& datafieldValues,
694 bool triggerFilterUpdate)
699 auto itChannel =
impl->channelRegistry.find(channelName);
701 if (itChannel ==
impl->channelRegistry.end())
706 for (
const auto& elem : datafieldValues)
708 DataFieldRegistry::iterator itDF = itChannel->second.dataFields.find(elem.first);
710 if (itDF == itChannel->second.dataFields.end())
718 if (triggerFilterUpdate)
720 for (
const auto& elem : datafieldValues)
730 bool triggerFilterUpdate)
735 auto itChannel =
impl->channelRegistry.find(channelName);
737 if (itChannel ==
impl->channelRegistry.end())
742 for (
const auto& elem : datafieldValues)
744 DataFieldRegistry::iterator itDF = itChannel->second.dataFields.find(elem.first);
746 if (itDF == itChannel->second.dataFields.end())
755 if (triggerFilterUpdate)
757 for (
const auto& elem : datafieldValues)
765 Observer::updateRefreshRateChannel(
const std::string& channelName)
767 auto& oldUpdateTime =
impl->channelUpdateTimestamps[channelName];
768 auto now = IceUtil::Time::now();
774 new Variant((now - oldUpdateTime).toMilliSecondsDouble()),
782 "Update delta of channel '" + channelName +
"'");
789 const std::set<std::string>& updatedDatafields)
795 std::unique_lock lock(
impl->channelQueueMutex);
796 std::set<std::string>& dfs =
impl->channelQueue[channelName];
797 dfs.insert(updatedDatafields.begin(), updatedDatafields.end());
798 impl->idleChannelCondition.notify_all();
802 Observer::channelUpdateFunction()
805 while (!
impl->channelUpdateTask->isStopped())
807 std::map<std::string, std::set<std::string>> queue;
809 std::unique_lock lock(
impl->channelQueueMutex);
810 queue.swap(
impl->channelQueue);
812 for (
const auto& elem : queue)
814 doChannelUpdate(elem.first, elem.second);
816 std::unique_lock lock(
impl->channelQueueMutex);
817 if (
impl->channelQueue.size() == 0)
819 impl->idleChannelCondition.wait(lock);
825 Observer::addToChannelHistory(
const std::pair<IceUtil::Time, ChannelRegistryEntry>& historyEntry,
826 const std::string& channelName)
828 if (
impl->maxHistorySize == 0)
832 std::unique_lock lockHistory(
impl->historyMutex);
833 auto historyIt =
impl->channelHistory.find(channelName);
834 if (historyIt ==
impl->channelHistory.end())
836 impl->channelHistory[channelName].set_capacity(
impl->maxHistorySize);
837 historyIt =
impl->channelHistory.find(channelName);
840 if (historyIt->second.empty() ||
841 now > historyIt->second.rbegin()->first +
842 IceUtil::Time::secondsDouble(1.0 /
impl->maxHistoryRecordFrequency))
844 historyIt->second.push_back(historyEntry);
849 Observer::doChannelUpdate(
const std::string& channelName,
850 const std::set<std::string>& updatedDatafields)
856 ChannelRegistryEntry entry;
857 std::pair<IceUtil::Time, ChannelRegistryEntry> historyEntry;
861 auto iterChannel =
impl->channelRegistry.find(channelName);
863 if (iterChannel ==
impl->channelRegistry.end())
869 if (
impl->logChannelUpdateRate)
871 updateRefreshRateChannel(channelName);
875 const DataFieldRegistry& dataFields = iterChannel->second.dataFields;
876 DataFieldRegistry::const_iterator iterDataFields = dataFields.begin();
877 bool channelInitialized =
true;
879 while (iterDataFields != dataFields.end())
881 channelInitialized &= iterDataFields->second.value->getInitialized();
885 iterChannel->second.initialized = channelInitialized;
886 if (iterChannel->second.conditionChecks.size() > 0)
888 entry = iterChannel->second;
892 addToChannelHistory(historyEntry, channelName);
894 ConditionCheckRegistry::iterator iterChecks = entry.conditionChecks.begin();
896 while (iterChecks != entry.conditionChecks.end())
899 if (updatedDatafields.size() > 0)
902 for (
const auto& elem : entry.dataFields)
904 const DataFieldRegistryEntry& datafieldEntry = elem.second;
905 if (updatedDatafields.count(datafieldEntry.identifier->datafieldName))
921 evaluateCheck(ConditionCheckPtr::dynamicCast(iterChecks->second), entry);
926 ARMARX_ERROR <<
"Evaluating condition for channel " << channelName <<
" failed!";
944 Observer::onInitComponent()
946 impl->maxHistorySize = getProperty<int>(
"MaxHistorySize");
947 impl->maxHistoryRecordFrequency = getProperty<float>(
"MaxHistoryRecordFrequency");
950 std::unique_lock lock(
impl->idMutex);
955 impl->filterUpdateTask =
957 impl->filterUpdateTask->start();
958 impl->channelUpdateTask =
960 impl->channelUpdateTask->start();
964 Observer::onConnectComponent()
966 impl->logChannelUpdateRate = getProperty<bool>(
"CreateUpdateFrequenciesChannel").getValue();
971 "Metachannel with the last channel update deltas of all channels in milliseconds");
973 impl->channelHistory.clear();
980 auto proxy = ObserverInterfacePrx::checkedCast(
getProxy());
982 for (
auto& channel :
impl->channelRegistry)
984 ChannelRegistryEntry&
c = channel.second;
985 for (
auto& df :
c.dataFields)
987 DataFieldRegistryEntry& d = df.second;
988 d.identifier->channelRef->observerProxy = proxy;
992 if (
impl->logChannelUpdateRate)
994 impl->metaTask->start();
999 Observer::postOnConnectComponent()
1002 impl->stopWorker =
false;
1008 Observer::preOnDisconnectComponent()
1011 impl->stopWorker =
true;
1012 if (
impl->worker.joinable())
1014 impl->worker.join();
1019 Observer::onExitComponent()
1024 impl->metaTask->stop();
1026 if (
impl->filterUpdateTask)
1028 impl->filterUpdateTask->stop(
false);
1030 impl->idleCondition.notify_all();
1031 if (
impl->filterUpdateTask)
1033 impl->filterUpdateTask->stop(
true);
1035 if (
impl->channelUpdateTask)
1037 impl->channelUpdateTask->stop(
false);
1039 impl->idleChannelCondition.notify_all();
1040 if (
impl->channelUpdateTask)
1042 impl->channelUpdateTask->stop(
true);
1044 impl->channelHistory.clear();
1048 Observer::createPropertyDefinitions()
1065 std::unique_lock lock(
impl->idMutex);
1067 return impl->currentId++;
1071 Observer::createCheck(
const CheckConfiguration& configuration)
const
1073 std::string checkName = configuration.checkName;
1076 StringConditionCheckMap::const_iterator iterChecks =
impl->availableChecks.find(checkName);
1078 if (iterChecks ==
impl->availableChecks.end())
1080 std::string reason =
1081 "Invalid condition check \"" + checkName +
"\" for observer \"" +
getName() +
"\".";
1082 throw InvalidConditionException(reason.c_str());
1087 ->createInstance(configuration,
impl->channelRegistry);
1099 CheckIdentifier identifier;
1100 identifier.uniqueId = id;
1101 identifier.channelName = check->configuration.dataFieldIdentifier->channelName;
1102 identifier.observerName =
getName();
1105 std::pair<int, ConditionCheckPtr> entry;
1107 entry.second = check;
1108 impl->channelRegistry[check->configuration.dataFieldIdentifier->channelName]
1109 .conditionChecks.insert(entry);
1115 Observer::evaluateCheck(
const ConditionCheckPtr& check,
const ChannelRegistryEntry& channel)
const
1117 check->evaluateCondition(channel.dataFields);
1129 std::deque<Impl::WorkerUpdate> toProcess;
1130 while (!
impl->stopWorker)
1138 std::lock_guard g{
impl->workerUpdatesMutex};
1141 if (toProcess.empty())
1144 <<
"no worker jobs (message only posted every 10 seconds";
1145 std::this_thread::sleep_for(std::chrono::milliseconds{1});
1149 float minage = std::numeric_limits<float>::infinity();
1159 std::map<std::string, Counters> counters;
1162 for (
auto& f : toProcess)
1165 float age = f.ageInMs();
1166 Counters& counter = counters[f.name];
1180 ARMARX_DEBUG <<
"observer worker got " << toProcess.size() <<
"\tjobs ages min/max "
1181 << minage <<
"\t" << maxage <<
"\n"
1184 for (
const auto& [key,
c] : counters)
1186 out <<
" " << key <<
"\t# " <<
c.n <<
"\tavg age " << (
c.age /
c.n)
1187 <<
"\tavg time " << (
c.time /
c.n) <<
"\n";
1197 std::lock_guard g{
impl->workerUpdatesMutex};
1198 impl->workerUpdates.emplace_back(name, std::move(f));
1204 std::lock_guard g{
impl->workerUpdatesMutex};
1205 impl->workerUpdates.emplace_back(name, std::move(f));
1221 return ClockT::now();
1227 return std::chrono::duration_cast<std::chrono::nanoseconds>(
Now() - t0).count() * 1e-6f;
1240 const Ice::Current&)
const
1252 std::unique_lock lock_checks(
impl->checksMutex);
1254 check = createCheck(configuration);
1258 CheckIdentifier identifier;
1261 identifier = registerCheck(check);
1271 ChannelRegistryEntry channel =
1272 impl->channelRegistry[configuration.dataFieldIdentifier->channelName];
1275 evaluateCheck(check, channel);
1283 const CheckConfiguration& configuration,
1284 const Ice::Current&)
1296 auto iter =
impl->channelRegistry.find(
id.channelName);
1298 if (iter ==
impl->channelRegistry.end())
1304 ConditionCheckRegistry::iterator iterCheck = iter->second.conditionChecks.find(
id.uniqueId);
1307 if (iterCheck != iter->second.conditionChecks.end())
1309 iter->second.conditionChecks.erase(iterCheck);
1317 const CheckIdentifier&
id,
1318 const Ice::Current&)
1334 const Ice::Current&
c)
const
1346 auto it =
impl->channelRegistry.find(channelName);
1347 if (it ==
impl->channelRegistry.end())
1352 auto itDF = it->second.dataFields.find(datafieldName);
1353 if (itDF == it->second.dataFields.end())
1359 TimedVariantPtr tv = TimedVariantPtr::dynamicCast(itDF->second.value);
1362 ARMARX_IMPORTANT <<
"could not cast timed variant: " << itDF->second.value->ice_id();
1369 const std::string& channelName,
1370 const std::string& datafieldName,
1371 const Ice::Current&)
const
1386 auto it =
impl->channelRegistry.find(identifier->channelName);
1387 if (it ==
impl->channelRegistry.end())
1392 auto itDF = it->second.dataFields.find(identifier->datafieldName);
1393 if (itDF == it->second.dataFields.end())
1396 identifier->datafieldName);
1399 return DatafieldRefPtr::dynamicCast(itDF->second.identifier);
1405 const Ice::Current&)
const
1413 const std::string& datafieldName)
const
1420 const std::string& channelName,
1421 const std::string& datafieldName,
1422 const Ice::Current&)
const
1432 TimedVariantBaseList
1435 TimedVariantBaseList result;
1437 DataFieldIdentifierBaseList::const_iterator iter =
identifiers.begin();
1452 const Ice::Current&
c)
1458 StringTimedVariantBaseMap
1462 StringTimedVariantBaseMap result;
1464 auto it =
impl->channelRegistry.find(channelName);
1465 if (it ==
impl->channelRegistry.end())
1469 DataFieldRegistry fields = it->second.dataFields;
1470 for (
auto& field : fields)
1472 result[field.first] = TimedVariantBasePtr::dynamicCast(field.second.value);
1479 const std::string& channelName,
1480 const Ice::Current&)
const
1487 ChannelRegistryEntry
1492 auto it =
impl->channelRegistry.find(channelName);
1493 if (it ==
impl->channelRegistry.end())
1502 const std::string& channelName,
1503 const Ice::Current&)
const
1515 ChannelRegistry result(
impl->channelRegistry.begin(),
impl->channelRegistry.end());
1516 if (!includeMetaChannels)
1525 bool includeMetaChannels,
1526 const Ice::Current&)
1531 includeMetaChannels);
1535 StringConditionCheckMap
1538 std::unique_lock lock(
impl->checksMutex);
1540 return impl->availableChecks;
1545 const Ice::Current&)
1556 return impl->channelRegistry.find(channelName) !=
impl->channelRegistry.end();
1561 const std::string& channelName,
1562 const Ice::Current&)
const
1573 auto itChannel =
impl->channelRegistry.find(channelName);
1575 if (itChannel ==
impl->channelRegistry.end())
1580 return itChannel->second.dataFields.find(datafieldName) != itChannel->second.dataFields.end();
1585 const std::string& channelName,
1586 const std::string& datafieldName,
1587 const Ice::Current&)
const
1596 const DatafieldRefBasePtr& datafieldRef)
1606 std::string filteredName = datafieldRef->datafieldName +
"_" + filter->ice_id();
1611 while (
existsDataField(datafieldRef->channelRef->channelName, filteredName))
1615 datafieldRef->datafieldName +
"_" + filter->ice_id() +
"_" +
ValueToString(i);
1624 const DatafieldFilterBasePtr& filter,
1625 const DatafieldRefBasePtr& datafieldRef,
1626 const Ice::Current&)
1638 const DatafieldFilterBasePtr& filter,
1639 const DatafieldRefBasePtr& datafieldRef)
1645 if (!filter->checkTypeSupport(ref->getDataField()->getType()))
1647 auto types = filter->getSupportedTypes();
1648 std::string suppTypes =
"supported types";
1650 for (
auto t : types)
1660 if (
existsDataField(datafieldRef->channelRef->channelName, filterDatafieldName))
1667 auto var = ref->getDataField();
1668 filter->update(var->getTime().toMicroSeconds(), var);
1672 filterDatafieldName,
1673 *VariantPtr::dynamicCast(filter->getValue()),
1674 "Filtered value of " +
1675 ref->getDataFieldIdentifier()->getIdentifierStr());
1678 ChannelRefPtr channel = ChannelRefPtr::dynamicCast(ref->channelRef);
1679 channel->refetchChannel();
1682 data->filter = filter;
1683 data->original = ref;
1684 data->filtered = filteredRef;
1685 std::unique_lock lock(
impl->filterMutex);
1686 impl->orignalToFiltered.insert(
1687 std::make_pair(ref->getDataFieldIdentifier()->getIdentifierStr(),
data));
1688 impl->filteredToOriginal[filteredRef->getDataFieldIdentifier()->getIdentifierStr()] =
data;
1694 const AMD_ObserverInterface_createNamedFilteredDatafieldPtr& amd,
1695 const std::string& filterDatafieldName,
1696 const DatafieldFilterBasePtr& filter,
1697 const DatafieldRefBasePtr& datafieldRef,
1698 const Ice::Current&)
1703 filterDatafieldName,
1716 std::unique_lock lock(
impl->filterMutex);
1717 ref = DatafieldRefPtr::dynamicCast(datafieldRef);
1718 const std::string idStr = ref->getDataFieldIdentifier()->getIdentifierStr();
1719 auto it =
impl->filteredToOriginal.find(idStr);
1720 remove = (it !=
impl->filteredToOriginal.end());
1731 const DatafieldRefBasePtr& datafieldRef,
1732 const Ice::Current&)
1742 const Ice::Current&
c)
const
1751 const std::string& channelName,
1753 const Ice::Current&
c)
const
1769 const Ice::Current&
c)
const
1773 std::unique_lock lock_channels(
impl->historyMutex);
1775 ChannelHistory result;
1776 auto historyIt =
impl->channelHistory.find(channelName);
1777 if (historyIt ==
impl->channelHistory.end())
1783 auto lastInsertIt = result.begin();
1785 float timestepUs = timestepMs * 1000;
1786 for (
auto& entry : historyIt->second)
1788 long timestamp = entry.first.toMicroSeconds();
1789 if (timestamp > endTimestamp)
1794 if (timestamp > startTimestamp && timestamp - lastUsedTimestep > timestepUs)
1796 lastInsertIt = result.emplace_hint(result.end(),
1797 std::make_pair(timestamp, entry.second.dataFields));
1798 lastUsedTimestep = timestamp;
1806 const AMD_ObserverInterface_getPartialChannelHistoryPtr& amd,
1807 const std::string& channelName,
1811 const Ice::Current&
c)
const
1824 TimedVariantBaseList
1826 const std::string& datafieldName,
1828 const Ice::Current&
c)
const
1837 const std::string& channelName,
1838 const std::string& datafieldName,
1840 const Ice::Current&
c)
const
1852 TimedVariantBaseList
1854 const std::string& datafieldName,
1858 const Ice::Current&
c)
const
1861 std::unique_lock lock_channels(
impl->historyMutex);
1862 TimedVariantBaseList result;
1863 auto historyIt =
impl->channelHistory.find(channelName);
1864 if (historyIt ==
impl->channelHistory.end())
1874 float timestepUs = timestepMs * 1000;
1875 for (
auto& entry : historyIt->second)
1877 long timestamp = entry.first.toMicroSeconds();
1878 if (timestamp > endTimestamp)
1883 if (timestamp > startTimestamp && timestamp - lastUsedTimestep > timestepUs)
1885 auto datafieldIt = entry.second.dataFields.find(datafieldName);
1886 if (datafieldIt == entry.second.dataFields.end())
1892 TimedVariantPtr tvar = TimedVariantPtr::dynamicCast(datafieldIt->second.value);
1895 result.emplace_back(tvar);
1896 lastUsedTimestep = timestamp;
1906 const AMD_ObserverInterface_getPartialDatafieldHistoryPtr& amd,
1907 const std::string& channelName,
1908 const std::string& datafieldName,
1912 const Ice::Current&
c)
const
1927 name{
name}, time{Now()}, f{std::move(f)}
1934 return TimeDeltaInMs(time);