diff --git a/sqsjobs/listener.go b/sqsjobs/listener.go index 46c2a3c..d8b09d4 100644 --- a/sqsjobs/listener.go +++ b/sqsjobs/listener.go @@ -31,12 +31,6 @@ func (c *Driver) listen(ctx context.Context) { //nolint:gocognit c.log.Debug("sqs listener was stopped") return default: - // lock when we hit the limit - for atomic.LoadInt64(c.msgInFlight) >= int64(atomic.LoadInt32(c.msgInFlightLimit)) { - c.log.Debug("prefetch limit was reached, waiting for the jobs to be processed", zap.Int64("current", atomic.LoadInt64(c.msgInFlight)), zap.Int32("limit", atomic.LoadInt32(c.msgInFlightLimit))) - c.cond.Wait() - } - message, err := c.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{ QueueUrl: c.queueURL, MaxNumberOfMessages: c.prefetch, @@ -93,6 +87,11 @@ func (c *Driver) listen(ctx context.Context) { //nolint:gocognit for i := 0; i < len(message.Messages); i++ { c.cond.L.Lock() + // lock when we hit the limit + for atomic.LoadInt64(c.msgInFlight) >= int64(atomic.LoadInt32(c.msgInFlightLimit)) { + c.log.Debug("prefetch limit was reached, waiting for the jobs to be processed", zap.Int64("current", atomic.LoadInt64(c.msgInFlight)), zap.Int32("limit", atomic.LoadInt32(c.msgInFlightLimit))) + c.cond.Wait() + } m := message.Messages[i] c.log.Debug("receive message", zap.Stringp("ID", m.MessageId))