Skip to content

Commit

Permalink
When creating a tango device, the signal type is now required in type…
Browse files Browse the repository at this point in the history
… annotations unless it is a SignalX. Annotations may also include Devices or DeviceVectors. The kwargs for these objects should be passed to init for the device as a dict called . Signals which are not explicitly annotated are automatically filled in as attributes when the device is connected. Their signal read/write character and datatype is inferred from the tango server.
  • Loading branch information
burkeds committed Sep 12, 2024
1 parent d18607e commit f66d403
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 180 deletions.
4 changes: 2 additions & 2 deletions src/ophyd_async/tango/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
get_tango_trl,
get_trl_descriptor,
infer_python_type,
infer_signal_frontend,
infer_signal_character,
make_backend,
tango_signal_auto,
tango_signal_r,
Expand All @@ -32,7 +32,7 @@
get_trl_descriptor,
get_tango_trl,
infer_python_type,
infer_signal_frontend,
infer_signal_character,
make_backend,
AttributeProxy,
CommandProxy,
Expand Down
164 changes: 125 additions & 39 deletions src/ophyd_async/tango/base_devices/_base_device.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,32 @@
from __future__ import annotations

from typing import Dict, Optional, Tuple, Union, get_type_hints
from typing import (
Dict,
Optional,
Tuple,
Type,
TypeVar,
Union,
get_args,
get_origin,
get_type_hints,
)

from ophyd_async.core import (
DEFAULT_TIMEOUT,
Device,
DeviceVector,
Signal,
)
from ophyd_async.tango.signal import (
infer_python_type,
infer_signal_frontend,
make_backend,
tango_signal_auto,
)
from tango import DeviceProxy as SyncDeviceProxy
from tango.asyncio import DeviceProxy as AsyncDeviceProxy

T = TypeVar("T")


class TangoDevice(Device):
"""
Expand All @@ -35,6 +47,7 @@ class TangoDevice(Device):
proxy: Optional[Union[AsyncDeviceProxy, SyncDeviceProxy]] = None
_polling: Tuple[bool, float, float, float] = (False, 0.1, None, 0.1)
_signal_polling: Dict[str, Tuple[bool, float, float, float]] = {}
_poll_only_annotated_signals: bool = True

def __init__(
self,
Expand All @@ -47,8 +60,7 @@ def __init__(

self.trl = trl if trl else ""
self.proxy = device_proxy

self.create_children_from_annotations()
tango_create_children_from_annotations(self)
super().__init__(name=name)

async def connect(
Expand All @@ -70,46 +82,29 @@ async def closure():

await closure()
self.register_signals()
await fill_proxy_entries(self)

# set_name should be called again to propagate the new signal names
self.set_name(self.name)

# Set the polling configuration
if self._polling[0]:
for child in self.children():
if issubclass(type(child[1]), Signal):
child[1]._backend.set_polling(*self._polling) # noqa: SLF001
child[1]._backend.allow_events(False) # noqa: SLF001
if self._signal_polling:
for signal_name, polling in self._signal_polling.items():
if hasattr(self, signal_name):
attr = getattr(self, signal_name)
attr._backend.set_polling(*polling) # noqa: SLF001
attr._backend.allow_events(False) # noqa: SLF001

await super().connect(mock=mock, timeout=timeout)

# Users can override this method to register new signals
def register_signals(self):
annots = get_type_hints(self.__class__)
for name, obj_type in annots.items():
if hasattr(self, name):
signal = getattr(self, name)
if issubclass(type(signal), Signal):
tango_name = name.lstrip("_")
read_trl = f"{self.trl}/{tango_name}"
datatype = infer_python_type(read_trl)
backend = make_backend(
datatype=datatype,
read_trl=read_trl,
write_trl=read_trl,
device_proxy=self.proxy,
)
if self._polling[0]:
backend.allow_events(False)
backend.set_polling(*self._polling)
if name in self._signal_polling:
backend.allow_events(False)
backend.set_polling(*self._signal_polling[name])
signal._backend = backend # noqa: SLF001

def create_children_from_annotations(self):
annots = get_type_hints(self.__class__)
for attr_name, obj_type in annots.items():
if isinstance(obj_type, type):
if obj_type is Signal:
tango_name = attr_name.lstrip("_")
trl = f"{self.trl}/{tango_name}"
setattr(
self, attr_name, infer_signal_frontend(trl=trl, name=attr_name)
)
elif issubclass(obj_type, Signal):
setattr(self, attr_name, obj_type(name=attr_name))
pass


def tango_polling(
Expand Down Expand Up @@ -170,3 +165,94 @@ def decorator(cls):
return cls

return decorator


def tango_create_children_from_annotations(
device: TangoDevice,
included_optional_fields: Tuple[str, ...] = (),
device_vectors: Optional[Dict[str, int]] = None,
):
"""Initialize blocks at __init__ of `device`."""
for name, device_type in get_type_hints(type(device)).items():
if name in ("_name", "parent"):
continue

device_type, is_optional = _strip_union(device_type)
if is_optional and name not in included_optional_fields:
continue

is_device_vector, device_type = _strip_device_vector(device_type)
if is_device_vector:
kwargs = "_" + name + "_kwargs"
kwargs = getattr(device, kwargs, {})
prefix = kwargs["prefix"]
count = kwargs["count"]
n_device_vector = DeviceVector(
{i: device_type(f"{prefix}{i}") for i in range(1, count + 1)}
)
setattr(device, name, n_device_vector)

else:
origin = get_origin(device_type)
origin = origin if origin else device_type

if issubclass(origin, Signal):
datatype = None
tango_name = name.lstrip("_")
read_trl = f"{device.trl}/{tango_name}"
type_args = get_args(device_type)
if type_args:
datatype = type_args[0]
backend = make_backend(
datatype=datatype,
read_trl=read_trl,
write_trl=read_trl,
device_proxy=device.proxy,
)
setattr(device, name, origin(name=name, backend=backend))

elif issubclass(origin, Device) or isinstance(origin, Device):
kwargs = "_" + name + "_kwargs"
kwargs = getattr(device, kwargs, "")
setattr(device, name, origin(**kwargs))


async def fill_proxy_entries(device: TangoDevice):
proxy_trl = device.trl
children = [name.lstrip("_") for name, _ in device.children()]
proxy_attributes = list(device.proxy.get_attribute_list())
proxy_commands = list(device.proxy.get_command_list())
combined = proxy_attributes + proxy_commands

for name in combined:
if name not in children:
full_trl = f"{proxy_trl}/{name}"
try:
auto_signal = await tango_signal_auto(
trl=full_trl, device_proxy=device.proxy
)
setattr(device, name, auto_signal)
except RuntimeError as e:
if "Commands with different in and out dtypes" in str(e):
print(
f"Skipping {name}. Commands with different in and out dtypes"
f" are not supported."
)
continue
raise e


def _strip_union(field: Union[Union[T], T]) -> Tuple[T, bool]:
if get_origin(field) is Union:
args = get_args(field)
is_optional = type(None) in args
for arg in args:
if arg is not type(None):
return arg, is_optional
return field, False


def _strip_device_vector(field: Union[Type[Device]]) -> Tuple[bool, Type[Device]]:
if get_origin(field) is DeviceVector:
return True, get_args(field)[0]
return False, field
11 changes: 5 additions & 6 deletions src/ophyd_async/tango/demo/_counter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
AsyncStatus,
ConfigSignal,
HintedSignal,
Signal,
SignalR,
SignalRW,
SignalX,
)
from ophyd_async.tango import TangoReadable, tango_polling
Expand All @@ -23,15 +24,13 @@ class TangoCounterConfig:
class TangoCounter(TangoReadable):
# Enter the name and type of the signals you want to use
# If type is None or Signal, the type will be inferred from the Tango device
counts: Signal
sample_time: Signal
state: Signal
reset: Signal
counts: SignalR[int]
sample_time: SignalRW[float]
start: SignalX

def __init__(self, trl: str, name=""):
super().__init__(trl, name=name)
self.add_readables([self.counts], HintedSignal.uncached)
self.add_readables([self.counts], HintedSignal)
self.add_readables([self.sample_time], ConfigSignal)

@AsyncStatus.wrap
Expand Down
16 changes: 6 additions & 10 deletions src/ophyd_async/tango/demo/_mover.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
CalculateTimeout,
ConfigSignal,
HintedSignal,
SignalR,
SignalRW,
SignalX,
WatchableAsyncStatus,
Expand All @@ -32,9 +31,8 @@ class TangoMoverConfig:
class TangoMover(TangoReadable, Movable, Stoppable):
# Enter the name and type of the signals you want to use
# If type is None or Signal, the type will be inferred from the Tango device
position: SignalRW
velocity: SignalRW
state: SignalR
position: SignalRW[float]
velocity: SignalRW[float]
_stop: SignalX

def __init__(self, trl: str, name=""):
Expand All @@ -57,7 +55,7 @@ async def set(self, value: float, timeout: CalculatableTimeout = CalculateTimeou
await self.position.set(value, wait=False, timeout=timeout)

# Wait for the motor to stop
move_status = AsyncStatus(self._wait_for_idle())
move_status = self.wait_for_idle()

try:
async for current_position in observe_value(
Expand All @@ -75,11 +73,8 @@ async def set(self, value: float, timeout: CalculatableTimeout = CalculateTimeou
if not self._set_success:
raise RuntimeError("Motor was stopped")

async def _wait_for_idle(self):
if self.state._backend.support_events is False: # noqa: SLF001
if self.state._backend._polling[0] is False: # noqa: SLF001
raise RuntimeError("State does not support events or polling")

@AsyncStatus.wrap
async def wait_for_idle(self):
event = asyncio.Event()

def _wait(value: dict[str, Reading]):
Expand All @@ -88,6 +83,7 @@ def _wait(value: dict[str, Reading]):

self.state.subscribe(_wait)
await event.wait()
self.state.clear_sub(_wait)

def stop(self, success: bool = True) -> AsyncStatus:
self._set_success = success
Expand Down
4 changes: 2 additions & 2 deletions src/ophyd_async/tango/signal/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from ._signal import (
infer_python_type,
infer_signal_frontend,
infer_signal_character,
make_backend,
tango_signal_auto,
tango_signal_r,
Expand Down Expand Up @@ -29,7 +29,7 @@
"get_trl_descriptor",
"get_tango_trl",
"infer_python_type",
"infer_signal_frontend",
"infer_signal_character",
"make_backend",
"tango_signal_r",
"tango_signal_rw",
Expand Down
Loading

0 comments on commit f66d403

Please sign in to comment.