diff --git a/shared/lib-hoppy/hoppy/base_channel_opener.py b/shared/lib-hoppy/hoppy/base_channel_opener.py new file mode 100644 index 0000000000..6cc26e73fb --- /dev/null +++ b/shared/lib-hoppy/hoppy/base_channel_opener.py @@ -0,0 +1,127 @@ +import logging +import time +from abc import ABC, abstractmethod + +from hoppy.config import RABBITMQ_CONFIG +from pika.adapters.asyncio_connection import AsyncioConnection +from pika import ConnectionParameters, PlainCredentials + + +class BaseChannelOpener(ABC): + def __init__(self, config: [dict | None] = None): + if config is None: + config = {} + self.config = {**RABBITMQ_CONFIG, **config} + self.connection_parameters = self._create_connection_parameters() + + self._connection = None + self._channel = None + self._max_reconnect_delay = self.config.get('max_reconnect_delay', 30) + self._reconnect_delay = self.config.get('initial_reconnect_delay', 0) + self._custom_loop = None + + def _create_connection_parameters(self) -> ConnectionParameters: + credentials = PlainCredentials(self.config["username"], self.config["password"]) + return ConnectionParameters( + host=self.config['host'], + port=self.config['port'], + virtual_host=self.config['virtual_host'], + credentials=credentials) + + def _initialize_connection_session(self): + """The following attributes are used per connection session. When a reconnect happens, they should be reset.""" + self._reconnect_delay = self.config.get('initial_reconnect_delay', 0) + + def connect(self, loop=None): + """ + Creates the asyncio connection to RabbitMQ + + Parameters + ---------- + loop = None | asyncio.AbstractEventLoop | nbio_interface.AbstractIOServices + Defaults to asyncio.get_event_loop() + """ + + self._custom_loop = loop + + self._info('connectingToRabbitMq', config=self.config) + self._connection = AsyncioConnection( + parameters=self.connection_parameters, + on_open_callback=self._on_connection_open, + on_open_error_callback=self._on_connection_open_error, + on_close_callback=self._on_connection_closed, + custom_ioloop=loop) + return self._connection + + def _on_connection_open(self, connection): + self._debug('openedConnection') + self._connection = connection + self._initialize_connection_session() + self._open_channel() + + def _on_connection_open_error(self, _unused_connection, err): + self._error('failedToOpenConnection', err) + self._reconnect() + + def _close_connection(self): + if self._connection is not None: + self._debug('closingConnection', + closing=self._connection.is_closing, + closed=self._connection.is_closed) + if not self._connection.is_closing and not self._connection.is_closed: + self._connection.close() + + @abstractmethod + def _on_connection_closed(self, _unused_connection, reason): + pass + + def _reconnect(self): + self.stop() + reconnect_delay = self._get_reconnect_delay() + self._warning('reconnecting', reconnect_delay_seconds=reconnect_delay) + time.sleep(reconnect_delay) + self.connect(self._custom_loop) + + def _get_reconnect_delay(self): + self._reconnect_delay += 1 + if self._reconnect_delay > self._max_reconnect_delay: + self._reconnect_delay = self._max_reconnect_delay + return self._reconnect_delay + + def _open_channel(self): + self._debug('openingChannel') + self._connection.channel(on_open_callback=self._on_channel_open) + + def _on_channel_closed(self, channel, reason): + self._warning('closedChannel', channel=channel, reason=reason) + self._close_connection() + + def _close_channel(self): + if self._channel is not None: + self._debug('closingChannel', channel=self._channel) + self._channel.close() + + def _on_channel_open(self, channel): + self._debug('openedChannel', channel=channel) + self._channel = channel + self._channel.add_on_close_callback(self._on_channel_closed) + + @staticmethod + def __kwarg_str(**kwargs): + return ' '.join(f'{k}={v}' for k, v in kwargs.items()) + + def _debug(self, event, **kwargs): + msg = f'event={event} client_type={self._client_type.name} {self.__kwarg_str(**kwargs)}' + logging.debug(msg) + + def _info(self, event, **kwargs): + msg = f'event={event} client_type={self._client_type.name} {self.__kwarg_str(**kwargs)}' + logging.info(msg) + + def _warning(self, event, **kwargs): + msg = f'event={event} client_type={self._client_type.name} {self.__kwarg_str(**kwargs)}' + logging.warning(msg) + + def _error(self, event, error=None, **kwargs): + msg = f'event={event} client_type={self._client_type.name} err={error!r} {self.__kwarg_str(**kwargs)}' + logging.error(msg) diff --git a/shared/lib-hoppy/hoppy/base_exchange_declarer.py b/shared/lib-hoppy/hoppy/base_exchange_declarer.py new file mode 100644 index 0000000000..478c94b9aa --- /dev/null +++ b/shared/lib-hoppy/hoppy/base_exchange_declarer.py @@ -0,0 +1,47 @@ +from hoppy.base_channel_opener import BaseChannelOpener +from hoppy.hoppy_properties import ExchangeProperties + + +class BaseExchangeDeclarer(BaseChannelOpener): + + def __init__(self, config: [dict | None] = None, exchange_properties: ExchangeProperties = ExchangeProperties()): + super().__init__(config) + + self._set_exchange_properties(exchange_properties) + + def _set_exchange_properties(self, exchange_properties: ExchangeProperties): + self.exchange_name = exchange_properties.name + self.exchange_type = exchange_properties.type + self.passive_declare_exchange = exchange_properties.passive_declare + self.durable_exchange = exchange_properties.durable + self.auto_delete_exchange = exchange_properties.auto_delete + + def _open_channel(self): + self._debug('openingChannel') + self._connection.channel(on_open_callback=self._on_channel_open) + + def _on_channel_open(self, channel): + super()._on_channel_open(channel) + self._setup_exchange() + + def _setup_exchange(self): + self._debug('declaringExchange', + exchange=self.exchange_name, + type=self.exchange_type, + passive_declare=self.passive_declare_exchange, + durable=self.durable_exchange, + auto_delete=self.auto_delete_exchange) + self._channel.exchange_declare(exchange=self.exchange_name, + exchange_type=self.exchange_type, + passive=self.passive_declare_exchange, + durable=self.durable_exchange, + auto_delete=self.auto_delete_exchange, + callback=self._on_exchange_declare_ok) + + def _on_exchange_declare_ok(self, _unused_frame): + self._debug('declaredExchange', + exchange=self.exchange_name, + type=self.exchange_type, + passive_declare=self.passive_declare_exchange, + durable=self.durable_exchange, + auto_delete=self.auto_delete_exchange) diff --git a/shared/lib-hoppy/hoppy/base_queue_client.py b/shared/lib-hoppy/hoppy/base_queue_client.py index c45f99cdd4..de4067258c 100644 --- a/shared/lib-hoppy/hoppy/base_queue_client.py +++ b/shared/lib-hoppy/hoppy/base_queue_client.py @@ -1,17 +1,14 @@ -import logging -import time -from abc import ABC, abstractmethod +from abc import abstractmethod from enum import Enum -from hoppy.config import RABBITMQ_CONFIG +from hoppy.base_queue_declarer import BaseQueueDeclarer from hoppy.hoppy_properties import ExchangeProperties, QueueProperties from pika import ConnectionParameters, PlainCredentials -from pika.adapters.asyncio_connection import AsyncioConnection -ClientType = Enum('ClientType', ['CONSUMER', 'PUBLISHER']) +ClientType = Enum('ClientType', ['CONSUMER', 'PUBLISHER', 'CHANNEL_ONLY']) -class BaseQueueClient(ABC): +class BaseQueueClient(BaseQueueDeclarer): """ Creates the base of an asynchronous client for connecting to an exchange and queue. This class does not implement publishing or consuming from the queues, but merely handles creating the connection, declaring the exchange, @@ -44,70 +41,24 @@ def __init__(self, :param routing_key: str = '' the routing key used to route messages to the queue """ + super().__init__(config, exchange_properties, queue_properties, routing_key) self._client_type = _client_type - if config is None: - config = {} - self.config = {**RABBITMQ_CONFIG, **config} - self.connection_parameters = self._create_connection_parameters() - self.exchange_name = exchange_properties.name - self.exchange_type = exchange_properties.type - self.passive_declare_exchange = exchange_properties.passive_declare - self.durable_exchange = exchange_properties.durable - self.auto_delete_exchange = exchange_properties.auto_delete - - self.queue_name = queue_properties.name - self.passive_declare_queue = queue_properties.passive_declare - self.durable_queue = queue_properties.durable - self.auto_delete_queue = queue_properties.auto_delete - self.exclusive_queue = queue_properties.exclusive - - self.routing_key = routing_key - - self._custom_loop = None - self._connection = None - self._channel = None - self._max_reconnect_delay = self.config.get('max_reconnect_delay', 30) - self._reconnect_delay = self.config.get('initial_reconnect_delay', 0) self._is_ready = False self._stopping = False self._stopped = False - def _create_connection_parameters(self) -> ConnectionParameters: - credentials = PlainCredentials(self.config["username"], self.config["password"]) - return ConnectionParameters( - host=self.config['host'], - port=self.config['port'], - credentials=credentials) - def _initialize_connection_session(self): """The following attributes are used per connection session. When a reconnect happens, they should be reset.""" - self._reconnect_delay = self.config.get('initial_reconnect_delay', 0) + super()._initialize_connection_session() self._is_ready = False self._stopping = False self._stopped = False - def connect(self, loop=None): - """ - Creates the asyncio connection to RabbitMQ - - Parameters - ---------- - loop = None | asyncio.AbstractEventLoop | nbio_interface.AbstractIOServices - Defaults to asyncio.get_event_loop() - """ - - self._custom_loop = loop - - self._info('connectingToRabbitMq', config=self.config) - self._connection = AsyncioConnection( - parameters=self.connection_parameters, - on_open_callback=self._on_connection_open, - on_open_error_callback=self._on_connection_open_error, - on_close_callback=self._on_connection_closed, - custom_ioloop=loop) - return self._connection + @property + def is_ready(self) -> bool: + return self._is_ready @property def is_ready(self) -> bool: @@ -140,16 +91,6 @@ def stop(self): def is_stopped(self) -> bool: return self._stopped - def _on_connection_open(self, connection): - self._debug('openedConnection') - self._connection = connection - self._initialize_connection_session() - self._open_channel() - - def _on_connection_open_error(self, _unused_connection, err): - self._error('failedToOpenConnection', err) - self._reconnect() - def _on_connection_closed(self, _unused_connection, reason): self._channel = None if self._stopping: @@ -163,122 +104,10 @@ def _on_connection_closed(self, _unused_connection, reason): self._warning('connectionClosedUnexpectedly', reason=reason) self._reconnect() - def _close_connection(self): - if self._connection is not None: - self._debug('closingConnection', - closing=self._connection.is_closing, - closed=self._connection.is_closed) - if not self._connection.is_closing and not self._connection.is_closed: - self._connection.close() - - def _reconnect(self): - self.stop() - reconnect_delay = self._get_reconnect_delay() - self._warning('reconnecting', reconnect_delay_seconds=reconnect_delay) - time.sleep(reconnect_delay) - self.connect(self._custom_loop) - - def _get_reconnect_delay(self): - self._reconnect_delay += 1 - if self._reconnect_delay > self._max_reconnect_delay: - self._reconnect_delay = self._max_reconnect_delay - return self._reconnect_delay - - def _open_channel(self): - self._debug('openingChannel') - self._connection.channel(on_open_callback=self._on_channel_open) - - def _on_channel_open(self, channel): - self._debug('openedChannel', channel=channel) - self._channel = channel - self._channel.add_on_close_callback(self._on_channel_closed) - self._setup_exchange() - - def _on_channel_closed(self, channel, reason): - self._warning('closedChannel', channel=channel, reason=reason) - self._close_connection() - - def _close_channel(self): - if self._channel is not None: - self._debug('closingChannel', channel=self._channel) - self._channel.close() - - def _setup_exchange(self): - self._debug('declaringExchange', - exchange=self.exchange_name, - type=self.exchange_type, - passive_declare=self.passive_declare_exchange, - durable=self.durable_exchange, - auto_delete=self.auto_delete_exchange) - self._channel.exchange_declare(exchange=self.exchange_name, - exchange_type=self.exchange_type, - passive=self.passive_declare_exchange, - durable=self.durable_exchange, - auto_delete=self.auto_delete_exchange, - callback=self._on_exchange_declare_ok) - def _on_exchange_declare_ok(self, _unused_frame): - self._debug('declaredExchange', - exchange=self.exchange_name, - type=self.exchange_type, - passive_declare=self.passive_declare_exchange, - durable=self.durable_exchange, - auto_delete=self.auto_delete_exchange) + super()._on_exchange_declare_ok(_unused_frame) self._setup_queue() - def _setup_queue(self): - self._debug('declaringQueue', - queue=self.queue_name, - passive_declare=self.passive_declare_queue, - durable=self.durable_queue, - exclusive=self.exclusive_queue, - auto_delete=self.auto_delete_exchange) - self._channel.queue_declare(queue=self.queue_name, - passive=self.passive_declare_queue, - durable=self.durable_queue, - exclusive=self.exclusive_queue, - auto_delete=self.auto_delete_queue, - callback=self._on_queue_declare_ok) - - def _on_queue_declare_ok(self, _unused_frame): - self._debug('declaredQueue', - queue=self.queue_name, - passive_declare=self.passive_declare_queue, - durable=self.durable_queue, - exclusive=self.exclusive_queue, - auto_delete=self.auto_delete_exchange) - self._debug('bindingQueue', - queue=self.queue_name, - exchange=self.exchange_name, - routing_key=self.routing_key) - self._channel.queue_bind(self.queue_name, - self.exchange_name, - routing_key=self.routing_key, - callback=self._on_bind_ok) - def _on_bind_ok(self, _unused_frame): - self._debug('boundQueue', - queue=self.queue_name, - exchange=self.exchange_name, - routing_key=self.routing_key) + super()._on_bind_ok(_unused_frame) self._ready() - - @staticmethod - def __kwarg_str(**kwargs): - return ' '.join(f'{k}={v}' for k, v in kwargs.items()) - - def _debug(self, event, **kwargs): - msg = f'event={event} client_type={self._client_type.name} {self.__kwarg_str(**kwargs)}' - logging.debug(msg) - - def _info(self, event, **kwargs): - msg = f'event={event} client_type={self._client_type.name} {self.__kwarg_str(**kwargs)}' - logging.info(msg) - - def _warning(self, event, **kwargs): - msg = f'event={event} client_type={self._client_type.name} {self.__kwarg_str(**kwargs)}' - logging.warning(msg) - - def _error(self, event, error=None, **kwargs): - msg = f'event={event} client_type={self._client_type.name} err={error!r} {self.__kwarg_str(**kwargs)}' - logging.error(msg) diff --git a/shared/lib-hoppy/hoppy/base_queue_declarer.py b/shared/lib-hoppy/hoppy/base_queue_declarer.py new file mode 100644 index 0000000000..9986016736 --- /dev/null +++ b/shared/lib-hoppy/hoppy/base_queue_declarer.py @@ -0,0 +1,61 @@ +from hoppy.base_exchange_declarer import BaseExchangeDeclarer +from hoppy.hoppy_properties import ExchangeProperties, QueueProperties + + +class BaseQueueDeclarer(BaseExchangeDeclarer): + def __init__(self, + config: [dict | None] = None, + exchange_properties: ExchangeProperties = ExchangeProperties(), + queue_properties: QueueProperties = QueueProperties(), + routing_key: str = ''): + super().__init__(config, exchange_properties) + + self._set_queue_properties(queue_properties) + self.routing_key = routing_key + + def _set_queue_properties(self, queue_properties: QueueProperties): + self.queue_name = queue_properties.name + self.passive_declare_queue = queue_properties.passive_declare + self.durable_queue = queue_properties.durable + self.auto_delete_queue = queue_properties.auto_delete + self.exclusive_queue = queue_properties.exclusive + + def _on_exchange_declare_ok(self, _unused_frame): + super()._on_exchange_declare_ok(_unused_frame) + self._setup_queue() + + def _setup_queue(self): + self._debug('declaringQueue', + queue=self.queue_name, + passive_declare=self.passive_declare_queue, + durable=self.durable_queue, + exclusive=self.exclusive_queue, + auto_delete=self.auto_delete_exchange) + self._channel.queue_declare(queue=self.queue_name, + passive=self.passive_declare_queue, + durable=self.durable_queue, + exclusive=self.exclusive_queue, + auto_delete=self.auto_delete_queue, + callback=self._on_queue_declare_ok) + + def _on_queue_declare_ok(self, _unused_frame): + self._debug('declaredQueue', + queue=self.queue_name, + passive_declare=self.passive_declare_queue, + durable=self.durable_queue, + exclusive=self.exclusive_queue, + auto_delete=self.auto_delete_exchange) + self._debug('bindingQueue', + queue=self.queue_name, + exchange=self.exchange_name, + routing_key=self.routing_key) + self._channel.queue_bind(self.queue_name, + self.exchange_name, + routing_key=self.routing_key, + callback=self._on_bind_ok) + + def _on_bind_ok(self, _unused_frame): + self._debug('boundQueue', + queue=self.queue_name, + exchange=self.exchange_name, + routing_key=self.routing_key) diff --git a/shared/lib-hoppy/hoppy/channel.py b/shared/lib-hoppy/hoppy/channel.py new file mode 100644 index 0000000000..af39efa930 --- /dev/null +++ b/shared/lib-hoppy/hoppy/channel.py @@ -0,0 +1,71 @@ +import asyncio +from hoppy.base_exchange_declarer import BaseExchangeDeclarer +from hoppy.base_channel_opener import BaseChannelOpener +from hoppy.base_queue_client import BaseQueueClient, ClientType +from hoppy.hoppy_properties import ExchangeProperties, QueueProperties + + +class Channel(BaseQueueClient): + """Creates a channel that can be used to declare exchanges and queues.""" + + def __init__(self, + config: [dict | None] = None, + exchange_properties: ExchangeProperties = ExchangeProperties(), + queue_properties: QueueProperties = QueueProperties(), + routing_key: str = ''): + """ + Creates this class + + :param config: dict | None = None + collection of key value pairs used to create the RabbitMQ connection parameters (see pika.ConnectionParameters) + this config is merged with the default RABBITMQ_CONFIG + :param exchange_properties: ExchangeProperties + properties dictating how the exchange is declared + :param queue_properties: QueueProperties + properties dictating how the queue is declared + :param routing_key: str = '' + the routing key used to route messages to the queue + """ + + super().__init__(ClientType.CHANNEL_ONLY, config, exchange_properties, queue_properties, routing_key) + + async def connect(self): + super().connect() + while not self.is_ready: + await asyncio.sleep(0) + + def _ready(self): + """Executed when the channel is open. + Overrides super class abstract method.""" + + self._is_ready = True + + def _shut_down(self): + """Called when the client is requested to stop. + Overrides super class abstract method""" + + self._close_channel() + self._close_connection() + + def _on_channel_open(self, channel): + BaseChannelOpener._on_channel_open(self, channel) + self._ready() + + def exchange_declare(self, exchange_properties: ExchangeProperties = None): + if exchange_properties: + self._set_exchange_properties(exchange_properties) + + self._setup_exchange() + + def _on_exchange_declare_ok(self, _unused_frame): + BaseExchangeDeclarer._on_exchange_declare_ok(self, _unused_frame) + + def queue_declare(self, queue_properties: QueueProperties = None, exchange_name: str = None, routing_key: str = None): + if queue_properties: + self._set_queue_properties(queue_properties) + if exchange_name: + self.exchange_name = exchange_name + if routing_key: + self.routing_key = routing_key + + self._setup_queue() diff --git a/shared/lib-hoppy/hoppy/config.py b/shared/lib-hoppy/hoppy/config.py index 8d44748e16..1b4877146b 100644 --- a/shared/lib-hoppy/hoppy/config.py +++ b/shared/lib-hoppy/hoppy/config.py @@ -2,8 +2,8 @@ RABBITMQ_CONFIG = { "host": os.environ.get("RABBITMQ_PLACEHOLDERS_HOST") or "localhost", - "username": os.environ.get("RABBITMQ_USERNAME") or "guest", - "password": os.environ.get("RABBITMQ_PASSWORD") or "guest", + "username": os.environ.get("RABBITMQ_USERNAME") or "user", + "password": os.environ.get("RABBITMQ_PASSWORD") or "bitnami", "port": int(os.environ.get("RABBITMQ_PORT") or 5672), "retry_limit": int(os.environ.get("RABBITMQ_RETRY_LIMIT") or 3), "timeout": int(os.environ.get("RABBITMQ_TIMEOUT") or 60 * 60 * 3), # 3 hours diff --git a/shared/lib-hoppy/integration/channel_test.py b/shared/lib-hoppy/integration/channel_test.py new file mode 100644 index 0000000000..59131a642c --- /dev/null +++ b/shared/lib-hoppy/integration/channel_test.py @@ -0,0 +1,53 @@ +import pytest +from unittest.mock import MagicMock +from hoppy.channel import Channel +from hoppy.hoppy_properties import ExchangeProperties, QueueProperties + + +@pytest.fixture +def channel(): + return Channel() + + +@pytest.mark.asyncio +async def test_connect(channel): + await channel.connect() + assert channel.is_ready + + +def test_ready(channel): + channel._ready() + assert channel._is_ready + + +def test_shut_down(channel, mocker): + mock_close_channel = mocker.patch.object(channel, '_close_channel') + mock_close_connection = mocker.patch.object(channel, '_close_connection') + channel._shut_down() + mock_close_channel.assert_called_once() + mock_close_connection.assert_called_once() + + +def test_on_channel_open(channel, mocker): + mock_channel = MagicMock() + mock_ready = mocker.patch.object(channel, '_ready') + channel._on_channel_open(mock_channel) + mock_ready.assert_called_once() + + +def test_exchange_declare(channel, mocker): + mock_exchange_properties = MagicMock(spec=ExchangeProperties) + mock_set_exchange_properties = mocker.patch.object(channel, '_set_exchange_properties') + mock_setup_exchange = mocker.patch.object(channel, '_setup_exchange') + channel.exchange_declare(mock_exchange_properties) + mock_set_exchange_properties.assert_called_once_with(mock_exchange_properties) + mock_setup_exchange.assert_called_once() + + +def test_queue_declare(channel, mocker): + mock_queue_properties = MagicMock(spec=QueueProperties) + mock_set_queue_properties = mocker.patch.object(channel, '_set_queue_properties') + mock_setup_queue = mocker.patch.object(channel, '_setup_queue') + channel.queue_declare(mock_queue_properties, 'exchange_name', 'routing_key') + mock_set_queue_properties.assert_called_once_with(mock_queue_properties) + mock_setup_queue.assert_called_once() diff --git a/shared/lib-hoppy/test/event_loop_test.py b/shared/lib-hoppy/integration/event_loop_test.py similarity index 100% rename from shared/lib-hoppy/test/event_loop_test.py rename to shared/lib-hoppy/integration/event_loop_test.py diff --git a/shared/lib-hoppy/pyproject.toml b/shared/lib-hoppy/pyproject.toml index 90f87f430b..d1bf745c7e 100644 --- a/shared/lib-hoppy/pyproject.toml +++ b/shared/lib-hoppy/pyproject.toml @@ -11,10 +11,14 @@ dependencies = [ "pytest", "pytest-mock", "pytest-asyncio", + "pytest-env", "pytest-timeout" ] dynamic = ["version"] +[tool.setuptools] +packages = ["hoppy"] + [tool.setuptools.dynamic] version = { attr = "hoppy.__version__" } @@ -23,7 +27,12 @@ addopts = [ "--import-mode=importlib", "-rfesp" ] +env = [ + "RABBITMQ_USERNAME=user", + "RABBITMQ_PASSWORD=bitnami" +] testpaths = [ - "tests" + "test", + "integration" # depends on rabbitmq-service ] -timeout=5 \ No newline at end of file +timeout=5