Skip to content

Commit

Permalink
Declare exchanges/queues independent from client (fix #2152) (#2185)
Browse files Browse the repository at this point in the history
* Prevent hoppy from closing caller-provided ioloop (fix #2151)

* Use async def for async_hoppy_client.stop (fix #2150)

* Use async def for async_hoppy_client.start (fix #2149)

* Declare exchanges/queues independent from client (fix #2152)

* Fix missing imports

* Address review feedback

* Update pyproject.toml

* Fix codeql warning
  • Loading branch information
raudabaugh authored Nov 14, 2023
1 parent 7cb89a4 commit 574f9ae
Show file tree
Hide file tree
Showing 9 changed files with 383 additions and 186 deletions.
127 changes: 127 additions & 0 deletions shared/lib-hoppy/hoppy/base_channel_opener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import logging
import time
from abc import ABC, abstractmethod

from hoppy.config import RABBITMQ_CONFIG
from pika.adapters.asyncio_connection import AsyncioConnection
from pika import ConnectionParameters, PlainCredentials


class BaseChannelOpener(ABC):
def __init__(self, config: [dict | None] = None):
if config is None:
config = {}
self.config = {**RABBITMQ_CONFIG, **config}
self.connection_parameters = self._create_connection_parameters()

self._connection = None
self._channel = None
self._max_reconnect_delay = self.config.get('max_reconnect_delay', 30)
self._reconnect_delay = self.config.get('initial_reconnect_delay', 0)
self._custom_loop = None

def _create_connection_parameters(self) -> ConnectionParameters:
credentials = PlainCredentials(self.config["username"], self.config["password"])
return ConnectionParameters(
host=self.config['host'],
port=self.config['port'],
virtual_host=self.config['virtual_host'],
credentials=credentials)

def _initialize_connection_session(self):
"""The following attributes are used per connection session. When a reconnect happens, they should be reset."""
self._reconnect_delay = self.config.get('initial_reconnect_delay', 0)

def connect(self, loop=None):
"""
Creates the asyncio connection to RabbitMQ
Parameters
----------
loop = None | asyncio.AbstractEventLoop | nbio_interface.AbstractIOServices
Defaults to asyncio.get_event_loop()
"""

self._custom_loop = loop

self._info('connectingToRabbitMq', config=self.config)
self._connection = AsyncioConnection(
parameters=self.connection_parameters,
on_open_callback=self._on_connection_open,
on_open_error_callback=self._on_connection_open_error,
on_close_callback=self._on_connection_closed,
custom_ioloop=loop)
return self._connection

def _on_connection_open(self, connection):
self._debug('openedConnection')
self._connection = connection
self._initialize_connection_session()
self._open_channel()

def _on_connection_open_error(self, _unused_connection, err):
self._error('failedToOpenConnection', err)
self._reconnect()

def _close_connection(self):
if self._connection is not None:
self._debug('closingConnection',
closing=self._connection.is_closing,
closed=self._connection.is_closed)
if not self._connection.is_closing and not self._connection.is_closed:
self._connection.close()

@abstractmethod
def _on_connection_closed(self, _unused_connection, reason):
pass

def _reconnect(self):
self.stop()
reconnect_delay = self._get_reconnect_delay()
self._warning('reconnecting', reconnect_delay_seconds=reconnect_delay)
time.sleep(reconnect_delay)
self.connect(self._custom_loop)

def _get_reconnect_delay(self):
self._reconnect_delay += 1
if self._reconnect_delay > self._max_reconnect_delay:
self._reconnect_delay = self._max_reconnect_delay
return self._reconnect_delay

def _open_channel(self):
self._debug('openingChannel')
self._connection.channel(on_open_callback=self._on_channel_open)

def _on_channel_closed(self, channel, reason):
self._warning('closedChannel', channel=channel, reason=reason)
self._close_connection()

def _close_channel(self):
if self._channel is not None:
self._debug('closingChannel', channel=self._channel)
self._channel.close()

def _on_channel_open(self, channel):
self._debug('openedChannel', channel=channel)
self._channel = channel
self._channel.add_on_close_callback(self._on_channel_closed)

@staticmethod
def __kwarg_str(**kwargs):
return ' '.join(f'{k}={v}' for k, v in kwargs.items())

def _debug(self, event, **kwargs):
msg = f'event={event} client_type={self._client_type.name} {self.__kwarg_str(**kwargs)}'
logging.debug(msg)

def _info(self, event, **kwargs):
msg = f'event={event} client_type={self._client_type.name} {self.__kwarg_str(**kwargs)}'
logging.info(msg)

def _warning(self, event, **kwargs):
msg = f'event={event} client_type={self._client_type.name} {self.__kwarg_str(**kwargs)}'
logging.warning(msg)

def _error(self, event, error=None, **kwargs):
msg = f'event={event} client_type={self._client_type.name} err={error!r} {self.__kwarg_str(**kwargs)}'
logging.error(msg)
47 changes: 47 additions & 0 deletions shared/lib-hoppy/hoppy/base_exchange_declarer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from hoppy.base_channel_opener import BaseChannelOpener
from hoppy.hoppy_properties import ExchangeProperties


class BaseExchangeDeclarer(BaseChannelOpener):

def __init__(self, config: [dict | None] = None, exchange_properties: ExchangeProperties = ExchangeProperties()):
super().__init__(config)

self._set_exchange_properties(exchange_properties)

def _set_exchange_properties(self, exchange_properties: ExchangeProperties):
self.exchange_name = exchange_properties.name
self.exchange_type = exchange_properties.type
self.passive_declare_exchange = exchange_properties.passive_declare
self.durable_exchange = exchange_properties.durable
self.auto_delete_exchange = exchange_properties.auto_delete

def _open_channel(self):
self._debug('openingChannel')
self._connection.channel(on_open_callback=self._on_channel_open)

def _on_channel_open(self, channel):
super()._on_channel_open(channel)
self._setup_exchange()

def _setup_exchange(self):
self._debug('declaringExchange',
exchange=self.exchange_name,
type=self.exchange_type,
passive_declare=self.passive_declare_exchange,
durable=self.durable_exchange,
auto_delete=self.auto_delete_exchange)
self._channel.exchange_declare(exchange=self.exchange_name,
exchange_type=self.exchange_type,
passive=self.passive_declare_exchange,
durable=self.durable_exchange,
auto_delete=self.auto_delete_exchange,
callback=self._on_exchange_declare_ok)

def _on_exchange_declare_ok(self, _unused_frame):
self._debug('declaredExchange',
exchange=self.exchange_name,
type=self.exchange_type,
passive_declare=self.passive_declare_exchange,
durable=self.durable_exchange,
auto_delete=self.auto_delete_exchange)
Loading

0 comments on commit 574f9ae

Please sign in to comment.