Source code for armarx_vision.image_processor
import logging
import threading
import warnings
from abc import ABC
from abc import abstractmethod
from typing import Tuple
from typing import Any
from typing import Union
import numpy as np
from armarx_core.ice_manager import using_topic
from armarx_core.ice_manager import get_proxy
from armarx_core.ice_manager import register_object
from armarx_core.ice_manager import is_alive
from armarx_vision.image_provider import ImageProvider
from visionx import ImageProviderInterfacePrx
from visionx import ImageProcessorInterface
from armarx import MetaInfoSizeBase
from .shm_tools import path_to_shm
from .shm_tools import read_images_shm
logger = logging.getLogger(__name__)
[docs]class ImageProcessor(ImageProcessorInterface, ABC):
"""
An abstract class to process images
.. highlight:: python
.. code-block:: python
class TestImageProcessor(ImageProcessor):
def process_images(self, images, info):
info.timestamp = time.time()
return np.random.random(images.shape) * 128, info
"""
def __init__(self, provider_name: str, num_result_images: int = None):
super().__init__()
self.provider_name = provider_name
self.num_result_images = num_result_images
self._thread = threading.Thread(target=self._process)
self.image_available = False
self.cv = threading.Condition()
self.image_source = None
self.result_image_provider = None
self.shm_path = None
[docs] def reportImageAvailable(self, provider_name, current=None):
with self.cv:
self.image_available = True
self.cv.notify()
def _get_images_and_info(self):
if self.shm_path:
info = MetaInfoSizeBase() # self.image_source.getMetaInfo()
images = read_images_shm(self.shm_path, self.data_dimensions)
else:
image_buffer, info = self.image_source.getImagesAndMetaInfo()
images = np.frombuffer(image_buffer, dtype=np.uint8).reshape(
self.data_dimensions
)
return images, info
def _process(self):
while is_alive():
with self.cv:
self.cv.wait_for(lambda: self.image_available)
input_images, info = self._get_images_and_info()
if hasattr(self, "process_image") and callable(self.process_image):
warnings.warn(
"Replaced with process_image(images, info)", DeprecationWarning
)
result = self.process_image(input_images)
else:
result = self.process_images(input_images, info)
if not result:
logger.warning("Unable to get images")
return
elif isinstance(result, tuple):
result_images, info = result
else:
result_images = result
info.timeProvided = 0
self.result_image_provider.update_image(
result_images, info.timeProvided
)
[docs] @abstractmethod
def process_images(
self, images: np.ndarray, info: MetaInfoSizeBase
) -> Union[np.ndarray, Tuple[np.ndarray, MetaInfoSizeBase]]:
"""
This function is called everytime a new image is available.
Results are automatically published.
:param images: the new images
:param info: meta information about the image
:returns: Either the result images only or a tuple containing the result image and the info
"""
pass
[docs] def register(self):
warnings.warn("Replaced with on_connect", DeprecationWarning)
self.on_connect()
[docs] def shutdown(self):
warnings.warn("Replaced with on_disconnect", DeprecationWarning)
self.on_disconnect()
[docs] def on_disconnect(self):
self._image_listener_topic.unsubscribe(self._proxy)
[docs] def on_connect(self):
logger.debug("Registering image processor")
proxy = register_object(self, self.__class__.__name__)
self.image_source = get_proxy(ImageProviderInterfacePrx, self.provider_name)
self.shm_path = path_to_shm(self.provider_name)
number_of_images = self.image_source.getNumberImages()
image_format = self.image_source.getImageFormat()
d = image_format.dimension
if self.num_result_images is None:
self.num_result_images = self.image_source.getNumberImages()
self.result_image_provider = ImageProvider(
f"{self.__class__.__name__}Result",
self.num_result_images,
d.width,
d.height,
)
self.result_image_provider.register()
self.data_dimensions = (
number_of_images,
image_format.dimension.height,
image_format.dimension.width,
image_format.bytesPerPixel,
)
logger.debug("data dimensions %s", self.data_dimensions)
self._thread.start()
self._image_listener_topic = using_topic(
proxy, f"{self.provider_name}.ImageListener"
)