Skip to content

Commit

Permalink
Add ConsumerOptions (#29)
Browse files Browse the repository at this point in the history
* Add ConsumerOptions

* Update mq.go

* Update mq.go

* Update mq.go

* Add InitDefaultConsumerOptions
  • Loading branch information
vikmeup authored Jan 13, 2021
1 parent 7e676f1 commit b74318d
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 35 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ require (
github.com/kr/pretty v0.1.0 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/shopspring/decimal v1.2.0
github.com/sirupsen/logrus v1.7.0 // indirect
github.com/streadway/amqp v1.0.0 // indirect
github.com/stretchr/testify v1.6.1
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ=
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM=
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand All @@ -26,6 +31,7 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
48 changes: 13 additions & 35 deletions network/mq/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,13 @@ import (
)

var (
prefetchCount int
amqpChan *amqp.Channel
conn *amqp.Connection

DefaultConsumerOptions = ConsumerOptions{
Workers: 1,
RetryOnError: false,
RetryDelay: 0,
}
amqpChan *amqp.Channel
conn *amqp.Connection
)

const ()

type Consumer interface {
Callback(msg amqp.Delivery) error
}

type ConsumerOptions struct {
Workers int
RetryOnError bool
RetryDelay time.Duration
}

type (
Queue string
Exchange string
Expand Down Expand Up @@ -135,32 +119,26 @@ func (q Queue) GetMessageChannel() MessageChannel {
return messageChannel
}

func worker(messages <-chan amqp.Delivery, consumer Consumer) {
func worker(messages <-chan amqp.Delivery, consumer Consumer, options ConsumerOptions) {
for msg := range messages {
err := consumer.Callback(msg)
if err != nil {
log.Error(err)
}
//if options.RetryOnError {
// if err := message.Nack(false, true); err != nil {
// log.Error(err)
// }
// time.Sleep(options.RetryDelay)
//} else {
// if err := message.Ack(false); err != nil {
// log.Error(err)
// }
//}
if err := msg.Ack(false); err != nil {
log.Error(err)
if err != nil && options.RetryOnError {
if err := msg.Nack(true, true); err != nil {
log.Error(err)
}
time.Sleep(options.RetryDelay)
} else {
if err := msg.Ack(false); err != nil {
log.Error(err)
}
}
}
}

func (q Queue) RunConsumer(consumer Consumer, options ConsumerOptions, ctx context.Context) {
messages := make(chan amqp.Delivery)
for w := 1; w <= options.Workers; w++ {
go worker(messages, consumer)
go worker(messages, consumer, options)
}
messageChannel := q.GetMessageChannel()
for {
Expand Down
19 changes: 19 additions & 0 deletions network/mq/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package mq

import (
"time"
)

type ConsumerOptions struct {
Workers int
RetryOnError bool
RetryDelay time.Duration
}

func InitDefaultConsumerOptions(workers int) ConsumerOptions {
return ConsumerOptions{
Workers: workers,
RetryOnError: true,
RetryDelay: time.Second * 1,
}
}

0 comments on commit b74318d

Please sign in to comment.