35 #include <Ice/BuiltinSequences.h>
36 #include <Ice/ObjectAdapter.h>
37 #include <IceGrid/Admin.h>
38 #include <IceGrid/Exception.h>
39 #include <IceStorm/IceStorm.h>
40 #include <IceUtil/Time.h>
42 #include <SimoxUtility/algorithm/string/string_tools.h>
48 #include "ArmarXCore/interface/core/ManagedIceObjectDefinitions.h"
49 #include "ArmarXCore/interface/core/ManagedIceObjectDependencyBase.h"
50 #include "ArmarXCore/interface/core/ThreadingIceBase.h"
61 #define WAITMESSAGEINTERVAL (long)5000
74 bool startSchedulingObject) :
75 armarXManager(armarXManager),
76 iceManager(iceManager),
77 managedObject(object),
78 terminateRequested(false),
79 objectedInitialized(false),
81 objectAdapterToAddTo(objectAdapterToAddTo)
85 object->impl->objectScheduler =
this;
86 object->impl->armarXManager = armarXManager;
88 interruptCondition = std::make_shared<std::condition_variable>();
89 interruptConditionVariable = std::make_shared<bool>();
90 if (startSchedulingObject)
99 wakeupDependencyCheck();
101 if (scheduleObjectTask)
103 scheduleObjectTask->stop(
true);
112 if (!scheduleObjectTask)
116 &ArmarXObjectScheduler::scheduleObject,
117 managedObject->getName() +
"ArmarXObjectScheduler");
120 if (!scheduleObjectTask->isRunning())
122 scheduleObjectTask->start();
126 ARMARX_INFO << managedObject->getName() <<
" already scheduled";
136 terminateRequested =
true;
140 managedObject->impl->stateCondition.notify_all();
145 std::scoped_lock lock(interruptMutex);
146 *interruptConditionVariable =
true;
148 interruptCondition->notify_all();
150 if (scheduleObjectTask)
152 scheduleObjectTask->stop(
false);
159 while (scheduleObjectTask && !scheduleObjectTask->waitForFinished(5000))
162 <<
" is blocking the removal - continuing to wait.";
164 std::unique_lock lock(managedObject->impl->objectStateMutex);
166 while (managedObject->impl->objectState != eManagedIceObjectExited)
168 managedObject->impl->stateCondition.wait_for(lock, std::chrono::milliseconds(100));
170 <<
" is blocking the removal - continuing to wait.";
175 ArmarXObjectScheduler::waitForInterrupt()
177 std::unique_lock lock(interruptMutex);
179 if (terminateRequested || (scheduleObjectTask && !scheduleObjectTask->isRunning()))
184 *interruptConditionVariable =
false;
186 while (!*interruptConditionVariable)
188 interruptCondition->wait(lock);
194 const long timeoutMs)
const
203 while (!terminateRequested)
205 std::unique_lock lock(managedObject->impl->objectStateMutex);
207 if (managedObject->impl->objectState != stateToWaitFor)
209 managedObject->impl->stateCondition.wait_for(
218 << managedObject->getName()
219 <<
"' to reach state "
228 long waitTimeLeft = timeoutMs;
230 while (waitTimeLeft > 0 && !terminateRequested)
232 waitTime = IceUtil::Time::now() - startTime;
233 waitTimeLeft = timeoutMs - waitTime.toMilliSeconds();
235 if (waitTime.toMilliSeconds() > 2000)
238 << managedObject->getName() <<
"' to reach state "
242 std::unique_lock lock(managedObject->impl->objectStateMutex);
244 if (managedObject->impl->objectState != stateToWaitFor)
246 managedObject->impl->stateCondition.wait_for(
262 const long timeoutMs)
const
272 while (!terminateRequested)
274 std::unique_lock lock(managedObject->impl->objectStateMutex);
276 if (managedObject->impl->objectState < minimumStateToWaitFor)
278 bool timeout = managedObject->impl->stateCondition.wait_for(
280 std::cv_status::timeout;
284 <<
"Waiting for '" << managedObject->getName()
285 <<
"' to reach minimum state "
289 if (managedObject->getUnresolvedDependencies().empty())
295 out << simox::alg::join(managedObject->getUnresolvedDependencies(),
312 long waitTimeLeft = timeoutMs;
314 while (waitTimeLeft > 0 && !terminateRequested)
316 waitTime = IceUtil::Time::now() - startTime;
317 waitTimeLeft = timeoutMs - waitTime.toMilliSeconds();
319 if (waitTime.toMilliSeconds() > 2000)
322 <<
"Waiting for " << waitTimeLeft <<
"ms for '" << managedObject->getName()
323 <<
"' to reach minimum state "
327 std::unique_lock lock(managedObject->impl->objectStateMutex);
329 if (managedObject->impl->objectState < minimumStateToWaitFor)
331 managedObject->impl->stateCondition.wait_for(
348 return (scheduleObjectTask && scheduleObjectTask->isFinished()) ||
355 return terminateRequested;
361 return managedObject;
364 ManagedIceObjectState
367 return managedObject ? (ManagedIceObjectState)managedObject->getState()
368 : eManagedIceObjectExited;
379 bool dependenciesResolved =
false;
381 while (!dependenciesResolved && !terminateRequested)
386 if (timeoutMs != -1 && (IceUtil::Time::now() - startTime).toMilliSeconds() >= timeoutMs)
388 throw LocalException(
"Could not resolve dependencies in ") << timeoutMs <<
" ms";
391 if (!dependenciesResolved)
393 std::unique_lock lock(dependencyWaitMutex);
394 dependencyWaitConditionVariable =
false;
395 bool timeout =
false;
397 while (!dependencyWaitConditionVariable && !timeout)
400 (dependencyWaitCondition.wait_for(lock, std::chrono::milliseconds(1000)) ==
401 std::cv_status::timeout);
406 if (!terminateRequested)
408 ARMARX_VERBOSE <<
"All " << managedObject->getName() <<
" dependencies resolved";
415 bool dependenciesResolved =
true;
416 bool stateChanged =
false;
417 std::string unresolvedNames;
420 DependencyMap dependencies = managedObject->getConnectivity().dependencies;
421 DependencyMap::iterator iter = dependencies.begin();
423 while (iter != dependencies.end())
426 ManagedIceObjectDependencyPtr::dynamicCast(iter->second);
432 if (!dependency->getResolved())
434 dependenciesResolved =
false;
435 unresolvedNames +=
"\t" + dependency->getName() +
"\n";
439 if (dependency->getStateChanged())
450 if (unresolvedNames.length() > 0)
452 ARMARX_INFO <<
"ManagedIceObject '" << managedObject->getName()
453 <<
"' still waiting for: \n " << unresolvedNames;
457 ARMARX_INFO <<
"All dependencies of '" << managedObject->getName() <<
"' resolved!";
460 return dependenciesResolved;
464 ArmarXObjectScheduler::setInteruptConditionVariable(
465 std::shared_ptr<std::condition_variable> interruptCondition,
466 std::shared_ptr<bool> interruptConditionVariable)
468 this->interruptCondition = interruptCondition;
469 this->interruptConditionVariable = interruptConditionVariable;
477 std::scoped_lock lock(dependencyWaitMutex);
478 dependencyWaitConditionVariable =
true;
480 dependencyWaitCondition.notify_all();
486 bool dependencyLost =
false;
488 DependencyMap dependencies = managedObject->getConnectivity().dependencies;
489 DependencyMap::iterator iter = dependencies.begin();
491 while (iter != dependencies.end())
494 ManagedIceObjectDependencyPtr::dynamicCast(iter->second);
500 if (!dependency->getResolved())
502 dependencyLost =
true;
503 iceManager->removeProxyFromCache(dependency->getName(), dependency->getType());
509 return !dependencyLost;
515 DependencyMap dependencies = managedObject->getConnectivity().dependencies;
516 DependencyMap::iterator iter = dependencies.begin();
518 while (iter != dependencies.end())
521 ManagedIceObjectDependencyPtr::dynamicCast(iter->second);
523 if (dependency->getName() == objectName)
537 tryReconnect = reconnect;
539 std::scoped_lock lock(interruptMutex);
540 *interruptConditionVariable =
true;
543 interruptCondition->notify_all();
550 ArmarXObjectScheduler::scheduleObject()
553 if (!objectedInitialized)
558 while (!terminateRequested && (scheduleObjectTask && !scheduleObjectTask->isStopped()))
565 if (!terminateRequested)
583 scheduleObjectTask->waitForStop();
586 if (terminateRequested)
596 ArmarXObjectScheduler::initObject()
600 objectedInitialized =
true;
601 managedObject->init(iceManager);
605 managedObject->setObjectState(eManagedIceObjectInitializationFailed);
612 ArmarXObjectScheduler::startObject()
616 managedObject, managedObject->getName(), objectAdapterToAddTo);
621 managedObject->start(objectHandles.first,
622 objectAdapterToAddTo ? objectAdapterToAddTo
623 : objectHandles.second);
627 managedObject->setObjectState(eManagedIceObjectStartingFailed);
633 Ice::StringSeq offeredTopics = managedObject->getConnectivity().offeredTopics;
634 Ice::StringSeq::iterator iterOT = offeredTopics.begin();
636 while (iterOT != offeredTopics.end())
643 Ice::StringSeq usedTopics = managedObject->getConnectivity().usedTopics;
644 Ice::StringSeq::iterator iterUT = usedTopics.begin();
646 while (iterUT != usedTopics.end())
648 iceManager->subscribeTopic(
649 objectHandles.first, *iterUT, managedObject->impl->orderedTopicPublishing[*iterUT]);
654 DependencyMap dependencies = managedObject->getConnectivity().dependencies;
662 IceGrid::AdminPrx admin = iceManager->getIceGridSession()->getAdmin();
666 admin->addObject(objectHandles.first);
668 catch (
const IceGrid::ObjectExistsException& e)
670 admin->updateObject(objectHandles.first);
672 catch (
const IceGrid::DeploymentException& e)
674 ARMARX_ERROR <<
"*** IceGrid::Admin >> adding " << managedObject->getName()
675 <<
" raised a DeploymentException(" << e.reason <<
")" <<
flush;
678 ARMARX_VERBOSE <<
"Object '" << managedObject->getName() <<
"' started";
682 ArmarXObjectScheduler::disconnectObject()
684 ARMARX_INFO <<
"disconnecting object " << managedObject->getName();
688 managedObject->disconnect();
698 if (iceManager && managedObject)
700 iceManager->removeObject(managedObject->getName());
709 catch (IceGrid::ObjectNotRegisteredException& notRegisteredException)
719 Ice::StringSeq usedTopics = managedObject->getConnectivity().usedTopics;
720 Ice::StringSeq::iterator iterUT = usedTopics.begin();
724 while (iterUT != usedTopics.end())
726 iceManager->unsubscribeTopic(managedObject->getProxy(), *iterUT);
736 ArmarXObjectScheduler::exitObject()
740 managedObject->exit();
743 iceManager =
nullptr;
744 armarXManager =
nullptr;