39 #include <boost/circular_buffer.hpp>
41 #include <condition_variable>
51 bool logChannelUpdateRate =
false;
60 using ChannelRegistryHistory = std::unordered_map<std::string, boost::circular_buffer<std::pair<IceUtil::Time, ChannelRegistryEntry> > >;
102 WorkerUpdate(
const std::string& name, std::function<
void(
void)> f);
103 float ageInMs()
const;
107 std::function<void(
void)>
f;
128 if (
getState() < eManagedIceObjectInitialized)
130 throw LocalException() <<
"offerChannel() must not be called before the Observer is initalized (i.e. not in onInitObserver(), use onConnectObserver()";
135 if (
impl->channelRegistry.find(channelName) !=
impl->channelRegistry.end())
140 ChannelRegistryEntry channel;
141 channel.name = channelName;
142 channel.description = description;
143 channel.initialized =
false;
145 std::pair<std::string, ChannelRegistryEntry> entry;
146 entry.first = channelName;
147 entry.second = channel;
149 impl->channelRegistry.insert(entry);
154 if (
getState() < eManagedIceObjectInitialized)
156 throw LocalException() <<
"offerDataFieldWithDefault() must not be called before the Observer is initalized (i.e. not in onInitObserver(), use onConnectObserver()";
161 auto channelIt =
impl->channelRegistry.find(channelName);
163 if (channelIt ==
impl->channelRegistry.end())
168 if (channelIt->second.dataFields.find(datafieldName) != channelIt->second.dataFields.end())
173 DataFieldRegistryEntry dataField;
174 dataField.identifier =
new DatafieldRef(
this, channelName, datafieldName,
false);
175 dataField.description = description;
178 *dataField.value = defaultValue;
180 std::pair<std::string, DataFieldRegistryEntry> entry;
181 entry.first = datafieldName;
182 entry.second = dataField;
184 channelIt->second.dataFields.insert(entry);
189 if (
getState() < eManagedIceObjectInitialized)
191 throw LocalException() <<
"offerDataField() must not be called before the Observer is initalized (i.e. not in onInitObserver(), use onConnectObserver()";
196 auto channelIt =
impl->channelRegistry.find(channelName);
198 if (channelIt ==
impl->channelRegistry.end())
203 if (channelIt->second.dataFields.find(datafieldName) != channelIt->second.dataFields.end())
208 DataFieldRegistryEntry dataField;
209 dataField.identifier =
new DatafieldRef(
this, channelName, datafieldName,
false);
210 dataField.description = description;
213 dataField.value->setType(type);
215 std::pair<std::string, DataFieldRegistryEntry> entry;
216 entry.first = datafieldName;
217 entry.second = dataField;
219 impl->channelRegistry[channelName].dataFields.insert(entry);
248 <<
"failure when seting datafield "
249 <<
"using slow fallback";
250 for (
const auto&
value : valueMap)
252 const std::string& datafieldName =
value.first;
275 std::unique_lock lock(
impl->checksMutex);
277 if (
impl->availableChecks.find(checkName) !=
impl->availableChecks.end())
282 std::pair<std::string, ConditionCheckPtr> entry;
283 entry.first = checkName;
284 entry.second = conditionCheck;
286 impl->availableChecks.insert(entry);
294 auto iter =
impl->channelRegistry.find(channelName);
296 if (iter ==
impl->channelRegistry.end())
303 impl->channelRegistry.erase(iter);
308 std::unique_lock lock(
impl->historyMutex);
309 impl->channelHistory.erase(channelName);
317 std::unique_lock lock(
impl->filterMutex);
320 std::string idStr = idptr->getIdentifierStr();
321 auto itFilter =
impl->filteredToOriginal.find(idStr);
323 if (itFilter !=
impl->filteredToOriginal.end())
325 DatafieldRefPtr refPtr = DatafieldRefPtr::dynamicCast(itFilter->second->original);
326 auto range =
impl->orignalToFiltered.equal_range(refPtr->getDataFieldIdentifier()->getIdentifierStr());
328 for (
auto it = range.first; it != range.second; it++)
329 if (it->second->filtered->getDataFieldIdentifier()->getIdentifierStr()
332 impl->orignalToFiltered.erase(it);
336 impl->filteredToOriginal.erase(itFilter);
342 auto itChannel =
impl->channelRegistry.find(id->channelName);
344 if (itChannel ==
impl->channelRegistry.end())
349 itChannel->second.dataFields.erase(id->datafieldName);
359 std::vector<Impl::FilterQueueData> filterData;
360 std::set<std::string> foundFilterFields;
362 std::unique_lock lock(
impl->filterMutex);
363 const std::string
id =
getName() +
"." + channelName +
"." + datafieldName;
365 auto range =
impl->orignalToFiltered.equal_range(
id);
371 long tLong = t.toMicroSeconds();
373 for (
auto it = range.first; it != range.second; it++)
375 it->second->filter->update(tLong,
value);
377 foundFilterFields.insert(it->second->filtered->datafieldName);
380 filterData.emplace_back(
Impl::FilterQueueData {tv, it->second->filtered->channelRef->channelName, it->second->filtered->datafieldName});
385 for (
auto& elem : filterData)
387 setDataFieldFlatCopy(elem.channelName, elem.datafieldName, VariantPtr::dynamicCast(elem.value));
397 return foundFilterFields;
402 if (
impl->orignalToFiltered.size() == 0)
406 const std::string
id =
getName() +
"." + channelName +
"." + datafieldName;
408 std::unique_lock lock(
impl->filterMutex);
409 if (
impl->orignalToFiltered.count(
id) == 0)
414 std::unique_lock lock(
impl->filterQueueMutex);
415 impl->filterQueue[id] = {
value, channelName, datafieldName};
416 impl->idleCondition.notify_all();
419 void Observer::updateFilters()
421 while (!
impl->filterUpdateTask->isStopped())
425 std::unique_lock lock(
impl->filterQueueMutex);
426 queue.swap(
impl->filterQueue);
428 std::unordered_map<std::string, std::set<std::string> > channels;
429 for (
const Impl::FilterUpdateQueue::value_type& elem : queue)
432 elem.second.datafieldName,
434 channels[elem.second.channelName].insert(foundFilterFields.begin(), foundFilterFields.end());
436 for (
const auto& channel : channels)
438 if (channel.second.size() > 0)
443 std::unique_lock lock(
impl->filterQueueMutex);
444 if (
impl->filterQueue.size() == 0)
446 impl->idleCondition.wait(lock);
457 dataFieldIter->second.value = std::move(tval);
470 auto itChannel =
impl->channelRegistry.find(channelName);
472 if (itChannel ==
impl->channelRegistry.end())
477 DataFieldRegistry::iterator itDF = itChannel->second.dataFields.find(datafieldName);
479 if (itDF == itChannel->second.dataFields.end())
486 itDF->second.value =
value.clone();
492 valuePtr = itDF->second.value;
494 if (triggerFilterUpdate)
506 auto itChannel =
impl->channelRegistry.find(channelName);
508 if (itChannel ==
impl->channelRegistry.end())
513 DataFieldRegistry::iterator itDF = itChannel->second.dataFields.find(datafieldName);
515 if (itDF == itChannel->second.dataFields.end())
523 if (triggerFilterUpdate)
533 auto itChannel =
impl->channelRegistry.find(channelName);
535 if (itChannel ==
impl->channelRegistry.end())
540 for (
const auto& elem : datafieldValues)
542 DataFieldRegistry::iterator itDF = itChannel->second.dataFields.find(elem.first);
544 if (itDF == itChannel->second.dataFields.end())
550 TimedVariantPtr oldValue = TimedVariantPtr::dynamicCast(itDF->second.value);
551 if (!oldValue || (elem.second > oldValue->getTimestamp() && oldValue->getInitialized()))
553 itDF->second.value =
new TimedVariant(VariantPtr::dynamicCast(itDF->second.value), IceUtil::Time::microSeconds(elem.second));
562 auto itChannel =
impl->channelRegistry.find(channelName);
564 if (itChannel ==
impl->channelRegistry.end())
569 for (
auto& elem : itChannel->second.dataFields)
572 TimedVariantPtr oldValue = TimedVariantPtr::dynamicCast(elem.second.value);
573 if (!oldValue || (timestamp > oldValue->getTimestamp() && oldValue->getInitialized()))
575 elem.second.value =
new TimedVariant(VariantPtr::dynamicCast(elem.second.value), IceUtil::Time::microSeconds(timestamp));
584 auto itChannel =
impl->channelRegistry.find(channelName);
585 if (itChannel ==
impl->channelRegistry.end())
588 auto& channel =
impl->channelRegistry[channelName];
589 channel.name = channelName;
590 channel.description = description;
591 channel.initialized =
false;
593 itChannel =
impl->channelRegistry.find(channelName);
597 auto targ = itChannel->second.dataFields.begin();
598 auto vals = datafieldValues.begin();
599 while (targ != itChannel->second.dataFields.end() &&
600 vals != datafieldValues.end())
602 if (vals->first > targ->first)
606 else if (vals->first == targ->first)
617 if (vals != datafieldValues.end())
622 if (triggerFilterUpdate)
624 for (
const auto& elem : datafieldValues)
632 void Observer::setDataFieldsFlatCopy(
const std::string& channelName,
const std::unordered_map< ::std::string, ::armarx::VariantBasePtr>& datafieldValues,
bool triggerFilterUpdate)
637 auto itChannel =
impl->channelRegistry.find(channelName);
639 if (itChannel ==
impl->channelRegistry.end())
644 for (
const auto& elem : datafieldValues)
646 DataFieldRegistry::iterator itDF = itChannel->second.dataFields.find(elem.first);
648 if (itDF == itChannel->second.dataFields.end())
656 if (triggerFilterUpdate)
658 for (
const auto& elem : datafieldValues)
670 auto itChannel =
impl->channelRegistry.find(channelName);
672 if (itChannel ==
impl->channelRegistry.end())
677 for (
const auto& elem : datafieldValues)
679 DataFieldRegistry::iterator itDF = itChannel->second.dataFields.find(elem.first);
681 if (itDF == itChannel->second.dataFields.end())
690 if (triggerFilterUpdate)
692 for (
const auto& elem : datafieldValues)
699 void Observer::updateRefreshRateChannel(
const std::string& channelName)
701 auto& oldUpdateTime =
impl->channelUpdateTimestamps[channelName];
702 auto now = IceUtil::Time::now();
721 std::unique_lock lock(
impl->channelQueueMutex);
722 std::set<std::string>& dfs =
impl->channelQueue[channelName];
723 dfs.insert(updatedDatafields.begin(), updatedDatafields.end());
724 impl->idleChannelCondition.notify_all();
727 void Observer::channelUpdateFunction()
730 while (!
impl->channelUpdateTask->isStopped())
732 std::map<std::string, std::set<std::string> > queue;
734 std::unique_lock lock(
impl->channelQueueMutex);
735 queue.swap(
impl->channelQueue);
737 for (
const auto& elem : queue)
739 doChannelUpdate(elem.first, elem.second);
741 std::unique_lock lock(
impl->channelQueueMutex);
742 if (
impl->channelQueue.size() == 0)
744 impl->idleChannelCondition.wait(lock);
748 void Observer::addToChannelHistory(
const std::pair<IceUtil::Time, ChannelRegistryEntry>& historyEntry,
const std::string& channelName)
750 if (
impl->maxHistorySize == 0)
754 std::unique_lock lockHistory(
impl->historyMutex);
755 auto historyIt =
impl->channelHistory.find(channelName);
756 if (historyIt ==
impl->channelHistory.end())
758 impl->channelHistory[channelName].set_capacity(
impl->maxHistorySize);
759 historyIt =
impl->channelHistory.find(channelName);
762 if (historyIt->second.empty() || now > historyIt->second.rbegin()->first + IceUtil::Time::secondsDouble(1.0 /
impl->maxHistoryRecordFrequency))
764 historyIt->second.push_back(historyEntry);
768 void Observer::doChannelUpdate(
const std::string& channelName,
const std::set<std::string>& updatedDatafields)
774 ChannelRegistryEntry entry;
775 std::pair<IceUtil::Time, ChannelRegistryEntry> historyEntry;
779 auto iterChannel =
impl->channelRegistry.find(channelName);
781 if (iterChannel ==
impl->channelRegistry.end())
788 if (
impl->logChannelUpdateRate)
790 updateRefreshRateChannel(channelName);
794 const DataFieldRegistry& dataFields = iterChannel->second.dataFields;
795 DataFieldRegistry::const_iterator iterDataFields = dataFields.begin();
796 bool channelInitialized =
true;
798 while (iterDataFields != dataFields.end())
800 channelInitialized &= iterDataFields->second.value->getInitialized();
804 iterChannel->second.initialized = channelInitialized;
805 if (iterChannel->second.conditionChecks.size() > 0)
807 entry = iterChannel->second;
812 addToChannelHistory(historyEntry, channelName);
814 ConditionCheckRegistry::iterator iterChecks = entry.conditionChecks.begin();
816 while (iterChecks != entry.conditionChecks.end())
819 if (updatedDatafields.size() > 0)
822 for (
const auto& elem : entry.dataFields)
824 const DataFieldRegistryEntry& datafieldEntry = elem.second;
825 if (updatedDatafields.count(datafieldEntry.identifier->datafieldName))
841 evaluateCheck(ConditionCheckPtr::dynamicCast(iterChecks->second), entry);
846 ARMARX_ERROR <<
"Evaluating condition for channel " << channelName <<
" failed!";
863 void Observer::onInitComponent()
865 impl->maxHistorySize = getProperty<int>(
"MaxHistorySize");
866 impl->maxHistoryRecordFrequency = getProperty<float>(
"MaxHistoryRecordFrequency");
869 std::unique_lock lock(
impl->idMutex);
875 impl->filterUpdateTask->start();
877 impl->channelUpdateTask->start();
880 void Observer::onConnectComponent()
882 impl->logChannelUpdateRate = getProperty<bool>(
"CreateUpdateFrequenciesChannel").getValue();
887 impl->channelHistory.clear();
894 auto proxy = ObserverInterfacePrx::checkedCast(
getProxy());
896 for (
auto& channel :
impl->channelRegistry)
898 ChannelRegistryEntry&
c = channel.second;
899 for (
auto& df :
c.dataFields)
901 DataFieldRegistryEntry& d = df.second;
902 d.identifier->channelRef->observerProxy = proxy;
906 if (
impl->logChannelUpdateRate)
908 impl->metaTask->start();
911 void Observer::postOnConnectComponent()
914 impl->stopWorker =
false;
919 void Observer::preOnDisconnectComponent()
922 impl->stopWorker =
true;
923 if (
impl->worker.joinable())
929 void Observer::onExitComponent()
934 impl->metaTask->stop();
936 if (
impl->filterUpdateTask)
938 impl->filterUpdateTask->stop(
false);
940 impl->idleCondition.notify_all();
941 if (
impl->filterUpdateTask)
943 impl->filterUpdateTask->stop(
true);
945 if (
impl->channelUpdateTask)
947 impl->channelUpdateTask->stop(
false);
949 impl->idleChannelCondition.notify_all();
950 if (
impl->channelUpdateTask)
952 impl->channelUpdateTask->stop(
true);
954 impl->channelHistory.clear();
976 std::unique_lock lock(
impl->idMutex);
978 return impl->currentId++;
982 ConditionCheckPtr Observer::createCheck(
const CheckConfiguration& configuration)
const
984 std::string checkName = configuration.checkName;
987 StringConditionCheckMap::const_iterator iterChecks =
impl->availableChecks.find(checkName);
989 if (iterChecks ==
impl->availableChecks.end())
991 std::string reason =
"Invalid condition check \"" + checkName +
"\" for observer \"" +
getName() +
"\".";
992 throw InvalidConditionException(reason.c_str());
996 ConditionCheckPtr check = ConditionCheckPtr::dynamicCast(iterChecks->second)->createInstance(configuration,
impl->channelRegistry);
1007 CheckIdentifier identifier;
1008 identifier.uniqueId = id;
1009 identifier.channelName = check->configuration.dataFieldIdentifier->channelName;
1010 identifier.observerName =
getName();
1013 std::pair<int, ConditionCheckPtr> entry;
1015 entry.second = check;
1016 impl->channelRegistry[check->configuration.dataFieldIdentifier->channelName].conditionChecks.insert(entry);
1021 void Observer::evaluateCheck(
const ConditionCheckPtr& check,
const ChannelRegistryEntry& channel)
const
1023 check->evaluateCondition(channel.dataFields);
1034 std::deque<Impl::WorkerUpdate> toProcess;
1035 while (!
impl->stopWorker)
1043 std::lock_guard g{
impl->workerUpdatesMutex};
1046 if (toProcess.empty())
1049 std::this_thread::sleep_for(std::chrono::milliseconds{1});
1053 float minage = std::numeric_limits<float>::infinity();
1062 std::map<std::string, Counters> counters;
1065 for (
auto& f : toProcess)
1068 float age = f.ageInMs();
1069 Counters& counter = counters[f.name];
1083 ARMARX_DEBUG <<
"observer worker got " << toProcess.size()
1084 <<
"\tjobs ages min/max " << minage <<
"\t" << maxage <<
"\n"
1087 for (
const auto& [key,
c] : counters)
1091 <<
"\tavg age " << (
c.age /
c.n)
1092 <<
"\tavg time " << (
c.time /
c.n) <<
"\n";
1101 std::lock_guard g{
impl->workerUpdatesMutex};
1102 impl->workerUpdates.emplace_back(name, std::move(f));
1106 std::lock_guard g{
impl->workerUpdatesMutex};
1107 impl->workerUpdates.emplace_back(name, std::move(f));
1116 return ClockT::now();
1121 return std::chrono::duration_cast<std::chrono::nanoseconds>(
Now() - t0).count() * 1e-6f;
1130 const AMD_ObserverInterface_getObserverNamePtr& amd,
1131 const Ice::Current&)
const
1138 const CheckConfiguration& configuration)
1143 std::unique_lock lock_checks(
impl->checksMutex);
1145 check = createCheck(configuration);
1149 CheckIdentifier identifier;
1152 identifier = registerCheck(check);
1162 ChannelRegistryEntry channel =
impl->channelRegistry[configuration.dataFieldIdentifier->channelName];
1165 evaluateCheck(check, channel);
1172 const AMD_ObserverInterface_installCheckPtr& amd,
1173 const CheckConfiguration& configuration,
1174 const Ice::Current&)
1186 auto iter =
impl->channelRegistry.find(
id.channelName);
1188 if (iter ==
impl->channelRegistry.end())
1194 ConditionCheckRegistry::iterator iterCheck = iter->second.conditionChecks.find(
id.uniqueId);
1197 if (iterCheck != iter->second.conditionChecks.end())
1199 iter->second.conditionChecks.erase(iterCheck);
1205 const AMD_ObserverInterface_removeCheckPtr& amd,
1206 const CheckIdentifier&
id,
1207 const Ice::Current&)
1216 const Ice::Current&
c)
const
1222 const AMD_ObserverInterface_getDataFieldPtr& amd,
1224 const Ice::Current&
c)
const
1233 const std::string& channelName,
1234 const std::string& datafieldName)
const
1239 auto it =
impl->channelRegistry.find(channelName);
1240 if (it ==
impl->channelRegistry.end())
1245 auto itDF = it->second.dataFields.find(datafieldName);
1246 if (itDF == it->second.dataFields.end())
1252 TimedVariantPtr tv = TimedVariantPtr::dynamicCast(itDF->second.value);
1255 ARMARX_IMPORTANT <<
"could not cast timed variant: " << itDF->second.value->ice_id();
1260 const AMD_ObserverInterface_getDatafieldByNamePtr& amd,
1261 const std::string& channelName,
1262 const std::string& datafieldName,
1263 const Ice::Current&)
const
1276 auto it =
impl->channelRegistry.find(identifier->channelName);
1277 if (it ==
impl->channelRegistry.end())
1282 auto itDF = it->second.dataFields.find(identifier->datafieldName);
1283 if (itDF == it->second.dataFields.end())
1288 return DatafieldRefPtr::dynamicCast(itDF->second.identifier);
1291 const AMD_ObserverInterface_getDataFieldRefPtr& amd,
1293 const Ice::Current&)
const
1301 const std::string& channelName,
1302 const std::string& datafieldName)
const
1307 const AMD_ObserverInterface_getDatafieldRefByNamePtr& amd,
1308 const std::string& channelName,
1309 const std::string& datafieldName,
1310 const Ice::Current&)
const
1320 const Ice::Current&
c)
1322 TimedVariantBaseList result;
1324 DataFieldIdentifierBaseList::const_iterator iter =
identifiers.begin();
1336 const AMD_ObserverInterface_getDataFieldsPtr& amd,
1338 const Ice::Current&
c)
1347 const std::string& channelName)
const
1350 StringTimedVariantBaseMap result;
1352 auto it =
impl->channelRegistry.find(channelName);
1353 if (it ==
impl->channelRegistry.end())
1357 DataFieldRegistry fields = it->second.dataFields;
1358 for (
auto& field : fields)
1360 result[field.first] = TimedVariantBasePtr::dynamicCast(field.second.value);
1365 const AMD_ObserverInterface_getDatafieldsOfChannelPtr& amd,
1366 const std::string& channelName,
1367 const Ice::Current&)
const
1375 const std::string& channelName)
const
1379 auto it =
impl->channelRegistry.find(channelName);
1380 if (it ==
impl->channelRegistry.end())
1387 const AMD_ObserverInterface_getChannelPtr& amd,
1388 const std::string& channelName,
1389 const Ice::Current&)
const
1397 bool includeMetaChannels)
1402 ChannelRegistry result(
impl->channelRegistry.begin(),
impl->channelRegistry.end());
1403 if (!includeMetaChannels)
1410 const AMD_ObserverInterface_getAvailableChannelsPtr& amd,
1411 bool includeMetaChannels,
1412 const Ice::Current&)
1416 includeMetaChannels);
1421 std::unique_lock lock(
impl->checksMutex);
1423 return impl->availableChecks;
1426 const AMD_ObserverInterface_getAvailableChecksPtr& amd,
1427 const Ice::Current&)
1434 const std::string& channelName)
const
1438 return impl->channelRegistry.find(channelName) !=
impl->channelRegistry.end();
1441 const AMD_ObserverInterface_existsChannelPtr& amd,
1442 const std::string& channelName,
1443 const Ice::Current&)
const
1451 const std::string& channelName,
1452 const std::string& datafieldName)
const
1456 auto itChannel =
impl->channelRegistry.find(channelName);
1458 if (itChannel ==
impl->channelRegistry.end())
1463 return itChannel->second.dataFields.find(datafieldName) != itChannel->second.dataFields.end();
1466 const AMD_ObserverInterface_existsDataFieldPtr& amd,
1467 const std::string& channelName,
1468 const std::string& datafieldName,
1469 const Ice::Current&)
const
1478 const DatafieldFilterBasePtr& filter,
1479 const DatafieldRefBasePtr& datafieldRef)
1489 std::string filteredName = datafieldRef->datafieldName +
"_" + filter->ice_id();
1495 while (
existsDataField(datafieldRef->channelRef->channelName, filteredName))
1498 filteredName = datafieldRef->datafieldName +
"_" + filter->ice_id() +
"_" +
ValueToString(i);
1505 const AMD_ObserverInterface_createFilteredDatafieldPtr& amd,
1506 const DatafieldFilterBasePtr& filter,
1507 const DatafieldRefBasePtr& datafieldRef,
1508 const Ice::Current&)
1517 const std::string& filterDatafieldName,
1518 const DatafieldFilterBasePtr& filter,
1519 const DatafieldRefBasePtr& datafieldRef)
1525 if (!filter->checkTypeSupport(ref->getDataField()->getType()))
1527 auto types = filter->getSupportedTypes();
1528 std::string suppTypes =
"supported types";
1530 for (
auto t : types)
1540 if (
existsDataField(datafieldRef->channelRef->channelName, filterDatafieldName))
1548 auto var = ref->getDataField();
1549 filter->update(var->getTime().toMicroSeconds(), var);
1552 offerDataFieldWithDefault(datafieldRef->channelRef->channelName, filterDatafieldName, *VariantPtr::dynamicCast(filter->getValue()),
"Filtered value of " + ref->getDataFieldIdentifier()->getIdentifierStr());
1555 ChannelRefPtr channel = ChannelRefPtr::dynamicCast(ref->channelRef);
1556 channel->refetchChannel();
1559 data->filter = filter;
1560 data->original = ref;
1561 data->filtered = filteredRef;
1562 std::unique_lock lock(
impl->filterMutex);
1563 impl->orignalToFiltered.insert(std::make_pair(ref->getDataFieldIdentifier()->getIdentifierStr(),
data));
1564 impl->filteredToOriginal[filteredRef->getDataFieldIdentifier()->getIdentifierStr()] =
data;
1568 const AMD_ObserverInterface_createNamedFilteredDatafieldPtr& amd,
1569 const std::string& filterDatafieldName,
1570 const DatafieldFilterBasePtr& filter,
1571 const DatafieldRefBasePtr& datafieldRef,
1572 const Ice::Current&)
1574 addWorkerJob(
"Observer::createNamedFilteredDatafield", amd,
1576 filterDatafieldName,
1582 const DatafieldRefBasePtr& datafieldRef)
1588 std::unique_lock lock(
impl->filterMutex);
1589 ref = DatafieldRefPtr::dynamicCast(datafieldRef);
1590 const std::string idStr = ref->getDataFieldIdentifier()->getIdentifierStr();
1591 auto it =
impl->filteredToOriginal.find(idStr);
1592 remove = (it !=
impl->filteredToOriginal.end());
1601 const AMD_ObserverInterface_removeFilteredDatafieldPtr& amd,
1602 const DatafieldRefBasePtr& datafieldRef,
1603 const Ice::Current&)
1611 const std::string& channelName,
1613 const Ice::Current&
c)
const
1619 const AMD_ObserverInterface_getChannelHistoryPtr& amd,
1620 const std::string& channelName,
1622 const Ice::Current&
c)
const
1632 const std::string& channelName,
1636 const Ice::Current&
c)
const
1640 std::unique_lock lock_channels(
impl->historyMutex);
1642 ChannelHistory result;
1643 auto historyIt =
impl->channelHistory.find(channelName);
1644 if (historyIt ==
impl->channelHistory.end())
1650 auto lastInsertIt = result.begin();
1652 float timestepUs = timestepMs * 1000;
1653 for (
auto& entry : historyIt->second)
1655 long timestamp = entry.first.toMicroSeconds();
1656 if (timestamp > endTimestamp)
1661 if (timestamp > startTimestamp && timestamp - lastUsedTimestep > timestepUs)
1663 lastInsertIt = result.emplace_hint(result.end(), std::make_pair(timestamp, entry.second.dataFields));
1664 lastUsedTimestep = timestamp;
1670 const AMD_ObserverInterface_getPartialChannelHistoryPtr& amd,
1671 const std::string& channelName,
1675 const Ice::Current&
c)
const
1677 addWorkerJob(
"Observer::getPartialChannelHistory", amd,
1687 const std::string& channelName,
1688 const std::string& datafieldName,
1690 const Ice::Current&
c)
const
1696 const AMD_ObserverInterface_getDatafieldHistoryPtr& amd,
1697 const std::string& channelName,
1698 const std::string& datafieldName,
1700 const Ice::Current&
c)
const
1711 const std::string& channelName,
1712 const std::string& datafieldName,
1716 const Ice::Current&
c)
const
1719 std::unique_lock lock_channels(
impl->historyMutex);
1720 TimedVariantBaseList result;
1721 auto historyIt =
impl->channelHistory.find(channelName);
1722 if (historyIt ==
impl->channelHistory.end())
1732 float timestepUs = timestepMs * 1000;
1733 for (
auto& entry : historyIt->second)
1735 long timestamp = entry.first.toMicroSeconds();
1736 if (timestamp > endTimestamp)
1741 if (timestamp > startTimestamp && timestamp - lastUsedTimestep > timestepUs)
1743 auto datafieldIt = entry.second.dataFields.find(datafieldName);
1744 if (datafieldIt == entry.second.dataFields.end())
1750 TimedVariantPtr tvar = TimedVariantPtr::dynamicCast(datafieldIt->second.value);
1753 result.emplace_back(tvar);
1754 lastUsedTimestep = timestamp;
1762 const AMD_ObserverInterface_getPartialDatafieldHistoryPtr& amd,
1763 const std::string& channelName,
1764 const std::string& datafieldName,
1768 const Ice::Current&
c)
const
1770 addWorkerJob(
"Observer::getPartialDatafieldHistory", amd,
1781 std::function<
void(
void)> f)
1790 return TimeDeltaInMs(time);