Skip to content

Commit

Permalink
Adding support for the Tango control system (https://www.tango-contro…
Browse files Browse the repository at this point in the history
…ls.org/). This relies on PyTango (https://pypi.org/project/pytango/) ophyd-async devices to asynchronous PyTango DeviceProxy objects. The control strategy relies on a shared resource called `proxy` found in the new TangoDevice class. By passing this proxy to the TangoSignalBackend of its signals, a proxy to attributes or commands of the Tango device can be established.

    1. New TangoDevice and TangoReadable device classes.
    2. Automated inference of the existence unannotated signals
    3. Monitoring via Tango events with optional polling of attributes.
    4. Tango sensitive signals are constructed by attaching a TangoSignalBackend to a Signal object or may be built using new tango_signal_* constructor methods.
    5. Signal objects with a Tango backend and Tango devices should behave the same as those with EPICS or other backends.

    1. As of this commit, typed commands are not supported in Ophyd-Async so Tango command signals with a type other than None are automatically built as SignalRW as a workaround.
    2. Tango commands with different input/output types are not supported.
    3. Pipes are not supported.

    1. Extension of Device and StandardReadable to support shared resources such as the DeviceProxy.
    2. Extension of the Tango backend to support typed commands.
    3. Extension of the Tango backend to support pipes.

Contact:
Devin Burke
Research software scientist
Deutsches Elektronen-Synchrotron (DESY)
[email protected]
  • Loading branch information
burkeds committed Oct 8, 2024
1 parent e289ddd commit 8f8c2c3
Showing 1 changed file with 33 additions and 35 deletions.
68 changes: 33 additions & 35 deletions src/ophyd_async/tango/base_devices/_base_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from typing import (
TypeVar,
Union,
get_args,
get_origin,
get_type_hints,
Expand All @@ -11,7 +10,6 @@
from ophyd_async.core import (
DEFAULT_TIMEOUT,
Device,
DeviceVector,
Signal,
)
from ophyd_async.tango.signal import (
Expand Down Expand Up @@ -160,28 +158,28 @@ def tango_create_children_from_annotations(
if name in ("_name", "parent"):
continue

device_type, is_optional = _strip_union(device_type)
if is_optional and name not in included_optional_fields:
continue

is_device_vector, device_type = _strip_device_vector(device_type)
if is_device_vector:
n_device_vector = DeviceVector()
setattr(device, name, n_device_vector)
# device_type, is_optional = _strip_union(device_type)
# if is_optional and name not in included_optional_fields:
# continue
#
# is_device_vector, device_type = _strip_device_vector(device_type)
# if is_device_vector:
# n_device_vector = DeviceVector()
# setattr(device, name, n_device_vector)

else:
origin = get_origin(device_type)
origin = origin if origin else device_type
# else:
origin = get_origin(device_type)
origin = origin if origin else device_type

if issubclass(origin, Signal):
type_args = get_args(device_type)
datatype = type_args[0] if type_args else None
backend = make_backend(datatype=datatype, device_proxy=device.proxy)
setattr(device, name, origin(name=name, backend=backend))
if issubclass(origin, Signal):
type_args = get_args(device_type)
datatype = type_args[0] if type_args else None
backend = make_backend(datatype=datatype, device_proxy=device.proxy)
setattr(device, name, origin(name=name, backend=backend))

elif issubclass(origin, Device) or isinstance(origin, Device):
assert callable(origin), f"{origin} is not callable."
setattr(device, name, origin())
elif issubclass(origin, Device) or isinstance(origin, Device):
assert callable(origin), f"{origin} is not callable."
setattr(device, name, origin())


async def _fill_proxy_entries(device: TangoDevice):
Expand Down Expand Up @@ -211,17 +209,17 @@ async def _fill_proxy_entries(device: TangoDevice):
raise e


def _strip_union(field: T | T) -> tuple[T, bool]:
if get_origin(field) is Union:
args = get_args(field)
is_optional = type(None) in args
for arg in args:
if arg is not type(None):
return arg, is_optional
return field, False


def _strip_device_vector(field: type[Device]) -> tuple[bool, type[Device]]:
if get_origin(field) is DeviceVector:
return True, get_args(field)[0]
return False, field
# def _strip_union(field: T | T) -> tuple[T, bool]:
# if get_origin(field) is Union:
# args = get_args(field)
# is_optional = type(None) in args
# for arg in args:
# if arg is not type(None):
# return arg, is_optional
# return field, False
#
#
# def _strip_device_vector(field: type[Device]) -> tuple[bool, type[Device]]:
# if get_origin(field) is DeviceVector:
# return True, get_args(field)[0]
# return False, field

0 comments on commit 8f8c2c3

Please sign in to comment.