ArmarXObjectScheduler.cpp
Go to the documentation of this file.
1/*
2 * This file is part of ArmarX.
3 *
4 * Copyright (C) 2011-2016, High Performance Humanoid Technologies (H2T), Karlsruhe Institute of Technology (KIT), all rights reserved.
5 *
6 * ArmarX is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 as
8 * published by the Free Software Foundation.
9 *
10 * ArmarX is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 *
18 * @package ArmarXCore::core
19 * @author Kai Welke (kai dot welke at kit dot edu)
20 * @date 2012
21 * @copyright http://www.gnu.org/licenses/gpl-2.0.txt
22 * GNU General Public License
23 */
24
26
27#include <algorithm> // for min
28#include <cstddef> // for NULL
29#include <map> // for _Rb_tree_iterator, etc
30#include <memory>
31#include <ostream> // for operator<<, etc
32#include <utility> // for pair
33#include <vector> // for vector, vector<>::iterator
34
35#include <Ice/BuiltinSequences.h> // for StringSeq
36#include <Ice/ObjectAdapter.h> // for ObjectAdapterPtr
37#include <IceGrid/Admin.h> // for AdminPrx, Admin
38#include <IceGrid/Exception.h> // for DeploymentException, etc
39#include <IceStorm/IceStorm.h> // for TopicPrx
40#include <IceUtil/Time.h> // for Time
41
42#include <SimoxUtility/algorithm/string/string_tools.h>
43
44#include "ArmarXCore/core/ArmarXFwd.h" // for ArmarXManagerPtr, etc
46#include "ArmarXCore/core/logging/LogSender.h" // for LogSender, flush
47#include "ArmarXCore/core/logging/Logging.h" // for ARMARX_VERBOSE, etc
48#include "ArmarXCore/interface/core/ManagedIceObjectDefinitions.h"
49#include "ArmarXCore/interface/core/ManagedIceObjectDependencyBase.h"
50#include "ArmarXCore/interface/core/ThreadingIceBase.h" // for upCast
51#include <ArmarXCore/core/ArmarXManager.h> // for ArmarXManagerPtr, etc
52#include <ArmarXCore/core/IceManager.h> // for IceManager, ObjectHandles
53#include <ArmarXCore/core/ManagedIceObject.h> // for ManagedIceObject
55#include <ArmarXCore/core/exceptions/Exception.h> // for handleExceptions, etc
57
58#include "IceGridAdmin.h" // for IceGridAdmin
60
61#define WAITMESSAGEINTERVAL (long)5000
62
63namespace armarx
64{
65 // *******************************************************
66 // construction
67 // *******************************************************
68
69
71 const IceManagerPtr& iceManager,
72 const ManagedIceObjectPtr& object,
73 Ice::ObjectAdapterPtr objectAdapterToAddTo,
74 bool startSchedulingObject) :
75 armarXManager(armarXManager),
76 iceManager(iceManager),
77 managedObject(object),
78 terminateRequested(false),
79 objectedInitialized(false),
80 tryReconnect(true),
81 objectAdapterToAddTo(objectAdapterToAddTo)
82 {
83 setTag("ObjectScheduler");
84 // set object scheduler and armarxManager in managedIceObject
85 object->impl->objectScheduler = this;
86 object->impl->armarXManager = armarXManager;
87
88 interruptCondition = std::make_shared<std::condition_variable>();
89 interruptConditionVariable = std::make_shared<bool>();
90 if (startSchedulingObject)
91 // start management thread
92 {
94 }
95 }
96
98 {
100
101 if (scheduleObjectTask)
102 {
103 scheduleObjectTask->stop(true);
104 }
105
106 // ARMARX_VERBOSE << "~ArmarXObjectScheduler()" ;
107 }
108
109 void
111 {
112 if (!scheduleObjectTask)
113 {
114 scheduleObjectTask = new RunningTask<ArmarXObjectScheduler>(
115 this,
116 &ArmarXObjectScheduler::scheduleObject,
117 managedObject->getName() + "ArmarXObjectScheduler");
118 }
119
120 if (!scheduleObjectTask->isRunning())
121 {
122 scheduleObjectTask->start();
123 }
124 else
125 {
126 ARMARX_INFO << managedObject->getName() << " already scheduled";
127 }
128 }
129
130 // *******************************************************
131 // termination handling
132 // *******************************************************
133 void
135 {
136 terminateRequested = true;
137
138 if (managedObject)
139 {
140 managedObject->impl->stateCondition.notify_all();
141 }
142
144 {
145 std::scoped_lock lock(interruptMutex);
146 *interruptConditionVariable = true;
147 }
148 interruptCondition->notify_all();
149
150 if (scheduleObjectTask)
151 {
152 scheduleObjectTask->stop(false);
153 }
154 }
155
156 void
158 {
159 while (scheduleObjectTask && !scheduleObjectTask->waitForFinished(5000))
160 {
161 ARMARX_INFO << managedObject->getName()
162 << " is blocking the removal - continuing to wait.";
163 }
164 std::unique_lock lock(managedObject->impl->objectStateMutex);
165
166 while (managedObject->impl->objectState != eManagedIceObjectExited)
167 {
168 managedObject->impl->stateCondition.wait_for(lock, std::chrono::milliseconds(100));
169 ARMARX_INFO << deactivateSpam(5) << managedObject->getName()
170 << " is blocking the removal - continuing to wait.";
171 }
172 }
173
174 void
175 ArmarXObjectScheduler::waitForInterrupt()
176 {
177 std::unique_lock lock(interruptMutex);
178
179 if (terminateRequested || (scheduleObjectTask && !scheduleObjectTask->isRunning()))
180 {
181 return;
182 }
183
184 *interruptConditionVariable = false;
185
186 while (!*interruptConditionVariable)
187 {
188 interruptCondition->wait(lock);
189 }
190 }
191
192 bool
193 ArmarXObjectScheduler::waitForObjectState(ManagedIceObjectState stateToWaitFor,
194 const long timeoutMs) const
195 {
196 if (!managedObject)
197 {
198 return false;
199 }
200
201 if (timeoutMs == -1)
202 {
203 while (!terminateRequested)
204 {
205 std::unique_lock lock(managedObject->impl->objectStateMutex);
206
207 if (managedObject->impl->objectState != stateToWaitFor)
208 {
209 managedObject->impl->stateCondition.wait_for(
210 lock, std::chrono::milliseconds(WAITMESSAGEINTERVAL));
211 }
212 else
213 {
214 return true;
215 }
216
217 ARMARX_VERBOSE /*<< deactivateSpam(4)*/ << "Waiting for '"
218 << managedObject->getName()
219 << "' to reach state "
221 stateToWaitFor);
222 }
223 }
224 else
225 {
226 IceUtil::Time startTime = IceUtil::Time::now();
227 IceUtil::Time waitTime = startTime;
228 long waitTimeLeft = timeoutMs;
229
230 while (waitTimeLeft > 0 && !terminateRequested)
231 {
232 waitTime = IceUtil::Time::now() - startTime;
233 waitTimeLeft = timeoutMs - waitTime.toMilliSeconds();
234
235 if (waitTime.toMilliSeconds() > 2000)
236 {
237 ARMARX_VERBOSE << "Waiting for " << waitTimeLeft << "ms for '"
238 << managedObject->getName() << "' to reach state "
240 }
241
242 std::unique_lock lock(managedObject->impl->objectStateMutex);
243
244 if (managedObject->impl->objectState != stateToWaitFor)
245 {
246 managedObject->impl->stateCondition.wait_for(
247 lock,
248 std::chrono::milliseconds(std::min(WAITMESSAGEINTERVAL, waitTimeLeft)));
249 }
250 else
251 {
252 return true;
253 }
254 }
255 }
256
257 return false;
258 }
259
260 bool
261 ArmarXObjectScheduler::waitForObjectStateMinimum(ManagedIceObjectState minimumStateToWaitFor,
262 const long timeoutMs) const
263 {
264 if (!managedObject)
265 {
266 return false;
267 }
268
269 if (timeoutMs == -1)
270 {
271
272 while (!terminateRequested)
273 {
274 std::unique_lock lock(managedObject->impl->objectStateMutex);
275
276 if (managedObject->impl->objectState < minimumStateToWaitFor)
277 {
278 bool timeout = managedObject->impl->stateCondition.wait_for(
279 lock, std::chrono::milliseconds(WAITMESSAGEINTERVAL)) ==
280 std::cv_status::timeout;
281 if (timeout)
282 {
284 << /*deactivateSpam(4) <<*/ "Waiting for '" << managedObject->getName()
285 << "' to reach minimum state "
286 << ManagedIceObject::GetObjectStateAsString(minimumStateToWaitFor)
287 << "\nWaiting for: " << ARMARX_STREAM_PRINTER
288 {
289 if (managedObject->getUnresolvedDependencies().empty())
290 {
291 out << "nothing";
292 }
293 else
294 {
295 out << simox::alg::join(managedObject->getUnresolvedDependencies(),
296 ", ");
297 }
298 };
299 }
300 }
301 else
302 {
303 return true;
304 }
305 }
306 }
307 else
308 {
309
310 IceUtil::Time startTime = IceUtil::Time::now();
311 IceUtil::Time waitTime = startTime;
312 long waitTimeLeft = timeoutMs;
313
314 while (waitTimeLeft > 0 && !terminateRequested)
315 {
316 waitTime = IceUtil::Time::now() - startTime;
317 waitTimeLeft = timeoutMs - waitTime.toMilliSeconds();
318
319 if (waitTime.toMilliSeconds() > 2000)
320 {
322 << "Waiting for " << waitTimeLeft << "ms for '" << managedObject->getName()
323 << "' to reach minimum state "
324 << ManagedIceObject::GetObjectStateAsString(minimumStateToWaitFor);
325 }
326
327 std::unique_lock lock(managedObject->impl->objectStateMutex);
328
329 if (managedObject->impl->objectState < minimumStateToWaitFor)
330 {
331 managedObject->impl->stateCondition.wait_for(
332 lock,
333 std::chrono::milliseconds(std::min(WAITMESSAGEINTERVAL, waitTimeLeft)));
334 }
335 else
336 {
337 return true;
338 }
339 }
340 }
341
342 return false;
343 }
344
345 bool
347 {
348 return (scheduleObjectTask && scheduleObjectTask->isFinished()) ||
349 getObjectState() == eManagedIceObjectExited;
350 }
351
352 bool
354 {
355 return terminateRequested;
356 }
357
360 {
361 return managedObject;
362 }
363
364 ManagedIceObjectState
366 {
367 return managedObject ? (ManagedIceObjectState)managedObject->getState()
368 : eManagedIceObjectExited;
369 }
370
371 // *******************************************************
372 // dependency resolution
373 // *******************************************************
374 void
376 {
377 IceUtil::Time startTime = IceUtil::Time::now();
378
379 bool dependenciesResolved = false;
380
381 while (!dependenciesResolved && !terminateRequested)
382 {
383 dependenciesResolved = checkDependenciesResolvement();
384
385
386 if (timeoutMs != -1 && (IceUtil::Time::now() - startTime).toMilliSeconds() >= timeoutMs)
387 {
388 throw LocalException("Could not resolve dependencies in ") << timeoutMs << " ms";
389 }
390
391 if (!dependenciesResolved) // only wait when dependencies are not resolved yet
392 {
393 std::unique_lock lock(dependencyWaitMutex);
394 dependencyWaitConditionVariable = false;
395 bool timeout = false;
396
397 while (!dependencyWaitConditionVariable && !timeout)
398 {
399 timeout =
400 (dependencyWaitCondition.wait_for(lock, std::chrono::milliseconds(1000)) ==
401 std::cv_status::timeout);
402 }
403 }
404 }
405
406 if (!terminateRequested)
407 {
408 ARMARX_VERBOSE << "All " << managedObject->getName() << " dependencies resolved";
409 }
410 }
411
412 bool
414 {
415 bool dependenciesResolved = true;
416 bool stateChanged = false;
417 std::string unresolvedNames;
418
419 // retrieve dependencies
420 DependencyMap dependencies = managedObject->getConnectivity().dependencies;
421 DependencyMap::iterator iter = dependencies.begin();
422
423 while (iter != dependencies.end())
424 {
426 ManagedIceObjectDependencyPtr::dynamicCast(iter->second);
427
428 // check dependency
429 dependency->check();
430
431 // check whether dependency has been resolves
432 if (!dependency->getResolved())
433 {
434 dependenciesResolved = false;
435 unresolvedNames += "\t" + dependency->getName() + "\n";
436 }
437
438 // check if dependecy state has changed
439 if (dependency->getStateChanged())
440 {
441 stateChanged = true;
442 }
443
444 iter++;
445 }
446
447 // output list of objects we still need
448 if (stateChanged)
449 {
450 if (unresolvedNames.length() > 0)
451 {
452 ARMARX_INFO << "ManagedIceObject '" << managedObject->getName()
453 << "' still waiting for: \n " << unresolvedNames;
454 }
455 else
456 {
457 ARMARX_INFO << "All dependencies of '" << managedObject->getName() << "' resolved!";
458 }
459 }
460 return dependenciesResolved;
461 }
462
463 void
464 ArmarXObjectScheduler::setInteruptConditionVariable(
465 std::shared_ptr<std::condition_variable> interruptCondition,
466 std::shared_ptr<bool> interruptConditionVariable)
467 {
468 this->interruptCondition = interruptCondition;
469 this->interruptConditionVariable = interruptConditionVariable;
470 }
471
472 void
474 {
475 // ARMARX_DEBUG << managedObject->getName() << " scheduler was woken up";
476 {
477 std::scoped_lock lock(dependencyWaitMutex);
478 dependencyWaitConditionVariable = true;
479 }
480 dependencyWaitCondition.notify_all();
481 }
482
483 bool
485 {
486 bool dependencyLost = false;
487 // retrieve dependencies
488 DependencyMap dependencies = managedObject->getConnectivity().dependencies;
489 DependencyMap::iterator iter = dependencies.begin();
490
491 while (iter != dependencies.end())
492 {
494 ManagedIceObjectDependencyPtr::dynamicCast(iter->second);
495
496 // check dependency
497 dependency->check();
498
499 // check whether dependency has been resolved
500 if (!dependency->getResolved())
501 {
502 dependencyLost = true;
503 iceManager->removeProxyFromCache(dependency->getName(), dependency->getType());
504 }
505
506 iter++;
507 }
508
509 return !dependencyLost;
510 }
511
512 bool
513 ArmarXObjectScheduler::dependsOn(const std::string& objectName)
514 {
515 DependencyMap dependencies = managedObject->getConnectivity().dependencies;
516 DependencyMap::iterator iter = dependencies.begin();
517
518 while (iter != dependencies.end())
519 {
521 ManagedIceObjectDependencyPtr::dynamicCast(iter->second);
522
523 if (dependency->getName() == objectName)
524 {
525 return true;
526 }
527
528 iter++;
529 }
530
531 return false;
532 }
533
534 void
536 {
537 tryReconnect = reconnect;
538 {
539 std::scoped_lock lock(interruptMutex);
540 *interruptConditionVariable = true;
541 }
542
543 interruptCondition->notify_all();
544 }
545
546 // *******************************************************
547 // main scheduling thread
548 // *******************************************************
549 void
550 ArmarXObjectScheduler::scheduleObject()
551 {
552 // first let Managed ice objects initialize
553 if (!objectedInitialized)
554 {
555 initObject();
556 }
557
558 while (!terminateRequested && (scheduleObjectTask && !scheduleObjectTask->isStopped()))
559 {
560 // try to resolve dependencies
562
563
564 // register component with ice
565 if (!terminateRequested)
566 {
567 startObject();
568 }
569
570 // wait for disconnect or shutdown
571 waitForInterrupt();
572
573 // checkDependencyStatusTask->stop();
574
575 disconnectObject();
576
577 if (!tryReconnect)
578 {
579 break;
580 }
581 }
582
583 scheduleObjectTask->waitForStop();
584
585 // exit managed object
586 if (terminateRequested)
587 {
588 exitObject();
589 }
590 }
591
592 // *******************************************************
593 // ManagedIceObject phases
594 // *******************************************************
595 void
596 ArmarXObjectScheduler::initObject()
597 {
598 try
599 {
600 objectedInitialized = true;
601 managedObject->init(iceManager);
602 }
603 catch (...) // dispatch and handle exception
604 {
605 managedObject->setObjectState(eManagedIceObjectInitializationFailed);
607 terminate();
608 }
609 }
610
611 void
612 ArmarXObjectScheduler::startObject()
613 {
614 // register to iceManager
615 ObjectHandles objectHandles = iceManager->registerObject(
616 managedObject, managedObject->getName(), objectAdapterToAddTo);
617
618 // call hook
619 try
620 {
621 managedObject->start(objectHandles.first,
622 objectAdapterToAddTo ? objectAdapterToAddTo
623 : objectHandles.second);
624 }
625 catch (...) // dispatch and handle exception
626 {
627 managedObject->setObjectState(eManagedIceObjectStartingFailed);
629 }
630
631
632 // offer topics
633 Ice::StringSeq offeredTopics = managedObject->getConnectivity().offeredTopics;
634 Ice::StringSeq::iterator iterOT = offeredTopics.begin();
635
636 while (iterOT != offeredTopics.end())
637 {
638 iceManager->getTopic<IceStorm::TopicPrx>(*iterOT);
639 iterOT++;
640 }
641
642 // subscribe to topics
643 Ice::StringSeq usedTopics = managedObject->getConnectivity().usedTopics;
644 Ice::StringSeq::iterator iterUT = usedTopics.begin();
645
646 while (iterUT != usedTopics.end())
647 {
648 iceManager->subscribeTopic(
649 objectHandles.first, *iterUT, managedObject->impl->orderedTopicPublishing[*iterUT]);
650 iterUT++;
651 }
652
653 // retrieve (precache) proxies
654 DependencyMap dependencies = managedObject->getConnectivity().dependencies;
655 // TODO: precaching cannot work with current iceManager since it only provides
656 // a template getProxy which uses the typeid for caching
657
658
659 // register to admin
660 // from now on object is pingable throuhg registry and can be found by
661 // waiting ManagedIceObjects
662 IceGrid::AdminPrx admin = iceManager->getIceGridSession()->getAdmin();
663
664 try
665 {
666 admin->addObject(objectHandles.first);
667 }
668 catch (const IceGrid::ObjectExistsException& e)
669 {
670 admin->updateObject(objectHandles.first);
671 }
672 catch (const IceGrid::DeploymentException& e)
673 {
674 ARMARX_ERROR << "*** IceGrid::Admin >> adding " << managedObject->getName()
675 << " raised a DeploymentException(" << e.reason << ")" << flush;
676 }
677
678 ARMARX_VERBOSE << "Object '" << managedObject->getName() << "' started";
679 }
680
681 void
682 ArmarXObjectScheduler::disconnectObject()
683 {
684 ARMARX_INFO << "disconnecting object " << managedObject->getName();
685
686 try
687 {
688 managedObject->disconnect();
689 }
690
691 catch (...) // dispatch and handle exception
692 {
694 }
695
696 try
697 {
698 if (iceManager && managedObject)
699 {
700 iceManager->removeObject(managedObject->getName());
701 }
702
703 // IceGrid::AdminPrx admin = iceManager->getIceGridSession()->getAdmin();
704 // if(getObject()->getObjectAdapter())
705 // getObject()->getObjectAdapter()->deactivate();
706 // if(managedObject->getProxy())
707 // admin->removeObject(managedObject->getProxy()->ice_getIdentity());
708 }
709 catch (IceGrid::ObjectNotRegisteredException& notRegisteredException)
710 {
711 // // removing an unregistered object
712 // //!!!
713 // ARMARX_WARNING << "removing "
714 // << getObject()->getName()
715 // << " object failed due to ObjectNotRegisteredException"
716 // << flush;
717 }
718 // unsubscribe from topics
719 Ice::StringSeq usedTopics = managedObject->getConnectivity().usedTopics;
720 Ice::StringSeq::iterator iterUT = usedTopics.begin();
721
722 try
723 {
724 while (iterUT != usedTopics.end())
725 {
726 iceManager->unsubscribeTopic(managedObject->getProxy(), *iterUT);
727 iterUT++;
728 }
729 }
730 catch (...)
731 {
732 }
733 }
734
735 void
736 ArmarXObjectScheduler::exitObject()
737 {
738 // ARMARX_INFO << "Exiting object " << managedObject->getName();
739
740 managedObject->exit();
741
742
743 iceManager = nullptr;
744 armarXManager = nullptr;
745 }
746} // namespace armarx
#define WAITMESSAGEINTERVAL
#define ARMARX_STREAM_PRINTER
use this macro to write output code that is executed when printed and thus not executed if the debug ...
Definition Logging.h:310
ManagedIceObjectState getObjectState() const
bool dependsOn(const std::string &objectName)
void waitForDependencies(int timeoutMs=-1)
waits until all depenencies are resolved.
void terminate()
Terminates the ManagedIceObject.
ArmarXObjectScheduler(const ArmarXManagerPtr &armarXManager, const IceManagerPtr &iceManager, const armarx::ManagedIceObjectPtr &object, Ice::ObjectAdapterPtr objectAdapterToAddTo, bool startSchedulingObject=true)
Constructs an ArmarXObjectScheduler.
const armarx::ManagedIceObjectPtr & getObject() const
Retrieve pointer to scheduled ManagedIceObject.
bool isTerminated() const
Check whether the Scheduler is terminated.
bool waitForObjectState(ManagedIceObjectState stateToWaitFor, const long timeoutMs=-1) const
waitForObjectStart waits (thread sleeps) until the object reached a specific state.
void waitForTermination()
Waits until scheduler has been terminated.
bool waitForObjectStateMinimum(ManagedIceObjectState minimumStateToWaitFor, const long timeoutMs=-1) const
waitForObjectStart waits (thread sleeps) until the object reached a specific state (or higher/later).
SpamFilterDataPtr deactivateSpam(float deactivationDurationSec=10.0f, const std::string &identifier="", bool deactivate=true) const
disables the logging for the current line for the given amount of seconds.
Definition Logging.cpp:99
void setTag(const LogTag &tag)
Definition Logging.cpp:54
static std::string GetObjectStateAsString(int state)
#define ARMARX_INFO
The normal logging level.
Definition Logging.h:181
#define ARMARX_IMPORTANT
The logging level for always important information, but expected behaviour (in contrast to ARMARX_WAR...
Definition Logging.h:190
#define ARMARX_ERROR
The logging level for unexpected behaviour, that must be fixed.
Definition Logging.h:196
#define ARMARX_VERBOSE
The logging level for verbose information.
Definition Logging.h:187
::IceInternal::ProxyHandle<::IceProxy::IceStorm::Topic > TopicPrx
Definition IceManager.h:70
::IceInternal::Handle<::Ice::ObjectAdapter > ObjectAdapterPtr
Definition IceManager.h:52
This file offers overloads of toIce() and fromIce() functions for STL container types.
IceUtil::Handle< ManagedIceObjectDependency > ManagedIceObjectDependencyPtr
IceUtil::Handle< ArmarXManager > ArmarXManagerPtr
std::pair< Ice::ObjectPrx, Ice::ObjectAdapterPtr > ObjectHandles
Object handles pair which contains the object proxy and its adapter.
Definition IceManager.h:97
void handleExceptions()
const LogSender::manipulator flush
Definition LogSender.h:251
IceUtil::Handle< IceManager > IceManagerPtr
IceManager smart pointer.
Definition ArmarXFwd.h:39
IceInternal::Handle< ManagedIceObject > ManagedIceObjectPtr
Definition ArmarXFwd.h:42