From e8b28527f4adedf533877754231753659775ec89 Mon Sep 17 00:00:00 2001 From: Teddy Budiono Hermawan Date: Mon, 28 Aug 2023 16:37:20 +0800 Subject: [PATCH] Support prefetch limit per consumer. Deprecate Global OptionPrefetchLimit. --- mq/consumer.go | 17 ++++++++++++++--- mq/mq.go | 6 ++++-- mq/options.go | 7 +++++++ 3 files changed, 25 insertions(+), 5 deletions(-) diff --git a/mq/consumer.go b/mq/consumer.go index aba03c1..e8cda24 100644 --- a/mq/consumer.go +++ b/mq/consumer.go @@ -47,7 +47,7 @@ func (c *consumer) Start(ctx context.Context) error { c.stopChan = make(chan struct{}) var err error - c.messages, err = c.messageChannel() + c.messages, err = c.messageChannel(c.options.Workers) if err != nil { return fmt.Errorf("get message channel: %v", err) } @@ -141,8 +141,19 @@ func (c *consumer) process(queueName string, body []byte) error { return err } -func (c *consumer) messageChannel() (<-chan amqp.Delivery, error) { - messageChannel, err := c.client.amqpChan.Consume( +// messageChannel will create a new dedicated channel for this consumer to use +func (c *consumer) messageChannel(prefetchCount int) (<-chan amqp.Delivery, error) { + mqChan, err := c.client.conn.Channel() + if err != nil { + return nil, fmt.Errorf("MQ issue. queue: %s, err: %w", string(c.queue.Name()), err) + } + + err = mqChan.Qos(prefetchCount, 0, true) + if err != nil { + return nil, fmt.Errorf("MQ issue. queue: %s, err: %w", string(c.queue.Name()), err) + } + + messageChannel, err := mqChan.Consume( string(c.queue.Name()), "", false, diff --git a/mq/mq.go b/mq/mq.go index 472aa91..c16fc42 100755 --- a/mq/mq.go +++ b/mq/mq.go @@ -24,8 +24,10 @@ const ( ) type Client struct { - url string - conn *amqp.Connection + url string + conn *amqp.Connection + + // This channel should only be used for management related operations, like declaring queues & exchanges amqpChan *amqp.Channel connClients []ConnectionClient diff --git a/mq/options.go b/mq/options.go index 761c49a..8dee7fe 100644 --- a/mq/options.go +++ b/mq/options.go @@ -27,6 +27,13 @@ func DefaultConsumerOptions(workers int) *ConsumerOptions { } } +// Deprecated: We should not put prefetch limit at channel level. We need to set limit at consumer level +// This option no longer works to limit QoS globally. +// +// From rabbitMQ doc https://www.rabbitmq.com/consumer-prefetch.html +// Unfortunately the channel is not the ideal scope for this - since a single channel may consume from multiple queues, +// the channel and the queue(s) need to coordinate with each other for every message sent to ensure they don't go over +// the limit. This is slow on a single machine, and very slow when consuming across a cluster. func OptionPrefetchLimit(limit int) Option { return func(m *Client) error { err := m.amqpChan.Qos(