26 #include <Ice/Current.h>
27 #include <Ice/DispatchInterceptor.h>
28 #include <Ice/Initialize.h>
29 #include <Ice/LocalException.h>
30 #include <Ice/Object.h>
31 #include <Ice/ObjectAdapter.h>
32 #include <Ice/ProxyHandle.h>
33 #include <IceGrid/Admin.h>
34 #include <IceUtil/Thread.h>
35 #include <IceUtil/Time.h>
36 #include <IceUtil/UUID.h>
37 #include <IceStorm/IceStorm.h>
39 #include <IceGrid/Registry.h>
43 #include <ArmarXCore/interface/core/UserException.h>
53 class DeploymentException;
54 class ObjectNotRegisteredException;
62 impl->communicator = communicator;
64 impl->topicSuffix = topicSuffix;
65 impl->forceShutdown =
false;
84 return _servant->ice_dispatch(request);
89 ARMARX_WARNING <<
deactivateSpam(30, request.getCurrent().operation + request.getCurrent().id.name) <<
"Calling interface function '" << request.getCurrent().operation <<
"' of object '" << request.getCurrent().id.name <<
"' failed:\n" <<
GetHandledExceptionString();
100 const std::string& objectName,
103 std::unique_lock lock(impl->objectRegistryMutex);
107 throw Ice::AlreadyRegisteredException(__FILE__, __LINE__, object->ice_id(), objectName);
112 objectEntry->id = Ice::stringToIdentity(objectName);
116 objectEntry->adapter = adapterToAddTo;
117 objectEntry->ownAdapter =
false;
121 objectEntry->adapter =
123 ->createObjectAdapterWithEndpoints(objectName,
"tcp");
124 objectEntry->adapter->activate();
125 objectEntry->ownAdapter =
true;
130 objectEntry->adapter->add(interceptor, objectEntry->id);
131 objectEntry->proxy = objectEntry->adapter->createProxy(objectEntry->id);
141 return ObjectHandles(objectEntry->proxy, objectEntry->adapter);
149 std::unique_lock lock(impl->objectRegistryMutex);
151 auto objectIt = impl->objectRegistry.find(objectName);
153 if (objectIt != impl->objectRegistry.end())
160 for (
auto&
topic : objectEntry->usedTopics)
167 if (objectEntry->ownAdapter)
171 else if (!adapter->isDeactivated())
173 adapter->remove(objectEntry->id);
179 ARMARX_VERBOSE <<
"removing object from ice: " << objectName <<
" with id: " << objectIt->second->id.name;
180 admin->removeObject(objectIt->second->id);
182 catch (IceGrid::ObjectNotRegisteredException& notRegisteredException)
192 catch (IceGrid::DeploymentException& deploymentException)
197 <<
" object failed due to DeploymentException"
200 catch (Ice::ObjectAdapterDeactivatedException& e)
202 ARMARX_INFO <<
"ObjectAdapterDeactivatedException for " << objectIt->second->id.name;
205 impl->objectRegistry.erase(objectIt);
212 throw armarx::UserException(
message);
217 std::string proxyString = name;
219 if (!endpoints.empty())
221 proxyString += std::string(
":") + endpoints;
224 std::string proxyTypedId =
228 return (impl->checkedProxies.erase(proxyTypedId) > 0);
239 for (
auto& proxyEntry : impl->checkedProxies)
241 if (proxyEntry.second == proxy)
243 impl->checkedProxies.erase(proxyEntry.first);
253 std::unique_lock lock(impl->topicManagerMutex);
255 if (!impl->topicManagerProxy)
260 return impl->topicManagerProxy;
265 Ice::ObjectPrx obj = communicator->stringToProxy(
"IceStorm/TopicManager");
267 return IceStorm::TopicManagerPrx::checkedCast(obj);
272 const std::string& topicName,
273 bool orderedPublishing)
280 if (orderedPublishing)
282 qos[
"reliability"] =
"ordered";
287 topic->subscribeAndGetPublisher(qos, orderedPublishing ? subscriberProxy : subscriberProxy->ice_oneway());
289 catch (IceStorm::AlreadySubscribed& e)
294 topic->subscribeAndGetPublisher(qos, orderedPublishing ? subscriberProxy : subscriberProxy->ice_oneway());
296 catch (IceStorm::AlreadySubscribed& e)
300 <<
" already subscribed"
305 ARMARX_INFO <<
"Subscribed to topic " << topicName;
306 std::unique_lock lock(impl->topicSubscriptionMutex);
308 impl->subscriptions.push_back(std::make_pair(topicName, subscriberProxy));
314 auto prx =
registerObject(subscriber, topicName +
"Listener" + IceUtil::generateUUID());
319 const std::string& topicName)
323 topic->unsubscribe(subscriberProxy);
324 ARMARX_INFO <<
"Unsubscribed from topic " << topicName;
325 std::unique_lock lock(impl->topicSubscriptionMutex);
327 std::vector<std::pair<std::string, Ice::ObjectPrx> >::iterator toDelete = impl->subscriptions.end();
328 std::vector<std::pair<std::string, Ice::ObjectPrx> >::iterator it;
330 for (it = impl->subscriptions.begin(); it != impl->subscriptions.end(); ++it)
332 if (it->first == topicName && it->second == subscriberProxy)
339 if (toDelete != impl->subscriptions.end())
341 impl->subscriptions.erase(toDelete);
349 std::unique_lock lock(impl->topicRetrievalMutex);
359 catch (
const IceStorm::NoSuchTopic&)
367 catch (
const IceStorm::TopicExists&)
377 Ice::ObjectPrx IceManager::implGetCheckedProxy(std::string
const& proxyTypedId)
379 std::unique_lock lock(impl->proxyCacheMutex);
380 return impl->checkedProxies.at(proxyTypedId);
383 void IceManager::implSetCheckedProxy(std::string
const& proxyTypedId,
const Ice::ObjectPrx& proxy)
385 std::unique_lock lock(impl->proxyCacheMutex);
386 impl->checkedProxies[proxyTypedId] = proxy;
411 if (impl->iceGridAdmin)
413 impl->iceGridAdmin->stop();
421 return impl->topicSuffix;
424 Ice::ObjectPrx IceManager::communicator_stringToProxy(
const std::string& proxyString)
429 Ice::ObjectPrx IceManager::__getTopic(
const std::string& topicName,
bool useUDP)
433 auto prx = topic->getPublisher();
436 prx = prx->ice_datagram();
440 prx = prx->ice_oneway();
446 void IceManager::cleanUp()
454 std::vector<std::pair<std::string, Ice::ObjectPrx> >::iterator it;
458 std::unique_lock lock(impl->topicSubscriptionMutex);
460 for (it = impl->subscriptions.begin(); it != impl->subscriptions.end(); ++it)
462 retrieveTopic(it->first)->unsubscribe(it->second);
465 impl->subscriptions.clear();
469 std::unique_lock lock(impl->objectRegistryMutex);
471 auto objListIt = impl->objectRegistry.begin();
473 for (; objListIt != impl->objectRegistry.end(); ++objListIt)
477 objListIt->second->adapter->deactivate();
479 admin->removeObject(objListIt->second->id);
487 if (impl->iceGridAdmin)
489 impl->iceGridAdmin->removeObservers();
496 const std::string& registrantName,
497 const std::string& dependencyObjectName)
499 std::unique_lock lock(impl->objectRegistryMutex);
501 ObjectEntryPtr objectEntry = getOrCreateObjectEntry(registrantName);
503 objectEntry->dependencies.push_back
505 new DependencyObjectEntry
507 dependencyObjectName,
515 void IceManager::resolveObjectDependencies()
517 auto objectIt = impl->objectRegistry.begin();
519 for (; objectIt != impl->objectRegistry.end(); ++objectIt)
521 std::string missingObjects;
522 ObjectEntryPtr objectEntry = objectIt->second;
524 if (!objectEntry->active && objectEntry->proxy)
526 objectEntry->dependenciesResolved =
true;
527 DependencyList::iterator depIt = objectEntry->dependencies.begin();
529 for (; depIt != objectEntry->dependencies.end(); ++depIt)
531 DependencyObjectEntryPtr dependencyEntry = *depIt;
533 if (!dependencyEntry->resolved)
537 dependencyEntry->proxy->ice_timeout(2000)->ice_ping();
542 << dependencyEntry->name
545 dependencyEntry->resolved =
true;
547 objectEntry->updated =
true;
551 objectEntry->dependenciesResolved =
false;
553 missingObjects +=
"\t" + dependencyEntry->name +
"\n";
556 IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(10));
561 if (objectEntry->updated)
563 if (missingObjects.length() > 0)
567 <<
" still waiting for:\n"
572 objectEntry->updated =
false;
574 if (objectEntry->dependenciesResolved)
578 <<
" dependencies resolved"
587 const std::string& registrantName,
588 const std::string& topicName)
590 std::unique_lock lock(impl->objectRegistryMutex);
592 ObjectEntryPtr objectEntry = getOrCreateObjectEntry(registrantName);
594 objectEntry->usedTopics.push_back(topicName);
599 const std::string& registrantName,
600 const std::string& topicName)
602 std::unique_lock lock(impl->objectRegistryMutex);
604 ObjectEntryPtr objectEntry = getOrCreateObjectEntry(registrantName);
606 objectEntry->offeredTopics.push_back(topicName);
610 void IceManager::subscribeTopics(Ice::ObjectPrx subscriber,
const TopicList& topics,
bool orderedPublishing)
612 TopicList::const_iterator it = topics.begin();
614 for (; it != topics.end(); ++it)
622 void IceManager::retrieveTopics(
const TopicList& topics)
624 TopicList::const_iterator it = topics.begin();
626 for (; it != topics.end(); ++it)
635 this->impl->name = name;
641 return impl->communicator;
647 std::unique_lock lock(impl->iceGridAdminMutex);
649 if (!impl->iceGridAdmin)
651 impl->iceGridAdmin = IceGridAdmin::Create(
getCommunicator(), impl->name);
654 return impl->iceGridAdmin;
665 IceManager::getOrCreateObjectEntry(
const std::string& objectName)
667 auto objIt = impl->objectRegistry.find(objectName);
669 if (objIt == impl->objectRegistry.end() || objIt->second->name.empty())
671 ObjectEntryPtr objectEntry =
new ObjectEntry();
672 objectEntry->name = objectName;
674 impl->objectRegistry[objectName] = objectEntry;
679 return objIt->second;
688 Ice::ObjectPrx prx = getProxy<Ice::ObjectPrx>(objectName);
689 prx->ice_timeout(500)->ice_ping();