From e899d476092865e3f433f18439d281fa95c39caf Mon Sep 17 00:00:00 2001 From: Devin Burke Date: Tue, 22 Oct 2024 14:31:25 +0200 Subject: [PATCH] First version of a DerivedSignalBackend. Does not yet auto-update if the component signal changes --- src/ophyd_async/core/__init__.py | 2 + src/ophyd_async/core/_signal_backend.py | 109 +++++++++++++++++++++++- 2 files changed, 110 insertions(+), 1 deletion(-) diff --git a/src/ophyd_async/core/__init__.py b/src/ophyd_async/core/__init__.py index ddd3ab1a80..e883fa3404 100644 --- a/src/ophyd_async/core/__init__.py +++ b/src/ophyd_async/core/__init__.py @@ -62,6 +62,7 @@ wait_for_value, ) from ._signal_backend import ( + DerivedSignalBackend, RuntimeSubsetEnum, SignalBackend, SubsetEnum, @@ -167,4 +168,5 @@ "is_pydantic_model", "wait_for_connection", "completed_status", + "DerivedSignalBackend", ] diff --git a/src/ophyd_async/core/_signal_backend.py b/src/ophyd_async/core/_signal_backend.py index 035936f32c..bbb7a7e70d 100644 --- a/src/ophyd_async/core/_signal_backend.py +++ b/src/ophyd_async/core/_signal_backend.py @@ -1,4 +1,7 @@ +import asyncio +import time from abc import abstractmethod +from collections.abc import Callable from typing import ( TYPE_CHECKING, Any, @@ -7,7 +10,8 @@ Literal, ) -from bluesky.protocols import Reading +import numpy as np +from bluesky.protocols import HasName, Movable, Readable, Reading from event_model import DataKey from ._utils import DEFAULT_TIMEOUT, ReadingValueCallback, T @@ -95,3 +99,106 @@ def __init__(self): SubsetEnum = Literal else: SubsetEnum = RuntimeSubsetEnum + +ReadingValueCallback = Callable[[Reading, T], None] + +# Type which is readable and has name +READABLE_DEVICE = Readable | HasName +MOVABLE_DEVICE = Movable | HasName + + +class DerivedSignalBackend(SignalBackend[T]): + def __init__( + self, datatype: T | None = None, **kwargs: READABLE_DEVICE | MOVABLE_DEVICE + ): + self.component_signals: list[str] = [] + self.datatype: T | None = datatype + self._value: T | None = None + self._setpoint: T | None = None + self._callback: ReadingValueCallback | None = None + + for attr, attr_type in self.__annotations__.items(): + signal_type = attr_type.__origin__ + data_type = attr_type.__args__[0] + if attr in kwargs: + component_signal = kwargs.pop(attr) + component_type = type(component_signal) + component_datatype = component_signal._backend.datatype # noqa: SLF001 + if not issubclass(component_type, signal_type): + raise ValueError(f"Attribute {attr} must be of type {signal_type}") + if not issubclass(component_datatype, data_type): + raise ValueError( + f"Attribute {attr} must be of datatype {data_type}" + ) + setattr(self, attr, component_signal) + self.component_signals.append(component_signal.name) + + for keyword, value in kwargs.items(): + setattr(self, keyword, value) + + @abstractmethod + async def when_read(self) -> T: + """Read a value derived from the underlying hardware""" + + @abstractmethod + async def when_write( + self, value: T, wait: bool = True, timeout: float = DEFAULT_TIMEOUT + ) -> None: + """Write to underlying hardware based on a value""" + + @classmethod + def datatype_allowed(cls, dtype: Any) -> bool: + return True + + def source(self, name: str) -> str: + src = f"{name} derived from signals: {self.component_signals}" + return src + + async def connect(self, timeout: float = DEFAULT_TIMEOUT): + pass + + async def put(self, value: T | None, wait=True, timeout=None): + if not issubclass(type(value), self.datatype): + raise ValueError(f"Value {value} is not of type {self.datatype}") + self._setpoint = value + await self.when_write(value=value, wait=wait, timeout=timeout) + await asyncio.sleep(0) + + async def get_datakey(self, source: str) -> DataKey: + value = await self.get_value() + shape = np.shape(value) + precision = getattr(self, "precision", 2) + units = getattr(self, "units", "") + return DataKey( + source=source, + dtype=self.datatype, + shape=list(shape), + precision=precision, + units=units, + ) + + async def get_reading(self) -> Reading: + self._value = await self.when_read() + reading = Reading(value=self._value, timestamp=time.time()) + if self._callback: + self._callback(reading, self._value) + return reading + + async def get_value(self) -> T: + self._value = await self.when_read() + reading = Reading(value=self._value, timestamp=time.time()) + if self._callback: + self._callback(reading, self._value) + return self._value + + async def get_setpoint(self) -> T: + return self._setpoint + + def set_callback(self, callback: ReadingValueCallback[T] | None) -> None: + # Check if the callback is callable and has the right signature + if callback is not None: + if not isinstance(callback, ReadingValueCallback[T]): + raise TypeError( + "Callback must be a callable with signature (Reading, T) -> None" + ) + self._callback = callback