You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
There seems to be a change in the way RobustQueue.consume() works when used in connection pooling scenarios in version 9.4.1 (latest atm). The RobustChannel and the declared RobustQueue seem to be getting restored, but the consumers inside the declared queue are not getting restored after a broker restart.
Here is an example to reproduce:
publisher.py
import asyncio
import aio_pika
from aio_pika.abc import AbstractRobustConnection
from aio_pika.pool import Pool
async def get_connection() -> AbstractRobustConnection:
return await aio_pika.connect_robust("<amqp_url>")
async def main() -> None:
connection_pool: Pool = Pool(get_connection, max_size=2)
async def get_channel() -> aio_pika.Channel:
async with connection_pool.acquire() as connection:
return await connection.channel()
channel_pool: Pool = Pool(get_channel, max_size=10)
queue_name = "pool_queue2"
async def publish(message: str) -> None:
async with channel_pool.acquire() as channel: # type: aio_pika.Channel
await channel.default_exchange.publish(
aio_pika.Message(message.encode()),
queue_name,
)
async with connection_pool, channel_pool:
while True:
user_input = input("Enter 'y' to send a message or any other key to exit: ")
if user_input.strip().lower() == 'y':
await publish("Hello, World!")
else:
break
if __name__ == "__main__":
asyncio.run(main())
consumer.py
import asyncio
import aio_pika
from aio_pika.abc import AbstractRobustConnection
from aio_pika.pool import Pool
async def get_connection() -> AbstractRobustConnection:
return await aio_pika.connect_robust("<amqp_url>")
async def process_message(
message: aio_pika.abc.AbstractIncomingMessage,
) -> None:
async with message.process():
print(message.body)
await asyncio.sleep(1)
async def main() -> None:
connection_pool: Pool = Pool(get_connection, max_size=2)
async def get_channel() -> aio_pika.Channel:
async with connection_pool.acquire() as connection:
return await connection.channel()
channel_pool: Pool = Pool(get_channel, max_size=10)
queue_name = "pool_queue2"
async def consume() -> None:
async with channel_pool.acquire() as channel: # type: aio_pika.Channel
await channel.set_qos(10)
queue = await channel.declare_queue(
queue_name, durable=True, auto_delete=False,
)
await queue.consume(process_message)
async with connection_pool, channel_pool:
await consume()
# Add an infinite loop to keep the program running until manually stopped
while True:
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(main())
In the above example, the consumption will not get restored after a broker restart, but if we use an older version fe. 9.0.7 it seems to work fine.
!!! Additional Notes!!!
Using the RobustQueueIterator seems to solve the issue in both versions (9.0.7 and latest). Replace await queue.consume(process_message) with:
async with queue.iterator() as queue_iter:
async for message in queue_iter: # type: aio_pika.abc.AbstractIncomingMessage
await callback(message)
But I am not sure whether or not to switch to this approach since it is not mentioned in the docs what the difference is between using RobustQueueIterator, and between using simply RobustQueue.consume() to handle incoming messages
2. I am using this version of RabbitMQ rabbitmq:3.8-management-alpine
The text was updated successfully, but these errors were encountered:
You don't have a strong reference to queue, so it gets Garbage Collected and when it is time to .restore() the queues it is no longer in the relevant weakset.
QueueIterator keeps a strong reference to its queue.
Edit: 663807c is the first commit after the 9.0.7 release.
There seems to be a change in the way
RobustQueue.consume()
works when used in connection pooling scenarios in version9.4.1
(latest atm). TheRobustChannel
and the declaredRobustQueue
seem to be getting restored, but the consumers inside the declared queue are not getting restored after a broker restart.Here is an example to reproduce:
publisher.py
consumer.py
In the above example, the consumption will not get restored after a broker restart, but if we use an older version fe.
9.0.7
it seems to work fine.!!! Additional Notes!!!
RobustQueueIterator
seems to solve the issue in both versions (9.0.7
andlatest
). Replaceawait queue.consume(process_message)
with:But I am not sure whether or not to switch to this approach since it is not mentioned in the docs what the difference is between using
RobustQueueIterator
, and between using simplyRobustQueue.consume()
to handle incoming messages2. I am using this version of RabbitMQ
rabbitmq:3.8-management-alpine
The text was updated successfully, but these errors were encountered: