Skip to content

Commit

Permalink
fix Server consumer & handler registration when Broker is not connected
Browse files Browse the repository at this point in the history
  • Loading branch information
zerlok committed Nov 30, 2024
1 parent b61e3d3 commit e6de10d
Show file tree
Hide file tree
Showing 9 changed files with 290 additions and 52 deletions.
7 changes: 6 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "BrokRPC"
version = "0.2.2"
version = "0.2.3"
description = "framework for gRPC like server-client communication over message brokers"
authors = ["zerlok <[email protected]>"]
readme = "README.md"
Expand Down Expand Up @@ -95,6 +95,11 @@ strict = true
module = ["google.rpc.*"]
ignore_missing_imports = true

# NOTE: allow return `typing.Any` in test fixtures (e.g. mock objects created with `create_autospec`)
[[tool.mypy.overrides]]
module = ["tests.*"]
warn_return_any = false


[tool.pytest.ini_options]
pythonpath = [
Expand Down
13 changes: 10 additions & 3 deletions src/brokrpc/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import typing as t
import warnings

if t.TYPE_CHECKING:
from concurrent.futures import Executor
Expand Down Expand Up @@ -64,6 +65,10 @@ def __init__(
self.__opened = asyncio.Event()
self.__driver: BrokerDriver | None = None

def __del__(self) -> None:
if self.is_connected:
warnings.warn("broker was not disconnected properly", RuntimeWarning, stacklevel=1)

def __str__(self) -> str:
return to_str_obj(self, is_connected=self.is_connected, driver=self.__driver)

Expand Down Expand Up @@ -140,7 +145,7 @@ def publisher[T](
*,
serializer: t.Callable[[T], BinaryMessage] | Serializer[T, BinaryMessage] | None = None,
) -> t.AsyncContextManager[BinaryPublisher] | t.AsyncContextManager[Publisher[T, PublisherResult]]:
return self.builder_publisher().add_serializer(serializer).build(options)
return self.build_publisher().add_serializer(serializer).build(options)

@t.overload
def consumer[T](
Expand Down Expand Up @@ -212,7 +217,7 @@ def consumer[T](
.build(t.cast(t.Any, consumer), options, executor=executor)
)

def builder_publisher(self) -> PublisherBuilder[BinaryMessage, PublisherResult]:
def build_publisher(self) -> PublisherBuilder[BinaryMessage, PublisherResult]:
return (
PublisherBuilder(self.__provide_publisher, ident)
.set_exchange(self.__default_exchange)
Expand All @@ -239,7 +244,9 @@ def __provide_publisher(self, options: PublisherOptions | None) -> t.AsyncContex
return self.__get_driver().provide_publisher(options)

def __bind_consumer(
self, consumer: BinaryConsumer, options: BindingOptions
self,
consumer: BinaryConsumer,
options: BindingOptions,
) -> t.AsyncContextManager[BoundConsumer]:
return self.__get_driver().bind_consumer(consumer, options)

Expand Down
2 changes: 1 addition & 1 deletion src/brokrpc/rpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def build_message(body: U) -> Message[U]:
)

return (
self.__broker.builder_publisher()
self.__broker.build_publisher()
.add_serializer(serializer)
.add_serializer(build_message)
.build(PublisherOptions(**asdict(exchange)) if exchange is not None else None)
Expand Down
20 changes: 11 additions & 9 deletions src/brokrpc/rpc/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def __init__(
self.__lock = asyncio.Lock()
self.__cm_stack = AsyncExitStack()
self.__state = State.IDLE
self.__consumers: list[t.AsyncContextManager[BoundConsumer]] = []
self.__consumer_cms: list[t.Callable[[], t.AsyncContextManager[BoundConsumer]]] = []
self.__bound_consumers: list[BoundConsumer] = []

def __str__(self) -> str:
Expand Down Expand Up @@ -90,16 +90,17 @@ def register_consumer[U](

binding = self.__get_binding_options(exchange, routing_key, queue)

cm: t.AsyncContextManager[BoundConsumer]
cm: t.Callable[[], t.AsyncContextManager[BoundConsumer]]
match func:
case consumer if isinstance(consumer, Consumer):
cm = self.__broker.consumer(consumer, binding, serializer=serializer)
cm = partial(self.__broker.consumer, consumer, binding, serializer=serializer)

case async_func if inspect.iscoroutinefunction(async_func):
cm = self.__broker.consumer(async_func, binding, serializer=serializer)
cm = partial(self.__broker.consumer, async_func, binding, serializer=serializer)

case sync_func if callable(sync_func):
cm = self.__broker.consumer(
cm = partial(
self.__broker.consumer,
# TODO: avoid cast
t.cast(t.Callable[[U], ConsumerResult], sync_func),
binding,
Expand All @@ -112,7 +113,7 @@ def register_consumer[U](
details = "invalid func type"
raise TypeError(details, func)

self.__consumers.append(cm)
self.__consumer_cms.append(cm)

def register_unary_unary_handler[U, V](
self,
Expand Down Expand Up @@ -148,8 +149,9 @@ def register_unary_unary_handler[U, V](
details = "invalid func type"
raise TypeError(details, func)

self.__consumers.append(
self.__bind_handler(
self.__consumer_cms.append(
partial(
self.__bind_handler,
factory=factory,
binding=self.__get_binding_options(exchange, routing_key, queue),
)
Expand All @@ -165,7 +167,7 @@ async def start(self) -> None:
self.__state = State.STARTUP
try:
bound_consumers = await asyncio.gather(
*(self.__cm_stack.enter_async_context(handler_cm) for handler_cm in self.__consumers)
*(self.__cm_stack.enter_async_context(cm()) for cm in self.__consumer_cms)
)

except Exception:
Expand Down
11 changes: 10 additions & 1 deletion src/brokrpc/serializer/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,17 @@ def __init__(
encoder: JSONEncoder | None = None,
decoder: JSONDecoder | None = None,
encoding: str | None = None,
default: t.Callable[[object], object] | None = None,
) -> None:
self.__encoder = encoder if encoder is not None else JSONEncoder(indent=None, separators=(",", ":"))
self.__encoder = (
encoder
if encoder is not None
else JSONEncoder(
indent=None,
separators=(",", ":"),
default=default,
)
)
self.__decoder = decoder if decoder is not None else JSONDecoder()
self.__encoding = encoding if encoding is not None else "utf-8"

Expand Down
63 changes: 55 additions & 8 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,33 @@
import typing as t
from contextlib import nullcontext
from datetime import timedelta
from unittest.mock import create_autospec

import pytest

# FIXME: find a way to import Parser
# NOTE: fixes strange error `Module "_pytest.config" does not explicitly export attribute "Parser"`.
from _pytest.config import Config, Parser # type: ignore[attr-defined]
from _pytest.fixtures import SubRequest
from yarl import URL

from brokrpc.abc import BrokerDriver
from brokrpc.abc import Consumer, Serializer
from brokrpc.broker import Broker
from brokrpc.options import BrokerOptions
from brokrpc.message import BinaryMessage
from brokrpc.model import ConsumerResult
from brokrpc.options import BindingOptions, BrokerOptions
from brokrpc.retry import ConstantDelay, DelayRetryStrategy, ExponentialDelay, MultiplierDelay
from brokrpc.rpc.abc import UnaryUnaryHandler
from tests.stub.driver import StubBrokerDriver, StubConsumer

BROKER_IS_CONNECTED: t.Final = pytest.mark.parametrize(
"broker_connected", [pytest.param(True, id="broker is connected")]
)
BROKER_IS_NOT_CONNECTED: t.Final = pytest.mark.parametrize(
"broker_connected", [pytest.param(False, id="broker is not connected")]
)
WARNINGS_AS_ERRORS: t.Final = pytest.mark.WARNINGS_AS_ERRORS("error")


@pytest.hookimpl(trylast=True)
def pytest_addoption(parser: Parser) -> None:
Expand Down Expand Up @@ -98,21 +111,55 @@ def parse_retry_delay_strategy(config: Config) -> DelayRetryStrategy | None:


@pytest.fixture
def broker_driver() -> StubBrokerDriver:
def stub_broker_driver() -> StubBrokerDriver:
return StubBrokerDriver()


@pytest.fixture
def broker(broker_driver: BrokerDriver) -> Broker:
return Broker(nullcontext(broker_driver))
async def stub_broker(*, stub_broker_driver: StubBrokerDriver, broker_connected: bool) -> t.AsyncIterator[Broker]:
broker = Broker(nullcontext(stub_broker_driver))
try:
if broker_connected:
await broker.connect()

assert broker.is_connected is broker_connected

@pytest.fixture
async def connected_broker(broker: Broker) -> t.AsyncIterator[Broker]:
async with broker:
yield broker

finally:
await broker.disconnect()


@pytest.fixture
def broker_connected() -> bool:
return False


@pytest.fixture
def stub_consumer() -> StubConsumer:
return StubConsumer()


@pytest.fixture
def mock_consumer() -> Consumer[object, ConsumerResult]:
return create_autospec(Consumer)


@pytest.fixture
def mock_unary_unary_handler() -> UnaryUnaryHandler[object, object]:
return create_autospec(UnaryUnaryHandler)


@pytest.fixture
def stub_routing_key(request: SubRequest) -> str:
return request.node.name


@pytest.fixture
def stub_binding_options(stub_routing_key: str) -> BindingOptions:
return BindingOptions(binding_keys=(stub_routing_key,))


@pytest.fixture
def mock_serializer() -> Serializer[object, BinaryMessage]:
return create_autospec(Serializer)
Empty file added tests/unit/rpc/__init__.py
Empty file.
129 changes: 129 additions & 0 deletions tests/unit/rpc/test_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import typing as t

import pytest

from brokrpc.abc import Consumer, Serializer
from brokrpc.broker import Broker
from brokrpc.message import BinaryMessage
from brokrpc.model import ConsumerResult
from brokrpc.rpc.abc import HandlerSerializer, UnaryUnaryHandler
from brokrpc.rpc.model import Request
from brokrpc.rpc.server import Server, ServerNotInConfigurableStateError, ServerOptions
from tests.conftest import BROKER_IS_CONNECTED, WARNINGS_AS_ERRORS

SERVER_IS_CONFIGURABLE = pytest.mark.parametrize(
("broker_connected", "server_running"),
[
pytest.param(False, False, id="broker not connected & server is not running"),
pytest.param(True, False, id="broker is connected, but server is not running yet"),
],
)
SERVER_IS_NOT_CONFIGURABLE = pytest.mark.parametrize(
("broker_connected", "server_running"),
[
pytest.param(True, True, id="broker is connected & server is running"),
],
)


@BROKER_IS_CONNECTED
async def test_warning_on_del_of_running_server(stub_broker: Broker) -> None:
server = Server(stub_broker)

await server.start()

with pytest.warns(RuntimeWarning, match="server was not stopped properly"):
del server


@BROKER_IS_CONNECTED
@WARNINGS_AS_ERRORS
async def test_no_warning_on_del_of_stopped_server(stub_broker: Broker) -> None:
server = Server(stub_broker)

await server.start()
await server.stop()

del server


@SERVER_IS_CONFIGURABLE
def test_register_consumer_ok[U](
*,
server: Server,
mock_consumer: Consumer[U, ConsumerResult],
stub_routing_key: str,
mock_serializer: Serializer[U, BinaryMessage],
) -> None:
server.register_consumer(mock_consumer, stub_routing_key, mock_serializer)


@SERVER_IS_NOT_CONFIGURABLE
def test_register_consumer_error[U](
*,
server: Server,
mock_consumer: Consumer[U, ConsumerResult],
stub_routing_key: str,
mock_serializer: Serializer[U, BinaryMessage],
) -> None:
with pytest.raises(ServerNotInConfigurableStateError):
server.register_consumer(mock_consumer, stub_routing_key, mock_serializer)


@SERVER_IS_CONFIGURABLE
def test_register_unary_unary_ok[U, V](
*,
server: Server,
mock_unary_unary_handler: UnaryUnaryHandler[Request[U], V],
stub_routing_key: str,
mock_serializer: HandlerSerializer[U, V],
) -> None:
server.register_unary_unary_handler(
func=mock_unary_unary_handler,
routing_key=stub_routing_key,
serializer=mock_serializer,
)


@SERVER_IS_NOT_CONFIGURABLE
def test_register_unary_unary_error[U, V](
*,
server: Server,
mock_unary_unary_handler: UnaryUnaryHandler[Request[U], V],
stub_routing_key: str,
mock_serializer: HandlerSerializer[U, V],
) -> None:
with pytest.raises(ServerNotInConfigurableStateError):
server.register_unary_unary_handler(
func=mock_unary_unary_handler,
routing_key=stub_routing_key,
serializer=mock_serializer,
)


@pytest.fixture
async def server(
*,
stub_broker: Broker,
server_options: ServerOptions | None,
server_running: bool,
) -> t.AsyncIterator[Server]:
server = Server(stub_broker, server_options)
try:
if server_running:
await server.start()

yield server

finally:
await server.stop()


@pytest.fixture
def server_options() -> ServerOptions | None:
return None


@pytest.fixture
def server_running() -> bool:
return False
Loading

0 comments on commit e6de10d

Please sign in to comment.