From 541cca754e67b762aa3e982f58352648876ed79b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?A=2ESamet=20=C4=B0leri?= Date: Wed, 30 Oct 2024 14:05:01 +0300 Subject: [PATCH] feat: cronsumer internal queue capacity, producer batch size and timeout expose (#147) * feat: able to use kafka cronsumer queue capacity field * feat: able to inject cronsumer producer batch size and timeout * docs: add new cronsumer exposed field to the readme * feat: kafka cronsumer v1.1.5 dump --- README.md | 13 ++++++++----- consumer_config.go | 10 ++++++++-- go.mod | 2 +- go.sum | 4 ++++ test/integration/go.mod | 2 +- test/integration/go.sum | 2 ++ 6 files changed, 24 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 52cb8c9..4ff2434 100644 --- a/README.md +++ b/README.md @@ -241,17 +241,20 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap | `metricPrefix` | MetricPrefix is used for prometheus fq name prefix. If not provided, default metric prefix value is `kafka_konsumer`. Currently, there are two exposed prometheus metrics. `processed_messages_total` and `unprocessed_messages_total` So, if default metric prefix used, metrics names are `kafka_konsumer_processed_messages_total_current` and `kafka_konsumer_unprocessed_messages_total_current`. | kafka_konsumer | | `dial.Timeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Dialer) | no timeout | | `dial.KeepAlive` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Dialer) | not enabled | -| `transport.DialTimeout ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | 5s | -| `transport.IdleTimeout ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | 30s | -| `transport.MetadataTTL ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | 6s | -| `transport.MetadataTopics ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | all topics in cluster | +| `transport.DialTimeout ` | [see doc]((https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.47#Transport) | 5s | +| `transport.IdleTimeout ` | [see doc]((https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.47#Transport) | 30s | +| `transport.MetadataTTL ` | [see doc]((https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.47#Transport) | 6s | +| `transport.MetadataTopics ` | [see doc]((https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.47#Transport) | all topics in cluster | | `distributedTracingEnabled` | indicates open telemetry support on/off for consume and produce operations. | false | | `distributedTracingConfiguration.TracerProvider` | [see doc](https://opentelemetry.io/docs/specs/otel/trace/api/) | otel.GetTracerProvider() | | `distributedTracingConfiguration.Propagator` | [see doc](https://opentelemetry.io/docs/specs/otel/trace/api/) | otel.GetTextMapPropagator() | -| `retryConfiguration.clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | | +| `retryConfiguration.clientId` | [see doc]((https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.47#Transport) | | | `retryConfiguration.startTimeCron` | Cron expression when retry consumer ([kafka-cronsumer](https://github.com/Trendyol/kafka-cronsumer#configurations)) starts to work at | | | `retryConfiguration.metricPrefix` | MetricPrefix is used for prometheus fq name prefix. If not provided, default metric prefix value is kafka_cronsumer. Currently, there are two exposed prometheus metrics. retried_messages_total_current and discarded_messages_total_current. So, if default metric prefix used, metrics names are kafka_cronsumer_retried_messages_total_current and kafka_cronsumer_discarded_messages_total_current | kafka_cronsumer | | `retryConfiguration.workDuration` | Work duration exception consumer actively consuming messages | | +| `retryConfiguration.queueCapacity` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.47#ReaderConfig.QueueCapacity) | 100 | +| `retryConfiguration.producerBatchSize` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.47#WriterConfig.BatchSize) | 100 | +| `retryConfiguration.producerBatchTimeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.47#WriterConfig.BatchTimeout) | 100 | | `retryConfiguration.topic` | Retry/Exception topic names | | | `retryConfiguration.brokers` | Retry topic brokers urls | | | `retryConfiguration.maxRetry` | Maximum retry value for attempting to retry a message | 3 | diff --git a/consumer_config.go b/consumer_config.go index 63218f6..c4ac5ea 100644 --- a/consumer_config.go +++ b/consumer_config.go @@ -126,6 +126,7 @@ func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config { MaxRetry: cfg.RetryConfiguration.MaxRetry, VerifyTopicOnStartup: cfg.RetryConfiguration.VerifyTopicOnStartup, Concurrency: cfg.RetryConfiguration.Concurrency, + QueueCapacity: cfg.RetryConfiguration.QueueCapacity, MinBytes: cfg.Reader.MinBytes, MaxBytes: cfg.Reader.MaxBytes, MaxWait: cfg.Reader.MaxWait, @@ -137,8 +138,10 @@ func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config { RetentionTime: cfg.Reader.RetentionTime, }, Producer: kcronsumer.ProducerConfig{ - Balancer: cfg.RetryConfiguration.Balancer, - Brokers: cfg.RetryConfiguration.Brokers, + Balancer: cfg.RetryConfiguration.Balancer, + Brokers: cfg.RetryConfiguration.Brokers, + BatchSize: cfg.RetryConfiguration.ProducerBatchSize, + BatchTimeout: cfg.RetryConfiguration.ProducerBatchTimeout, }, LogLevel: lcronsumer.Level(cfg.RetryConfiguration.LogLevel), } @@ -225,6 +228,9 @@ type RetryConfiguration struct { WorkDuration time.Duration SkipMessageByHeaderFn SkipMessageByHeaderFn Concurrency int + QueueCapacity int + ProducerBatchSize int + ProducerBatchTimeout time.Duration } type BatchConfiguration struct { diff --git a/go.mod b/go.mod index 41376da..e1db224 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/Trendyol/kafka-konsumer/v2 go 1.19 require ( - github.com/Trendyol/kafka-cronsumer v1.5.4 + github.com/Trendyol/kafka-cronsumer v1.5.5 github.com/Trendyol/otel-kafka-konsumer v0.0.7 github.com/ansrivas/fiberprometheus/v2 v2.6.1 github.com/gofiber/fiber/v2 v2.52.1 diff --git a/go.sum b/go.sum index 0f0df02..3907c74 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,9 @@ github.com/Trendyol/kafka-cronsumer v1.5.4 h1:r2iyWJ8E+rd5IoRGv/XZ2bKMknxfLh2eApPhMiJodR4= github.com/Trendyol/kafka-cronsumer v1.5.4/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= +github.com/Trendyol/kafka-cronsumer v1.5.5-0.20241024185446-b44371b7a3ec h1:82NtbRoJBnH7Qq/9VIMyRbRi6jTc9yytqrpR3KsH5Ks= +github.com/Trendyol/kafka-cronsumer v1.5.5-0.20241024185446-b44371b7a3ec/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= +github.com/Trendyol/kafka-cronsumer v1.5.5 h1:R+tZd/A//0NDQbmBwkuMolncOXIeKlqRt1bvnmU6Fn8= +github.com/Trendyol/kafka-cronsumer v1.5.5/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4= github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= diff --git a/test/integration/go.mod b/test/integration/go.mod index a36c13a..92f1db1 100644 --- a/test/integration/go.mod +++ b/test/integration/go.mod @@ -10,7 +10,7 @@ require ( ) require ( - github.com/Trendyol/kafka-cronsumer v1.5.4 // indirect + github.com/Trendyol/kafka-cronsumer v1.5.5 // indirect github.com/Trendyol/otel-kafka-konsumer v0.0.7 // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/ansrivas/fiberprometheus/v2 v2.6.1 // indirect diff --git a/test/integration/go.sum b/test/integration/go.sum index febb32c..c25f52b 100644 --- a/test/integration/go.sum +++ b/test/integration/go.sum @@ -9,6 +9,8 @@ github.com/Trendyol/kafka-cronsumer v1.5.3/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNu github.com/Trendyol/kafka-cronsumer v1.5.4-0.20240808131305-10cf27589160/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= github.com/Trendyol/kafka-cronsumer v1.5.4-0.20240827135347-7ed5187ea81d/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= github.com/Trendyol/kafka-cronsumer v1.5.4/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= +github.com/Trendyol/kafka-cronsumer v1.5.5-0.20241024185446-b44371b7a3ec/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= +github.com/Trendyol/kafka-cronsumer v1.5.5/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4= github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=