diff --git a/tests/test_pipe.py b/tests/test_pipe.py index acb2b91..c672fea 100644 --- a/tests/test_pipe.py +++ b/tests/test_pipe.py @@ -1,9 +1,10 @@ from __future__ import annotations import inspect +from collections import deque +from contextlib import suppress from functools import partial -from itertools import product -from typing import Any, Callable +from typing import Any, Awaitable, Callable import anyio import pytest @@ -11,7 +12,14 @@ from .base import Timer from async_wrapper.exception import AlreadyDisposedError -from async_wrapper.pipe import Disposable, Pipe, SimpleDisposable, create_disposable +from async_wrapper.pipe import ( + Disposable, + DisposableWithCallback, + Pipe, + SimpleDisposable, + Subscribable, + create_disposable, +) pytestmark = pytest.mark.anyio @@ -26,6 +34,10 @@ def __init__(self, dispose: Callable[[], Any] | None = None) -> None: self.disposed = False self._dispose = dispose + @property + def is_disposed(self) -> bool: + return self.disposed + async def next(self, value: Any) -> Any: await anyio.sleep(0) self.value = value @@ -41,14 +53,71 @@ async def dispose(self) -> Any: await value -async def as_tuple(value: ValueT) -> tuple[ValueT]: - await anyio.sleep(0) - return (value,) +class CustomDisposableWithCallback(CustomDisposable): + def __init__(self, dispose: Callable[[], Any] | None = None) -> None: + super().__init__(dispose) + self._subscribables: deque[Subscribable] = deque() + + async def dispose(self) -> Any: + await super().dispose() + for subscribable in self._subscribables: + subscribable.unsubscribe(self) + + def prepare_callback(self, subscribable: Subscribable) -> Any: + self._subscribables.append(subscribable) + + +class CustomSubscribable(CustomDisposable): + def __init__(self, dispose: Callable[[], Any] | None = None) -> None: + super().__init__(dispose) + self._listeners: dict[Disposable[Any, Any], bool] = {} + @property + def size(self) -> int: + return len(self._listeners) -async def as_str(value: Any) -> str: + async def next(self, value: Any) -> Any: + result = await super().next(value) + for listener in self._listeners: + await listener.next(result) + + async def dispose(self) -> Any: + with suppress(TypeError): + await super().dispose() + for listener, do_dispose in self._listeners.items(): + if not do_dispose: + continue + await listener.dispose() + + def subscribe( + self, + disposable: Disposable[Any, Any] | Callable[[Any], Awaitable[Any]], + *, + dispose: bool = True, + ) -> Any: + if not isinstance(disposable, Disposable): + disposable = create_disposable(disposable) + self._listeners[disposable] = dispose + if isinstance(disposable, DisposableWithCallback): + disposable.prepare_callback(self) + + def unsubscribe(self, disposable: Disposable[Any, Any]) -> None: + self._listeners.pop(disposable, None) + + +@pytest.fixture( + params=[ + pytest.param(CustomSubscribable, id="custom-subscribable"), + pytest.param(Pipe, id="pipe"), + ] +) +def subscribable_type(request) -> type[Subscribable]: + return request.param + + +async def as_tuple(value: ValueT) -> tuple[ValueT]: await anyio.sleep(0) - return str(value) + return (value,) async def return_self(value: ValueT) -> ValueT: @@ -89,28 +158,35 @@ async def hit(value: Any) -> None: # noqa: ARG001 check_hit() -@pytest.mark.parametrize( - ("x", "func_and_type"), product(range(1, 4), ((as_tuple, tuple), (as_str, str))) -) -async def test_subscribe(x: int, func_and_type: tuple[Any, Any]): - pipe: Pipe[int, Any] = Pipe(func_and_type[0]) +def test_custom_disposable(): + disposable = CustomDisposable() + assert isinstance(disposable, Disposable) + + +def test_custom_disposable_with_callback(): + disposable = CustomDisposableWithCallback() + assert isinstance(disposable, DisposableWithCallback) + + +def test_custom_subscribable(): + disposable = CustomSubscribable() + assert isinstance(disposable, Subscribable) + + +@pytest.mark.parametrize("x", range(1, 4)) +async def test_subscribe(x: int, subscribable_type: type[Subscribable]): + pipe: Subscribable[int, Any] = subscribable_type(as_tuple) getter, setter = use_value() pipe.subscribe(setter) await pipe.next(x) result = await getter() - - assert isinstance(result, func_and_type[1]) - - if func_and_type[1] is tuple: - assert result[0] == x - elif func_and_type[1] is str: - assert result == str(x) + assert result is not None @pytest.mark.parametrize("x", range(1, 4)) -async def test_subscribe_interface(x: int): - pipe: Pipe[int, int] = Pipe(return_self) +async def test_subscribe_interface(x: int, subscribable_type: type[Subscribable]): + pipe: Subscribable[int, int] = subscribable_type(return_self) disposable = CustomDisposable() pipe.subscribe(disposable) @@ -122,7 +198,7 @@ async def test_subscribe_interface(x: int): @pytest.mark.parametrize("x", range(1, 4)) -async def test_subscribe_many(x: int): +async def test_subscribe_many(x: int, subscribable_type: type[Subscribable]): size = 10 check: list[Any] = [False] * size @@ -131,19 +207,22 @@ async def hit(value: Any, index: int) -> None: await anyio.sleep(0) check[index] = value - pipe = Pipe(as_str) + pipe: Subscribable[int, tuple] = subscribable_type(as_tuple) for index in range(size): pipe.subscribe(partial(hit, index=index)) await pipe.next(x) - assert check == [str(x)] * size + if subscribable_type is CustomSubscribable: + assert check == [x] * size + else: + assert check == [(x,)] * size @pytest.mark.parametrize("x", range(1, 4)) -async def test_subscribe_chain(x: int): - pipe1: Pipe[int, int] = Pipe(return_self) - pipe2: Pipe[int, tuple[int]] = Pipe(as_tuple) - pipe3: Pipe[Any, str] = Pipe(as_str) +async def test_subscribe_chain(x: int, subscribable_type: type[Subscribable]): + pipe1: Subscribable[int, int] = subscribable_type(return_self) + pipe2: Subscribable[int, tuple[int]] = subscribable_type(as_tuple) + pipe3: Subscribable[Any, tuple] = subscribable_type(as_tuple) getter, setter = use_value() pipe1.subscribe(pipe2) @@ -153,12 +232,42 @@ async def test_subscribe_chain(x: int): await pipe1.next(x) result = await getter() - assert isinstance(result, str) - assert result == str((x,)) + if subscribable_type is CustomSubscribable: + assert isinstance(result, int) + assert result == x + else: + assert isinstance(result, tuple) + assert result == ((x,),) + + +async def test_unsubscribe(subscribable_type: type[Subscribable]): + pipe: Subscribable[Any, Any] = subscribable_type(return_self) + getter, setter = use_value() + disposable = create_disposable(setter) + pipe.subscribe(disposable) + + await pipe.next(0) + result = await getter() + assert result == 0 + + pipe.unsubscribe(disposable) + await pipe.next(1) + result = await getter() + assert result != 1 + +async def test_prepare_callback(subscribable_type: type[Subscribable]): + pipe: Subscribable[Any, Any] = subscribable_type(return_self) + disposable = CustomDisposableWithCallback() -async def test_empty_dispose(): - pipe: Pipe[Any, Any] = Pipe(return_self) + pipe.subscribe(disposable) + assert pipe.size == 1 + await disposable.dispose() + assert pipe.size == 0 + + +async def test_empty_dispose(subscribable_type: type[Subscribable]): + pipe: Subscribable[Any, Any] = subscribable_type(return_self) disposable = CustomDisposable() pipe.subscribe(disposable) @@ -167,7 +276,7 @@ async def test_empty_dispose(): assert disposable.disposed is True -async def test_dispose(): +async def test_dispose(subscribable_type: type[Subscribable]): flag: bool = False async def hit() -> None: @@ -175,7 +284,12 @@ async def hit() -> None: await anyio.sleep(0) flag = True - pipe: Pipe[Any, Any] = Pipe(return_self, dispose=hit) + pipe: Subscribable[Any, Any] + if subscribable_type is Pipe: + pipe = subscribable_type(return_self, dispose=hit) + else: + pipe = subscribable_type(dispose=hit) + disposable = CustomDisposable() pipe.subscribe(disposable) @@ -186,7 +300,7 @@ async def hit() -> None: assert flag is True -async def test_dispose_many(): +async def test_dispose_many(subscribable_type: type[Subscribable]): size = 10 check: list[Any] = [False] * size @@ -195,7 +309,7 @@ async def hit(index: int) -> None: await anyio.sleep(0) check[index] = True - pipe: Pipe[Any, Any] = Pipe(return_self) + pipe: Subscribable[Any, Any] = subscribable_type(return_self) for index in range(size): disposable = CustomDisposable(dispose=partial(hit, index=index)) pipe.subscribe(disposable) @@ -205,23 +319,25 @@ async def hit(index: int) -> None: assert all(x is True for x in check) -async def test_dispose_chain(): - pipe: Pipe[Any, Any] = Pipe(return_self) - disposable1 = Pipe(return_self) +async def test_dispose_chain(subscribable_type: type[Subscribable]): + pipe: Subscribable[Any, Any] = subscribable_type(return_self) + disposable1 = subscribable_type(return_self) disposable2 = CustomDisposable() pipe.subscribe(disposable1) disposable1.subscribe(disposable2) - assert disposable1.is_disposed is False + if isinstance(disposable1, Pipe): + assert disposable1.is_disposed is False assert disposable2.disposed is False await pipe.dispose() - assert disposable1.is_disposed is True + if isinstance(disposable1, Pipe): + assert disposable1.is_disposed is True assert disposable2.disposed is True -async def test_dispose_only_once(): +async def test_pipe_dispose_only_once(): count = 0 async def hit() -> None: @@ -236,7 +352,7 @@ async def hit() -> None: assert count == 1 -async def test_do_not_dispose(): +async def test_do_not_dispose(subscribable_type: type[Subscribable]): flag: bool = False async def hit() -> None: @@ -244,7 +360,7 @@ async def hit() -> None: await anyio.sleep(0) flag = True - pipe = Pipe(return_self) + pipe: Subscribable[int, int] = subscribable_type(return_self) disposable = CustomDisposable(dispose=hit) pipe.subscribe(disposable, dispose=False) @@ -253,7 +369,7 @@ async def hit() -> None: assert disposable.disposed is False -async def test_semaphore(): +async def test_pipe_semaphore(): size = 3 check: list[Any] = [False] * size @@ -264,7 +380,7 @@ async def hit(value: Any, index: int) -> None: sema_value = 2 sema = anyio.Semaphore(sema_value) - pipe = Pipe(as_str, context={"semaphore": sema}) + pipe: Pipe[int, tuple] = Pipe(as_tuple, context={"semaphore": sema}) for index in range(size): pipe.subscribe(partial(hit, index=index)) @@ -275,7 +391,7 @@ async def hit(value: Any, index: int) -> None: assert EPSILON * q < timer.term < EPSILON * q + EPSILON -async def test_limit(): +async def test_pipe_limit(): size = 3 check: list[Any] = [False] * size @@ -286,7 +402,7 @@ async def hit(value: Any, index: int) -> None: limit_value = 2 limit = anyio.CapacityLimiter(limit_value) - pipe = Pipe(as_str, context={"limiter": limit}) + pipe: Pipe[int, tuple] = Pipe(as_tuple, context={"limiter": limit}) for index in range(size): pipe.subscribe(partial(hit, index=index)) @@ -297,7 +413,7 @@ async def hit(value: Any, index: int) -> None: assert EPSILON * q < timer.term < EPSILON * q + EPSILON -async def test_lock(): +async def test_pipe_lock(): size = 3 check: list[Any] = [False] * size @@ -307,7 +423,7 @@ async def hit(value: Any, index: int) -> None: check[index] = value lock = anyio.Lock() - pipe = Pipe(as_str, context={"lock": lock}) + pipe: Pipe[int, tuple] = Pipe(as_tuple, context={"lock": lock}) for index in range(size): pipe.subscribe(partial(hit, index=index)) @@ -317,7 +433,7 @@ async def hit(value: Any, index: int) -> None: assert EPSILON * size < timer.term < EPSILON * size + EPSILON -async def test_next_after_disposed(): +async def test_pipe_next_after_disposed(): flag: bool = False async def hit(value: Any) -> None: # noqa: ARG001 @@ -333,7 +449,7 @@ async def hit(value: Any) -> None: # noqa: ARG001 await pipe.next(1) -async def test_subscribe_after_disposed(): +async def test_pipe_subscribe_after_disposed(): pipe = Pipe(return_self) await pipe.dispose() _, setter = use_value()