Skip to content

Commit

Permalink
add option to set prefetch per consumer (#234)
Browse files Browse the repository at this point in the history
  • Loading branch information
DeimonDB authored Aug 30, 2023
1 parent de9ae91 commit d4dc257
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 3 deletions.
14 changes: 11 additions & 3 deletions mq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (c *consumer) Start(ctx context.Context) error {
c.stopChan = make(chan struct{})

var err error
c.messages, err = c.messageChannel(c.options.Workers)
c.messages, err = c.messageChannel()
if err != nil {
return fmt.Errorf("get message channel: %v", err)
}
Expand Down Expand Up @@ -142,13 +142,13 @@ func (c *consumer) process(queueName string, body []byte) error {
}

// messageChannel will create a new dedicated channel for this consumer to use
func (c *consumer) messageChannel(prefetchCount int) (<-chan amqp.Delivery, error) {
func (c *consumer) messageChannel() (<-chan amqp.Delivery, error) {
mqChan, err := c.client.conn.Channel()
if err != nil {
return nil, fmt.Errorf("MQ issue. queue: %s, err: %w", string(c.queue.Name()), err)
}

err = mqChan.Qos(prefetchCount, 0, true)
err = mqChan.Qos(c.getSanitizedPrefetchCount(), 0, true)
if err != nil {
return nil, fmt.Errorf("MQ issue. queue: %s, err: %w", string(c.queue.Name()), err)
}
Expand All @@ -169,6 +169,14 @@ func (c *consumer) messageChannel(prefetchCount int) (<-chan amqp.Delivery, erro
return messageChannel, nil
}

func (c *consumer) getSanitizedPrefetchCount() int {
if c.options.Prefetch < c.options.Workers {
return c.options.Workers
}

return c.options.Prefetch
}

func (c *consumer) getRemainingRetries(delivery amqp.Delivery) int32 {
remainingRetriesRaw, exists := delivery.Headers[headerRemainingRetries]
if !exists {
Expand Down
2 changes: 2 additions & 0 deletions mq/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

type ConsumerOptions struct {
Workers int
Prefetch int
RetryOnError bool
RetryDelay time.Duration
PerformanceMetric metrics.PerformanceMetric
Expand All @@ -20,6 +21,7 @@ type ConsumerOptions struct {
func DefaultConsumerOptions(workers int) *ConsumerOptions {
return &ConsumerOptions{
Workers: workers,
Prefetch: 2 * workers,
RetryOnError: true,
RetryDelay: time.Second,
MaxRetries: -1,
Expand Down

0 comments on commit d4dc257

Please sign in to comment.