Source code for armarx_vision.image_provider
import logging
from abc import ABC
import time
import warnings
import numpy as np
from armarx_core.ice_manager import register_object
from armarx_core.ice_manager import get_topic
from visionx import ImageProcessorInterfacePrx
from visionx import ImageProviderInterface
from visionx import ImageFormatInfo
from visionx import ImageType
from armarx import MetaInfoSizeBase
logger = logging.getLogger(__name__)
[docs]class ImageProvider(ImageProviderInterface, ABC):
""" """
def __init__(
self, name: str, num_images: int = 2, width: int = 640, height: int = 480
):
super().__init__()
self.name = name
self.image_format = self._get_image_format(width, height)
self.data_dimensions = (
num_images,
height,
width,
self.image_format.bytesPerPixel,
)
self.images = np.zeros(self.data_dimensions, dtype=np.uint8)
image_size = self.image_format.bytesPerPixel * width * height
self.info = MetaInfoSizeBase(image_size, image_size)
self.image_topic = None
self.proxy = None
# self.register()
[docs] def register(self):
warnings.warn("Replaced with on_connect", DeprecationWarning)
self.on_connect()
[docs] def on_connect(self):
"""
Register the image provider.
"""
logger.debug("registering image processor %s", self.name)
self.proxy = register_object(self, self.name)
self.image_topic = get_topic(
ImageProcessorInterfacePrx, f"{self.name}.ImageListener"
)
[docs] def update_image(self, images, time_provided=0):
warnings.warn("Replaced with update_images", DeprecationWarning)
self.update_images(images, time_provided)
[docs] def update_images(self, images: np.ndarray, time_provided: int = 0):
"""
Publish a new image
:param images: the images to publish
:param time_provided: time stamp of the images. If zero the current time will be used
"""
self.images = images
self.info.timeProvided = time_provided or int(time.time() * 1000.0 * 1000.0)
if self.image_topic:
self.image_topic.reportImageAvailable(self.name)
else:
logger.warning("not registered. call register() method")
def _get_image_format(self, width, height):
image_format = ImageFormatInfo()
image_format.bytesPerPixel = 3
image_format.dimension.width = width
image_format.dimension.height = height
image_format.type = ImageType.eRgb
return image_format
[docs] def getImages(self, current=None):
logger.debug("getImages()")
return memoryview(self.images)
[docs] def getNumberImages(self, current=None):
return self.data_dimensions[0]
[docs] def hasSharedMemorySupport(self, current=None):
return False
[docs] def shutdown(self, current=None):
current.adapter.getCommunicator().shutdown()