28 #include <Ice/Current.h>
29 #include <Ice/DispatchInterceptor.h>
30 #include <Ice/Initialize.h>
31 #include <Ice/LocalException.h>
32 #include <Ice/Object.h>
33 #include <Ice/ObjectAdapter.h>
34 #include <Ice/ProxyHandle.h>
35 #include <IceGrid/Admin.h>
36 #include <IceGrid/Registry.h>
37 #include <IceStorm/IceStorm.h>
38 #include <IceUtil/Thread.h>
39 #include <IceUtil/Time.h>
40 #include <IceUtil/UUID.h>
46 #include <ArmarXCore/interface/core/UserException.h>
53 class DeploymentException;
54 class ObjectNotRegisteredException;
61 const std::string topicSuffix) :
64 impl->communicator = communicator;
66 impl->topicSuffix = topicSuffix;
67 impl->forceShutdown =
false;
88 return _servant->ice_dispatch(request);
94 request.getCurrent().operation +
95 request.getCurrent().id.name)
96 <<
"Calling interface function '" << request.getCurrent().operation
97 <<
"' of object '" << request.getCurrent().id.name <<
"' failed:\n"
108 const std::string& objectName,
111 std::unique_lock lock(impl->objectRegistryMutex);
115 throw Ice::AlreadyRegisteredException(__FILE__, __LINE__, object->ice_id(), objectName);
120 objectEntry->id = Ice::stringToIdentity(objectName);
124 objectEntry->adapter = adapterToAddTo;
125 objectEntry->ownAdapter =
false;
129 objectEntry->adapter =
130 getCommunicator()->createObjectAdapterWithEndpoints(objectName,
"tcp");
131 objectEntry->adapter->activate();
132 objectEntry->ownAdapter =
true;
137 objectEntry->adapter->add(interceptor, objectEntry->id);
138 objectEntry->proxy = objectEntry->adapter->createProxy(objectEntry->id);
143 return ObjectHandles(objectEntry->proxy, objectEntry->adapter);
151 std::unique_lock lock(impl->objectRegistryMutex);
153 auto objectIt = impl->objectRegistry.find(objectName);
155 if (objectIt != impl->objectRegistry.end())
162 for (
auto&
topic : objectEntry->usedTopics)
169 if (objectEntry->ownAdapter)
173 else if (!adapter->isDeactivated())
175 adapter->remove(objectEntry->id);
182 <<
" with id: " << objectIt->second->id.name;
183 admin->removeObject(objectIt->second->id);
185 catch (IceGrid::ObjectNotRegisteredException& notRegisteredException)
194 catch (IceGrid::DeploymentException& deploymentException)
198 <<
" object failed due to DeploymentException" <<
flush;
200 catch (Ice::ObjectAdapterDeactivatedException& e)
202 ARMARX_INFO <<
"ObjectAdapterDeactivatedException for "
203 << objectIt->second->id.name;
206 impl->objectRegistry.erase(objectIt);
214 throw armarx::UserException(
message);
219 const std::string& typeName,
220 const std::string& endpoints)
222 std::string proxyString = name;
224 if (!endpoints.empty())
226 proxyString += std::string(
":") + endpoints;
229 std::string proxyTypedId = proxyString + std::string(
":") + typeName;
230 return (impl->checkedProxies.erase(proxyTypedId) > 0);
240 for (
auto& proxyEntry : impl->checkedProxies)
242 if (proxyEntry.second == proxy)
244 impl->checkedProxies.erase(proxyEntry.first);
254 std::unique_lock lock(impl->topicManagerMutex);
256 if (!impl->topicManagerProxy)
261 return impl->topicManagerProxy;
267 Ice::ObjectPrx obj = communicator->stringToProxy(
"IceStorm/TopicManager");
269 return IceStorm::TopicManagerPrx::checkedCast(obj);
274 const std::string& topicName,
275 bool orderedPublishing)
282 if (orderedPublishing)
284 qos[
"reliability"] =
"ordered";
289 topic->subscribeAndGetPublisher(
290 qos, orderedPublishing ? subscriberProxy : subscriberProxy->ice_oneway());
292 catch (IceStorm::AlreadySubscribed& e)
297 topic->subscribeAndGetPublisher(
298 qos, orderedPublishing ? subscriberProxy : subscriberProxy->ice_oneway());
300 catch (IceStorm::AlreadySubscribed& e)
306 ARMARX_INFO <<
"Subscribed to topic " << topicName;
307 std::unique_lock lock(impl->topicSubscriptionMutex);
309 impl->subscriptions.push_back(std::make_pair(topicName, subscriberProxy));
315 const std::string& topicName,
316 bool orderedPublishing)
318 auto prx =
registerObject(subscriber, topicName +
"Listener" + IceUtil::generateUUID());
327 topic->unsubscribe(subscriberProxy);
328 ARMARX_INFO <<
"Unsubscribed from topic " << topicName;
329 std::unique_lock lock(impl->topicSubscriptionMutex);
331 std::vector<std::pair<std::string, Ice::ObjectPrx>>::iterator toDelete =
332 impl->subscriptions.end();
333 std::vector<std::pair<std::string, Ice::ObjectPrx>>::iterator it;
335 for (it = impl->subscriptions.begin(); it != impl->subscriptions.end(); ++it)
337 if (it->first == topicName && it->second == subscriberProxy)
344 if (toDelete != impl->subscriptions.end())
346 impl->subscriptions.erase(toDelete);
352 IceManager::retrieveTopic(
const std::string& name)
355 std::unique_lock lock(impl->topicRetrievalMutex);
365 catch (
const IceStorm::NoSuchTopic&)
373 catch (
const IceStorm::TopicExists&)
384 IceManager::implGetCheckedProxy(std::string
const& proxyTypedId)
386 std::unique_lock lock(impl->proxyCacheMutex);
387 return impl->checkedProxies.at(proxyTypedId);
391 IceManager::implSetCheckedProxy(std::string
const& proxyTypedId,
const Ice::ObjectPrx& proxy)
393 std::unique_lock lock(impl->proxyCacheMutex);
394 impl->checkedProxies[proxyTypedId] = proxy;
420 if (impl->iceGridAdmin)
422 impl->iceGridAdmin->stop();
431 return impl->topicSuffix;
435 IceManager::communicator_stringToProxy(
const std::string& proxyString)
441 IceManager::__getTopic(
const std::string& topicName,
bool useUDP)
445 auto prx = topic->getPublisher();
448 prx = prx->ice_datagram();
452 prx = prx->ice_oneway();
458 IceManager::cleanUp()
466 std::vector<std::pair<std::string, Ice::ObjectPrx>>::iterator it;
470 std::unique_lock lock(impl->topicSubscriptionMutex);
472 for (it = impl->subscriptions.begin(); it != impl->subscriptions.end(); ++it)
474 retrieveTopic(it->first)->unsubscribe(it->second);
477 impl->subscriptions.clear();
481 std::unique_lock lock(impl->objectRegistryMutex);
483 auto objListIt = impl->objectRegistry.begin();
485 for (; objListIt != impl->objectRegistry.end(); ++objListIt)
489 objListIt->second->adapter->deactivate();
491 admin->removeObject(objListIt->second->id);
499 if (impl->iceGridAdmin)
501 impl->iceGridAdmin->removeObservers();
508 const std::string& dependencyObjectName)
510 std::unique_lock lock(impl->objectRegistryMutex);
512 ObjectEntryPtr objectEntry = getOrCreateObjectEntry(registrantName);
514 objectEntry->dependencies.push_back(
new DependencyObjectEntry(
515 dependencyObjectName,
getCommunicator()->stringToProxy(dependencyObjectName)));
520 IceManager::resolveObjectDependencies()
522 auto objectIt = impl->objectRegistry.begin();
524 for (; objectIt != impl->objectRegistry.end(); ++objectIt)
526 std::string missingObjects;
527 ObjectEntryPtr objectEntry = objectIt->second;
529 if (!objectEntry->active && objectEntry->proxy)
531 objectEntry->dependenciesResolved =
true;
532 DependencyList::iterator depIt = objectEntry->dependencies.begin();
534 for (; depIt != objectEntry->dependencies.end(); ++depIt)
536 DependencyObjectEntryPtr dependencyEntry = *depIt;
538 if (!dependencyEntry->resolved)
542 dependencyEntry->proxy->ice_timeout(2000)->ice_ping();
544 ARMARX_INFO << objectEntry->name <<
" found " << dependencyEntry->name
547 dependencyEntry->resolved =
true;
549 objectEntry->updated =
true;
553 objectEntry->dependenciesResolved =
false;
555 missingObjects +=
"\t" + dependencyEntry->name +
"\n";
558 IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(10));
563 if (objectEntry->updated)
565 if (missingObjects.length() > 0)
567 ARMARX_INFO << objectEntry->name <<
" still waiting for:\n"
568 << missingObjects <<
flush;
571 objectEntry->updated =
false;
573 if (objectEntry->dependenciesResolved)
575 ARMARX_INFO <<
" all " << objectEntry->name <<
" dependencies resolved"
584 const std::string& topicName)
586 std::unique_lock lock(impl->objectRegistryMutex);
588 ObjectEntryPtr objectEntry = getOrCreateObjectEntry(registrantName);
590 objectEntry->usedTopics.push_back(topicName);
595 const std::string& topicName)
597 std::unique_lock lock(impl->objectRegistryMutex);
599 ObjectEntryPtr objectEntry = getOrCreateObjectEntry(registrantName);
601 objectEntry->offeredTopics.push_back(topicName);
605 IceManager::subscribeTopics(Ice::ObjectPrx subscriber,
606 const TopicList& topics,
607 bool orderedPublishing)
609 TopicList::const_iterator it = topics.begin();
611 for (; it != topics.end(); ++it)
618 IceManager::retrieveTopics(
const TopicList& topics)
620 TopicList::const_iterator it = topics.begin();
622 for (; it != topics.end(); ++it)
631 this->impl->name = name;
637 return impl->communicator;
643 std::unique_lock lock(impl->iceGridAdminMutex);
645 if (!impl->iceGridAdmin)
647 impl->iceGridAdmin = IceGridAdmin::Create(
getCommunicator(), impl->name);
650 return impl->iceGridAdmin;
660 IceManager::getOrCreateObjectEntry(
const std::string& objectName)
662 auto objIt = impl->objectRegistry.find(objectName);
664 if (objIt == impl->objectRegistry.end() || objIt->second->name.empty())
666 ObjectEntryPtr objectEntry =
new ObjectEntry();
667 objectEntry->name = objectName;
669 impl->objectRegistry[objectName] = objectEntry;
674 return objIt->second;
682 Ice::ObjectPrx prx = getProxy<Ice::ObjectPrx>(objectName);
683 prx->ice_timeout(500)->ice_ping();