diff --git a/README.md b/README.md index f08eab8..52cb8c9 100644 --- a/README.md +++ b/README.md @@ -255,6 +255,7 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap | `retryConfiguration.topic` | Retry/Exception topic names | | | `retryConfiguration.brokers` | Retry topic brokers urls | | | `retryConfiguration.maxRetry` | Maximum retry value for attempting to retry a message | 3 | +| `retryConfiguration.concurrency` | Number of goroutines used at listeners | 1 | | `retryConfiguration.tls.rootCAPath` | [see doc](https://pkg.go.dev/crypto/tls#Config.RootCAs) | "" | | `retryConfiguration.tls.intermediateCAPath` | Same with rootCA, if you want to specify two rootca you can use it with rootCAPath | "" | | `retryConfiguration.sasl.authType` | `SCRAM` or `PLAIN` | | diff --git a/consumer_config.go b/consumer_config.go index 7a18da1..63218f6 100644 --- a/consumer_config.go +++ b/consumer_config.go @@ -125,7 +125,7 @@ func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config { Duration: cfg.RetryConfiguration.WorkDuration, MaxRetry: cfg.RetryConfiguration.MaxRetry, VerifyTopicOnStartup: cfg.RetryConfiguration.VerifyTopicOnStartup, - Concurrency: cfg.Concurrency, + Concurrency: cfg.RetryConfiguration.Concurrency, MinBytes: cfg.Reader.MinBytes, MaxBytes: cfg.Reader.MaxBytes, MaxWait: cfg.Reader.MaxWait, @@ -224,6 +224,7 @@ type RetryConfiguration struct { MaxRetry int WorkDuration time.Duration SkipMessageByHeaderFn SkipMessageByHeaderFn + Concurrency int } type BatchConfiguration struct { @@ -284,6 +285,10 @@ func (cfg *ConsumerConfig) setDefaults() { cfg.Concurrency = 1 } + if cfg.RetryConfiguration.Concurrency == 0 { + cfg.RetryConfiguration.Concurrency = cfg.Concurrency + } + if cfg.CommitInterval == 0 { cfg.CommitInterval = time.Second // Kafka-go library default value is 0, we need to also change this. diff --git a/consumer_config_test.go b/consumer_config_test.go index ebf9878..f434e71 100644 --- a/consumer_config_test.go +++ b/consumer_config_test.go @@ -35,6 +35,9 @@ func TestConsumerConfig_validate(t *testing.T) { if cfg.BatchConfiguration != nil { t.Fatalf("Batch configuration not specified so it must stay as nil") } + if cfg.RetryConfiguration.Concurrency != 1 { + t.Fatal("Retry Configuration Concurrency must equal to 1") + } }) t.Run("Set_Defaults_For_BatchConfiguration", func(t *testing.T) { // Given