Skip to content

Commit

Permalink
feat: add size, is_disposed
Browse files Browse the repository at this point in the history
  • Loading branch information
phi-friday committed Feb 25, 2024
1 parent df49fd0 commit 6378631
Showing 1 changed file with 20 additions and 5 deletions.
25 changes: 20 additions & 5 deletions src/async_wrapper/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ class Synchronization(TypedDict, total=False):
class Disposable(Protocol[InputT, OutputT]):
"""Defines the interface for a disposable resource."""

@property
def is_disposed(self) -> bool:
"""Check if disposed"""
... # pragma: no cover

async def next(self, value: InputT) -> OutputT:
"""
Processes the next input value and produces an output value.
Expand Down Expand Up @@ -75,6 +80,11 @@ def prepare_callback(self, subscribable: Subscribable) -> Any:
class Subscribable(Disposable[InputT, OutputT], Protocol[InputT, OutputT]):
"""subscribable & disposable"""

@property
def size(self) -> int:
"""listener size"""
... # pragma: no cover

def subscribe(
self,
disposable: Disposable[OutputT, Any] | Callable[[OutputT], Awaitable[Any]],
Expand All @@ -89,7 +99,7 @@ def subscribe(
dispose: Whether to dispose the disposable when the pipe is disposed.
"""

def unsubscribe(self, dispoable: Disposable[Any, Any]) -> None:
def unsubscribe(self, disposable: Disposable[Any, Any]) -> None:
"""
Unsubscribes a disposable
Expand All @@ -115,8 +125,8 @@ def __init__(
self._journals = deque()

@property
@override
def is_disposed(self) -> bool:
"""is disposed"""
return self._is_disposed

@override
Expand Down Expand Up @@ -176,10 +186,15 @@ def __init__(
self._dispose_lock = anyio.Lock()

@property
@override
def is_disposed(self) -> bool:
"""is disposed"""
return self._is_disposed

@property
@override
def size(self) -> int:
return len(self._listeners)

@override
async def next(self, value: InputT) -> OutputT:
if self._is_disposed:
Expand Down Expand Up @@ -227,8 +242,8 @@ def subscribe(
disposable.prepare_callback(self)

@override
def unsubscribe(self, dispoable: Disposable[Any, Any]) -> None:
self._listeners.pop(dispoable, None)
def unsubscribe(self, disposable: Disposable[Any, Any]) -> None:
self._listeners.pop(disposable, None)


def create_disposable(
Expand Down

0 comments on commit 6378631

Please sign in to comment.