ArmarXManager.cpp
Go to the documentation of this file.
1/*
2 * This file is part of ArmarX.
3 *
4 * Copyright (C) 2011-2016, High Performance Humanoid Technologies (H2T), Karlsruhe Institute of Technology (KIT), all rights reserved.
5 *
6 * ArmarX is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 as
8 * published by the Free Software Foundation.
9 *
10 * ArmarX is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 *
18 * @package ArmarXCore::core
19 * @author Kai Welke (kai dot welke at kit dot edu)
20 * @date 2012
21 * @copyright http://www.gnu.org/licenses/gpl-2.0.txt
22 * GNU General Public License
23 */
24
25#include "ArmarXManager.h" // for ArmarXManager, etc
26
27#include <limits.h> // for hostname max length
28
29#include <chrono> // for seconds
30#include <cstddef> // for size_t, NULL
31#include <iostream> // for operator<<, basic_ostream, etc
32#include <map> // for _Rb_tree_iterator, etc
33#include <ratio> // for ratio
34#include <sstream> // for basic_stringbuf<>::int_type, etc
35#include <string> // for string, allocator, etc
36#include <thread> // for thread, sleep_for
37#include <utility> // for pair, make_pair, move
38#include <vector> // for vector, vector<>::iterator
39
40#include <unistd.h> // for hostname
41
42#include <Ice/BuiltinSequences.h> // for StringSeq
43#include <Ice/Communicator.h> // for CommunicatorPtr, etc
44#include <Ice/Identity.h> // for Identity
45#include <Ice/Initialize.h> // for initialize
46#include <Ice/LocalException.h> // for AlreadyRegisteredException, etc
47#include <Ice/Metrics.h> // for Metrics
48#include <Ice/NativePropertiesAdmin.h>
49#include <Ice/ObjectAdapter.h> // for ObjectAdapterPtr
50#include <Ice/ObjectF.h> // for upCast
51#include <Ice/Properties.h> // for Properties
52#include <Ice/PropertiesF.h> // for upCast
53#include <Ice/Proxy.h> // for Object
54#include <Ice/ProxyF.h> // for ObjectPrx
55#include <IceGrid/Admin.h> // for Admin, ObjectObserverPrx, upCast
56#include <IceGrid/Registry.h> // for RegistryPrx, upCast
57#include <IceStorm/IceStorm.h> // for TopicPrx, TopicManagerPrx, etc
58#include <IceUtil/Handle.h> // for HandleBase, Handle
59
63#include "ArmarXCore/core/exceptions/Exception.h" // for LocalException, etc
64#include "ArmarXCore/core/logging/LogSender.h" // for LogSender
65#include "ArmarXCore/core/logging/Logging.h" // for ARMARX_VERBOSE, etc
66#include "ArmarXCore/core/services/tasks/ThreadList.h" // for ThreadList
67#include "ArmarXCore/interface/core/BasicTypes.h"
68#include "ArmarXCore/interface/core/Log.h" // for LogPrx, MessageType, etc
69#include "ArmarXCore/interface/core/ManagedIceObjectDefinitions.h"
70#include "ArmarXCore/interface/core/ThreadingIceBase.h" // for upCast
72#include <ArmarXCore/core/Component.h> // for ComponentPtr, Component
73#include <ArmarXCore/core/IceManager.h> // for IceManager
74#include <ArmarXCore/core/ManagedIceObject.h> // for ManagedIceObject
79#include <ArmarXCore/core/system/ArmarXDataPath.h> // for ArmarXDataPath
82#include <ArmarXCore/core/time/LocalTimeServer.h> // for LocalTimeServer
83#include <ArmarXCore/core/time/TimeUtil.h> // for TimeUtil
85
87#include "IceGridAdmin.h" // for IceGridAdmin
88
89namespace Ice
90{
91 struct Current;
92} // namespace Ice
93
94namespace armarx
95{
96#define MANAGEROBJNAME applicationName + "Manager"
97#define OBJOBSNAME std::string("ArmarXObjectObserver_") + applicationName
98
99 // *******************************************************
100 // construction
101 // *******************************************************
102 ArmarXManager::ArmarXManager(std::string applicationName,
103 int port,
104 std::string host,
105 std::string locatorName,
106 Ice::StringSeq args) :
107 applicationName(applicationName), managerState(eCreated)
108 {
109 // initialize communicator
110 std::stringstream defaultLocator;
111 defaultLocator << "--Ice.Default.Locator=" << locatorName << ":tcp -p " << port << " -h "
112 << host;
113 args.push_back(defaultLocator.str());
114 Ice::CommunicatorPtr communicator = Ice::initialize(args);
115
116 // init members
117 init(applicationName, communicator);
118 }
119
120 ArmarXManager::ArmarXManager(std::string applicationName,
121 const Ice::CommunicatorPtr& communicator) :
122 applicationName(applicationName), managerState(eCreated)
123 {
124 // init members
125 init(applicationName, communicator);
126 }
127
133
134 bool
136 {
137 return CheckIceConnection(iceManager->getCommunicator(), printHint);
138 }
139
140 bool
141 ArmarXManager::CheckIceConnection(const Ice::CommunicatorPtr& communicator, bool printHint)
142 {
143 const std::string armarxHint = "'\nDid you start armarx?\n\nTo start armarx: armarx start\n"
144 "To kill a hanging armarx: armarx killIce";
145 try
146 {
147 if (communicator->getProperties()->getProperty("Ice.Default.Locator").empty())
148 {
149 if (printHint)
150 {
151 std::cerr << "Required Ice property 'Ice.Default.Locator' not set! "
152 << "It has to has a value similar to 'IceGrid/Locator:tcp -p 4061 -h "
153 "localhost'";
154 }
155 return false;
156 }
157 auto locatorProp = communicator->getProperties()->getProperty("Ice.Default.Locator");
158 auto pos = locatorProp.find_first_of(':');
159 std::string locatorId = locatorProp.substr(0, pos);
160 auto proxy = communicator->stringToProxy(locatorId);
161 IceGrid::LocatorPrx::checkedCast(proxy);
162 }
163 catch (...)
164 {
165 if (printHint)
166 {
167 std::cerr << "Could not contact default locator at '"
168 << communicator->getProperties()->getProperty("Ice.Default.Locator")
169 << armarxHint;
170 }
171 return false;
172 }
173
174 try
175 {
176 std::string registryId = "IceGrid/Registry";
177 auto proxy = communicator->stringToProxy(registryId);
178 IceGrid::RegistryPrx::checkedCast(proxy);
179 }
180 catch (...)
181 {
182 if (printHint)
183 {
184 std::cerr << "Could not contact IceGrid registry at '"
185 << communicator->getProperties()->getProperty(
186 "IceGrid.Registry.Client.Endpoints")
187 << armarxHint;
188 }
189 return false;
190 }
191
192 try
193 {
194 IceManager::GetTopicManager(communicator);
195 }
196 catch (const Ice::NoEndpointException& e)
197 {
198 std::cout << "Caught exception: \n" << e.what() << std::endl;
199
200 if (printHint)
201 {
202 std::cerr
203 << "Could not contact TopicManager at '"
204 // This string is actually used in IceManager::GetTopicManager()
205 << "IceStorm/TopicManager"
206 // << communicator->getProperties()->getProperty("IceStormAdmin.TopicManager.Default")
207 << armarxHint;
208 }
209 return false;
210 }
211
212 return true;
213 }
214
215 // *******************************************************
216 // ArmarXManager property setters
217 // *******************************************************
218 void
223
224 void
226 {
227 ThreadList::getApplicationThreadList()->enableProfiler(enable);
228 }
229
230 void
235
236 void
237 ArmarXManager::setDataPaths(std::string dataPaths)
238 {
240 }
241
242 // *******************************************************
243 // adding / removing ManagedIceObjects
244 // *******************************************************
245
246
247 void
249 bool addWithOwnAdapter,
250 const std::string& objectName,
251 bool useOwnScheduleThread)
252 {
253 addObject(object,
254 addWithOwnAdapter ? Ice::ObjectAdapterPtr() : getAdapter(),
255 objectName,
256 useOwnScheduleThread);
257 }
258
259 void
261 const std::string& objectName,
262 bool addWithOwnAdapter,
263 bool useOwnScheduleThread)
264 {
265 addObject(object, addWithOwnAdapter, objectName, useOwnScheduleThread);
266 }
267
268 void
270 Ice::ObjectAdapterPtr objectAdapterToAddTo,
271 const std::string& objectName,
272 bool useOwnScheduleThread)
273 {
274 if (!object)
275 {
276 throw LocalException("Cannot add NULL object");
277 }
278 {
279 auto cptr = ComponentPtr::dynamicCast(object);
280 if (cptr && !cptr->createdByComponentCreate)
281 {
282 throw LocalException("Components need to be created by Component::create");
283 }
284 }
285
286 if (!objectName.empty())
287 {
288 if (!object->getName().empty())
289 {
290 ARMARX_INFO << "Adding object with custom name: " << objectName;
291 }
292 object->setName(objectName);
293 }
294 if (object->getName().empty())
295 {
296 object->setName(object->getDefaultName());
297 }
298 if (object->getName().empty())
299 {
300 throw LocalException("Object name must not be empty");
301 }
302 auto lock = acquireManagedObjectsMutex();
303 if (!lock)
304 {
305 return;
306 }
307
308 bool reachable = false;
309
310 try
311 {
312 reachable = getIceManager()->isObjectReachable(object->getName());
313 }
314 catch (...)
315 {
316 throw;
317 }
318
319 if (reachable)
320 {
321 throw Ice::AlreadyRegisteredException(
322 __FILE__, __LINE__, object->ice_id(), object->getName());
323 }
324
325 try
326 {
328 this, iceManager, object, objectAdapterToAddTo, useOwnScheduleThread);
329 auto pair = managedObjects.insert(
330 std::make_pair(object->getName(), std::move(objectScheduler)));
331 if (!pair.second)
332 {
333 ARMARX_WARNING << "Insert of object scheduler in managedObjects failed since there "
334 "is already a scheduler for name "
335 << object->getName();
336 }
337 if (!useOwnScheduleThread)
338 {
339 std::scoped_lock lock(schedulerListMutex);
340 singleThreadedSchedulers.at(rand() % singleThreadedSchedulers.size())
341 ->addObjectScheduler(pair.first->second);
342 }
343 }
344 catch (...)
345 {
346 throw;
347 }
348 }
349
350 void
352 const std::string& objectName,
353 bool addWithOwnAdapter,
354 bool useOwnScheduleThread)
355 {
356 ArmarXManagerPtr manager(this);
357 std::thread{
358 [=]
359 {
360 try
361 {
362 manager->addObject(object, objectName, addWithOwnAdapter, useOwnScheduleThread);
363 }
364 catch (...)
365 {
367 }
368 }
369
370 }.detach();
371 }
372
373 void
374 ArmarXManager::removeObjectBlocking(const std::string& objectName)
375 {
376 ArmarXObjectSchedulerPtr scheduler;
377 {
378 auto lock = acquireManagedObjectsMutex();
379 if (!lock)
380 {
381 return;
382 }
383 scheduler = managedObjects.find(objectName)->second;
384 }
385 try
386 {
387 if (scheduler)
388 {
389 removeObject(scheduler, true);
390 }
391 }
392 catch (...)
393 {
394 throw;
395 }
396 }
397
398 void
399 ArmarXManager::removeObjectNonBlocking(const std::string& objectName)
400 {
401 auto removal = [objectName, this]()
402 {
403 ArmarXObjectSchedulerPtr scheduler;
404 {
405 auto lock = acquireManagedObjectsMutex();
406 if (!lock)
407 {
408 return;
409 }
410 scheduler = managedObjects.find(objectName)->second;
411 }
412 try
413 {
414 this->removeObject(scheduler, false);
415 }
416 catch (...)
417 {
418 throw;
419 }
420 };
421 std::thread{removal}.detach();
422 }
423
424 void
426 {
427 removeObjectBlocking(object->getName());
428 }
429
430 void
435
436 // *******************************************************
437 // shutdown handling
438 // *******************************************************
439 void
441 {
442 // assure no state changed
443 managerStateMutex.lock();
444
445 if (managerState == eShutdown)
446 {
447 managerStateMutex.unlock();
448 return;
449 }
450
451 // wait for shutdown
452 std::unique_lock lock(shutdownMutex);
453 managerStateMutex.unlock();
454 shutdownCondition.wait(lock);
455 ARMARX_VERBOSE << "Waiting for shutdown finished";
456 }
457
458 void
460 {
461 try
462 {
463 // check state
464 {
465 std::scoped_lock lock(managerStateMutex);
466
467 // do not shutdown if shutdown is in progress
468 if (managerState >= eShutdownInProgress)
469 {
470 return;
471 }
472
473 managerState = eShutdownInProgress;
474
475 // locking managedObjects ne done before state mutex is released
476 managedObjectsMutex.lock();
477 }
478
479 ARMARX_VERBOSE << "Shutting down ArmarXManager" << std::endl;
480
481 // stop cleanup task. All objects will be removed manually in shutdown
482 cleanupSchedulersTask->stop();
483
484 if (checkDependencyStatusTask)
485 {
486 checkDependencyStatusTask->stop();
487 }
488
489 sharedRemoteHandleState.reset();
490 remoteReferenceCountControlBlockManager.reset();
491
492 disconnectAllObjects();
493
494 // shutdown log sender
496
497 // remove all managed objects
498 removeAllObjects(true);
499 try
500 {
501 iceManager->getIceGridSession()->getAdmin()->removeObject(
502 Ice::Identity{MANAGEROBJNAME, ""});
503 }
504 catch (...)
505 {
506 }
507
508 singleThreadedSchedulers.clear();
509
510 try
511 {
512 // deactivate all object adapters (and do internal iceManager shutdown)
513 iceManager->shutdown();
514
515 // wait until all adapters have been deactivated
516 iceManager->waitForShutdown();
517 }
518 catch (...)
519 {
520 }
521 // destroy manager and communicator
522 iceManager->destroy();
523
524 // set to NULL to avoid cycle in pointers ArmarXManager <-> ArmarXObjectObserver
525 objObserver = nullptr;
526
527 // set state to shutdown and notify waitForShutdown
528 {
529 std::scoped_lock lock(managerStateMutex);
530
531 // inform waiting threads of shutdown
532 {
533 std::unique_lock lock(shutdownMutex);
534 ARMARX_VERBOSE << "notifying shutdown waiters" << std::endl;
535 shutdownCondition.notify_all();
536 }
537
538 managerState = eShutdown;
539 }
540
541 managedObjectsMutex.unlock();
542 }
543 catch (std::exception& e)
544 {
545 ARMARX_INFO << "shutdown failed with exception!\n" << e.what() << std::endl;
546 }
547
548 ARMARX_INFO << "Shutdown of ArmarXManager finished!" << std::endl;
549 }
550
551 void
552 ArmarXManager::asyncShutdown(std::size_t timeoutMs)
553 {
554 std::thread{[this, timeoutMs]
555 {
556 std::this_thread::sleep_for(std::chrono::milliseconds{timeoutMs});
557 shutdown();
558 }}
559 .detach();
560 }
561
562 bool
564 {
565 std::scoped_lock lock(managerStateMutex);
566 return (managerState == eShutdown);
567 }
568
569 // *******************************************************
570 // getters
571 // *******************************************************
572
573 const IceManagerPtr&
575 {
576 return iceManager;
577 }
578
581 {
582 return iceManager->getCommunicator();
583 }
584
585 std::vector<ManagedIceObjectPtr>
587 {
588 std::vector<ManagedIceObjectPtr> objects;
589
590 auto lock = acquireManagedObjectsMutex();
591 if (!lock)
592 {
593 return objects;
594 }
595
596 ObjectSchedulerMap::iterator iter = managedObjects.begin();
597
598 while (iter != managedObjects.end())
599 {
600 objects.push_back(iter->second->getObject());
601 iter++;
602 }
603
604
605 return objects;
606 }
607
608 // *******************************************************
609 // Slice MiceManagerInsightProvider implementation
610 // *******************************************************
611
612 std::string
613 ArmarXManager::getHostname(const Ice::Current&)
614 {
615 char hostname[HOST_NAME_MAX];
616 gethostname(hostname, HOST_NAME_MAX);
617 return std::string(hostname);
618 }
619
620 Ice::StringSeq
621 ArmarXManager::getObjectNames(const Ice::Current& c)
622 {
623 return getManagedObjectNames();
624 }
625
626 mice::MiceObjectConnectivity
627 ArmarXManager::getMiceObjectConnectivity(const std::string& objectName, const Ice::Current& c)
628 {
629 mice::MiceObjectConnectivity miceCon = mice::MiceObjectConnectivity();
630 ManagedIceObjectConnectivity con = getObjectConnectivity(objectName);
631 miceCon.subscribedTopics = con.usedTopics;
632 miceCon.publishedTopics = con.offeredTopics;
633 miceCon.usedObjects = Ice::StringSeq();
634
635 for (const auto& entry : con.dependencies)
636 {
637 miceCon.usedObjects.push_back(entry.first);
638 }
639
640 return miceCon;
641 }
642
643 IceMX::MetricsAdminPrx
645 {
646 Ice::ObjectPrx adminObj = getIceManager()->getCommunicator()->getAdmin();
647 IceMX::MetricsAdminPrx metrAdmin = IceMX::MetricsAdminPrx::checkedCast(adminObj, "Metrics");
648 return metrAdmin;
649 }
650
651 // *******************************************************
652 // Slice ArmarXManagerInterface implementation
653 // *******************************************************
654 ManagedIceObjectState
655 ArmarXManager::getObjectState(const std::string& objectName, const Ice::Current& c)
656 {
657 auto lock = acquireManagedObjectsMutex();
658 if (!lock)
659 {
660 return eManagedIceObjectExiting;
661 }
662
663 ObjectSchedulerMap::iterator iter = managedObjects.find(objectName);
664
665 if (iter == managedObjects.end())
666 {
667 return eManagedIceObjectExited;
668 }
669
670 ManagedIceObjectState state = (ManagedIceObjectState)iter->second->getObject()->getState();
671
672 return state;
673 }
674
675 ManagedIceObjectConnectivity
676 ArmarXManager::getObjectConnectivity(const std::string& objectName, const Ice::Current& c)
677 {
678 auto lock = acquireManagedObjectsMutex();
679 if (!lock)
680 {
681 return ManagedIceObjectConnectivity();
682 }
683
684 ObjectSchedulerMap::iterator iter = managedObjects.find(objectName);
685
686 if (iter == managedObjects.end())
687 {
688 return ManagedIceObjectConnectivity();
689 }
690
691 ManagedIceObjectConnectivity con = iter->second->getObject()->getConnectivity();
692
693
694 return con;
695 }
696
697 StringStringDictionary
698 ArmarXManager::getObjectProperties(const ::std::string& objectName, const ::Ice::Current&)
699 {
700 StringStringDictionary propertyMap;
701 ObjectSchedulerMap::iterator iter = managedObjects.find(objectName);
702
703 if (iter == managedObjects.end())
704 {
705 return propertyMap;
706 }
707
708 ComponentPtr component = ComponentPtr::dynamicCast(iter->second->getObject());
709
710 if (!component)
711 {
712 return propertyMap;
713 }
714 ARMARX_CHECK_EXPRESSION(component->getPropertyDefinitions());
715 auto result = component->getPropertyDefinitions()->getPropertyValues(
716 component->getPropertyDefinitions()->getPrefix());
717 return result;
718 }
719
720 ObjectPropertyInfos
721 ArmarXManager::getObjectPropertyInfos(const ::std::string& objectName, const ::Ice::Current&)
722 {
723 ObjectPropertyInfos propertyMap;
724 ObjectSchedulerMap::iterator iter = managedObjects.find(objectName);
725
726 if (iter == managedObjects.end())
727 {
728 return propertyMap;
729 }
730
731 ComponentPtr component = ComponentPtr::dynamicCast(iter->second->getObject());
732
733 if (!component)
734 {
735 return propertyMap;
736 }
737
738 ARMARX_CHECK_EXPRESSION(component->getPropertyDefinitions());
739 for (auto prop : component->getPropertyDefinitions()->getPropertyValues())
740 {
741 propertyMap[component->getPropertyDefinitions()->getPrefix() + prop.first] = {
742 component->getPropertyDefinitions()->getDefinitionBase(prop.first)->isConstant(),
743 prop.second};
744 }
745 return propertyMap;
746 }
747
748 ObjectPropertyInfos
750 {
751 ObjectPropertyInfos propertyMap;
752 if (!Application::getInstance() || !Application::getInstance()->getPropertyDefinitions())
753 {
754 return propertyMap;
755 }
756 for (auto prop : Application::getInstance()->getPropertyDefinitions()->getPropertyValues())
757 {
758 propertyMap[Application::getInstance()->getPropertyDefinitions()->getPrefix() +
759 prop.first] = {Application::getInstance()
760 ->getPropertyDefinitions()
761 ->getDefinitionBase(prop.first)
762 ->isConstant(),
763 prop.second};
764 }
765 return propertyMap;
766 }
767
768 Ice::PropertiesAdminPrx
770 {
771 Ice::ObjectPrx adminObj = getIceManager()->getCommunicator()->getAdmin();
772 Ice::PropertiesAdminPrx propAdmin =
773 Ice::PropertiesAdminPrx::checkedCast(adminObj, "Properties");
774 return propAdmin;
775 }
776
777 void
779 {
780 for (ManagedIceObjectPtr& managedObject : getManagedObjects())
781 {
782 ComponentPtr component = ComponentPtr::dynamicCast(managedObject);
783 if (component)
784 {
785 component->setIceProperties(properties);
786 }
787 }
788 }
789
790 void
791 ArmarXManager::updateComponentIceProperties(const Ice::PropertyDict& properties)
792 {
793 for (ManagedIceObjectPtr& managedObject : getManagedObjects())
794 {
795 ComponentPtr component = ComponentPtr::dynamicCast(managedObject);
796 if (component)
797 {
798 component->updateIceProperties(properties);
799 }
800 }
801 }
802
803 Ice::StringSeq
805 {
806 Ice::StringSeq objectNames;
807
808 auto lock = acquireManagedObjectsMutex();
809 if (!lock)
810 {
811 return objectNames;
812 }
813
814 ObjectSchedulerMap::iterator iter = managedObjects.begin();
815
816 while (iter != managedObjects.end())
817 {
818 objectNames.push_back(iter->first);
819 iter++;
820 }
821
822
823 return objectNames;
824 }
825
826 class IcePropertyChangeCallback : public Ice::PropertiesAdminUpdateCallback
827 {
828 public:
829 IcePropertyChangeCallback(armarx::ApplicationPtr application) : application(application)
830 {
831 }
832
833 void
834 updated(const Ice::PropertyDict& changes) override
835 {
836 // ARMARX_INFO << "Properties were updated: " << changes;
837 if (application)
838 {
839 application->updateIceProperties(changes);
840 }
841 }
842
843 private:
844 armarx::ApplicationPtr application;
845 };
846
848
849 // *******************************************************
850 // private methods
851 // *******************************************************
852 void
853 ArmarXManager::init(std::string applicationName, const Ice::CommunicatorPtr& communicator)
854 {
856
858
859
860 // create ice manager
861 iceManager = new IceManager(
862 communicator,
863 applicationName,
864 appInstance ? appInstance->getProperty<std::string>("TopicSuffix").getValue() : "");
865
866
867 if (!checkIceConnection())
868 {
869 throw Ice::ConnectFailedException(__FILE__, __LINE__);
870 }
871
872 this->installProcessFacet();
873
874 // init logging
875 LogSender::setProxy(applicationName, iceManager->getTopic<LogPrx>("Log"));
876
877 setTag("ArmarXManager");
878
880
881
882 // make sure icegrid session exists before continuing
883 auto icegrid = iceManager->getIceGridSession();
884 // register obj observer for onDisconnect
885 objObserver = new ArmarXObjectObserver(this);
886 Ice::ObjectAdapterPtr observerAdapter;
887 Ice::ObjectPrx oPrx =
888 icegrid->registerObjectWithNewAdapter(objObserver, OBJOBSNAME, observerAdapter);
889 IceGrid::ObjectObserverPrx objObsPrx = IceGrid::ObjectObserverPrx::checkedCast(oPrx);
890 icegrid->setObjectObserver(objObsPrx);
891
892 // register manager to ice
893 icegrid->registerObjectWithNewAdapter(this, MANAGEROBJNAME, armarxManagerAdapter);
894
895
896 // create periodic task for starter cleanup
897 cleanupSchedulersTask = new PeriodicTask<ArmarXManager>(this,
898 &ArmarXManager::cleanupSchedulers,
899 500,
900 false,
901 "ArmarXManager::cleanupSchedulers");
902 cleanupSchedulersTask->start();
903
904 checkDependencyStatusTask =
906 &ArmarXManager::checkDependencies,
907 1000,
908 false,
909 "ArmarXManager::DependenciesChecker");
910 checkDependencyStatusTask->start();
911
912
913 // set manager state to running
914 {
915 std::scoped_lock lock(managerStateMutex);
916 managerState = eRunning;
917 }
918
919 // create ThreadList for this process
920 ThreadList::getApplicationThreadList()->setApplicationThreadListName(applicationName +
921 "ThreadList");
922
923 try
924 {
926 }
927 catch (...)
928 {
929 }
930
931 // create a local TimeServer for this process
932 if (appInstance && appInstance->getProperty<bool>("UseTimeServer").getValue())
933 {
934 ARMARX_VERBOSE << "Using time from global time server.";
935 LocalTimeServer::getApplicationTimeServer()->setApplicationTimeServerName(
936 applicationName + "LocalTimeServer");
939 }
940 else
941 {
942 ARMARX_VERBOSE << "Using time from local system clock.";
943 }
944
945 sharedRemoteHandleState.reset(new SharedRemoteHandleState{
946 appInstance
947 ? appInstance->getProperty<unsigned int>("RemoteHandlesDeletionTimeout").getValue()
949 remoteReferenceCountControlBlockManager.reset(
950 new RemoteReferenceCountControlBlockManager{IceUtil::Time::milliSeconds(100)});
951 }
952
953 void
954 ArmarXManager::cleanupSchedulers()
955 {
956 std::scoped_lock lock(terminatingObjectsMutex);
957
958 ObjectSchedulerList::iterator iter = terminatingObjects.begin();
959
960 while (iter != terminatingObjects.end())
961 {
962 const ArmarXObjectSchedulerPtr& sched = *iter;
963 ARMARX_VERBOSE << deactivateSpam(1, sched->getObject()->getName())
964 << "Checking termination state of " << sched->getObject()->getName()
965 << ": "
966 << ManagedIceObject::GetObjectStateAsString(sched->getObjectState());
967 // remove terminated starters
968 if ((*iter)->isTerminated())
969 {
970 ARMARX_VERBOSE << "Delayed Removal of ManagedIceObject "
971 << (*iter)->getObject()->getName() << " finished";
972 iter = terminatingObjects.erase(iter);
973 }
974 else
975 {
976 iter++;
977 }
978 }
979 }
980
981 void
982 ArmarXManager::disconnectDependees(const std::string& object)
983 {
984 try
985 {
986 std::vector<std::string> dependees = getDependendees(object);
987
988 auto lock = acquireManagedObjectsMutex();
989 if (!lock)
990 {
991 return;
992 }
993
994 for (const auto& dependee : dependees)
995 {
996 ArmarXManager::ObjectSchedulerMap::iterator it = managedObjects.find(dependee);
997 ARMARX_INFO << deactivateSpam(10, dependee + object) << "'" << dependee
998 << "' disconnected because of '" << object << "'";
999
1000 if (it != managedObjects.end())
1001 {
1002 it->second->disconnected(true);
1003 }
1004 }
1005 }
1006 catch (...)
1007 {
1008 throw;
1009 }
1010 }
1011
1012 void
1013 ArmarXManager::disconnectAllObjects()
1014 {
1015 auto lock = acquireManagedObjectsMutex();
1016 if (!lock)
1017 {
1018 return;
1019 }
1020 ObjectSchedulerMap::iterator iter = managedObjects.begin();
1021
1022 for (; iter != managedObjects.end(); iter++)
1023 {
1024 iter->second->disconnected(false);
1025 }
1026 }
1027
1028 std::vector<std::string>
1029 ArmarXManager::getDependendees(const std::string& removedObject)
1030 {
1031 std::vector<std::string> result;
1032
1033 auto lock = acquireManagedObjectsMutex();
1034 if (!lock)
1035 {
1036 return result;
1037 }
1038
1039 try
1040 {
1041
1042 ObjectSchedulerMap::const_iterator it = managedObjects.begin();
1043
1044 for (; it != managedObjects.end(); it++)
1045 {
1046 ArmarXObjectSchedulerPtr scheduler = it->second;
1047
1048 if (scheduler->dependsOn(removedObject))
1049 {
1050 result.push_back(it->first);
1051 }
1052 }
1053 }
1054 catch (...)
1055 {
1056 throw;
1057 }
1058
1059 return result;
1060 }
1061
1062 void
1063 ArmarXManager::wakeupWaitingSchedulers()
1064 {
1065 auto lock = acquireManagedObjectsMutex();
1066 if (!lock)
1067 {
1068 return;
1069 }
1070
1071 try
1072 {
1073
1074 ObjectSchedulerMap::const_iterator it = managedObjects.begin();
1075
1076 for (; it != managedObjects.end(); it++)
1077 {
1078 ArmarXObjectSchedulerPtr scheduler = it->second;
1079 scheduler->wakeupDependencyCheck();
1080 }
1081 }
1082 catch (...)
1083 {
1084 throw;
1085 }
1086 }
1087
1088 void
1089 ArmarXManager::removeAllObjects(bool blocking)
1090 {
1091 ObjectSchedulerMap tempMap;
1092 {
1093 std::scoped_lock lock(managedObjectsMutex);
1094 tempMap = managedObjects;
1095 }
1096
1097 for (auto& it : tempMap)
1098 {
1099 removeObject(it.second, false);
1100 }
1101
1102 if (blocking)
1103 {
1104 for (auto objectScheduler : terminatingObjects)
1105 {
1106 objectScheduler->waitForTermination();
1107 }
1108
1109 terminatingObjects.clear();
1110 }
1111 }
1112
1113 bool
1114 ArmarXManager::removeObject(const ArmarXObjectSchedulerPtr& objectScheduler, bool blocking)
1115 {
1116 if (!objectScheduler)
1117 {
1118 return true;
1119 }
1120
1121 const std::string objName = objectScheduler->getObject()->getName();
1122
1123 try
1124 {
1125 iceManager->removeObject(objName);
1126 {
1127 std::scoped_lock lock2(managedObjectsMutex);
1128 managedObjects.erase(objectScheduler->getObject()->getName());
1129 }
1130 // terminate
1131 objectScheduler->terminate();
1132
1133 if (blocking)
1134 {
1135 objectScheduler->waitForTermination();
1136 ARMARX_VERBOSE << "Blocking removal of ManagedIceObject " << objName << " finished";
1137 }
1138 else // only move to terminating list if delayed removal
1139 {
1140 ARMARX_VERBOSE << "Inserting ManagedIceObject into delayed removal list: "
1141 << objName;
1142 std::scoped_lock lockTerm(terminatingObjectsMutex);
1143 terminatingObjects.push_back(objectScheduler);
1144 }
1145
1146 return true;
1147 }
1148 catch (...)
1149 {
1150 ARMARX_ERROR << "Removing of object '" << objName << "' failed with an exception!\n"
1152 }
1153
1154 return false;
1155 }
1156
1157 //bool ArmarXManager::removeObject(const ObjectSchedulerMap::iterator& iter, bool blocking)
1158 //{
1159 // if (iter != managedObjects.end())
1160 // {
1161 // const std::string objName = iter->second->getObject()->getName();
1162
1163 // try
1164 // {
1165
1166 // ARMARX_VERBOSE << "Removing ManagedIceObject " << objName << " - blocking = " << blocking;
1167
1168
1169 // ArmarXObjectSchedulerPtr objectScheduler = iter->second;
1170
1171
1172 // // managedObjects.erase(iter);
1173 // return removeObject(objectScheduler, blocking);
1174 // }
1175 // catch (...)
1176 // {
1177 // ARMARX_ERROR << "Removing of object '" << objName << "' failed with an exception!";
1178 // handleExceptions();
1179 // }
1180 // }
1181
1182 // return false;
1183 //}
1184
1186 ArmarXManager::findObjectScheduler(const std::string& objectName) const
1187 {
1188 std::scoped_lock lock(managedObjectsMutex);
1189 auto it = managedObjects.find(objectName);
1190 if (it != managedObjects.end())
1191 {
1192 return it->second;
1193 }
1194 else
1195 {
1196 return ArmarXObjectSchedulerPtr();
1197 }
1198 }
1199
1200 void
1201 ArmarXManager::checkDependencies()
1202 {
1203
1204
1205 auto lock = acquireManagedObjectsMutex();
1206 if (!lock)
1207 {
1208 return;
1209 }
1210
1211 try
1212 {
1213 ObjectSchedulerMap::const_iterator it = managedObjects.begin();
1214
1215 for (; it != managedObjects.end(); it++)
1216 {
1217 const ArmarXObjectSchedulerPtr& scheduler = it->second;
1218
1219 if (scheduler->getObjectState() == eManagedIceObjectStarted &&
1220 !scheduler->checkDependenciesStatus())
1221 {
1222 scheduler->disconnected(true);
1223 }
1224 }
1225 }
1226 catch (...)
1227 {
1228 throw;
1229 }
1230 }
1231
1232 auto
1233 ArmarXManager::acquireManagedObjectsMutex() -> ScopedRecursiveLockPtr
1234 {
1235 // assure no state change until lock of managedObjectsMutex
1236 std::scoped_lock lock(managerStateMutex);
1237
1238 if (managerState >= eShutdownInProgress)
1239 {
1240 return ScopedRecursiveLockPtr();
1241 }
1242
1243 // lock access to managed objects
1245 new std::scoped_lock<std::recursive_mutex>(managedObjectsMutex));
1246
1247 return lock2;
1248 }
1249
1250 void
1251 ArmarXManager::installProcessFacet()
1252 {
1254 Ice::CommunicatorPtr applicationCommunicator = getCommunicator();
1255 ARMARX_CHECK_EXPRESSION(applicationCommunicator);
1256 // remove default Ice::Process facet
1257 if (applicationCommunicator->findAdminFacet("Process"))
1258 {
1259 applicationCommunicator->removeAdminFacet("Process");
1260 }
1261 // create and register new Ice::Process facet
1262 Ice::ProcessPtr applicationProcessFacet = new ApplicationProcessFacet(*this);
1263 applicationCommunicator->addAdminFacet(applicationProcessFacet, "Process");
1264
1265 IcePropertyChangeCallbackPtr propertyChangeCallback =
1266 new IcePropertyChangeCallback(appInstance);
1267
1268 Ice::ObjectPtr obj = applicationCommunicator->findAdminFacet("Properties");
1269 Ice::NativePropertiesAdminPtr admin = Ice::NativePropertiesAdminPtr::dynamicCast(obj);
1270 if (admin)
1271 {
1272 admin->addUpdateCallback(propertyChangeCallback);
1273 }
1274 else
1275 {
1277 << "Could not get properties admin - online property changing will not work";
1278 }
1279
1280 // activate new Ice::Process facet and PropertyChangeCallback
1281 applicationCommunicator->getAdmin();
1282 }
1283
1285
1286 void
1288 {
1289 std::unique_lock lock(FactoryCollectionBase_RegistrationListMutex());
1290
1291 for (const FactoryCollectionBasePtr& preregistration :
1293 {
1294 //ARMARX_INFO_S << "looking for " << preregistration->getFactories();
1295 for (const auto& id2factory : preregistration->getFactories())
1296 {
1297 if (!ic->getValueFactoryManager()->find(id2factory.first))
1298 {
1299 //ARMARX_IMPORTANT_S << "adding object factory for " << id2factory.first;
1300 ic->getValueFactoryManager()->add(id2factory.second, id2factory.first);
1301 }
1302 }
1303 }
1304 }
1305
1306 void
1311
1314 {
1315 return armarxManagerAdapter;
1316 }
1317
1318 const std::shared_ptr<SharedRemoteHandleState>&
1320 {
1321 return sharedRemoteHandleState;
1322 }
1323
1324 void
1326 {
1327 std::scoped_lock lock(schedulerListMutex);
1328 for (int i = 0; i < increaseBy; ++i)
1329 {
1330 singleThreadedSchedulers.push_back(new ArmarXMultipleObjectsScheduler());
1331 }
1332 }
1333
1334 bool
1335 LoadLibFromAbsolutePath(const std::string& path)
1336 {
1337 //these variables should be shared between ArmarXManager -> use static variables
1338 static std::map<std::string, DynamicLibraryPtr> loadedLibs;
1339 static std::mutex libsMutex;
1340 std::lock_guard<std::mutex> guard{libsMutex};
1341 if (loadedLibs.count(path))
1342 {
1343 return true;
1344 }
1346 try
1347 {
1348 lib->load(path);
1349 }
1350 catch (...)
1351 {
1353 return false;
1354 }
1355
1356 if (lib->isLibraryLoaded())
1357 {
1358 ARMARX_INFO << "Loaded library " << path;
1359 loadedLibs[path] = lib;
1360 }
1361 else
1362 {
1363 ARMARX_ERROR << "Could not load lib " + path + ": " + lib->getErrorMessage();
1364 return false;
1365 }
1366 return true;
1367 }
1368
1369 // bool ArmarXManager::loadLibFromPath(const std::string& path)
1370 // {
1371 // std::string absPath;
1372 // if (ArmarXDataPath::getAbsolutePath(path, absPath))
1373 // {
1374 // return LoadLibFromAbsolutePath(absPath);
1375 // }
1376 // ARMARX_ERROR << "Could not find library " + path;
1377 // return false;
1378 // }
1379 // bool ArmarXManager::loadLibFromPackage(const std::string& package, const std::string& libname)
1380 // {
1381 // CMakePackageFinder finder(package);
1382 // if (!finder.packageFound())
1383 // {
1384 // ARMARX_ERROR << "Could not find package '" << package << "'";
1385 // return false;
1386 // }
1387 //
1388 // for (auto libDirPath : Split(finder.getLibraryPaths(), ";"))
1389 // {
1390 // std::filesystem::path fullPath = libDirPath;
1391 // fullPath /= "lib" + libname + "." + DynamicLibrary::GetSharedLibraryFileExtension();
1392 // if (!std::filesystem::exists(fullPath))
1393 // {
1394 // fullPath = libDirPath;
1395 // fullPath /= libname;
1396 // if (!std::filesystem::exists(fullPath))
1397 // {
1398 // continue;
1399 // }
1400 // }
1401 // if (LoadLibFromAbsolutePath(fullPath.string()))
1402 // {
1403 // return true;
1404 // }
1405 // }
1406 // ARMARX_ERROR << "Could not find library " << libname << " in package " << package;
1407 // return false;
1408 // }
1409
1410
1412 armarx::ArmarXManager::getMetaInfo(const std::string& objectName, const Ice::Current&)
1413 {
1414 std::scoped_lock lock(managedObjectsMutex);
1415 StringStringDictionary propertyMap;
1416 ObjectSchedulerMap::iterator iter = managedObjects.find(objectName);
1417
1418 if (iter == managedObjects.end())
1419 {
1420 return StringVariantBaseMap();
1421 }
1422
1423 return iter->second->getObject()->getMetaInfoMap();
1424 }
1425} // namespace armarx
#define OBJOBSNAME
#define MANAGEROBJNAME
constexpr T c
static ApplicationPtr getInstance()
Retrieve shared pointer to the application object.
static void initDataPaths(const std::string &dataPathList)
const Ice::ObjectAdapterPtr & getAdapter() const
Ice::StringSeq getObjectNames(const Ice::Current &c=Ice::emptyCurrent) override
Retrieve the names of all ManagedIceObject.
void setGlobalMinimumLoggingLevel(MessageTypeT minimumLoggingLevel)
Set minimum logging level to output in log stream.
void enableProfiling(bool enable)
Enable or disable profiling of CPU Usage.
mice::MiceObjectConnectivity getMiceObjectConnectivity(const std::string &objectName, const Ice::Current &c=Ice::emptyCurrent) override
Retrieve connectivity of a ManagedIceObject.
const std::shared_ptr< SharedRemoteHandleState > & getSharedRemoteHandleState() const
ArmarXManager(std::string applicationName, int port=4061, std::string host="localhost", std::string locatorName="IceGrid/Locator", Ice::StringSeq args=Ice::StringSeq())
ArmarXManager constructor.
Ice::PropertiesAdminPrx getPropertiesAdmin(const Ice::Current &=Ice::emptyCurrent) override
void updateComponentIceProperties(const Ice::PropertyDict &properties)
const IceManagerPtr & getIceManager() const
Retrieve the instance of the icemanager.
friend class PeriodicTask< ArmarXManager >
ManagedIceObjectConnectivity getObjectConnectivity(const std::string &objectName, const Ice::Current &c=Ice::emptyCurrent) override
Retrieve connectivity of a ManagedIceObject.
IceMX::MetricsAdminPrx getMetricsAdmin(const Ice::Current &=Ice::emptyCurrent) override
void waitForShutdown()
Wait for shutdown.
virtual void addObjectAsync(const ManagedIceObjectPtr &object, const std::string &objectName, bool addWithOwnAdapter=true, bool useOwnScheduleThread=true)
void addObject(const ManagedIceObjectPtr &object, bool addWithOwnAdapter=true, const std::string &objectName="", bool useOwnScheduleThread=true) override
Add a ManagedIceObject to the manager.
::armarx::StringStringDictionary getObjectProperties(const ::std::string &objectName, const ::Ice::Current &=Ice::emptyCurrent) override
getObjectProperties is used to retrieve the properties of an object
const Ice::CommunicatorPtr & getCommunicator() const
static bool CheckIceConnection(const Ice::CommunicatorPtr &communicator, bool printHint)
void shutdown()
Shuts down the ArmarXManager.
void increaseSchedulers(int increaseBy)
increased the number of single threaded schedulers.
std::string getHostname(const Ice::Current &c=Ice::emptyCurrent) override
Gets the hostname of the host running the manager.
static void RegisterKnownObjectFactoriesWithIce(const Ice::CommunicatorPtr &ic)
Registers all object factories that are known with Ice.
void removeObjectBlocking(const ManagedIceObjectPtr &object) override
Removes an object from the manager.
void setDataPaths(std::string dataPaths)
Set data paths used to search for datafiles.
void asyncShutdown(std::size_t timeoutMs=0)
Calls shutdown() after a timeout.
StringVariantBaseMap getMetaInfo(const std::string &, const Ice::Current &) override
std::vector< ManagedIceObjectPtr > getManagedObjects() override
Retrieve pointers to all ManagedIceObject.
bool checkIceConnection(bool printHint=true) const
bool isShutdown()
Whether ArmarXManager shutdown has been finished.
void enableLogging(bool enable)
Enable or disable logging.
Ice::StringSeq getManagedObjectNames(const Ice::Current &c=Ice::emptyCurrent) override
Retrieve the names of all ManagedIceObject.
ManagedIceObjectState getObjectState(const std::string &objectName, const Ice::Current &c=Ice::emptyCurrent) override
Retrieve state of a ManagedIceObject.
ObjectPropertyInfos getObjectPropertyInfos(const ::std::string &objectName, const ::Ice::Current &) override
void removeObjectNonBlocking(const ManagedIceObjectPtr &object) override
Removes an object from the manager.
ObjectPropertyInfos getApplicationPropertyInfos(const ::Ice::Current &) override
void registerKnownObjectFactoriesWithIce()
non static convenience version of ArmarXManager::RegisterKnownObjectFactoriesWithIce()
void setComponentIceProperties(const Ice::PropertiesPtr &properties)
Calls Component::setIceProperties() on all components assigend to this ArmarXManager Instance.
friend class ArmarXObjectObserver
Takes care of the lifecycle management of ManagedIceObjects.
The DynamicLibrary class provides a mechanism to load libraries at runtime.
static std::vector< FactoryCollectionBasePtr > & PreregistrationList()
The IceManager class provides simplified access to commonly used Ice features.
Definition IceManager.h:106
static IceStorm::TopicManagerPrx GetTopicManager(Ice::CommunicatorPtr communicator)
IcePropertyChangeCallback(armarx::ApplicationPtr application)
void updated(const Ice::PropertyDict &changes) override
static LocalTimeServerPtr getApplicationTimeServer()
Get the applications LocalTimeServer instance.
static void SetGlobalMinimumLoggingLevel(MessageTypeT level)
With setGlobalMinimumLoggingLevel the minimum verbosity-level of log-messages can be set for the whol...
static void SetSendLoggingActivated(bool activated=true)
static void setProxy(const std::string &componentName, LogPrx logProxy)
static void SetLoggingActivated(bool activated=true, bool showMessage=true)
setLoggingActivated() is used to activate or disable the logging facilities in the whole application
SpamFilterDataPtr deactivateSpam(float deactivationDurationSec=10.0f, const std::string &identifier="", bool deactivate=true) const
disables the logging for the current line for the given amount of seconds.
Definition Logging.cpp:99
MessageTypeT minimumLoggingLevel
Definition Logging.h:279
void setTag(const LogTag &tag)
Definition Logging.cpp:54
static std::string GetObjectStateAsString(int state)
static ThreadListPtr getApplicationThreadList()
getApplicationThreadList retrieves the ThreadList, that contains all TimerTasks and PeriodicTasks in ...
static void SetTimeServer(LocalTimeServerPtr ts)
Definition TimeUtil.cpp:106
#define ARMARX_CHECK_EXPRESSION(expression)
This macro evaluates the expression and if it turns out to be false it will throw an ExpressionExcept...
#define ARMARX_INFO
The normal logging level.
Definition Logging.h:181
#define ARMARX_ERROR
The logging level for unexpected behaviour, that must be fixed.
Definition Logging.h:196
#define ARMARX_WARNING
The logging level for unexpected behaviour, but not a serious problem.
Definition Logging.h:193
#define ARMARX_VERBOSE
The logging level for verbose information.
Definition Logging.h:187
std::shared_ptr< ScopedRecursiveLock > ScopedRecursiveLockPtr
::IceInternal::Handle<::Ice::Properties > PropertiesPtr
::IceInternal::Handle<::Ice::Communicator > CommunicatorPtr
Definition IceManager.h:49
::IceInternal::Handle<::Ice::ObjectAdapter > ObjectAdapterPtr
Definition IceManager.h:52
This file offers overloads of toIce() and fromIce() functions for STL container types.
::IceInternal::ProxyHandle<::IceProxy::armarx::Log > LogPrx
Definition LogSender.h:58
IceUtil::Handle< ArmarXManager > ArmarXManagerPtr
std::mutex & FactoryCollectionBase_RegistrationListMutex()
std::map< std::string, VariantBasePtr > StringVariantBaseMap
IceUtil::Handle< ArmarXObjectScheduler > ArmarXObjectSchedulerPtr
Definition ArmarXFwd.h:33
std::string GetHandledExceptionString()
void handleExceptions()
bool LoadLibFromAbsolutePath(const std::string &path)
IceUtil::Handle< Application > ApplicationPtr
Definition Application.h:93
IceUtil::Handle< FactoryCollectionBase > FactoryCollectionBasePtr
IceUtil::Handle< IcePropertyChangeCallback > IcePropertyChangeCallbackPtr
IceUtil::Handle< IceManager > IceManagerPtr
IceManager smart pointer.
Definition ArmarXFwd.h:39
MessageTypeT
Definition LogSender.h:46
IceInternal::Handle< Component > ComponentPtr
Component smart pointer type.
Definition ArmarXFwd.h:45
IceInternal::Handle< ManagedIceObject > ManagedIceObjectPtr
Definition ArmarXFwd.h:42
std::shared_ptr< DynamicLibrary > DynamicLibraryPtr
This holds the shared state of all RemoteHandleControlBlocks for one armarx manager.
static const unsigned int DEFAULT_DELETION_DELAY
The amount of time (in ms) required to pass after a RemoteHandleControlBlock's usecount has reacht ze...