38 #include <Ice/BuiltinSequences.h>
39 #include <Ice/Communicator.h>
40 #include <Ice/Identity.h>
41 #include <Ice/Initialize.h>
42 #include <Ice/LocalException.h>
43 #include <Ice/NativePropertiesAdmin.h>
44 #include <Ice/ObjectAdapter.h>
45 #include <Ice/ObjectF.h>
46 #include <Ice/Properties.h>
47 #include <Ice/Metrics.h>
48 #include <Ice/PropertiesF.h>
49 #include <Ice/Proxy.h>
50 #include <Ice/ProxyF.h>
51 #include <IceGrid/Admin.h>
52 #include <IceGrid/Registry.h>
53 #include <IceUtil/Handle.h>
54 #include <IceStorm/IceStorm.h>
76 #include "ArmarXCore/interface/core/BasicTypes.h"
77 #include "ArmarXCore/interface/core/Log.h"
78 #include "ArmarXCore/interface/core/ManagedIceObjectDefinitions.h"
79 #include "ArmarXCore/interface/core/ThreadingIceBase.h"
92 #define MANAGEROBJNAME applicationName + "Manager"
93 #define OBJOBSNAME std::string("ArmarXObjectObserver_") + applicationName
99 applicationName(applicationName),
100 managerState(eCreated)
103 std::stringstream defaultLocator;
104 defaultLocator <<
"--Ice.Default.Locator=" << locatorName <<
":tcp -p " << port <<
" -h " << host;
105 args.push_back(defaultLocator.str());
109 init(applicationName, communicator);
113 applicationName(applicationName),
114 managerState(eCreated)
117 init(applicationName, communicator);
134 const std::string armarxHint =
"'\nDid you start armarx?\n\nTo start armarx: armarx start\n"
135 "To kill a hanging armarx: armarx killIce" ;
138 if (communicator->getProperties()->getProperty(
"Ice.Default.Locator").empty())
142 std::cerr <<
"Required Ice property 'Ice.Default.Locator' not set! "
143 <<
"It has to has a value similar to 'IceGrid/Locator:tcp -p 4061 -h localhost'";
147 auto locatorProp = communicator->getProperties()->getProperty(
"Ice.Default.Locator");
148 auto pos = locatorProp.find_first_of(
':');
149 std::string locatorId = locatorProp.substr(0, pos) ;
150 auto proxy = communicator->stringToProxy(locatorId);
151 IceGrid::LocatorPrx::checkedCast(proxy);
157 std::cerr <<
"Could not contact default locator at '"
158 << communicator->getProperties()->getProperty(
"Ice.Default.Locator")
166 std::string registryId =
"IceGrid/Registry";
167 auto proxy = communicator->stringToProxy(registryId);
168 IceGrid::RegistryPrx::checkedCast(proxy);
174 std::cerr <<
"Could not contact IceGrid registry at '"
175 << communicator->getProperties()->getProperty(
"IceGrid.Registry.Client.Endpoints")
185 catch (
const Ice::NoEndpointException& e)
187 std::cout <<
"Caught exception: \n" << e.what() << std::endl;
191 std::cerr <<
"Could not contact TopicManager at '"
193 <<
"IceStorm/TopicManager"
240 addObject(
object, addWithOwnAdapter, objectName, useOwnScheduleThread);
247 throw LocalException(
"Cannot add NULL object");
250 auto cptr = ComponentPtr::dynamicCast(
object);
251 if (cptr && !cptr->createdByComponentCreate)
253 throw LocalException(
"Components need to be created by Component::create");
257 if (!objectName.empty())
259 if (!object->getName().empty())
261 ARMARX_INFO <<
"Adding object with custom name: " << objectName;
263 object->setName(objectName);
265 if (object->getName().empty())
267 object->setName(object->getDefaultName());
269 if (object->getName().empty())
271 throw LocalException(
"Object name must not be empty");
273 auto lock = acquireManagedObjectsMutex();
279 bool reachable =
false;
283 reachable =
getIceManager()->isObjectReachable(object->getName());
292 throw Ice::AlreadyRegisteredException(__FILE__, __LINE__, object->ice_id(), object->getName());
298 auto pair = managedObjects.insert(std::make_pair(object->getName(), std::move(objectScheduler)));
301 ARMARX_WARNING <<
"Insert of object scheduler in managedObjects failed since there is already a scheduler for name " <<
object->getName();
303 if (!useOwnScheduleThread)
305 std::scoped_lock lock(schedulerListMutex);
306 singleThreadedSchedulers.at(rand() % singleThreadedSchedulers.size())->addObjectScheduler(pair.first->second);
324 manager->addObject(
object, objectName, addWithOwnAdapter, useOwnScheduleThread);
339 auto lock = acquireManagedObjectsMutex();
344 scheduler = managedObjects.find(objectName)->second;
350 removeObject(scheduler,
true);
362 auto removal = [objectName,
this]()
366 auto lock = acquireManagedObjectsMutex();
371 scheduler = managedObjects.find(objectName)->second;
375 this->removeObject(scheduler,
false);
384 std::thread {removal} .detach();
403 managerStateMutex.lock();
405 if (managerState == eShutdown)
407 managerStateMutex.unlock();
412 std::unique_lock lock(shutdownMutex);
413 managerStateMutex.unlock();
414 shutdownCondition.wait(lock);
424 std::scoped_lock lock(managerStateMutex);
427 if (managerState >= eShutdownInProgress)
432 managerState = eShutdownInProgress;
435 managedObjectsMutex.lock();
441 cleanupSchedulersTask->stop();
443 if (checkDependencyStatusTask)
445 checkDependencyStatusTask->stop();
448 sharedRemoteHandleState.reset();
449 remoteReferenceCountControlBlockManager.reset();
451 disconnectAllObjects();
457 removeAllObjects(
true);
460 iceManager->getIceGridSession()->getAdmin()->removeObject(
Ice::Identity {MANAGEROBJNAME,
""});
464 singleThreadedSchedulers.clear();
469 iceManager->shutdown();
472 iceManager->waitForShutdown();
478 iceManager->destroy();
481 objObserver =
nullptr;
485 std::scoped_lock lock(managerStateMutex);
489 std::unique_lock lock(shutdownMutex);
491 shutdownCondition.notify_all();
494 managerState = eShutdown;
497 managedObjectsMutex.unlock();
499 catch (std::exception& e)
501 ARMARX_INFO <<
"shutdown failed with exception!\n" << e.what() << std::endl;
504 ARMARX_INFO <<
"Shutdown of ArmarXManager finished!" << std::endl;
509 std::thread {[
this, timeoutMs]{
510 std::this_thread::sleep_for(std::chrono::milliseconds{timeoutMs});
518 std::scoped_lock lock(managerStateMutex);
519 return (managerState == eShutdown);
534 return iceManager->getCommunicator();
539 std::vector<ManagedIceObjectPtr> objects;
541 auto lock = acquireManagedObjectsMutex();
547 ObjectSchedulerMap::iterator iter = managedObjects.begin();
549 while (iter != managedObjects.end())
551 objects.push_back(iter->second->getObject());
567 char hostname[HOST_NAME_MAX];
568 gethostname(hostname, HOST_NAME_MAX);
569 return std::string(hostname);
579 mice::MiceObjectConnectivity miceCon = mice::MiceObjectConnectivity();
581 miceCon.subscribedTopics = con.usedTopics;
582 miceCon.publishedTopics = con.offeredTopics;
583 miceCon.usedObjects = Ice::StringSeq();
585 for(
const auto &entry : con.dependencies) {
586 miceCon.usedObjects.push_back(entry.first);
595 Ice::ObjectPrx adminObj =
getIceManager()->getCommunicator()->getAdmin();
596 IceMX::MetricsAdminPrx metrAdmin = IceMX::MetricsAdminPrx::checkedCast(adminObj,
"Metrics");
607 auto lock = acquireManagedObjectsMutex();
610 return eManagedIceObjectExiting;
613 ObjectSchedulerMap::iterator iter = managedObjects.find(objectName);
615 if (iter == managedObjects.end())
617 return eManagedIceObjectExited;
620 ManagedIceObjectState state = (ManagedIceObjectState)iter->second->getObject()->getState();
627 auto lock = acquireManagedObjectsMutex();
630 return ManagedIceObjectConnectivity();
633 ObjectSchedulerMap::iterator iter = managedObjects.find(objectName);
635 if (iter == managedObjects.end())
637 return ManagedIceObjectConnectivity();
640 ManagedIceObjectConnectivity con = iter->second->getObject()->getConnectivity();
649 StringStringDictionary propertyMap;
650 ObjectSchedulerMap::iterator iter = managedObjects.find(objectName);
652 if (iter == managedObjects.end())
664 auto result =
component->getPropertyDefinitions()->getPropertyValues(
component->getPropertyDefinitions()->getPrefix());
670 ObjectPropertyInfos propertyMap;
671 ObjectSchedulerMap::iterator iter = managedObjects.find(objectName);
673 if (iter == managedObjects.end())
686 for (
auto prop :
component->getPropertyDefinitions()->getPropertyValues())
688 propertyMap[
component->getPropertyDefinitions()->getPrefix() + prop.first] = {
component->getPropertyDefinitions()->getDefinitionBase(prop.first)->isConstant(),
697 ObjectPropertyInfos propertyMap;
714 Ice::ObjectPrx adminObj =
getIceManager()->getCommunicator()->getAdmin();
715 Ice::PropertiesAdminPrx propAdmin = Ice::PropertiesAdminPrx::checkedCast(adminObj,
"Properties");
739 component->updateIceProperties(properties);
747 Ice::StringSeq objectNames;
749 auto lock = acquireManagedObjectsMutex();
755 ObjectSchedulerMap::iterator iter = managedObjects.begin();
757 while (iter != managedObjects.end())
759 objectNames.push_back(iter->first);
768 public Ice::PropertiesAdminUpdateCallback
772 application(application)
776 void updated(
const Ice::PropertyDict& changes)
override
781 application->updateIceProperties(changes);
801 communicator, applicationName,
802 appInstance ? appInstance->getProperty<std::string>(
"TopicSuffix").getValue() :
"");
807 throw Ice::ConnectFailedException(__FILE__, __LINE__);
810 this->installProcessFacet();
814 iceManager->getTopic<
LogPrx>(
"Log"));
822 auto icegrid = iceManager->getIceGridSession();
826 Ice::ObjectPrx oPrx = icegrid->registerObjectWithNewAdapter(objObserver, OBJOBSNAME, observerAdapter);
827 IceGrid::ObjectObserverPrx objObsPrx = IceGrid::ObjectObserverPrx::checkedCast(oPrx);
828 icegrid->setObjectObserver(objObsPrx);
831 icegrid->registerObjectWithNewAdapter(
this, MANAGEROBJNAME, armarxManagerAdapter);
836 cleanupSchedulersTask =
new PeriodicTask<ArmarXManager>(
this, &ArmarXManager::cleanupSchedulers, 500,
false,
"ArmarXManager::cleanupSchedulers");
837 cleanupSchedulersTask->start();
839 checkDependencyStatusTask =
new PeriodicTask<ArmarXManager>(
this, &ArmarXManager::checkDependencies, 1000,
false,
"ArmarXManager::DependenciesChecker");
840 checkDependencyStatusTask->start();
845 std::scoped_lock lock(managerStateMutex);
846 managerState = eRunning;
859 if (appInstance && appInstance->getProperty<
bool>(
"UseTimeServer").getValue())
871 sharedRemoteHandleState.reset(
875 appInstance->getProperty<
unsigned int>(
"RemoteHandlesDeletionTimeout").getValue() :
882 void ArmarXManager::cleanupSchedulers()
884 std::scoped_lock lock(terminatingObjectsMutex);
886 ObjectSchedulerList::iterator iter = terminatingObjects.begin();
888 while (iter != terminatingObjects.end())
892 "Checking termination state of " << sched->getObject()->getName() <<
895 if ((*iter)->isTerminated())
897 ARMARX_VERBOSE <<
"Delayed Removal of ManagedIceObject " << (*iter)->getObject()->getName() <<
" finished";
898 iter = terminatingObjects.erase(iter);
907 void ArmarXManager::disconnectDependees(
const std::string&
object)
911 std::vector<std::string> dependees = getDependendees(
object);
913 auto lock = acquireManagedObjectsMutex();
919 for (
const auto& dependee : dependees)
921 ArmarXManager::ObjectSchedulerMap::iterator it = managedObjects.find(dependee);
922 ARMARX_INFO <<
deactivateSpam(10, dependee +
object) <<
"'" << dependee <<
"' disconnected because of '" <<
object <<
"'";
924 if (it != managedObjects.end())
926 it->second->disconnected(
true);
937 void ArmarXManager::disconnectAllObjects()
939 auto lock = acquireManagedObjectsMutex();
944 ObjectSchedulerMap::iterator iter = managedObjects.begin();
946 for (; iter != managedObjects.end(); iter++)
948 iter->second->disconnected(
false);
953 std::vector<std::string> ArmarXManager::getDependendees(
const std::string& removedObject)
955 std::vector<std::string> result;
957 auto lock = acquireManagedObjectsMutex();
966 ObjectSchedulerMap::const_iterator it = managedObjects.begin();
968 for (; it != managedObjects.end(); it++)
972 if (scheduler->dependsOn(removedObject))
974 result.push_back(it->first);
986 void ArmarXManager::wakeupWaitingSchedulers()
988 auto lock = acquireManagedObjectsMutex();
997 ObjectSchedulerMap::const_iterator it = managedObjects.begin();
999 for (; it != managedObjects.end(); it++)
1002 scheduler->wakeupDependencyCheck();
1012 void ArmarXManager::removeAllObjects(
bool blocking)
1014 ObjectSchedulerMap tempMap;
1016 std::scoped_lock lock(managedObjectsMutex);
1017 tempMap = managedObjects;
1020 for (
auto& it : tempMap)
1022 removeObject(it.second,
false);
1027 for (
auto objectScheduler : terminatingObjects)
1029 objectScheduler->waitForTermination();
1033 terminatingObjects.clear();
1041 if (!objectScheduler)
1046 const std::string objName = objectScheduler->getObject()->getName();
1050 iceManager->removeObject(objName);
1052 std::scoped_lock lock2(managedObjectsMutex);
1053 managedObjects.erase(objectScheduler->getObject()->getName());
1056 objectScheduler->terminate();
1060 objectScheduler->waitForTermination();
1061 ARMARX_VERBOSE <<
"Blocking removal of ManagedIceObject " << objName <<
" finished";
1065 ARMARX_VERBOSE <<
"Inserting ManagedIceObject into delayed removal list: " << objName;
1066 std::scoped_lock lockTerm(terminatingObjectsMutex);
1067 terminatingObjects.push_back(objectScheduler);
1112 std::scoped_lock lock(managedObjectsMutex);
1113 auto it = managedObjects.find(objectName);
1114 if (it != managedObjects.end())
1124 void ArmarXManager::checkDependencies()
1128 auto lock = acquireManagedObjectsMutex();
1136 ObjectSchedulerMap::const_iterator it = managedObjects.begin();
1138 for (; it != managedObjects.end(); it++)
1142 if (scheduler->getObjectState() == eManagedIceObjectStarted && !scheduler->checkDependenciesStatus())
1144 scheduler->disconnected(
true);
1158 std::scoped_lock lock(managerStateMutex);
1160 if (managerState >= eShutdownInProgress)
1171 void ArmarXManager::installProcessFacet()
1177 if (applicationCommunicator->findAdminFacet(
"Process"))
1179 applicationCommunicator->removeAdminFacet(
"Process");
1182 Ice::ProcessPtr applicationProcessFacet =
new ApplicationProcessFacet(*
this);
1183 applicationCommunicator->addAdminFacet(applicationProcessFacet,
"Process");
1187 Ice::ObjectPtr obj = applicationCommunicator->findAdminFacet(
"Properties");
1188 Ice::NativePropertiesAdminPtr admin = Ice::NativePropertiesAdminPtr::dynamicCast(obj);
1191 admin->addUpdateCallback(propertyChangeCallback);
1195 ARMARX_WARNING <<
"Could not get properties admin - online property changing will not work";
1199 applicationCommunicator->getAdmin();
1211 for (
const auto& id2factory : preregistration->getFactories())
1213 if (!
ic->getValueFactoryManager()->find(id2factory.first))
1216 ic->getValueFactoryManager()->add(id2factory.second, id2factory.first);
1230 return armarxManagerAdapter;
1235 return sharedRemoteHandleState;
1240 std::scoped_lock lock(schedulerListMutex);
1241 for (
int i = 0; i < increaseBy; ++i)
1250 static std::map<std::string, DynamicLibraryPtr> loadedLibs;
1251 static std::mutex libsMutex;
1252 std::lock_guard<std::mutex> guard {libsMutex};
1253 if (loadedLibs.count(path))
1268 if (lib->isLibraryLoaded())
1271 loadedLibs[path] = lib;
1275 ARMARX_ERROR <<
"Could not load lib " + path +
": " + lib->getErrorMessage();
1324 std::scoped_lock lock(managedObjectsMutex);
1325 StringStringDictionary propertyMap;
1326 ObjectSchedulerMap::iterator iter = managedObjects.find(objectName);
1328 if (iter == managedObjects.end())
1333 return iter->second->getObject()->getMetaInfoMap();