Source code for armarx_core.ice_manager

import os
import time

import logging
from typing import Any
from typing import TypeVar

from functools import lru_cache

import Ice
from Ice import NotRegisteredException

from IceGrid import RegistryPrx
from IceGrid import ObjectExistsException

from IceStorm import TopicManagerPrx
from IceStorm import NoSuchTopic
from IceStorm import AlreadySubscribed

from .config import get_ice_config_files
from .name_helper import get_ice_default_name

logger = logging.getLogger(__name__)


T = TypeVar("T")


[docs]def register_object( ice_object: Ice.Object, ice_object_name: str = None ) -> Ice.ObjectPrx: """ Register a local ice object under the given name :param ice_object: Local ice object instance :param ice_object_name: Name with which the object should be registered :return: Proxy to this object """ if not isinstance(ice_object, Ice.Object): logger.error("ice object is not an Ice.Object") raise ValueError("ice_object is not an Ice.Object") if not ice_object_name: ice_object_name = ice_object.__class__.__name__ logger.debug("Using class name %s to register the object", ice_object_name) adapter = freezer().communicator.createObjectAdapterWithEndpoints( ice_object_name, "tcp" ) ice_object_id = freezer().communicator.stringToIdentity(ice_object_name) adapter.add(ice_object, ice_object_id) adapter.activate() proxy = adapter.createProxy(ice_object_id) admin = get_admin() try: logger.info("adding new object %s", ice_object_name) admin.addObjectWithType(proxy, proxy.ice_id()) except ObjectExistsException: logger.info("updating new object %s", ice_object_name) admin.updateObject(proxy) return proxy
[docs]def get_topic(cls: T, topic_name: str = None) -> T: """ Retrieve a topic proxy casted to the first parameter :param cls: Type of the topic :param topic_name: Name of the topic :type topic_name: str :return: a casted topic proxy """ topic_name = topic_name or get_ice_default_name(cls) topic_manager = TopicManagerPrx.checkedCast( freezer().communicator.stringToProxy("IceStorm/TopicManager") ) topic = None try: topic = topic_manager.retrieve(topic_name) except NoSuchTopic: topic = topic_manager.create(topic_name) logger.info("Publishing to topic %s", topic_name) pub = topic.getPublisher().ice_oneway() return cls.uncheckedCast(pub)
[docs]def using_topic(proxy, topic_name: str = None): """ .. seealso:: :func:`register_object` :param proxy: the instance where the topic event should be called :param topic_name: the name of the topic to connect to :type topic_name: str """ topic_manager = TopicManagerPrx.checkedCast( freezer().communicator.stringToProxy("IceStorm/TopicManager") ) topic = None topic_name = topic_name or get_ice_default_name(proxy.__class__) try: topic = topic_manager.retrieve(topic_name) except NoSuchTopic: topic = topic_manager.create(topic_name) try: topic.subscribeAndGetPublisher(None, proxy) except AlreadySubscribed: topic.unsubscribe(proxy) topic.subscribeAndGetPublisher(None, proxy) logger.info("Subscribing to topic %s", topic_name) return topic
[docs]def wait_for_dependencies(proxy_names, timeout: int = 0): """ waits for a dependency list :param proxy_names: the proxy names to wait for :type proxy_names: list of str :returns: True if all dependencies are resolved :rtype: bool """ start_time = time.time() while not freezer().communicator.isShutdown(): if timeout and (start_time + timeout) < time.time(): logging.exception("Timeout while waiting for proxies %s", proxy_names) return False dependencies_resolved = True for proxy_name in proxy_names: try: proxy = freezer().communicator.stringToProxy(proxy_name) proxy.ice_ping() except NotRegisteredException: dependencies_resolved = False if dependencies_resolved: return True else: time.sleep(0.1)
[docs]def wait_for_proxy(cls, proxy_name: str = None, timeout: int = 0): """ waits for a proxy. :param cls: the class definition of an ArmarXComponent :param proxy_name: name of the proxy :param timeout: timeout in seconds to wait for the proxy. Zero means to wait forever :returns: the retrieved proxy :rtype: an instance of cls """ proxy = None proxy_name = proxy_name or get_ice_default_name(cls) start_time = time.time() while not freezer().communicator.isShutdown() and proxy is None: try: proxy = freezer().communicator.stringToProxy(proxy_name) proxy_cast = cls.checkedCast(proxy) return proxy_cast except NotRegisteredException: proxy = None if timeout and (start_time + timeout) < time.time(): logging.exception("Timeout while waiting for proxy %s", proxy_name) return None else: logger.debug("Waiting for proxy %s", proxy_name) time.sleep(0.1)
[docs]def get_proxy(cls: T, proxy_name: str = None) -> T: """ Connects to a proxy. :param cls: the class definition of an ArmarXComponent :param proxy_name: name of the proxy :type proxy_name: str :returns: the retrieved proxy :rtype: an instance of cls :raises: Ice::NotRegisteredException if the proxy is not available """ proxy_name = proxy_name or get_ice_default_name(cls) try: proxy = freezer().communicator.stringToProxy(proxy_name) return cls.checkedCast(proxy) except NotRegisteredException: logging.exception("Proxy %s does not exist", proxy_name)
def get_admin(): return freezer().registry.createAdminSession("user", "password").getAdmin() def is_connected(ice_node_name: str) -> bool: return get_admin().pingNode(ice_node_name)
[docs]def is_alive() -> bool: """ checks if shutdown has been invoked on the communicator. :returns: true if ice grid registry is alive """ return not freezer().communicator.isShutdown()
[docs]def wait_for_shutdown(): """ sleeps until the ice communicator receives a shutdown signal or the program receives a keyboard interrupt """ try: freezer().communicator.waitForShutdown() except KeyboardInterrupt: pass
[docs]@lru_cache(maxsize=1) def freezer(): """ """ return Freezer()
def test_connection(): if not is_connected("NodeMain"): logger.error("Ice is not running.") raise Exception("Ice is not running.") class Freezer: registry: RegistryPrx = None communicator = None def __init__(self): ice_config_files = get_ice_config_files() ice_communicator = Ice.initialize( ["--Ice.Config={}".format(",".join(ice_config_files))] ) ice_registry_proxy = ice_communicator.stringToProxy("IceGrid/Registry") ice_registry = RegistryPrx.checkedCast(ice_registry_proxy) self.communicator = ice_communicator self.registry = ice_registry import atexit atexit.register(self.__del__) @property def admin(self): return self.registry.createAdminSession("user", "password").getAdmin() def __enter__(self): return self.communicator def __exit__(self, exc_type, exc_val, exc_tb): pass def shutdown(self): if not self.communicator: return try: self.communicator.destroy() except Exception as ex: logger.error("Error while shutting down ice communicator") finally: self.communicator = None def __del__(self): self.shutdown()