Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add exchange_options #3

Open
wants to merge 5 commits into
base: Add-connection-options
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@ already has ``local_capacity - 1`` messages in memory. Higher settings
accelerate throughput a little bit; lower settings help adhere to
``local_capacity`` more rigorously.

``connection_options``
~~~~~~~~~~~~~~~~~~

A dictionary of values that will be passed as kwargs when aioamqp attempts
to connect to the RabbitMQ server.

``exchange_options``
~~~~~~~~~~~~~~~~~~

A dictionary of values that will be merged with the default exchange options.
exchange_options: { name: 'groups', type: 'direct', routing_prefix: 'groups' }

Design decisions
----------------

Expand Down
45 changes: 37 additions & 8 deletions channels_rabbitmq/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,18 @@

logger = logging.getLogger(__name__)


GroupsExchange = "groups"
ReconnectDelay = 1.0 # seconds

def routing_group(group, prefix=None):
"""
RabbitMQ routing keys should be in the format of <A>.<B>
This allows for wildcards suchas "groups.*" for queues connected to
topic exchanges
"""
if prefix is not None:
return prefix + '.' + group
else:
return group

def serialize(body):
"""
Expand Down Expand Up @@ -335,6 +343,8 @@ def __init__(
prefetch_count=10,
expiry=60,
group_expiry=86400,
connection_options={},
exchange_options={}
):
self.loop = loop
self.host = host
Expand All @@ -344,6 +354,13 @@ def __init__(
self.expiry = expiry
self.group_expiry = group_expiry
self.queue_name = queue_name
self.connection_options = connection_options
self.exchange_options = {
'name': 'groups',
'type': 'direct',
'routing_prefix': 'groups',
**exchange_options,
}

# incoming_messages: await `get()` on any channel-name queue to receive
# the next message. If the `get()` is canceled, that's probably because
Expand Down Expand Up @@ -435,7 +452,9 @@ async def _connect_and_run(self):
self._is_connected = False

logger.info("Channels connecting to RabbitMQ at %s", self.host)
transport, protocol = await aioamqp.from_url(self.host)
transport, protocol = await aioamqp.from_url(
self.host, **self.connection_options
)

logger.debug("Connected; setting up")
channel = await protocol.channel()
Expand All @@ -446,7 +465,7 @@ async def _connect_and_run(self):

# Declare "groups" exchange. It may persist; spurious declarations
# (such as on reconnect) are harmless.
await channel.exchange_declare(GroupsExchange, "direct")
await channel.exchange_declare(self.exchange_options["name"], self.exchange_options["type"])

# Queue up the handling of messages.
#
Expand Down Expand Up @@ -484,7 +503,9 @@ async def _connect_and_run(self):
await gather_without_leaking(
[
channel.queue_bind(
self.queue_name, GroupsExchange, routing_key=group
self.queue_name,
self.exchange_options["name"],
routing_key=routing_group(group, self.exchange_options["routing_prefix"])
)
for group in groups
]
Expand Down Expand Up @@ -636,7 +657,9 @@ async def group_add(self, channel, group, asgi_channel):
# This group is new to our connection-level queue. Make a
# connection-level binding.
await channel.queue_bind(
self.queue_name, GroupsExchange, routing_key=group
self.queue_name,
self.exchange_options["name"],
routing_key=routing_group(group, self.exchange_options["routing_prefix"])
)

async def group_discard(self, group, asgi_channel):
Expand All @@ -658,7 +681,9 @@ async def group_discard(self, group, asgi_channel):
logger.debug("Unbinding queue %s from group %s", self.queue_name, group)
# Disconnect, if we're connected.
await self._channel.queue_unbind(
self.queue_name, GroupsExchange, routing_key=group
self.queue_name,
self.exchange_options["name"],
routing_key=routing_group(group, self.exchange_options["routing_prefix"])
)

@stall_until_connected_or_closed
Expand All @@ -669,7 +694,11 @@ async def group_send(self, channel, group, message):
logger.debug("group_send %r to %s", message, group)

try:
await channel.publish(message, GroupsExchange, routing_key=group)
await channel.publish(
message,
self.exchange_options["name"],
routing_key=routing_group(group, self.exchange_options["routing_prefix"])
)
except PublishFailed:
# The Channels protocol has no way of reporting this error.
# Just silently delete the message.
Expand Down
13 changes: 12 additions & 1 deletion channels_rabbitmq/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,22 @@ def __init__(
prefetch_count=10,
expiry=60,
group_expiry=86400,
connection_options={},
exchange_options={}
):
self.host = host
self.local_capacity = local_capacity
self.remote_capacity = remote_capacity
self.prefetch_count = prefetch_count
self.expiry = expiry
self.group_expiry = 86400
self.group_expiry = group_expiry
self.connection_options = connection_options
self.exchange_options = {
'name': 'groups',
'type': 'direct',
'routing_prefix': 'groups',
**exchange_options,
}

# In inefficient client code (e.g., async_to_sync()), there may be
# several send() or receive() calls within different event loops --
Expand Down Expand Up @@ -88,6 +97,8 @@ def _create_connection(self, loop):
prefetch_count=self.prefetch_count,
expiry=self.expiry,
group_expiry=self.group_expiry,
connection_options=self.connection_options,
exchange_options=self.exchange_options,
)
self._connections[loop] = connection # assume lock is held

Expand Down