Skip to content

Commit

Permalink
First version of a DerivedSignalBackend. Does not yet auto-update if …
Browse files Browse the repository at this point in the history
…the component signal changes
  • Loading branch information
burkeds committed Oct 22, 2024
1 parent 3e74761 commit e899d47
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 1 deletion.
2 changes: 2 additions & 0 deletions src/ophyd_async/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
wait_for_value,
)
from ._signal_backend import (
DerivedSignalBackend,
RuntimeSubsetEnum,
SignalBackend,
SubsetEnum,
Expand Down Expand Up @@ -167,4 +168,5 @@
"is_pydantic_model",
"wait_for_connection",
"completed_status",
"DerivedSignalBackend",
]
109 changes: 108 additions & 1 deletion src/ophyd_async/core/_signal_backend.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import asyncio
import time
from abc import abstractmethod
from collections.abc import Callable
from typing import (
TYPE_CHECKING,
Any,
Expand All @@ -7,7 +10,8 @@
Literal,
)

from bluesky.protocols import Reading
import numpy as np
from bluesky.protocols import HasName, Movable, Readable, Reading
from event_model import DataKey

from ._utils import DEFAULT_TIMEOUT, ReadingValueCallback, T
Expand Down Expand Up @@ -95,3 +99,106 @@ def __init__(self):
SubsetEnum = Literal
else:
SubsetEnum = RuntimeSubsetEnum

ReadingValueCallback = Callable[[Reading, T], None]

# Type which is readable and has name
READABLE_DEVICE = Readable | HasName
MOVABLE_DEVICE = Movable | HasName


class DerivedSignalBackend(SignalBackend[T]):
def __init__(
self, datatype: T | None = None, **kwargs: READABLE_DEVICE | MOVABLE_DEVICE
):
self.component_signals: list[str] = []
self.datatype: T | None = datatype
self._value: T | None = None
self._setpoint: T | None = None
self._callback: ReadingValueCallback | None = None

for attr, attr_type in self.__annotations__.items():
signal_type = attr_type.__origin__
data_type = attr_type.__args__[0]
if attr in kwargs:
component_signal = kwargs.pop(attr)
component_type = type(component_signal)
component_datatype = component_signal._backend.datatype # noqa: SLF001
if not issubclass(component_type, signal_type):
raise ValueError(f"Attribute {attr} must be of type {signal_type}")
if not issubclass(component_datatype, data_type):
raise ValueError(
f"Attribute {attr} must be of datatype {data_type}"
)
setattr(self, attr, component_signal)
self.component_signals.append(component_signal.name)

for keyword, value in kwargs.items():
setattr(self, keyword, value)

@abstractmethod
async def when_read(self) -> T:
"""Read a value derived from the underlying hardware"""

@abstractmethod
async def when_write(
self, value: T, wait: bool = True, timeout: float = DEFAULT_TIMEOUT
) -> None:
"""Write to underlying hardware based on a value"""

@classmethod
def datatype_allowed(cls, dtype: Any) -> bool:
return True

def source(self, name: str) -> str:
src = f"{name} derived from signals: {self.component_signals}"
return src

async def connect(self, timeout: float = DEFAULT_TIMEOUT):
pass

async def put(self, value: T | None, wait=True, timeout=None):
if not issubclass(type(value), self.datatype):
raise ValueError(f"Value {value} is not of type {self.datatype}")
self._setpoint = value
await self.when_write(value=value, wait=wait, timeout=timeout)
await asyncio.sleep(0)

async def get_datakey(self, source: str) -> DataKey:
value = await self.get_value()
shape = np.shape(value)
precision = getattr(self, "precision", 2)
units = getattr(self, "units", "")
return DataKey(
source=source,
dtype=self.datatype,
shape=list(shape),
precision=precision,
units=units,
)

async def get_reading(self) -> Reading:
self._value = await self.when_read()
reading = Reading(value=self._value, timestamp=time.time())
if self._callback:
self._callback(reading, self._value)
return reading

async def get_value(self) -> T:
self._value = await self.when_read()
reading = Reading(value=self._value, timestamp=time.time())
if self._callback:
self._callback(reading, self._value)
return self._value

async def get_setpoint(self) -> T:
return self._setpoint

def set_callback(self, callback: ReadingValueCallback[T] | None) -> None:
# Check if the callback is callable and has the right signature
if callback is not None:
if not isinstance(callback, ReadingValueCallback[T]):
raise TypeError(
"Callback must be a callable with signature (Reading, T) -> None"
)
self._callback = callback

0 comments on commit e899d47

Please sign in to comment.