From 8c1ab31bf1d048f0d96ae852688b8b151129929f Mon Sep 17 00:00:00 2001 From: Mat Munn Date: Sat, 23 Feb 2019 18:49:31 +1100 Subject: [PATCH 1/4] Allow sending connection options --- README.rst | 6 ++++++ channels_rabbitmq/connection.py | 5 ++++- channels_rabbitmq/core.py | 5 ++++- 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/README.rst b/README.rst index 6f84446..493119b 100644 --- a/README.rst +++ b/README.rst @@ -75,6 +75,12 @@ already has ``local_capacity - 1`` messages in memory. Higher settings accelerate throughput a little bit; lower settings help adhere to ``local_capacity`` more rigorously. +``connection_options`` +~~~~~~~~~~~~~~~~~~ + +A dictionary of values that will be passed as kwargs when aioamqp attempts +to connect to the RabbitMQ server. + Design decisions ---------------- diff --git a/channels_rabbitmq/connection.py b/channels_rabbitmq/connection.py index 3acd933..7eb758c 100644 --- a/channels_rabbitmq/connection.py +++ b/channels_rabbitmq/connection.py @@ -335,6 +335,7 @@ def __init__( prefetch_count=10, expiry=60, group_expiry=86400, + connection_options={}, ): self.loop = loop self.host = host @@ -344,6 +345,7 @@ def __init__( self.expiry = expiry self.group_expiry = group_expiry self.queue_name = queue_name + self.connection_options = connection_options # incoming_messages: await `get()` on any channel-name queue to receive # the next message. If the `get()` is canceled, that's probably because @@ -435,7 +437,8 @@ async def _connect_and_run(self): self._is_connected = False logger.info("Channels connecting to RabbitMQ at %s", self.host) - transport, protocol = await aioamqp.from_url(self.host) + transport, protocol = await aioamqp.from_url(self.host, + **self.connection_options) logger.debug("Connected; setting up") channel = await protocol.channel() diff --git a/channels_rabbitmq/core.py b/channels_rabbitmq/core.py index f5330c0..5567daf 100644 --- a/channels_rabbitmq/core.py +++ b/channels_rabbitmq/core.py @@ -50,13 +50,15 @@ def __init__( prefetch_count=10, expiry=60, group_expiry=86400, + connection_options={}, ): self.host = host self.local_capacity = local_capacity self.remote_capacity = remote_capacity self.prefetch_count = prefetch_count self.expiry = expiry - self.group_expiry = 86400 + self.group_expiry = group_expiry + self.connection_options = connection_options # In inefficient client code (e.g., async_to_sync()), there may be # several send() or receive() calls within different event loops -- @@ -88,6 +90,7 @@ def _create_connection(self, loop): prefetch_count=self.prefetch_count, expiry=self.expiry, group_expiry=self.group_expiry, + connection_options=self.connection_options, ) self._connections[loop] = connection # assume lock is held From 3b16b4ed87f0d19eee7dd8c02a3ed5de66bc846a Mon Sep 17 00:00:00 2001 From: Mat Munn Date: Sat, 23 Feb 2019 19:38:56 +1100 Subject: [PATCH 2/4] Adjusted formatting --- channels_rabbitmq/connection.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/channels_rabbitmq/connection.py b/channels_rabbitmq/connection.py index 7eb758c..6d129fa 100644 --- a/channels_rabbitmq/connection.py +++ b/channels_rabbitmq/connection.py @@ -437,8 +437,9 @@ async def _connect_and_run(self): self._is_connected = False logger.info("Channels connecting to RabbitMQ at %s", self.host) - transport, protocol = await aioamqp.from_url(self.host, - **self.connection_options) + transport, protocol = await aioamqp.from_url( + self.host, **self.connection_options + ) logger.debug("Connected; setting up") channel = await protocol.channel() From e4cbc606bd1a50ec63e326bb7046d0e938c4d26d Mon Sep 17 00:00:00 2001 From: Administrator Date: Thu, 14 Mar 2019 13:57:03 -0400 Subject: [PATCH 3/4] added exchange_options --- README.rst | 6 +++++ channels_rabbitmq/connection.py | 41 ++++++++++++++++++++++++++++----- channels_rabbitmq/core.py | 8 +++++++ 3 files changed, 49 insertions(+), 6 deletions(-) diff --git a/README.rst b/README.rst index 493119b..8ca657c 100644 --- a/README.rst +++ b/README.rst @@ -81,6 +81,12 @@ accelerate throughput a little bit; lower settings help adhere to A dictionary of values that will be passed as kwargs when aioamqp attempts to connect to the RabbitMQ server. +``exchange_options`` +~~~~~~~~~~~~~~~~~~ + +A dictionary of values that will be merged with the default exchange options. +exchange_options: { name: 'groups', type: 'direct', routing_prefix: 'groups' } + Design decisions ---------------- diff --git a/channels_rabbitmq/connection.py b/channels_rabbitmq/connection.py index 6d129fa..81d623f 100644 --- a/channels_rabbitmq/connection.py +++ b/channels_rabbitmq/connection.py @@ -12,10 +12,22 @@ logger = logging.getLogger(__name__) +from django.conf import settings +def setting(name, default=None): + return getattr(settings, name, default) -GroupsExchange = "groups" ReconnectDelay = 1.0 # seconds +def routing_group(group, prefix=None): + """ + RabbitMQ routing keys should be in the format of . + This allows for wildcards suchas "groups.*" for queues connected to + topic exchanges + """ + if prefix is not None: + return prefix + '.' + group + else: + return group def serialize(body): """ @@ -336,6 +348,7 @@ def __init__( expiry=60, group_expiry=86400, connection_options={}, + exchange_options={} ): self.loop = loop self.host = host @@ -346,6 +359,12 @@ def __init__( self.group_expiry = group_expiry self.queue_name = queue_name self.connection_options = connection_options + self.exchange_options = { + 'name': 'groups', + 'type': 'direct', + 'routing_prefix': 'groups', + **exchange_options, + } # incoming_messages: await `get()` on any channel-name queue to receive # the next message. If the `get()` is canceled, that's probably because @@ -450,7 +469,7 @@ async def _connect_and_run(self): # Declare "groups" exchange. It may persist; spurious declarations # (such as on reconnect) are harmless. - await channel.exchange_declare(GroupsExchange, "direct") + await channel.exchange_declare(self.exchange_options["name"], self.exchange_options["type"]) # Queue up the handling of messages. # @@ -488,7 +507,9 @@ async def _connect_and_run(self): await gather_without_leaking( [ channel.queue_bind( - self.queue_name, GroupsExchange, routing_key=group + self.queue_name, + self.exchange_options["name"], + routing_key=routing_group(group, self.exchange_options["routing_prefix"]) ) for group in groups ] @@ -640,7 +661,9 @@ async def group_add(self, channel, group, asgi_channel): # This group is new to our connection-level queue. Make a # connection-level binding. await channel.queue_bind( - self.queue_name, GroupsExchange, routing_key=group + self.queue_name, + self.exchange_options["name"], + routing_key=routing_group(group, self.exchange_options["routing_prefix"]) ) async def group_discard(self, group, asgi_channel): @@ -662,7 +685,9 @@ async def group_discard(self, group, asgi_channel): logger.debug("Unbinding queue %s from group %s", self.queue_name, group) # Disconnect, if we're connected. await self._channel.queue_unbind( - self.queue_name, GroupsExchange, routing_key=group + self.queue_name, + self.exchange_options["name"], + routing_key=routing_group(group, self.exchange_options["routing_prefix"]) ) @stall_until_connected_or_closed @@ -673,7 +698,11 @@ async def group_send(self, channel, group, message): logger.debug("group_send %r to %s", message, group) try: - await channel.publish(message, GroupsExchange, routing_key=group) + await channel.publish( + message, + self.exchange_options["name"], + routing_key=routing_group(group, self.exchange_options["routing_prefix"]) + ) except PublishFailed: # The Channels protocol has no way of reporting this error. # Just silently delete the message. diff --git a/channels_rabbitmq/core.py b/channels_rabbitmq/core.py index 5567daf..48f8b85 100644 --- a/channels_rabbitmq/core.py +++ b/channels_rabbitmq/core.py @@ -51,6 +51,7 @@ def __init__( expiry=60, group_expiry=86400, connection_options={}, + exchange_options={} ): self.host = host self.local_capacity = local_capacity @@ -59,6 +60,12 @@ def __init__( self.expiry = expiry self.group_expiry = group_expiry self.connection_options = connection_options + self.exchange_options = { + 'name': 'groups', + 'type': 'direct', + 'routing_prefix': 'groups', + **exchange_options, + } # In inefficient client code (e.g., async_to_sync()), there may be # several send() or receive() calls within different event loops -- @@ -91,6 +98,7 @@ def _create_connection(self, loop): expiry=self.expiry, group_expiry=self.group_expiry, connection_options=self.connection_options, + exchange_options=self.exchange_options, ) self._connections[loop] = connection # assume lock is held From 9a394567c9ecbd178aedf0ff2de62ba3da36ab5a Mon Sep 17 00:00:00 2001 From: Administrator Date: Thu, 14 Mar 2019 14:15:17 -0400 Subject: [PATCH 4/4] removed unused settings function --- channels_rabbitmq/connection.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/channels_rabbitmq/connection.py b/channels_rabbitmq/connection.py index 81d623f..c2f90e8 100644 --- a/channels_rabbitmq/connection.py +++ b/channels_rabbitmq/connection.py @@ -12,10 +12,6 @@ logger = logging.getLogger(__name__) -from django.conf import settings -def setting(name, default=None): - return getattr(settings, name, default) - ReconnectDelay = 1.0 # seconds def routing_group(group, prefix=None):