From d4dc257165f9ba88de7cf116f40b17e16bad6279 Mon Sep 17 00:00:00 2001 From: Teddy Budiono Hermawan Date: Wed, 30 Aug 2023 11:42:44 +0800 Subject: [PATCH] add option to set prefetch per consumer (#234) --- mq/consumer.go | 14 +++++++++++--- mq/options.go | 2 ++ 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/mq/consumer.go b/mq/consumer.go index e8cda24..f280cce 100644 --- a/mq/consumer.go +++ b/mq/consumer.go @@ -47,7 +47,7 @@ func (c *consumer) Start(ctx context.Context) error { c.stopChan = make(chan struct{}) var err error - c.messages, err = c.messageChannel(c.options.Workers) + c.messages, err = c.messageChannel() if err != nil { return fmt.Errorf("get message channel: %v", err) } @@ -142,13 +142,13 @@ func (c *consumer) process(queueName string, body []byte) error { } // messageChannel will create a new dedicated channel for this consumer to use -func (c *consumer) messageChannel(prefetchCount int) (<-chan amqp.Delivery, error) { +func (c *consumer) messageChannel() (<-chan amqp.Delivery, error) { mqChan, err := c.client.conn.Channel() if err != nil { return nil, fmt.Errorf("MQ issue. queue: %s, err: %w", string(c.queue.Name()), err) } - err = mqChan.Qos(prefetchCount, 0, true) + err = mqChan.Qos(c.getSanitizedPrefetchCount(), 0, true) if err != nil { return nil, fmt.Errorf("MQ issue. queue: %s, err: %w", string(c.queue.Name()), err) } @@ -169,6 +169,14 @@ func (c *consumer) messageChannel(prefetchCount int) (<-chan amqp.Delivery, erro return messageChannel, nil } +func (c *consumer) getSanitizedPrefetchCount() int { + if c.options.Prefetch < c.options.Workers { + return c.options.Workers + } + + return c.options.Prefetch +} + func (c *consumer) getRemainingRetries(delivery amqp.Delivery) int32 { remainingRetriesRaw, exists := delivery.Headers[headerRemainingRetries] if !exists { diff --git a/mq/options.go b/mq/options.go index 8dee7fe..fc8f1e7 100644 --- a/mq/options.go +++ b/mq/options.go @@ -8,6 +8,7 @@ import ( type ConsumerOptions struct { Workers int + Prefetch int RetryOnError bool RetryDelay time.Duration PerformanceMetric metrics.PerformanceMetric @@ -20,6 +21,7 @@ type ConsumerOptions struct { func DefaultConsumerOptions(workers int) *ConsumerOptions { return &ConsumerOptions{ Workers: workers, + Prefetch: 2 * workers, RetryOnError: true, RetryDelay: time.Second, MaxRetries: -1,