Skip to content

Commit

Permalink
Merge pull request #58 from Trendyol/feature/client-id-config
Browse files Browse the repository at this point in the history
Add clientID kafka config
  • Loading branch information
emreodabas authored Jul 18, 2023
2 parents 56b7977 + 64294a9 commit 98505e1
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 1 deletion.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ Check out on [go-dcp](https://github.com/Trendyol/go-dcp#configuration)
| `kafka.scramPassword` | string | no | *not set | Define scram password. |
| `kafka.metadataTTL` | time.Duration | no | 60s | TTL for the metadata cached by segmentio, increase it to reduce network requests. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.MetadataTTL). |
| `kafka.metadataTopics` | []string | no | | Topic names for the metadata cached by segmentio, define topics here that the connector may produce. In large Kafka clusters, this will reduce memory usage. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.MetadataTopics). |
| `kafka.clientID` | string | no | | Unique identifier that the transport communicates to the brokers when it sends requests. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.ClientID). |

### Kafka Metadata Configuration(Use it if you want to store the checkpoint data in Kafka)

Expand Down
3 changes: 2 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@ type Kafka struct {
ScramUsername string `yaml:"scramUsername"`
ScramPassword string `yaml:"scramPassword"`
RootCAPath string `yaml:"rootCAPath"`
ClientID string `yaml:"clientID"`
Brokers []string `yaml:"brokers"`
MetadataTopics []string `yaml:"metadataTopics"`
ProducerBatchTickerDuration time.Duration `yaml:"producerBatchTickerDuration"`
ProducerBatchBytes int `yaml:"producerBatchBytes"`
ReadTimeout time.Duration `yaml:"readTimeout"`
WriteTimeout time.Duration `yaml:"writeTimeout"`
RequiredAcks int `yaml:"requiredAcks"`
ProducerBatchSize int `yaml:"producerBatchSize"`
MetadataTTL time.Duration `yaml:"metadataTTL"`
ProducerBatchTickerDuration time.Duration `yaml:"producerBatchTickerDuration"`
Compression int8 `yaml:"compression"`
SecureConnection bool `yaml:"secureConnection"`
}
Expand Down
1 change: 1 addition & 0 deletions kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ func NewClient(config *config.Connector, logger logger.Logger, errorLogger logge
newClient.transport = &kafka.Transport{
MetadataTTL: config.Kafka.MetadataTTL,
MetadataTopics: config.Kafka.MetadataTopics,
ClientID: config.Kafka.ClientID,
}

if config.Kafka.SecureConnection {
Expand Down

0 comments on commit 98505e1

Please sign in to comment.