32 #include <Ice/BuiltinSequences.h>
33 #include <Ice/ObjectAdapter.h>
34 #include <IceGrid/Admin.h>
35 #include <IceGrid/Exception.h>
36 #include <IceStorm/IceStorm.h>
37 #include <IceUtil/Time.h>
39 #include <SimoxUtility/algorithm/string/string_tools.h>
53 #include "ArmarXCore/interface/core/ManagedIceObjectDefinitions.h"
54 #include "ArmarXCore/interface/core/ManagedIceObjectDependencyBase.h"
55 #include "ArmarXCore/interface/core/ThreadingIceBase.h"
60 #define WAITMESSAGEINTERVAL (long)5000
70 : armarXManager(armarXManager),
71 iceManager(iceManager),
72 managedObject(object),
73 terminateRequested(false),
74 objectedInitialized(false),
76 objectAdapterToAddTo(objectAdapterToAddTo)
80 object->impl->objectScheduler =
this;
81 object->impl->armarXManager = armarXManager;
83 interruptCondition = std::make_shared<std::condition_variable>();
84 interruptConditionVariable = std::make_shared<bool>();
85 if (startSchedulingObject)
97 wakeupDependencyCheck();
99 if (scheduleObjectTask)
101 scheduleObjectTask->stop(
true);
109 if (!scheduleObjectTask)
114 if (!scheduleObjectTask->isRunning())
116 scheduleObjectTask->start();
120 ARMARX_INFO << managedObject->getName() <<
" already scheduled";
129 terminateRequested =
true;
133 managedObject->impl->stateCondition.notify_all();
138 std::scoped_lock lock(interruptMutex);
139 *interruptConditionVariable =
true;
141 interruptCondition->notify_all();
143 if (scheduleObjectTask)
145 scheduleObjectTask->stop(
false);
151 while (scheduleObjectTask && !scheduleObjectTask->waitForFinished(5000))
153 ARMARX_INFO << managedObject->getName() <<
" is blocking the removal - continuing to wait.";
155 std::unique_lock lock(managedObject->impl->objectStateMutex);
157 while (managedObject->impl->objectState != eManagedIceObjectExited)
159 managedObject->impl->stateCondition.wait_for(lock, std::chrono::milliseconds(100));
164 void ArmarXObjectScheduler::waitForInterrupt()
166 std::unique_lock lock(interruptMutex);
168 if (terminateRequested || (scheduleObjectTask && !scheduleObjectTask->isRunning()))
173 *interruptConditionVariable =
false;
175 while (!*interruptConditionVariable)
177 interruptCondition->wait(lock);
191 while (!terminateRequested)
193 std::unique_lock lock(managedObject->impl->objectStateMutex);
195 if (managedObject->impl->objectState != stateToWaitFor)
197 managedObject->impl->stateCondition.wait_for(lock, std::chrono::milliseconds(
WAITMESSAGEINTERVAL));
211 long waitTimeLeft = timeoutMs;
213 while (waitTimeLeft > 0 && !terminateRequested)
215 waitTime = IceUtil::Time::now() - startTime;
216 waitTimeLeft = timeoutMs - waitTime.toMilliSeconds();
218 if (waitTime.toMilliSeconds() > 2000)
223 std::unique_lock lock(managedObject->impl->objectStateMutex);
225 if (managedObject->impl->objectState != stateToWaitFor)
249 while (!terminateRequested)
251 std::unique_lock lock(managedObject->impl->objectStateMutex);
253 if (managedObject->impl->objectState < minimumStateToWaitFor)
255 bool timeout = managedObject->impl->stateCondition.wait_for(lock, std::chrono::milliseconds(
WAITMESSAGEINTERVAL)) == std::cv_status::timeout;
258 ARMARX_IMPORTANT <<
"Waiting for '" << managedObject->getName() <<
"' to reach minimum state "
262 if (managedObject->getUnresolvedDependencies().empty())
268 out << simox::alg::join(managedObject->getUnresolvedDependencies(),
", ");
284 long waitTimeLeft = timeoutMs;
286 while (waitTimeLeft > 0 && !terminateRequested)
288 waitTime = IceUtil::Time::now() - startTime;
289 waitTimeLeft = timeoutMs - waitTime.toMilliSeconds();
291 if (waitTime.toMilliSeconds() > 2000)
296 std::unique_lock lock(managedObject->impl->objectStateMutex);
298 if (managedObject->impl->objectState < minimumStateToWaitFor)
314 return (scheduleObjectTask && scheduleObjectTask->isFinished()) ||
320 return terminateRequested;
325 return managedObject;
329 return managedObject ? (ManagedIceObjectState)managedObject->getState() : eManagedIceObjectExited;
341 bool dependenciesResolved =
false;
343 while (!dependenciesResolved && !terminateRequested)
349 if (timeoutMs != -1 && (IceUtil::Time::now() - startTime).toMilliSeconds() >= timeoutMs)
351 throw LocalException(
"Could not resolve dependencies in ") << timeoutMs <<
" ms";
354 if (!dependenciesResolved)
356 std::unique_lock lock(dependencyWaitMutex);
357 dependencyWaitConditionVariable =
false;
358 bool timeout =
false;
360 while (! dependencyWaitConditionVariable && !timeout)
362 timeout = (dependencyWaitCondition.wait_for(lock, std::chrono::milliseconds(1000)) == std::cv_status::timeout);
369 if (!terminateRequested)
372 << managedObject->getName()
373 <<
" dependencies resolved";
379 bool dependenciesResolved =
true;
380 bool stateChanged =
false;
381 std::string unresolvedNames;
384 DependencyMap dependencies = managedObject->getConnectivity().dependencies;
385 DependencyMap::iterator iter = dependencies.begin();
387 while (iter != dependencies.end())
395 if (!dependency->getResolved())
397 dependenciesResolved =
false;
398 unresolvedNames +=
"\t" + dependency->getName() +
"\n";
402 if (dependency->getStateChanged())
413 if (unresolvedNames.length() > 0)
415 ARMARX_INFO <<
"ManagedIceObject '" << managedObject->getName() <<
"' still waiting for: \n " << unresolvedNames;
419 ARMARX_INFO <<
"All dependencies of '" << managedObject->getName() <<
"' resolved!";
422 return dependenciesResolved;
425 void ArmarXObjectScheduler::setInteruptConditionVariable(std::shared_ptr<std::condition_variable> interruptCondition, std::shared_ptr<bool> interruptConditionVariable)
427 this->interruptCondition = interruptCondition;
428 this->interruptConditionVariable = interruptConditionVariable;
435 std::scoped_lock lock(dependencyWaitMutex);
436 dependencyWaitConditionVariable =
true;
438 dependencyWaitCondition.notify_all();
443 bool dependencyLost =
false;
445 DependencyMap dependencies = managedObject->getConnectivity().dependencies;
446 DependencyMap::iterator iter = dependencies.begin();
448 while (iter != dependencies.end())
456 if (!dependency->getResolved())
458 dependencyLost =
true;
459 iceManager->removeProxyFromCache(dependency->getName(), dependency->getType());
465 return !dependencyLost;
474 DependencyMap dependencies = managedObject->getConnectivity().dependencies;
475 DependencyMap::iterator iter = dependencies.begin();
477 while (iter != dependencies.end())
481 if (dependency->getName() == objectName)
494 tryReconnect = reconnect;
496 std::scoped_lock lock(interruptMutex);
497 *interruptConditionVariable =
true;
500 interruptCondition->notify_all();
506 void ArmarXObjectScheduler::scheduleObject()
509 if (!objectedInitialized)
514 while (!terminateRequested && (scheduleObjectTask && !scheduleObjectTask->isStopped()))
521 if (!terminateRequested)
539 scheduleObjectTask->waitForStop();
542 if (terminateRequested)
551 void ArmarXObjectScheduler::initObject()
555 objectedInitialized =
true;
556 managedObject->init(iceManager);
560 managedObject->setObjectState(eManagedIceObjectInitializationFailed);
566 void ArmarXObjectScheduler::startObject()
570 managedObject, managedObject->getName(),
571 objectAdapterToAddTo);
576 managedObject->start(objectHandles.first, objectAdapterToAddTo ? objectAdapterToAddTo : objectHandles.second);
580 managedObject->setObjectState(eManagedIceObjectStartingFailed);
586 Ice::StringSeq offeredTopics = managedObject->getConnectivity().offeredTopics;
587 Ice::StringSeq::iterator iterOT = offeredTopics.begin();
589 while (iterOT != offeredTopics.end())
596 Ice::StringSeq usedTopics = managedObject->getConnectivity().usedTopics;
597 Ice::StringSeq::iterator iterUT = usedTopics.begin();
599 while (iterUT != usedTopics.end())
601 iceManager->subscribeTopic(objectHandles.first, *iterUT, managedObject->impl->orderedTopicPublishing[*iterUT]);
606 DependencyMap dependencies = managedObject->getConnectivity().dependencies;
614 IceGrid::AdminPrx admin = iceManager->getIceGridSession()->getAdmin();
618 admin->addObject(objectHandles.first);
620 catch (
const IceGrid::ObjectExistsException& e)
622 admin->updateObject(objectHandles.first);
624 catch (
const IceGrid::DeploymentException& e)
627 << managedObject->getName()
628 <<
" raised a DeploymentException("
634 ARMARX_VERBOSE <<
"Object '" << managedObject->getName() <<
"' started";
637 void ArmarXObjectScheduler::disconnectObject()
639 ARMARX_INFO <<
"disconnecting object " << managedObject->getName();
643 managedObject->disconnect();
653 if (iceManager && managedObject)
655 iceManager->removeObject(managedObject->getName());
664 catch (IceGrid::ObjectNotRegisteredException& notRegisteredException)
675 Ice::StringSeq usedTopics = managedObject->getConnectivity().usedTopics;
676 Ice::StringSeq::iterator iterUT = usedTopics.begin();
680 while (iterUT != usedTopics.end())
682 iceManager->unsubscribeTopic(managedObject->getProxy(), *iterUT);
693 void ArmarXObjectScheduler::exitObject()
697 managedObject->exit();
700 iceManager =
nullptr;
701 armarXManager =
nullptr;