Skip to content

Commit

Permalink
Merge branch 'main' into merge
Browse files Browse the repository at this point in the history
  • Loading branch information
burkeds committed Sep 30, 2024
2 parents 8f121dc + 55f3552 commit 0e90e93
Show file tree
Hide file tree
Showing 28 changed files with 681 additions and 428 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ markers = [
"adsim: require the ADsim IOC to be running",
]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"

[tool.coverage.run]
data_file = "/tmp/ophyd_async.coverage"
Expand Down
10 changes: 7 additions & 3 deletions src/ophyd_async/core/_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ def __init__(
self._fly_status: WatchableAsyncStatus | None = None
self._fly_start: float
self._iterations_completed: int = 0
self._intial_frame: int
self._initial_frame: int
self._last_frame: int
super().__init__(name)

Expand All @@ -208,7 +208,7 @@ def writer(self) -> DetectorWriter:

@AsyncStatus.wrap
async def stage(self) -> None:
# Disarm the detector, stop filewriting.
# Disarm the detector, stop file writing.
await self._check_config_sigs()
await asyncio.gather(self.writer.close(), self.controller.disarm())
self._trigger_info = None
Expand Down Expand Up @@ -314,7 +314,10 @@ async def prepare(self, value: TriggerInfo) -> None:
async def kickoff(self):
assert self._trigger_info, "Prepare must be called before kickoff!"
if self._iterations_completed >= self._trigger_info.iteration:
raise Exception(f"Kickoff called more than {self._trigger_info.iteration}")
raise Exception(
f"Kickoff called more than the configured number of "
f"{self._trigger_info.iteration} iteration(s)!"
)
self._iterations_completed += 1

@WatchableAsyncStatus.wrap
Expand All @@ -340,6 +343,7 @@ async def complete(self):
if index >= self._trigger_info.number:
break
if self._iterations_completed == self._trigger_info.iteration:
self._iterations_completed = 0
await self.controller.wait_for_idle()

async def describe_collect(self) -> dict[str, DataKey]:
Expand Down
19 changes: 2 additions & 17 deletions src/ophyd_async/core/_flyer.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
from abc import ABC, abstractmethod
from collections.abc import Sequence
from typing import Generic

from bluesky.protocols import Flyable, Preparable, Reading, Stageable
from event_model import DataKey
from bluesky.protocols import Flyable, Preparable, Stageable

from ._device import Device
from ._signal import SignalR
from ._status import AsyncStatus
from ._utils import T, merge_gathered_dicts
from ._utils import T


class TriggerLogic(ABC, Generic[T]):
Expand Down Expand Up @@ -39,11 +36,9 @@ class StandardFlyer(
def __init__(
self,
trigger_logic: TriggerLogic[T],
configuration_signals: Sequence[SignalR] = (),
name: str = "",
):
self._trigger_logic = trigger_logic
self._configuration_signals = tuple(configuration_signals)
super().__init__(name=name)

@property
Expand Down Expand Up @@ -73,13 +68,3 @@ async def kickoff(self) -> None:
@AsyncStatus.wrap
async def complete(self) -> None:
await self._trigger_logic.complete()

async def describe_configuration(self) -> dict[str, DataKey]:
return await merge_gathered_dicts(
[sig.describe() for sig in self._configuration_signals]
)

async def read_configuration(self) -> dict[str, Reading]:
return await merge_gathered_dicts(
[sig.read() for sig in self._configuration_signals]
)
3 changes: 3 additions & 0 deletions src/ophyd_async/core/_hdf_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class HDFDataset:
dtype_numpy: str = ""
multiplier: int = 1
swmr: bool = False
# Represents explicit chunk size written to disk.
chunk_shape: tuple[int, ...] = ()


SLICE_NAME = "AD_HDF5_SWMR_SLICE"
Expand Down Expand Up @@ -66,6 +68,7 @@ def __init__(
"dataset": ds.dataset,
"swmr": ds.swmr,
"multiplier": ds.multiplier,
"chunk_shape": ds.chunk_shape,
},
uid=None,
validate=True,
Expand Down
119 changes: 101 additions & 18 deletions src/ophyd_async/core/_table.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,125 @@
from typing import TypeVar
from enum import Enum
from typing import TypeVar, get_args, get_origin

import numpy as np
from pydantic import BaseModel, ConfigDict, model_validator

TableSubclass = TypeVar("TableSubclass", bound="Table")


def _concat(value1, value2):
if isinstance(value1, np.ndarray):
return np.concatenate((value1, value2))
else:
return value1 + value2


class Table(BaseModel):
"""An abstraction of a Table of str to numpy array."""

model_config = ConfigDict(validate_assignment=True, strict=False)

@staticmethod
def row(cls: type[TableSubclass], **kwargs) -> TableSubclass: # type: ignore
arrayified_kwargs = {
field_name: np.concatenate(
(
(default_arr := field_value.default_factory()), # type: ignore
np.array([kwargs[field_name]], dtype=default_arr.dtype),
arrayified_kwargs = {}
for field_name, field_value in cls.model_fields.items():
value = kwargs.pop(field_name)
if field_value.default_factory is None:
raise ValueError(
"`Table` models should have default factories for their "
"mutable empty columns."
)
default_array = field_value.default_factory()
if isinstance(default_array, np.ndarray):
arrayified_kwargs[field_name] = np.array(
[value], dtype=default_array.dtype
)
elif issubclass(type(value), Enum) and isinstance(value, str):
arrayified_kwargs[field_name] = [value]
else:
raise TypeError(
"Row column should be numpy arrays or sequence of string `Enum`."
)
if kwargs:
raise TypeError(
f"Unexpected keyword arguments {kwargs.keys()} for {cls.__name__}."
)
for field_name, field_value in cls.model_fields.items()
}
return cls(**arrayified_kwargs)

def __add__(self, right: TableSubclass) -> TableSubclass:
"""Concatenate the arrays in field values."""

assert type(right) is type(self), (
f"{right} is not a `Table`, or is not the same "
f"type of `Table` as {self}."
)
if type(right) is not type(self):
raise RuntimeError(
f"{right} is not a `Table`, or is not the same "
f"type of `Table` as {self}."
)

return type(right)(
**{
field_name: np.concatenate(
(getattr(self, field_name), getattr(right, field_name))
field_name: _concat(
getattr(self, field_name), getattr(right, field_name)
)
for field_name in self.model_fields
}
)

def numpy_dtype(self) -> np.dtype:
dtype = []
for field_name, field_value in self.model_fields.items():
if np.ndarray in (
get_origin(field_value.annotation),
field_value.annotation,
):
dtype.append((field_name, getattr(self, field_name).dtype))
else:
enum_type = get_args(field_value.annotation)[0]
assert issubclass(enum_type, Enum)
enum_values = [element.value for element in enum_type]
max_length_in_enum = max(len(value) for value in enum_values)
dtype.append((field_name, np.dtype(f"<U{max_length_in_enum}")))

return np.dtype(dtype)

def numpy_table(self):
# It would be nice to be able to use np.transpose for this,
# but it defaults to the largest dtype for everything.
dtype = self.numpy_dtype()
transposed_list = [
np.array(tuple(row), dtype=dtype)
for row in zip(*self.numpy_columns(), strict=False)
]
transposed = np.array(transposed_list, dtype=dtype)
return transposed

def numpy_columns(self) -> list[np.ndarray]:
"""Columns in the table can be lists of string enums or numpy arrays.
This method returns the columns, converting the string enums to numpy arrays.
"""

columns = []
for field_name, field_value in self.model_fields.items():
if np.ndarray in (
get_origin(field_value.annotation),
field_value.annotation,
):
columns.append(getattr(self, field_name))
else:
enum_type = get_args(field_value.annotation)[0]
assert issubclass(enum_type, Enum)
enum_values = [element.value for element in enum_type]
max_length_in_enum = max(len(value) for value in enum_values)
dtype = np.dtype(f"<U{max_length_in_enum}")

columns.append(
np.array(
[enum.value for enum in getattr(self, field_name)], dtype=dtype
)
)

return columns

@model_validator(mode="after")
def validate_arrays(self) -> "Table":
first_length = len(next(iter(self))[1])
Expand All @@ -49,11 +128,15 @@ def validate_arrays(self) -> "Table":
), "Rows should all be of equal size."

if not all(
np.issubdtype(
self.model_fields[field_name].default_factory().dtype, # type: ignore
field_value.dtype,
# Checks if the values are numpy subtypes if the array is a numpy array,
# or if the value is a string enum.
np.issubdtype(getattr(self, field_name).dtype, default_array.dtype)
if isinstance(
default_array := self.model_fields[field_name].default_factory(), # type: ignore
np.ndarray,
)
for field_name, field_value in self
else issubclass(get_args(field_value.annotation)[0], Enum)
for field_name, field_value in self.model_fields.items()
):
raise ValueError(
f"Cannot construct a `{type(self).__name__}`, "
Expand Down
2 changes: 2 additions & 0 deletions src/ophyd_async/epics/adcore/_core_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,6 @@ def __init__(self, prefix: str, name="") -> None:
self.array_size0 = epics_signal_r(int, prefix + "ArraySize0")
self.array_size1 = epics_signal_r(int, prefix + "ArraySize1")
self.create_directory = epics_signal_rw(int, prefix + "CreateDirectory")
self.num_frames_chunks = epics_signal_r(int, prefix + "NumFramesChunks_RBV")
self.chunk_size_auto = epics_signal_rw_rbv(bool, prefix + "ChunkSizeAuto")
super().__init__(prefix, name)
12 changes: 11 additions & 1 deletion src/ophyd_async/epics/adcore/_hdf_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,15 @@ def __init__(

async def open(self, multiplier: int = 1) -> dict[str, DataKey]:
self._file = None
info = self._path_provider(device_name=self.hdf.name)
info = self._path_provider(device_name=self._name_provider())

# Set the directory creation depth first, since dir creation callback happens
# when directory path PV is processed.
await self.hdf.create_directory.set(info.create_dir_depth)

# Make sure we are using chunk auto-sizing
await asyncio.gather(self.hdf.chunk_size_auto.set(True))

await asyncio.gather(
self.hdf.num_extra_dims.set(0),
self.hdf.lazy_open.set(True),
Expand Down Expand Up @@ -84,6 +87,9 @@ async def open(self, multiplier: int = 1) -> dict[str, DataKey]:
self._multiplier = multiplier
outer_shape = (multiplier,) if multiplier > 1 else ()

# Determine number of frames that will be saved per HDF chunk
frames_per_chunk = await self.hdf.num_frames_chunks.get_value()

# Add the main data
self._datasets = [
HDFDataset(
Expand All @@ -92,6 +98,7 @@ async def open(self, multiplier: int = 1) -> dict[str, DataKey]:
shape=detector_shape,
dtype_numpy=np_dtype,
multiplier=multiplier,
chunk_shape=(frames_per_chunk, *detector_shape),
)
]
# And all the scalar datasets
Expand All @@ -118,6 +125,9 @@ async def open(self, multiplier: int = 1) -> dict[str, DataKey]:
(),
np_datatype,
multiplier,
# NDAttributes appear to always be configured with
# this chunk size
chunk_shape=(16384,),
)
)

Expand Down
2 changes: 1 addition & 1 deletion src/ophyd_async/epics/adsimdetector/_sim.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ def __init__(
lambda: self.name,
adcore.ADBaseDatasetDescriber(self.drv),
),
config_sigs=config_sigs,
config_sigs=(self.drv.acquire_period, self.drv.acquire_time, *config_sigs),
name=name,
)
40 changes: 3 additions & 37 deletions src/ophyd_async/fastcs/panda/_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import numpy as np
import numpy.typing as npt
from pydantic import Field, field_validator, model_validator
from pydantic import Field, model_validator
from pydantic_numpy.helper.annotation import NpArrayPydanticAnnotation
from typing_extensions import TypedDict

Expand Down Expand Up @@ -51,13 +51,7 @@ class SeqTrigger(str, Enum):
),
Field(default_factory=lambda: np.array([], dtype=np.bool_)),
]
TriggerStr = Annotated[
np.ndarray[tuple[int], np.dtype[np.unicode_]],
NpArrayPydanticAnnotation.factory(
data_type=np.unicode_, dimensions=1, strict_data_typing=False
),
Field(default_factory=lambda: np.array([], dtype=np.dtype("<U32"))),
]
TriggerStr = Annotated[Sequence[SeqTrigger], Field(default_factory=list)]


class SeqTable(Table):
Expand Down Expand Up @@ -101,35 +95,7 @@ def row( # type: ignore
oute2: bool = False,
outf2: bool = False,
) -> "SeqTable":
if isinstance(trigger, SeqTrigger):
trigger = trigger.value
return super().row(**locals())

@field_validator("trigger", mode="before")
@classmethod
def trigger_to_np_array(cls, trigger_column):
"""
The user can provide a list of SeqTrigger enum elements instead of a numpy str.
"""
if isinstance(trigger_column, Sequence) and all(
isinstance(trigger, SeqTrigger) for trigger in trigger_column
):
trigger_column = np.array(
[trigger.value for trigger in trigger_column], dtype=np.dtype("<U32")
)
elif isinstance(trigger_column, Sequence) or isinstance(
trigger_column, np.ndarray
):
for trigger in trigger_column:
SeqTrigger(
trigger
) # To check all the given strings are actually `SeqTrigger`s
else:
raise ValueError(
"Expected a numpy array or a sequence of `SeqTrigger`, got "
f"{type(trigger_column)}."
)
return trigger_column
return Table.row(**locals())

@model_validator(mode="after")
def validate_max_length(self) -> "SeqTable":
Expand Down
6 changes: 5 additions & 1 deletion src/ophyd_async/fastcs/panda/_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,11 @@ async def _update_datasets(self) -> None:

capture_table = await self.panda_data_block.datasets.get_value()
self._datasets = [
HDFDataset(dataset_name, "/" + dataset_name, [1], multiplier=1)
# TODO: Update chunk size to read signal once available in IOC
# Currently PandA IOC sets chunk size to 1024 points per chunk
HDFDataset(
dataset_name, "/" + dataset_name, [1], multiplier=1, chunk_shape=(1024,)
)
for dataset_name in capture_table["name"]
]

Expand Down
Loading

0 comments on commit 0e90e93

Please sign in to comment.