From e6de10d3dc2693012ee9387acd64e47ac1f28614 Mon Sep 17 00:00:00 2001 From: zerlok Date: Sat, 30 Nov 2024 19:24:02 +0100 Subject: [PATCH] fix Server consumer & handler registration when Broker is not connected --- pyproject.toml | 7 +- src/brokrpc/broker.py | 13 +++- src/brokrpc/rpc/client.py | 2 +- src/brokrpc/rpc/server.py | 20 ++--- src/brokrpc/serializer/json.py | 11 ++- tests/conftest.py | 63 ++++++++++++++-- tests/unit/rpc/__init__.py | 0 tests/unit/rpc/test_server.py | 129 +++++++++++++++++++++++++++++++++ tests/unit/test_broker.py | 97 +++++++++++++++++-------- 9 files changed, 290 insertions(+), 52 deletions(-) create mode 100644 tests/unit/rpc/__init__.py create mode 100644 tests/unit/rpc/test_server.py diff --git a/pyproject.toml b/pyproject.toml index f2b4729..74ae393 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "BrokRPC" -version = "0.2.2" +version = "0.2.3" description = "framework for gRPC like server-client communication over message brokers" authors = ["zerlok "] readme = "README.md" @@ -95,6 +95,11 @@ strict = true module = ["google.rpc.*"] ignore_missing_imports = true +# NOTE: allow return `typing.Any` in test fixtures (e.g. mock objects created with `create_autospec`) +[[tool.mypy.overrides]] +module = ["tests.*"] +warn_return_any = false + [tool.pytest.ini_options] pythonpath = [ diff --git a/src/brokrpc/broker.py b/src/brokrpc/broker.py index 992f020..a925782 100644 --- a/src/brokrpc/broker.py +++ b/src/brokrpc/broker.py @@ -2,6 +2,7 @@ import asyncio import typing as t +import warnings if t.TYPE_CHECKING: from concurrent.futures import Executor @@ -64,6 +65,10 @@ def __init__( self.__opened = asyncio.Event() self.__driver: BrokerDriver | None = None + def __del__(self) -> None: + if self.is_connected: + warnings.warn("broker was not disconnected properly", RuntimeWarning, stacklevel=1) + def __str__(self) -> str: return to_str_obj(self, is_connected=self.is_connected, driver=self.__driver) @@ -140,7 +145,7 @@ def publisher[T]( *, serializer: t.Callable[[T], BinaryMessage] | Serializer[T, BinaryMessage] | None = None, ) -> t.AsyncContextManager[BinaryPublisher] | t.AsyncContextManager[Publisher[T, PublisherResult]]: - return self.builder_publisher().add_serializer(serializer).build(options) + return self.build_publisher().add_serializer(serializer).build(options) @t.overload def consumer[T]( @@ -212,7 +217,7 @@ def consumer[T]( .build(t.cast(t.Any, consumer), options, executor=executor) ) - def builder_publisher(self) -> PublisherBuilder[BinaryMessage, PublisherResult]: + def build_publisher(self) -> PublisherBuilder[BinaryMessage, PublisherResult]: return ( PublisherBuilder(self.__provide_publisher, ident) .set_exchange(self.__default_exchange) @@ -239,7 +244,9 @@ def __provide_publisher(self, options: PublisherOptions | None) -> t.AsyncContex return self.__get_driver().provide_publisher(options) def __bind_consumer( - self, consumer: BinaryConsumer, options: BindingOptions + self, + consumer: BinaryConsumer, + options: BindingOptions, ) -> t.AsyncContextManager[BoundConsumer]: return self.__get_driver().bind_consumer(consumer, options) diff --git a/src/brokrpc/rpc/client.py b/src/brokrpc/rpc/client.py index aaa5b31..7cb9624 100644 --- a/src/brokrpc/rpc/client.py +++ b/src/brokrpc/rpc/client.py @@ -35,7 +35,7 @@ def build_message(body: U) -> Message[U]: ) return ( - self.__broker.builder_publisher() + self.__broker.build_publisher() .add_serializer(serializer) .add_serializer(build_message) .build(PublisherOptions(**asdict(exchange)) if exchange is not None else None) diff --git a/src/brokrpc/rpc/server.py b/src/brokrpc/rpc/server.py index 9ce1999..7549a9f 100644 --- a/src/brokrpc/rpc/server.py +++ b/src/brokrpc/rpc/server.py @@ -61,7 +61,7 @@ def __init__( self.__lock = asyncio.Lock() self.__cm_stack = AsyncExitStack() self.__state = State.IDLE - self.__consumers: list[t.AsyncContextManager[BoundConsumer]] = [] + self.__consumer_cms: list[t.Callable[[], t.AsyncContextManager[BoundConsumer]]] = [] self.__bound_consumers: list[BoundConsumer] = [] def __str__(self) -> str: @@ -90,16 +90,17 @@ def register_consumer[U]( binding = self.__get_binding_options(exchange, routing_key, queue) - cm: t.AsyncContextManager[BoundConsumer] + cm: t.Callable[[], t.AsyncContextManager[BoundConsumer]] match func: case consumer if isinstance(consumer, Consumer): - cm = self.__broker.consumer(consumer, binding, serializer=serializer) + cm = partial(self.__broker.consumer, consumer, binding, serializer=serializer) case async_func if inspect.iscoroutinefunction(async_func): - cm = self.__broker.consumer(async_func, binding, serializer=serializer) + cm = partial(self.__broker.consumer, async_func, binding, serializer=serializer) case sync_func if callable(sync_func): - cm = self.__broker.consumer( + cm = partial( + self.__broker.consumer, # TODO: avoid cast t.cast(t.Callable[[U], ConsumerResult], sync_func), binding, @@ -112,7 +113,7 @@ def register_consumer[U]( details = "invalid func type" raise TypeError(details, func) - self.__consumers.append(cm) + self.__consumer_cms.append(cm) def register_unary_unary_handler[U, V]( self, @@ -148,8 +149,9 @@ def register_unary_unary_handler[U, V]( details = "invalid func type" raise TypeError(details, func) - self.__consumers.append( - self.__bind_handler( + self.__consumer_cms.append( + partial( + self.__bind_handler, factory=factory, binding=self.__get_binding_options(exchange, routing_key, queue), ) @@ -165,7 +167,7 @@ async def start(self) -> None: self.__state = State.STARTUP try: bound_consumers = await asyncio.gather( - *(self.__cm_stack.enter_async_context(handler_cm) for handler_cm in self.__consumers) + *(self.__cm_stack.enter_async_context(cm()) for cm in self.__consumer_cms) ) except Exception: diff --git a/src/brokrpc/serializer/json.py b/src/brokrpc/serializer/json.py index 4195f72..c07380c 100644 --- a/src/brokrpc/serializer/json.py +++ b/src/brokrpc/serializer/json.py @@ -16,8 +16,17 @@ def __init__( encoder: JSONEncoder | None = None, decoder: JSONDecoder | None = None, encoding: str | None = None, + default: t.Callable[[object], object] | None = None, ) -> None: - self.__encoder = encoder if encoder is not None else JSONEncoder(indent=None, separators=(",", ":")) + self.__encoder = ( + encoder + if encoder is not None + else JSONEncoder( + indent=None, + separators=(",", ":"), + default=default, + ) + ) self.__decoder = decoder if decoder is not None else JSONDecoder() self.__encoding = encoding if encoding is not None else "utf-8" diff --git a/tests/conftest.py b/tests/conftest.py index 178c557..c3a3ac7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,20 +1,33 @@ import typing as t from contextlib import nullcontext from datetime import timedelta +from unittest.mock import create_autospec import pytest # FIXME: find a way to import Parser # NOTE: fixes strange error `Module "_pytest.config" does not explicitly export attribute "Parser"`. from _pytest.config import Config, Parser # type: ignore[attr-defined] +from _pytest.fixtures import SubRequest from yarl import URL -from brokrpc.abc import BrokerDriver +from brokrpc.abc import Consumer, Serializer from brokrpc.broker import Broker -from brokrpc.options import BrokerOptions +from brokrpc.message import BinaryMessage +from brokrpc.model import ConsumerResult +from brokrpc.options import BindingOptions, BrokerOptions from brokrpc.retry import ConstantDelay, DelayRetryStrategy, ExponentialDelay, MultiplierDelay +from brokrpc.rpc.abc import UnaryUnaryHandler from tests.stub.driver import StubBrokerDriver, StubConsumer +BROKER_IS_CONNECTED: t.Final = pytest.mark.parametrize( + "broker_connected", [pytest.param(True, id="broker is connected")] +) +BROKER_IS_NOT_CONNECTED: t.Final = pytest.mark.parametrize( + "broker_connected", [pytest.param(False, id="broker is not connected")] +) +WARNINGS_AS_ERRORS: t.Final = pytest.mark.WARNINGS_AS_ERRORS("error") + @pytest.hookimpl(trylast=True) def pytest_addoption(parser: Parser) -> None: @@ -98,21 +111,55 @@ def parse_retry_delay_strategy(config: Config) -> DelayRetryStrategy | None: @pytest.fixture -def broker_driver() -> StubBrokerDriver: +def stub_broker_driver() -> StubBrokerDriver: return StubBrokerDriver() @pytest.fixture -def broker(broker_driver: BrokerDriver) -> Broker: - return Broker(nullcontext(broker_driver)) +async def stub_broker(*, stub_broker_driver: StubBrokerDriver, broker_connected: bool) -> t.AsyncIterator[Broker]: + broker = Broker(nullcontext(stub_broker_driver)) + try: + if broker_connected: + await broker.connect() + assert broker.is_connected is broker_connected -@pytest.fixture -async def connected_broker(broker: Broker) -> t.AsyncIterator[Broker]: - async with broker: yield broker + finally: + await broker.disconnect() + + +@pytest.fixture +def broker_connected() -> bool: + return False + @pytest.fixture def stub_consumer() -> StubConsumer: return StubConsumer() + + +@pytest.fixture +def mock_consumer() -> Consumer[object, ConsumerResult]: + return create_autospec(Consumer) + + +@pytest.fixture +def mock_unary_unary_handler() -> UnaryUnaryHandler[object, object]: + return create_autospec(UnaryUnaryHandler) + + +@pytest.fixture +def stub_routing_key(request: SubRequest) -> str: + return request.node.name + + +@pytest.fixture +def stub_binding_options(stub_routing_key: str) -> BindingOptions: + return BindingOptions(binding_keys=(stub_routing_key,)) + + +@pytest.fixture +def mock_serializer() -> Serializer[object, BinaryMessage]: + return create_autospec(Serializer) diff --git a/tests/unit/rpc/__init__.py b/tests/unit/rpc/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/rpc/test_server.py b/tests/unit/rpc/test_server.py new file mode 100644 index 0000000..4ea980a --- /dev/null +++ b/tests/unit/rpc/test_server.py @@ -0,0 +1,129 @@ +import typing as t + +import pytest + +from brokrpc.abc import Consumer, Serializer +from brokrpc.broker import Broker +from brokrpc.message import BinaryMessage +from brokrpc.model import ConsumerResult +from brokrpc.rpc.abc import HandlerSerializer, UnaryUnaryHandler +from brokrpc.rpc.model import Request +from brokrpc.rpc.server import Server, ServerNotInConfigurableStateError, ServerOptions +from tests.conftest import BROKER_IS_CONNECTED, WARNINGS_AS_ERRORS + +SERVER_IS_CONFIGURABLE = pytest.mark.parametrize( + ("broker_connected", "server_running"), + [ + pytest.param(False, False, id="broker not connected & server is not running"), + pytest.param(True, False, id="broker is connected, but server is not running yet"), + ], +) +SERVER_IS_NOT_CONFIGURABLE = pytest.mark.parametrize( + ("broker_connected", "server_running"), + [ + pytest.param(True, True, id="broker is connected & server is running"), + ], +) + + +@BROKER_IS_CONNECTED +async def test_warning_on_del_of_running_server(stub_broker: Broker) -> None: + server = Server(stub_broker) + + await server.start() + + with pytest.warns(RuntimeWarning, match="server was not stopped properly"): + del server + + +@BROKER_IS_CONNECTED +@WARNINGS_AS_ERRORS +async def test_no_warning_on_del_of_stopped_server(stub_broker: Broker) -> None: + server = Server(stub_broker) + + await server.start() + await server.stop() + + del server + + +@SERVER_IS_CONFIGURABLE +def test_register_consumer_ok[U]( + *, + server: Server, + mock_consumer: Consumer[U, ConsumerResult], + stub_routing_key: str, + mock_serializer: Serializer[U, BinaryMessage], +) -> None: + server.register_consumer(mock_consumer, stub_routing_key, mock_serializer) + + +@SERVER_IS_NOT_CONFIGURABLE +def test_register_consumer_error[U]( + *, + server: Server, + mock_consumer: Consumer[U, ConsumerResult], + stub_routing_key: str, + mock_serializer: Serializer[U, BinaryMessage], +) -> None: + with pytest.raises(ServerNotInConfigurableStateError): + server.register_consumer(mock_consumer, stub_routing_key, mock_serializer) + + +@SERVER_IS_CONFIGURABLE +def test_register_unary_unary_ok[U, V]( + *, + server: Server, + mock_unary_unary_handler: UnaryUnaryHandler[Request[U], V], + stub_routing_key: str, + mock_serializer: HandlerSerializer[U, V], +) -> None: + server.register_unary_unary_handler( + func=mock_unary_unary_handler, + routing_key=stub_routing_key, + serializer=mock_serializer, + ) + + +@SERVER_IS_NOT_CONFIGURABLE +def test_register_unary_unary_error[U, V]( + *, + server: Server, + mock_unary_unary_handler: UnaryUnaryHandler[Request[U], V], + stub_routing_key: str, + mock_serializer: HandlerSerializer[U, V], +) -> None: + with pytest.raises(ServerNotInConfigurableStateError): + server.register_unary_unary_handler( + func=mock_unary_unary_handler, + routing_key=stub_routing_key, + serializer=mock_serializer, + ) + + +@pytest.fixture +async def server( + *, + stub_broker: Broker, + server_options: ServerOptions | None, + server_running: bool, +) -> t.AsyncIterator[Server]: + server = Server(stub_broker, server_options) + try: + if server_running: + await server.start() + + yield server + + finally: + await server.stop() + + +@pytest.fixture +def server_options() -> ServerOptions | None: + return None + + +@pytest.fixture +def server_running() -> bool: + return False diff --git a/tests/unit/test_broker.py b/tests/unit/test_broker.py index 5d870db..639f22f 100644 --- a/tests/unit/test_broker.py +++ b/tests/unit/test_broker.py @@ -1,55 +1,94 @@ +from contextlib import nullcontext + import pytest from brokrpc.abc import BrokerDriver from brokrpc.broker import Broker, BrokerIsNotConnectedError from brokrpc.options import BindingOptions +from tests.conftest import BROKER_IS_CONNECTED, BROKER_IS_NOT_CONNECTED, WARNINGS_AS_ERRORS from tests.stub.driver import StubConsumer -def test_not_connected_broker_to_str_contains_appropriate_info( - not_connected_broker: Broker, -) -> None: - assert "is_connected=False" in str(not_connected_broker) - assert "driver=None" in str(not_connected_broker) - assert repr(not_connected_broker) +async def test_warning_on_del_of_connected_broker(stub_broker_driver: BrokerDriver) -> None: + broker = Broker(nullcontext(stub_broker_driver)) + await broker.connect() -def test_connected_broker_to_str_contains_appropriate_info( - broker_driver: BrokerDriver, - connected_broker: Broker, -) -> None: - assert "is_connected=True" in str(connected_broker) - assert "driver" in str(connected_broker) - assert "driver=None" not in str(connected_broker) - assert repr(connected_broker) + with pytest.warns(RuntimeWarning, match="broker was not disconnected properly"): + del broker -async def test_broker_connect_sets_is_connected( - not_connected_broker: Broker, -) -> None: - await not_connected_broker.connect() +@WARNINGS_AS_ERRORS +async def test_no_warning_on_del_of_disconnected_broker(stub_broker_driver: BrokerDriver) -> None: + broker = Broker(nullcontext(stub_broker_driver)) - assert not_connected_broker.is_connected + await broker.connect() + await broker.disconnect() + del broker -async def test_not_connected_broker_cant_provide_publisher( - not_connected_broker: Broker, -) -> None: + +@BROKER_IS_NOT_CONNECTED +def test_not_connected_broker_to_str_contains_appropriate_info(stub_broker: Broker) -> None: + assert "is_connected=False" in str(stub_broker) + assert "driver=None" in str(stub_broker) + assert repr(stub_broker) + + +@BROKER_IS_CONNECTED +def test_connected_broker_to_str_contains_appropriate_info(stub_broker: Broker) -> None: + assert "is_connected=True" in str(stub_broker) + assert "driver" in str(stub_broker) + assert "driver=None" not in str(stub_broker) + assert repr(stub_broker) + + +@BROKER_IS_NOT_CONNECTED +async def test_broker_connect_sets_is_connected(stub_broker: Broker) -> None: + await stub_broker.connect() + + assert stub_broker.is_connected + + +@BROKER_IS_CONNECTED +async def test_connected_broker_can_provide_publisher(stub_broker: Broker) -> None: + async with stub_broker.publisher(): + pass + + +@BROKER_IS_NOT_CONNECTED +async def test_not_connected_broker_cant_provide_publisher(stub_broker: Broker) -> None: with pytest.raises(BrokerIsNotConnectedError): - async with not_connected_broker.publisher(): + async with stub_broker.publisher(): pass +@BROKER_IS_CONNECTED +def test_connected_broker_can_get_publisher_builder(stub_broker: Broker) -> None: + stub_broker.build_publisher() + + +@BROKER_IS_CONNECTED +async def test_connected_broker_can_register_consumer( + stub_broker: Broker, + stub_consumer: StubConsumer, + stub_binding_options: BindingOptions, +) -> None: + async with stub_broker.consumer(stub_consumer, stub_binding_options): + pass + + +@BROKER_IS_NOT_CONNECTED async def test_not_connected_broker_cant_register_consumer( - not_connected_broker: Broker, + stub_broker: Broker, stub_consumer: StubConsumer, + stub_binding_options: BindingOptions, ) -> None: with pytest.raises(BrokerIsNotConnectedError): - async with not_connected_broker.consumer(stub_consumer, BindingOptions(binding_keys=("test",))): + async with stub_broker.consumer(stub_consumer, stub_binding_options): pass -@pytest.fixture -def not_connected_broker(broker: Broker) -> Broker: - assert not broker.is_connected - return broker +@BROKER_IS_CONNECTED +def test_connected_broker_can_get_consumer_builder(stub_broker: Broker) -> None: + stub_broker.build_consumer()