From 64294a9aa46e6f0d387ad88ea349541c8fdaad7a Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Mon, 17 Jul 2023 22:41:10 +0300 Subject: [PATCH] Add clientID kafka config --- README.md | 1 + config/config.go | 3 ++- kafka/client.go | 1 + 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 501361c..4acc1b1 100644 --- a/README.md +++ b/README.md @@ -114,6 +114,7 @@ Check out on [go-dcp](https://github.com/Trendyol/go-dcp#configuration) | `kafka.scramPassword` | string | no | *not set | Define scram password. | | `kafka.metadataTTL` | time.Duration | no | 60s | TTL for the metadata cached by segmentio, increase it to reduce network requests. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.MetadataTTL). | | `kafka.metadataTopics` | []string | no | | Topic names for the metadata cached by segmentio, define topics here that the connector may produce. In large Kafka clusters, this will reduce memory usage. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.MetadataTopics). | +| `kafka.clientID` | string | no | | Unique identifier that the transport communicates to the brokers when it sends requests. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.ClientID). | ### Kafka Metadata Configuration(Use it if you want to store the checkpoint data in Kafka) diff --git a/config/config.go b/config/config.go index e88dce2..b295e56 100644 --- a/config/config.go +++ b/config/config.go @@ -12,15 +12,16 @@ type Kafka struct { ScramUsername string `yaml:"scramUsername"` ScramPassword string `yaml:"scramPassword"` RootCAPath string `yaml:"rootCAPath"` + ClientID string `yaml:"clientID"` Brokers []string `yaml:"brokers"` MetadataTopics []string `yaml:"metadataTopics"` - ProducerBatchTickerDuration time.Duration `yaml:"producerBatchTickerDuration"` ProducerBatchBytes int `yaml:"producerBatchBytes"` ReadTimeout time.Duration `yaml:"readTimeout"` WriteTimeout time.Duration `yaml:"writeTimeout"` RequiredAcks int `yaml:"requiredAcks"` ProducerBatchSize int `yaml:"producerBatchSize"` MetadataTTL time.Duration `yaml:"metadataTTL"` + ProducerBatchTickerDuration time.Duration `yaml:"producerBatchTickerDuration"` Compression int8 `yaml:"compression"` SecureConnection bool `yaml:"secureConnection"` } diff --git a/kafka/client.go b/kafka/client.go index 50d6fa1..b9aa6df 100644 --- a/kafka/client.go +++ b/kafka/client.go @@ -269,6 +269,7 @@ func NewClient(config *config.Connector, logger logger.Logger, errorLogger logge newClient.transport = &kafka.Transport{ MetadataTTL: config.Kafka.MetadataTTL, MetadataTopics: config.Kafka.MetadataTopics, + ClientID: config.Kafka.ClientID, } if config.Kafka.SecureConnection {