35#include <Ice/BuiltinSequences.h>
36#include <Ice/ObjectAdapter.h>
37#include <IceGrid/Admin.h>
38#include <IceGrid/Exception.h>
39#include <IceStorm/IceStorm.h>
40#include <IceUtil/Time.h>
42#include <SimoxUtility/algorithm/string/string_tools.h>
48#include "ArmarXCore/interface/core/ManagedIceObjectDefinitions.h"
49#include "ArmarXCore/interface/core/ManagedIceObjectDependencyBase.h"
50#include "ArmarXCore/interface/core/ThreadingIceBase.h"
61#define WAITMESSAGEINTERVAL (long)5000
74 bool startSchedulingObject) :
75 armarXManager(armarXManager),
76 iceManager(iceManager),
77 managedObject(object),
78 terminateRequested(false),
79 objectedInitialized(false),
81 objectAdapterToAddTo(objectAdapterToAddTo)
85 object->impl->objectScheduler =
this;
86 object->impl->armarXManager = armarXManager;
88 interruptCondition = std::make_shared<std::condition_variable>();
89 interruptConditionVariable = std::make_shared<bool>();
90 if (startSchedulingObject)
101 if (scheduleObjectTask)
103 scheduleObjectTask->stop(
true);
112 if (!scheduleObjectTask)
116 &ArmarXObjectScheduler::scheduleObject,
117 managedObject->getName() +
"ArmarXObjectScheduler");
120 if (!scheduleObjectTask->isRunning())
122 scheduleObjectTask->start();
126 ARMARX_INFO << managedObject->getName() <<
" already scheduled";
136 terminateRequested =
true;
140 managedObject->impl->stateCondition.notify_all();
145 std::scoped_lock lock(interruptMutex);
146 *interruptConditionVariable =
true;
148 interruptCondition->notify_all();
150 if (scheduleObjectTask)
152 scheduleObjectTask->stop(
false);
159 while (scheduleObjectTask && !scheduleObjectTask->waitForFinished(5000))
162 <<
" is blocking the removal - continuing to wait.";
164 std::unique_lock lock(managedObject->impl->objectStateMutex);
166 while (managedObject->impl->objectState != eManagedIceObjectExited)
168 managedObject->impl->stateCondition.wait_for(lock, std::chrono::milliseconds(100));
170 <<
" is blocking the removal - continuing to wait.";
175 ArmarXObjectScheduler::waitForInterrupt()
177 std::unique_lock lock(interruptMutex);
179 if (terminateRequested || (scheduleObjectTask && !scheduleObjectTask->isRunning()))
184 *interruptConditionVariable =
false;
186 while (!*interruptConditionVariable)
188 interruptCondition->wait(lock);
194 const long timeoutMs)
const
203 while (!terminateRequested)
205 std::unique_lock lock(managedObject->impl->objectStateMutex);
207 if (managedObject->impl->objectState != stateToWaitFor)
209 managedObject->impl->stateCondition.wait_for(
218 << managedObject->getName()
219 <<
"' to reach state "
226 IceUtil::Time startTime = IceUtil::Time::now();
227 IceUtil::Time waitTime = startTime;
228 long waitTimeLeft = timeoutMs;
230 while (waitTimeLeft > 0 && !terminateRequested)
232 waitTime = IceUtil::Time::now() - startTime;
233 waitTimeLeft = timeoutMs - waitTime.toMilliSeconds();
235 if (waitTime.toMilliSeconds() > 2000)
238 << managedObject->getName() <<
"' to reach state "
242 std::unique_lock lock(managedObject->impl->objectStateMutex);
244 if (managedObject->impl->objectState != stateToWaitFor)
246 managedObject->impl->stateCondition.wait_for(
262 const long timeoutMs)
const
272 while (!terminateRequested)
274 std::unique_lock lock(managedObject->impl->objectStateMutex);
276 if (managedObject->impl->objectState < minimumStateToWaitFor)
278 bool timeout = managedObject->impl->stateCondition.wait_for(
280 std::cv_status::timeout;
284 <<
"Waiting for '" << managedObject->getName()
285 <<
"' to reach minimum state "
289 if (managedObject->getUnresolvedDependencies().empty())
295 out << simox::alg::join(managedObject->getUnresolvedDependencies(),
310 IceUtil::Time startTime = IceUtil::Time::now();
311 IceUtil::Time waitTime = startTime;
312 long waitTimeLeft = timeoutMs;
314 while (waitTimeLeft > 0 && !terminateRequested)
316 waitTime = IceUtil::Time::now() - startTime;
317 waitTimeLeft = timeoutMs - waitTime.toMilliSeconds();
319 if (waitTime.toMilliSeconds() > 2000)
322 <<
"Waiting for " << waitTimeLeft <<
"ms for '" << managedObject->getName()
323 <<
"' to reach minimum state "
327 std::unique_lock lock(managedObject->impl->objectStateMutex);
329 if (managedObject->impl->objectState < minimumStateToWaitFor)
331 managedObject->impl->stateCondition.wait_for(
348 return (scheduleObjectTask && scheduleObjectTask->isFinished()) ||
355 return terminateRequested;
361 return managedObject;
364 ManagedIceObjectState
367 return managedObject ? (ManagedIceObjectState)managedObject->getState()
368 : eManagedIceObjectExited;
377 IceUtil::Time startTime = IceUtil::Time::now();
379 bool dependenciesResolved =
false;
381 while (!dependenciesResolved && !terminateRequested)
386 if (timeoutMs != -1 && (IceUtil::Time::now() - startTime).toMilliSeconds() >= timeoutMs)
388 throw LocalException(
"Could not resolve dependencies in ") << timeoutMs <<
" ms";
391 if (!dependenciesResolved)
393 std::unique_lock lock(dependencyWaitMutex);
394 dependencyWaitConditionVariable =
false;
395 bool timeout =
false;
397 while (!dependencyWaitConditionVariable && !timeout)
400 (dependencyWaitCondition.wait_for(lock, std::chrono::milliseconds(1000)) ==
401 std::cv_status::timeout);
406 if (!terminateRequested)
408 ARMARX_VERBOSE <<
"All " << managedObject->getName() <<
" dependencies resolved";
415 bool dependenciesResolved =
true;
416 bool stateChanged =
false;
417 std::string unresolvedNames;
420 DependencyMap dependencies = managedObject->getConnectivity().dependencies;
421 DependencyMap::iterator iter = dependencies.begin();
423 while (iter != dependencies.end())
426 ManagedIceObjectDependencyPtr::dynamicCast(iter->second);
432 if (!dependency->getResolved())
434 dependenciesResolved =
false;
435 unresolvedNames +=
"\t" + dependency->getName() +
"\n";
439 if (dependency->getStateChanged())
450 if (unresolvedNames.length() > 0)
452 ARMARX_INFO <<
"ManagedIceObject '" << managedObject->getName()
453 <<
"' still waiting for: \n " << unresolvedNames;
457 ARMARX_INFO <<
"All dependencies of '" << managedObject->getName() <<
"' resolved!";
460 return dependenciesResolved;
464 ArmarXObjectScheduler::setInteruptConditionVariable(
465 std::shared_ptr<std::condition_variable> interruptCondition,
466 std::shared_ptr<bool> interruptConditionVariable)
468 this->interruptCondition = interruptCondition;
469 this->interruptConditionVariable = interruptConditionVariable;
477 std::scoped_lock lock(dependencyWaitMutex);
478 dependencyWaitConditionVariable =
true;
480 dependencyWaitCondition.notify_all();
486 bool dependencyLost =
false;
488 DependencyMap dependencies = managedObject->getConnectivity().dependencies;
489 DependencyMap::iterator iter = dependencies.begin();
491 while (iter != dependencies.end())
494 ManagedIceObjectDependencyPtr::dynamicCast(iter->second);
500 if (!dependency->getResolved())
502 dependencyLost =
true;
503 iceManager->removeProxyFromCache(dependency->getName(), dependency->getType());
509 return !dependencyLost;
515 DependencyMap dependencies = managedObject->getConnectivity().dependencies;
516 DependencyMap::iterator iter = dependencies.begin();
518 while (iter != dependencies.end())
521 ManagedIceObjectDependencyPtr::dynamicCast(iter->second);
523 if (dependency->getName() == objectName)
537 tryReconnect = reconnect;
539 std::scoped_lock lock(interruptMutex);
540 *interruptConditionVariable =
true;
543 interruptCondition->notify_all();
550 ArmarXObjectScheduler::scheduleObject()
553 if (!objectedInitialized)
558 while (!terminateRequested && (scheduleObjectTask && !scheduleObjectTask->isStopped()))
565 if (!terminateRequested)
583 scheduleObjectTask->waitForStop();
586 if (terminateRequested)
596 ArmarXObjectScheduler::initObject()
600 objectedInitialized =
true;
601 managedObject->init(iceManager);
605 managedObject->setObjectState(eManagedIceObjectInitializationFailed);
612 ArmarXObjectScheduler::startObject()
616 managedObject, managedObject->getName(), objectAdapterToAddTo);
621 managedObject->start(objectHandles.first,
622 objectAdapterToAddTo ? objectAdapterToAddTo
623 : objectHandles.second);
627 managedObject->setObjectState(eManagedIceObjectStartingFailed);
633 Ice::StringSeq offeredTopics = managedObject->getConnectivity().offeredTopics;
634 Ice::StringSeq::iterator iterOT = offeredTopics.begin();
636 while (iterOT != offeredTopics.end())
643 Ice::StringSeq usedTopics = managedObject->getConnectivity().usedTopics;
644 Ice::StringSeq::iterator iterUT = usedTopics.begin();
646 while (iterUT != usedTopics.end())
648 iceManager->subscribeTopic(
649 objectHandles.first, *iterUT, managedObject->impl->orderedTopicPublishing[*iterUT]);
654 DependencyMap dependencies = managedObject->getConnectivity().dependencies;
662 IceGrid::AdminPrx admin = iceManager->getIceGridSession()->getAdmin();
666 admin->addObject(objectHandles.first);
668 catch (
const IceGrid::ObjectExistsException& e)
670 admin->updateObject(objectHandles.first);
672 catch (
const IceGrid::DeploymentException& e)
674 ARMARX_ERROR <<
"*** IceGrid::Admin >> adding " << managedObject->getName()
675 <<
" raised a DeploymentException(" << e.reason <<
")" <<
flush;
678 ARMARX_VERBOSE <<
"Object '" << managedObject->getName() <<
"' started";
682 ArmarXObjectScheduler::disconnectObject()
684 ARMARX_INFO <<
"disconnecting object " << managedObject->getName();
688 managedObject->disconnect();
698 if (iceManager && managedObject)
700 iceManager->removeObject(managedObject->getName());
709 catch (IceGrid::ObjectNotRegisteredException& notRegisteredException)
719 Ice::StringSeq usedTopics = managedObject->getConnectivity().usedTopics;
720 Ice::StringSeq::iterator iterUT = usedTopics.begin();
724 while (iterUT != usedTopics.end())
726 iceManager->unsubscribeTopic(managedObject->getProxy(), *iterUT);
736 ArmarXObjectScheduler::exitObject()
740 managedObject->exit();
743 iceManager =
nullptr;
744 armarXManager =
nullptr;
#define WAITMESSAGEINTERVAL
#define ARMARX_STREAM_PRINTER
use this macro to write output code that is executed when printed and thus not executed if the debug ...
void disconnected(bool reconnect)
void wakeupDependencyCheck()
ManagedIceObjectState getObjectState() const
bool dependsOn(const std::string &objectName)
void waitForDependencies(int timeoutMs=-1)
waits until all depenencies are resolved.
void terminate()
Terminates the ManagedIceObject.
ArmarXObjectScheduler(const ArmarXManagerPtr &armarXManager, const IceManagerPtr &iceManager, const armarx::ManagedIceObjectPtr &object, Ice::ObjectAdapterPtr objectAdapterToAddTo, bool startSchedulingObject=true)
Constructs an ArmarXObjectScheduler.
const armarx::ManagedIceObjectPtr & getObject() const
Retrieve pointer to scheduled ManagedIceObject.
bool isTerminationRequested() const
bool checkDependenciesStatus() const
bool isTerminated() const
Check whether the Scheduler is terminated.
bool checkDependenciesResolvement()
bool waitForObjectState(ManagedIceObjectState stateToWaitFor, const long timeoutMs=-1) const
waitForObjectStart waits (thread sleeps) until the object reached a specific state.
~ArmarXObjectScheduler() override
void waitForTermination()
Waits until scheduler has been terminated.
bool waitForObjectStateMinimum(ManagedIceObjectState minimumStateToWaitFor, const long timeoutMs=-1) const
waitForObjectStart waits (thread sleeps) until the object reached a specific state (or higher/later).
SpamFilterDataPtr deactivateSpam(float deactivationDurationSec=10.0f, const std::string &identifier="", bool deactivate=true) const
disables the logging for the current line for the given amount of seconds.
void setTag(const LogTag &tag)
static std::string GetObjectStateAsString(int state)
#define ARMARX_INFO
The normal logging level.
#define ARMARX_IMPORTANT
The logging level for always important information, but expected behaviour (in contrast to ARMARX_WAR...
#define ARMARX_ERROR
The logging level for unexpected behaviour, that must be fixed.
#define ARMARX_VERBOSE
The logging level for verbose information.
::IceInternal::ProxyHandle<::IceProxy::IceStorm::Topic > TopicPrx
::IceInternal::Handle<::Ice::ObjectAdapter > ObjectAdapterPtr
This file offers overloads of toIce() and fromIce() functions for STL container types.
IceUtil::Handle< ManagedIceObjectDependency > ManagedIceObjectDependencyPtr
IceUtil::Handle< ArmarXManager > ArmarXManagerPtr
std::pair< Ice::ObjectPrx, Ice::ObjectAdapterPtr > ObjectHandles
Object handles pair which contains the object proxy and its adapter.
const LogSender::manipulator flush
IceUtil::Handle< IceManager > IceManagerPtr
IceManager smart pointer.
IceInternal::Handle< ManagedIceObject > ManagedIceObjectPtr