Skip to content

Commit

Permalink
Motec settings api (#115)
Browse files Browse the repository at this point in the history
* use new list of tuple return type of scan_for_cameras

* add motec_settings_interface

* add mjpeg camera tests

* add parameters to mjpeg camera

* require control_port to not be None

* implement async interface (with cooldowns)

* mark tests as asyncio

* remove usage of nicegui background task
this is necessary for usage without nicegui (for example in tests) since background_task depends on the nicegui core loop

* assume that getter and setter functions can be async
(and make them async in mjpeg_camera)

* ensure fixed event loop

* catch errors resulting from race conditions with a disconnect

* add docstrings

* remove unused import

* fix async errors in rtsp camera

* code review

* check if result is awaitable

* simplify start_capture_task

* tiny fix

* remove deprecated event_loop fixture

* fix missing value in call to setter

* fix rtsp_camera test

* make setter return type None or Awaitable[None]

* fix missing rename to polling_interval

* fix capture_task creation

* give ip to camera directly

* improve RtspDevice logging

* use fixtures as arguments

* cleanup

---------

Co-authored-by: Falko Schindler <falko@zauberzeug.com>
  • Loading branch information
NiklasNeugebauer and falkoschindler authored May 24, 2024
1 parent 73acff7 commit 3190cb0
Show file tree
Hide file tree
Showing 9 changed files with 336 additions and 38 deletions.
51 changes: 33 additions & 18 deletions rosys/vision/camera/configurable_camera.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import Any, Callable, Optional, overload
from typing import Any, Awaitable, Callable, Optional, overload

from .camera import Camera

Expand Down Expand Up @@ -31,23 +31,22 @@ class ConfigurableCamera(Camera):
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._parameters: dict[str, Parameter] = {}
self._pending_operations: int = 0

@overload
def _register_parameter(self, name: str, getter: Callable[[], Any], setter: Callable[[Any], None], default_value: Any) -> None:
def _register_parameter(self, name: str, getter: Callable[[], Any], setter: Callable[[Any], None | Awaitable[None]], default_value: Any) -> None:
...

@overload
def _register_parameter(self, name: str, getter: Callable[[], Any], setter: Callable[[Any], None], default_value: Any,
def _register_parameter(self, name: str, getter: Callable[[], Any], setter: Callable[[Any], None | Awaitable[None]], default_value: Any,
min_value: Any, max_value: Any, step: Any) -> None:
...

def _register_parameter(self, name: str, getter: Callable[[], Any], setter: Callable[[Any], None], default_value: Any,
def _register_parameter(self, name: str, getter: Callable[[], Any], setter: Callable[[Any], None | Awaitable[None]], default_value: Any,
min_value: Any = None, max_value: Any = None, step: Any = None) -> None:
info = ParameterInfo(name=name, min=min_value, max=max_value, step=step)
self._parameters[name] = Parameter(info=info, getter=getter, setter=setter, value=default_value)

def _apply_parameters(self, new_values: dict[str, Any], force_set: bool = False) -> None:
async def _apply_parameters(self, new_values: dict[str, Any], force_set: bool = False) -> None:
if not self.is_connected:
return
for name, value in new_values.items():
Expand All @@ -57,23 +56,39 @@ def _apply_parameters(self, new_values: dict[str, Any], force_set: bool = False)
continue
if not force_set and value == self._parameters[name].value:
continue
self._parameters[name].setter(value)
self._update_parameter_cache()

def _apply_all_parameters(self) -> None:
self._apply_parameters(self.parameters, force_set=True)
try:
result = self._parameters[name].setter(value)
if isinstance(result, Awaitable):
await result
except Exception as e:
if not self.is_connected:
return
raise e

def _update_parameter_cache(self) -> None:
await self._update_parameter_cache()

async def _apply_all_parameters(self) -> None:
await self._apply_parameters(self.parameters, force_set=True)

async def _update_parameter_cache(self) -> None:
if not self.is_connected:
return
for param in self._parameters.values():
val = param.getter()
if val is None and self.IGNORE_NONE_VALUES:
continue
param.value = val

def set_parameters(self, new_values: dict[str, Any]) -> None:
self._apply_parameters(new_values)
try:
val = param.getter()
if isinstance(val, Awaitable):
val = await val
if val is None and self.IGNORE_NONE_VALUES:
continue
param.value = val
except Exception as e:
if not self.is_connected:
return
raise e

async def set_parameters(self, new_values: dict[str, Any]) -> None:
await self._apply_parameters(new_values)

@property
def parameters(self) -> dict[str, Any]:
Expand Down
39 changes: 35 additions & 4 deletions rosys/vision/mjpeg_camera/mjpeg_camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,30 @@
from typing_extensions import Self

from ... import rosys
from ..camera import TransformableCamera
from ..camera import ConfigurableCamera, TransformableCamera
from ..image import Image
from ..image_processing import get_image_size_from_bytes, process_jpeg_image
from ..image_rotation import ImageRotation
from .mjpeg_device import MjpegDevice


class MjpegCamera(TransformableCamera):
class MjpegCamera(TransformableCamera, ConfigurableCamera):

def __init__(self,
*,
id: str, # pylint: disable=redefined-builtin
name: str | None = None,
connect_after_init: bool = True,
streaming: bool = True,
image_grab_interval: float = 0.1,
polling_interval: float = 0.1,
base_path_overwrite: str | None = None,
username: str | None = None,
password: str | None = None,
ip: str | None = None,
**kwargs: Any,
) -> None:
super().__init__(id=id, name=name, connect_after_init=connect_after_init, streaming=streaming,
image_grab_interval=image_grab_interval, base_path_overwrite=base_path_overwrite, **kwargs)
polling_interval=polling_interval, base_path_overwrite=base_path_overwrite, **kwargs)
self.log = logging.getLogger(f'rosys.vision.mjpeg_camera.{self.id}')
self.username = username
self.password = password
Expand All @@ -42,6 +42,9 @@ def __init__(self,
self.mac = parts[0]
self.device: Optional[MjpegDevice] = None

self._register_parameter('fps', self._get_fps, self._set_fps, default_value=10)
self._register_parameter('resolution', self._get_resolution, self._set_resolution, default_value=(640, 480))

def to_dict(self) -> dict:
return super().to_dict() | {
'username': self.username,
Expand Down Expand Up @@ -94,3 +97,31 @@ async def capture_image(self) -> None:
return

self._add_image(Image(camera_id=self.id, data=image, time=rosys.time(), size=final_image_resolution))

async def _set_fps(self, fps: int) -> None:
if self.device is None:
raise ValueError('Device is not connected')
assert self.device.settings_interface is not None

await self.device.settings_interface.set_fps(fps)

async def _get_fps(self) -> int:
if self.device is None:
raise ValueError('Device is not connected')
assert self.device.settings_interface is not None

return await self.device.settings_interface.get_fps()

async def _set_resolution(self, resolution: tuple[int, int]) -> None:
if self.device is None:
raise ValueError('Device is not connected')
assert self.device.settings_interface is not None

await self.device.settings_interface.set_stream_resolution(*resolution)

async def _get_resolution(self) -> tuple[int, int]:
if self.device is None:
raise ValueError('Device is not connected')
assert self.device.settings_interface is not None

return await self.device.settings_interface.get_stream_resolution()
24 changes: 18 additions & 6 deletions rosys/vision/mjpeg_camera/mjpeg_device.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
import asyncio
import logging
from asyncio import Task
from typing import AsyncGenerator, Optional

import httpx
from nicegui import background_tasks

from ...rosys import on_startup
from ..image_processing import remove_exif
from .vendors import mac_to_url
from .motec_settings_interface import MotecSettingsInterface
from .vendors import VendorType, mac_to_url, mac_to_vendor


class MjpegDevice:

def __init__(self, mac: str, ip: str, *,
index: Optional[int] = None, username: Optional[str] = None, password: Optional[str] = None) -> None:
index: Optional[int] = None,
username: Optional[str] = None,
password: Optional[str] = None,
control_port: int = 8885) -> None:
self.mac = mac
self.ip = ip
self.capture_task: Optional[Task] = None
Expand All @@ -23,10 +28,17 @@ def __init__(self, mac: str, ip: str, *,
if url is None:
raise ValueError(f'could not determine URL for {mac}')
self.url = url

if mac_to_vendor(mac) == VendorType.MOTEC:
self.settings_interface = MotecSettingsInterface(ip, port=control_port)

self.start_capture_task()

def start_capture_task(self):
self.capture_task = background_tasks.create(self.run_capture_task(), name=f'capture {self.mac}')
def start_capture_task(self) -> None:
def create_capture_task() -> None:
loop = asyncio.get_event_loop()
self.capture_task = loop.create_task(self.run_capture_task())
on_startup(create_capture_task)

async def restart_capture(self) -> None:
self.shutdown()
Expand Down Expand Up @@ -93,7 +105,7 @@ async def stream() -> AsyncGenerator[bytearray, None]:

async for image in stream():
self._image_buffer = image

self.log.warning('Capture task stopped')
self.capture_task = None

def capture(self) -> Optional[bytes]:
Expand Down
143 changes: 143 additions & 0 deletions rosys/vision/mjpeg_camera/motec_settings_interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import asyncio
import logging
import struct

from typing_extensions import Self


class AsyncTcpClient:
"""Async TCP client wrapper for sending and receiving messages to/from a server."""

def __init__(self, ip: str, port: int) -> None:
self.ip = ip
self.port = port
self.reader: asyncio.StreamReader | None = None
self.writer: asyncio.StreamWriter | None = None

async def __aenter__(self) -> Self:
self.reader, self.writer = await asyncio.open_connection(self.ip, self.port)
return self

async def __aexit__(self, *_) -> None:
assert self.writer is not None
self.writer.close()
await self.writer.wait_closed()

async def read(self, n_bytes: int, timeout_s: float = 3) -> bytes:
"""Read n_bytes from the server with a timeout.
:param n_bytes: Number of bytes to read
:param timeout_s: Timeout in seconds
:return: Bytes read from the server
:raises TimeoutError: If no response is received within the timeout
:raises RuntimeError: If the response length is not equal to n_bytes
"""
assert self.reader is not None
data = await asyncio.wait_for(self.reader.read(n_bytes), timeout_s)
if not data:
raise TimeoutError('No response received within timeout')
if not len(data) == n_bytes:
raise RuntimeError(f'Received response with unexpected length {len(data)}')

return data

async def write(self, message: bytes) -> None:
"""Write a message to the server.
:param message: Message to send
"""
assert self.writer is not None
self.writer.write(message)


class MotecSettingsInterface:

def __init__(self, ip: str, port: int) -> None:
self.ip = ip
self.port = port

self.log = logging.getLogger('MotecSettingsInterface')

self.cooldown = 0.2
self.cooldown_time = 0.0

self.event_loop = asyncio.get_event_loop()

def _construct_message(self, command_id: int, value_id: int, values: list[int]) -> bytes:
n_value_bytes = len(values)
if n_value_bytes > 6:
raise ValueError('Too many values provided')

v = [0] * 6
for i, value in enumerate(values):
v[i] = value

return struct.pack('B' * 12, 48, 2, command_id, n_value_bytes + 1, value_id, 0, *v)

async def _get_value(self, value_id: int, timeout_s: float = 3) -> bytearray:
message = self._construct_message(70, value_id, [])
self.log.debug('Sending message [get]: %s', message)

# send message
async with AsyncTcpClient(ip=self.ip, port=self.port) as client:
await client.write(message)

n_receive_bytes = 12

# receive response asynchronously with timeout
data = await client.read(n_receive_bytes, timeout_s)

values = struct.unpack('B' * n_receive_bytes, data)[6:]
self.log.debug('Received message [get]: %s', values)

self.cooldown_time = self.event_loop.time() + self.cooldown

return bytearray(values)

async def _set_value(self, value_id: int, values: list[int] | int) -> None:
if isinstance(values, int):
values = [values]

message = self._construct_message(38, value_id, values)
self.log.debug('Sending message [set]: %s', message)
# send message
async with AsyncTcpClient(ip=self.ip, port=self.port) as client:
await client.write(message)

self.cooldown_time = asyncio.get_event_loop().time() + self.cooldown

async def get_fps(self) -> int:
return (await self._get_value(178))[0]

async def set_fps(self, fps: int) -> None:
await self._set_value(178, fps)

async def get_stream_compression(self) -> int:
return (await self._get_value(177))[0]

async def set_stream_compression(self, level: int) -> None:
if not 1 <= level <= 4:
raise ValueError('Compression level must be between 1 and 4')

await self._set_value(177, level)

async def get_stream_resolution(self) -> tuple[int, int]:
wmsb, wlsb, hmsb, hlsb = (await self._get_value(179))[:4]
return (wmsb << 8) + wlsb, (hmsb << 8) + hlsb

async def set_stream_resolution(self, width: int, height: int) -> None:
wmsb = width >> 8
wlsb = width & 0xFF
hmsb = height >> 8
hlsb = height & 0xFF
await self._set_value(179, [wmsb, wlsb, hmsb, hlsb])

async def get_stream_port(self) -> int:
pmsb, plsb = (await self._get_value(200))[:2]
return (pmsb << 8) + plsb

async def set_stream_port(self, port: int) -> None:
pmsb = port >> 8
plsb = port & 0xFF
await self._set_value(200, [pmsb, plsb])
6 changes: 3 additions & 3 deletions rosys/vision/rtsp_camera/rtsp_camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async def connect(self) -> None:

self.device = RtspDevice(mac=self.id, ip=self.ip, jovision_profile=self.jovision_profile)

self._apply_all_parameters()
await self._apply_all_parameters()

async def disconnect(self) -> None:
if not self.is_connected:
Expand Down Expand Up @@ -129,8 +129,8 @@ def get_jovision_profile(self) -> Optional[int]:
return None
return self.jovision_profile

def _apply_parameters(self, new_values: dict[str, Any], force_set: bool = False) -> None:
super()._apply_parameters(new_values, force_set)
async def _apply_parameters(self, new_values: dict[str, Any], force_set: bool = False) -> None:
await super()._apply_parameters(new_values, force_set)
if self.is_connected:
assert self.device is not None
self.device.restart_gstreamer()
4 changes: 2 additions & 2 deletions rosys/vision/rtsp_camera/rtsp_camera_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ def restore(self, data: dict[str, dict]) -> None:
camera.NEW_IMAGE.register(self.NEW_IMAGE.emit)

@staticmethod
async def scan_for_cameras(network_interface: Optional[str] = None) -> list[str]:
return [mac for mac, _ in await find_known_cameras(network_interface=network_interface)]
async def scan_for_cameras(network_interface: Optional[str] = None) -> list[tuple[str, str]]:
return await find_known_cameras(network_interface=network_interface)

async def update_device_list(self) -> None:
for mac, ip in await find_known_cameras(network_interface=self.network_interface):
Expand Down
Loading

0 comments on commit 3190cb0

Please sign in to comment.