From 8f8c2c3dcc8ae8121f64031db2809adddb61a55c Mon Sep 17 00:00:00 2001 From: Devin Burke Date: Tue, 8 Oct 2024 09:05:56 +0200 Subject: [PATCH] Adding support for the Tango control system (https://www.tango-controls.org/). This relies on PyTango (https://pypi.org/project/pytango/) ophyd-async devices to asynchronous PyTango DeviceProxy objects. The control strategy relies on a shared resource called `proxy` found in the new TangoDevice class. By passing this proxy to the TangoSignalBackend of its signals, a proxy to attributes or commands of the Tango device can be established. 1. New TangoDevice and TangoReadable device classes. 2. Automated inference of the existence unannotated signals 3. Monitoring via Tango events with optional polling of attributes. 4. Tango sensitive signals are constructed by attaching a TangoSignalBackend to a Signal object or may be built using new tango_signal_* constructor methods. 5. Signal objects with a Tango backend and Tango devices should behave the same as those with EPICS or other backends. 1. As of this commit, typed commands are not supported in Ophyd-Async so Tango command signals with a type other than None are automatically built as SignalRW as a workaround. 2. Tango commands with different input/output types are not supported. 3. Pipes are not supported. 1. Extension of Device and StandardReadable to support shared resources such as the DeviceProxy. 2. Extension of the Tango backend to support typed commands. 3. Extension of the Tango backend to support pipes. Contact: Devin Burke Research software scientist Deutsches Elektronen-Synchrotron (DESY) devin.burke@desy.de --- .../tango/base_devices/_base_device.py | 68 +++++++++---------- 1 file changed, 33 insertions(+), 35 deletions(-) diff --git a/src/ophyd_async/tango/base_devices/_base_device.py b/src/ophyd_async/tango/base_devices/_base_device.py index 4db9d27991..9d01539263 100644 --- a/src/ophyd_async/tango/base_devices/_base_device.py +++ b/src/ophyd_async/tango/base_devices/_base_device.py @@ -2,7 +2,6 @@ from typing import ( TypeVar, - Union, get_args, get_origin, get_type_hints, @@ -11,7 +10,6 @@ from ophyd_async.core import ( DEFAULT_TIMEOUT, Device, - DeviceVector, Signal, ) from ophyd_async.tango.signal import ( @@ -160,28 +158,28 @@ def tango_create_children_from_annotations( if name in ("_name", "parent"): continue - device_type, is_optional = _strip_union(device_type) - if is_optional and name not in included_optional_fields: - continue - - is_device_vector, device_type = _strip_device_vector(device_type) - if is_device_vector: - n_device_vector = DeviceVector() - setattr(device, name, n_device_vector) + # device_type, is_optional = _strip_union(device_type) + # if is_optional and name not in included_optional_fields: + # continue + # + # is_device_vector, device_type = _strip_device_vector(device_type) + # if is_device_vector: + # n_device_vector = DeviceVector() + # setattr(device, name, n_device_vector) - else: - origin = get_origin(device_type) - origin = origin if origin else device_type + # else: + origin = get_origin(device_type) + origin = origin if origin else device_type - if issubclass(origin, Signal): - type_args = get_args(device_type) - datatype = type_args[0] if type_args else None - backend = make_backend(datatype=datatype, device_proxy=device.proxy) - setattr(device, name, origin(name=name, backend=backend)) + if issubclass(origin, Signal): + type_args = get_args(device_type) + datatype = type_args[0] if type_args else None + backend = make_backend(datatype=datatype, device_proxy=device.proxy) + setattr(device, name, origin(name=name, backend=backend)) - elif issubclass(origin, Device) or isinstance(origin, Device): - assert callable(origin), f"{origin} is not callable." - setattr(device, name, origin()) + elif issubclass(origin, Device) or isinstance(origin, Device): + assert callable(origin), f"{origin} is not callable." + setattr(device, name, origin()) async def _fill_proxy_entries(device: TangoDevice): @@ -211,17 +209,17 @@ async def _fill_proxy_entries(device: TangoDevice): raise e -def _strip_union(field: T | T) -> tuple[T, bool]: - if get_origin(field) is Union: - args = get_args(field) - is_optional = type(None) in args - for arg in args: - if arg is not type(None): - return arg, is_optional - return field, False - - -def _strip_device_vector(field: type[Device]) -> tuple[bool, type[Device]]: - if get_origin(field) is DeviceVector: - return True, get_args(field)[0] - return False, field +# def _strip_union(field: T | T) -> tuple[T, bool]: +# if get_origin(field) is Union: +# args = get_args(field) +# is_optional = type(None) in args +# for arg in args: +# if arg is not type(None): +# return arg, is_optional +# return field, False +# +# +# def _strip_device_vector(field: type[Device]) -> tuple[bool, type[Device]]: +# if get_origin(field) is DeviceVector: +# return True, get_args(field)[0] +# return False, field