Skip to content

Commit

Permalink
feature: add max_messages_in_flight_limit option
Browse files Browse the repository at this point in the history
Signed-off-by: Valery Piashchynski <[email protected]>
  • Loading branch information
rustatian committed Nov 10, 2024
1 parent c222308 commit be04361
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 13 deletions.
9 changes: 9 additions & 0 deletions sqsjobs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (
messageGroupID string = "message_group_id"
waitTime string = "wait_time"
skipQueueDeclaration string = "skip_queue_declaration"
maxMsgsInFlightLimit string = "max_messages_in_flight_limit"
maxVisibilityTimeout int32 = 43200
maxWaitTime int32 = 20
)
Expand All @@ -32,6 +33,10 @@ type Config struct {

// pipeline

// The maximum number of messages which can be in-flight at once. Use this to prevent workers from overloading.
// Defaults to prefetch
MaxMsgInFlightLimit int32 `mapstructure:"max_messages_in_flight_limit"`

// get queue url, do not declare
SkipQueueDeclaration bool `mapstructure:"skip_queue_declaration"`

Expand Down Expand Up @@ -139,6 +144,10 @@ func (c *Config) InitDefault() {
c.Prefetch = 10
}

if c.MaxMsgInFlightLimit == 0 {
c.MaxMsgInFlightLimit = c.Prefetch
}

if c.WaitTimeSeconds < 0 {
// 0 - ignored by AWS
c.WaitTimeSeconds = 0
Expand Down
19 changes: 15 additions & 4 deletions sqsjobs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, pipe jobs.Pip
prefetch: conf.Prefetch,
pauseCh: make(chan struct{}, 1),
// new in 2.12.1
msgInFlightLimit: ptr(int32(100)),
msgInFlightLimit: &conf.MaxMsgInFlightLimit,
msgInFlight: ptr(int64(0)),
}

Expand Down Expand Up @@ -199,6 +199,16 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.
return nil, errors.E(op, err)
}

wt := pipe.Int(waitTime, 0)
if wt < 0 {
wt = 0
} else if wt > int(maxWaitTime) {
wt = int(maxWaitTime)
}

pref := int32(pipe.Int(prefetch, 1)) //nolint:gosec
msgInFl := int32(pipe.Int(maxMsgsInFlightLimit, int(pref))) //nolint:gosec

// initialize job Driver
jb := &Driver{
tracer: tracer,
Expand All @@ -214,11 +224,12 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipe jobs.Pipeline, log *zap.
visibilityTimeout: int32(pipe.Int(visibility, 0)), //nolint:gosec
errorVisibilityTimeout: int32(pipe.Int(errorVisibilityTimeout, 0)), //nolint:gosec
retainFailedJobs: pipe.Bool(retainFailedJobs, false),
waitTime: int32(pipe.Int(waitTime, 0)), //nolint:gosec
prefetch: int32(pipe.Int(prefetch, 0)), //nolint:gosec
waitTime: int32(wt), //nolint:gosec
prefetch: pref,
pauseCh: make(chan struct{}, 1),
// new in 2.12.1
msgInFlightLimit: ptr(int32(100)),
// default - prefetch
msgInFlightLimit: ptr(msgInFl), //nolin:gosec
msgInFlight: ptr(int64(0)),
}

Expand Down
17 changes: 9 additions & 8 deletions tests/configs/.rr-sqs-init_fifo-prefetch.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
version: '3'
version: "3"

rpc:
listen: tcp://127.0.0.1:6232

server:
command: "php php_test_files/jobs/jobs_ok_sleep1.php"

sqs: { }
sqs: {}

logs:
level: debug
Expand All @@ -23,10 +23,11 @@ jobs:
driver: sqs
config:
prefetch: 10
message_group_id: 'RR'
max_messages_in_flight_limit: 10
message_group_id: "RR"
queue: sqs-init-prefetch-1.fifo # must end with .fifo!
attributes:
FifoQueue: 'true'
FifoQueue: "true"
ReceiveMessageWaitTimeSeconds: 5
VisibilityTimeout: 10 # release jobs again after 10s; before the worker has processed all jobs
tags:
Expand All @@ -36,13 +37,13 @@ jobs:
driver: sqs
config:
prefetch: 10
max_messages_in_flight_limit: 10
queue: sqs-init-prefetch-2.fifo
message_group_id: 'RR'
message_group_id: "RR"
attributes:
FifoQueue: 'true'
FifoQueue: "true"
ReceiveMessageWaitTimeSeconds: 5
tags:
test: "tag"

consume: [ "test-1", "test-2" ]

consume: ["test-1", "test-2"]
2 changes: 1 addition & 1 deletion tests/jobs_sqs_fifo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ func TestSQSPrefetch(t *testing.T) {
}()

time.Sleep(time.Second * 3)
for i := 0; i < 120; i++ {
for i := 0; i < 15; i++ {
go func() {
t.Run("PushPipelineFifo", helpers.PushToPipe("test-1", false, "127.0.0.1:6232"))
t.Run("PushPipelineFifo", helpers.PushToPipe("test-2", false, "127.0.0.1:6232"))
Expand Down

0 comments on commit be04361

Please sign in to comment.