Skip to content

Commit

Permalink
fix: move prefetch limit in the messages loop
Browse files Browse the repository at this point in the history
Signed-off-by: Valery Piashchynski <[email protected]>
  • Loading branch information
rustatian committed Nov 8, 2024
1 parent 44d3662 commit 24b58b3
Showing 1 changed file with 5 additions and 6 deletions.
11 changes: 5 additions & 6 deletions sqsjobs/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,6 @@ func (c *Driver) listen(ctx context.Context) { //nolint:gocognit
c.log.Debug("sqs listener was stopped")
return
default:
// lock when we hit the limit
for atomic.LoadInt64(c.msgInFlight) >= int64(atomic.LoadInt32(c.msgInFlightLimit)) {
c.log.Debug("prefetch limit was reached, waiting for the jobs to be processed", zap.Int64("current", atomic.LoadInt64(c.msgInFlight)), zap.Int32("limit", atomic.LoadInt32(c.msgInFlightLimit)))
c.cond.Wait()
}

message, err := c.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
QueueUrl: c.queueURL,
MaxNumberOfMessages: c.prefetch,
Expand Down Expand Up @@ -93,6 +87,11 @@ func (c *Driver) listen(ctx context.Context) { //nolint:gocognit

for i := 0; i < len(message.Messages); i++ {
c.cond.L.Lock()
// lock when we hit the limit
for atomic.LoadInt64(c.msgInFlight) >= int64(atomic.LoadInt32(c.msgInFlightLimit)) {
c.log.Debug("prefetch limit was reached, waiting for the jobs to be processed", zap.Int64("current", atomic.LoadInt64(c.msgInFlight)), zap.Int32("limit", atomic.LoadInt32(c.msgInFlightLimit)))
c.cond.Wait()
}

m := message.Messages[i]
c.log.Debug("receive message", zap.Stringp("ID", m.MessageId))
Expand Down

0 comments on commit 24b58b3

Please sign in to comment.