Skip to content

Commit

Permalink
Added a fixtures that checks if tasks are still running in tests (blu…
Browse files Browse the repository at this point in the history
…esky#538)

added autorun fixtures that check if tasks are still running and fail on the test level

also corrected some tests with incorrect event-loop usage
  • Loading branch information
evalott100 authored Sep 17, 2024
1 parent 5b67581 commit 989beeb
Show file tree
Hide file tree
Showing 10 changed files with 196 additions and 95 deletions.
2 changes: 1 addition & 1 deletion src/ophyd_async/core/_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ async def _check_config_sigs(self):
@AsyncStatus.wrap
async def unstage(self) -> None:
# Stop data writing.
await self.writer.close()
await asyncio.gather(self.writer.close(), self.controller.disarm())

async def read_configuration(self) -> Dict[str, Reading]:
return await merge_gathered_dicts(sig.read() for sig in self._config_sigs)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import asyncio
from typing import Optional

from pydantic import Field

from ophyd_async.core import DetectorControl, PathProvider
from ophyd_async.core._detector import TriggerInfo

Expand All @@ -14,7 +12,7 @@ def __init__(
self,
pattern_generator: PatternGenerator,
path_provider: PathProvider,
exposure: float = Field(default=0.1),
exposure: Optional[float] = 0.1,
) -> None:
self.pattern_generator: PatternGenerator = pattern_generator
self.pattern_generator.set_exposure(exposure)
Expand Down Expand Up @@ -46,13 +44,13 @@ async def wait_for_idle(self):
await self.task

async def disarm(self):
if self.task:
if self.task and not self.task.done():
self.task.cancel()
try:
await self.task
except asyncio.CancelledError:
pass
self.task = None
self.task = None

def get_deadtime(self, exposure: float | None) -> float:
return 0.001
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ async def collect_stream_docs(
def close(self) -> None:
if self._handle_for_h5_file:
self._handle_for_h5_file.close()
print("file closed")
self._handle_for_h5_file = None

async def observe_indices_written(
Expand Down
72 changes: 70 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import os
import pprint
import subprocess
import sys
import time
Expand All @@ -8,6 +9,7 @@

import pytest
from bluesky.run_engine import RunEngine, TransitionError
from pytest import FixtureRequest

from ophyd_async.core import (
DetectorTrigger,
Expand Down Expand Up @@ -58,21 +60,87 @@ def configure_epics_environment():
os.environ["EPICS_PVA_AUTO_ADDR_LIST"] = "NO"


_ALLOWED_PYTEST_TASKS = {"async_finalizer", "async_setup", "async_teardown"}


def _error_and_kill_pending_tasks(
loop: asyncio.AbstractEventLoop, test_name: str, test_passed: bool
) -> set[asyncio.Task]:
"""Cancels pending tasks in the event loop for a test. Raises an exception if
the test hasn't already.
Args:
loop: The event loop to check for pending tasks.
test_name: The name of the test.
test_passed: Indicates whether the test passed.
Returns:
set[asyncio.Task]: The set of unfinished tasks that were cancelled.
Raises:
RuntimeError: If there are unfinished tasks and the test didn't fail.
"""
unfinished_tasks = {
task
for task in asyncio.all_tasks(loop)
if task.get_coro().__name__ not in _ALLOWED_PYTEST_TASKS and not task.done()
}
for task in unfinished_tasks:
task.cancel()

# We only raise an exception here if the test didn't fail anyway.
# If it did then it makes sense that there's some tasks we need to cancel,
# but an exception will already have been raised.
if unfinished_tasks and test_passed:
raise RuntimeError(
f"Not all tasks closed during test {test_name}:\n"
f"{pprint.pformat(unfinished_tasks, width=88)}"
)

return unfinished_tasks


@pytest.fixture(autouse=True, scope="function")
def fail_test_on_unclosed_tasks(request: FixtureRequest):
"""
Used on every test to ensure failure if there are pending tasks
by the end of the test.
"""

fail_count = request.session.testsfailed
loop = asyncio.get_event_loop()
loop.set_debug(True)

request.addfinalizer(
lambda: _error_and_kill_pending_tasks(
loop, request.node.name, request.session.testsfailed == fail_count
)
)


@pytest.fixture(scope="function")
def RE(request):
def RE(request: FixtureRequest):
loop = asyncio.new_event_loop()
loop.set_debug(True)
RE = RunEngine({}, call_returns_result=True, loop=loop)
fail_count = request.session.testsfailed

def clean_event_loop():
if RE.state not in ("idle", "panicked"):
try:
RE.halt()
except TransitionError:
pass

loop.call_soon_threadsafe(loop.stop)
RE._th.join()
loop.close()

try:
_error_and_kill_pending_tasks(
loop, request.node.name, request.session.testsfailed == fail_count
)
finally:
loop.close()

request.addfinalizer(clean_event_loop)
return RE
Expand Down
15 changes: 12 additions & 3 deletions tests/core/test_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,18 @@ async def test_status_propogates_traceback_under_RE(RE) -> None:


async def test_async_status_exception_timeout():
st = AsyncStatus(asyncio.sleep(0.1))
with pytest.raises(Exception):
st.exception(timeout=1.0)
try:
st = AsyncStatus(asyncio.sleep(0.1))
with pytest.raises(
ValueError,
match=(
"cannot honour any timeout other than 0 in an asynchronous function"
),
):
st.exception(timeout=1.0)
finally:
if not st.done:
st.task.cancel()


@pytest.fixture
Expand Down
44 changes: 25 additions & 19 deletions tests/epics/adcore/test_single_trigger.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,48 @@
import bluesky.plan_stubs as bps
import bluesky.plans as bp
import pytest
from bluesky import RunEngine

from ophyd_async.core import DeviceCollector, set_mock_value
import ophyd_async.plan_stubs as ops
from ophyd_async.epics import adcore


@pytest.fixture
async def single_trigger_det():
async with DeviceCollector(mock=True):
stats = adcore.NDPluginStatsIO("PREFIX:STATS")
det = adcore.SingleTriggerDetector(
drv=adcore.ADBaseIO("PREFIX:DRV"),
stats=stats,
read_uncached=[stats.unique_id],
)
async def single_trigger_det_with_stats():
stats = adcore.NDPluginStatsIO("PREFIX:STATS", name="stats")
det = adcore.SingleTriggerDetector(
drv=adcore.ADBaseIO("PREFIX:DRV"),
stats=stats,
read_uncached=[stats.unique_id],
name="det",
)

assert det.name == "det"
assert stats.name == "det-stats"
# Set non-default values to check they are set back
# These are using set_mock_value to simulate the backend IOC being setup
# in a particular way, rather than values being set by the Ophyd signals
set_mock_value(det.drv.acquire_time, 0.5)
set_mock_value(det.drv.array_counter, 1)
set_mock_value(det.drv.image_mode, adcore.ImageMode.continuous)
set_mock_value(stats.unique_id, 3)
yield det
yield det, stats


async def test_single_trigger_det(
single_trigger_det: adcore.SingleTriggerDetector, RE: RunEngine
single_trigger_det_with_stats: adcore.SingleTriggerDetector, RE: RunEngine
):
single_trigger_det, stats = single_trigger_det_with_stats
names = []
docs = []
RE.subscribe(lambda name, _: names.append(name))
RE.subscribe(lambda _, doc: docs.append(doc))

RE(bp.count([single_trigger_det]))
def plan():
yield from ops.ensure_connected(single_trigger_det, mock=True)
yield from bps.abs_set(single_trigger_det.drv.acquire_time, 0.5)
yield from bps.abs_set(single_trigger_det.drv.array_counter, 1)
yield from bps.abs_set(
single_trigger_det.drv.image_mode, adcore.ImageMode.continuous
)
# set_mock_value(stats.unique_id, 3)
yield from bp.count([single_trigger_det])

RE(plan())

drv = single_trigger_det.drv
assert 1 == await drv.acquire.get_value()
Expand All @@ -47,4 +53,4 @@ async def test_single_trigger_det(
_, descriptor, event, _ = docs
assert descriptor["configuration"]["det"]["data"]["det-drv-acquire_time"] == 0.5
assert event["data"]["det-drv-array_counter"] == 1
assert event["data"]["det-stats-unique_id"] == 3
assert event["data"]["det-stats-unique_id"] == 0
Loading

0 comments on commit 989beeb

Please sign in to comment.