From 80df80c4a94e78f7cb18340ae2457f4d9d4af4b6 Mon Sep 17 00:00:00 2001 From: Mehmet Sezer Date: Fri, 7 Jul 2023 15:44:52 +0300 Subject: [PATCH] Add topic per dcp message metadata field (#56) * Add topic per dcp message metadata field * feat: bump go-dcp-client v0.0.62 --- README.md | 2 +- connector.go | 21 +++++++++++++++------ go.mod | 2 +- go.sum | 4 ++-- kafka/message/message.go | 1 + 5 files changed, 20 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 7145681..cf06d5a 100644 --- a/README.md +++ b/README.md @@ -99,7 +99,7 @@ Check out on [go-dcp-client](https://github.com/Trendyol/go-dcp-client#configura | Variable | Type | Required | Default | Description | |-------------------------------------|-------------------|----------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `kafka.collectionTopicMapping` | map[string]string | yes | | Defines which Couchbase collection events will be sent to which topic | +| `kafka.collectionTopicMapping` | map[string]string | yes | | Defines which Couchbase collection events will be sent to which topic,:warning: **If topic information is entered in the mapper, it will OVERWRITE this config**. | | `kafka.brokers` | []string | yes | | Broker ip and port information | | `kafka.producerBatchSize` | integer | no | 2000 | Maximum message count for batch, if exceed flush will be triggered. | | `kafka.producerBatchBytes` | integer | no | 10485760 | Maximum size(byte) for batch, if exceed flush will be triggered. | diff --git a/connector.go b/connector.go index 1c926d3..126af7e 100644 --- a/connector.go +++ b/connector.go @@ -2,6 +2,7 @@ package gokafkaconnectcouchbase import ( "errors" + "fmt" "os" "github.com/Trendyol/go-dcp-client" @@ -62,11 +63,7 @@ func (c *connector) produce(ctx *models.ListenerContext) { default: return } - topic := c.config.Kafka.CollectionTopicMapping[e.CollectionName] - if topic == "" { - c.errorLogger.Printf("unexpected collection | %s", e.CollectionName) - return - } + kafkaMessages := c.mapper(e) if len(kafkaMessages) == 0 { @@ -77,7 +74,7 @@ func (c *connector) produce(ctx *models.ListenerContext) { messages := make([]sKafka.Message, 0, len(kafkaMessages)) for _, message := range kafkaMessages { messages = append(messages, sKafka.Message{ - Topic: topic, + Topic: c.getTopicName(e.CollectionName, message.Topic), Key: message.Key, Value: message.Value, Headers: message.Headers, @@ -86,6 +83,18 @@ func (c *connector) produce(ctx *models.ListenerContext) { c.producer.Produce(ctx, e.EventTime, messages) } +func (c *connector) getTopicName(collectionName string, messageTopic *string) string { + if messageTopic != nil { + return *messageTopic + } + + topic := c.config.Kafka.CollectionTopicMapping[collectionName] + if topic == "" { + panic(fmt.Sprintf("there is no topic mapping for collection: %s on your configuration", collectionName)) + } + return topic +} + func NewConnector(cfg any, mapper Mapper) (Connector, error) { c, err := newConfig(cfg) if err != nil { diff --git a/go.mod b/go.mod index 4a17287..c9a7234 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/Trendyol/go-kafka-connect-couchbase go 1.19 require ( - github.com/Trendyol/go-dcp-client v0.0.61 + github.com/Trendyol/go-dcp-client v0.0.62 github.com/json-iterator/go v1.1.12 github.com/prometheus/client_golang v1.15.1 github.com/segmentio/kafka-go v0.4.39 diff --git a/go.sum b/go.sum index 7073745..6b9b466 100644 --- a/go.sum +++ b/go.sum @@ -36,8 +36,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= -github.com/Trendyol/go-dcp-client v0.0.61 h1:v1nPJIgwVJEZuykkPgghpgGDjfbqqN0rzsoCuYaqKF0= -github.com/Trendyol/go-dcp-client v0.0.61/go.mod h1:V/s7c30X1y1ZAlxnbQFeBJd1J0U8KMir3Jr9XAcpWho= +github.com/Trendyol/go-dcp-client v0.0.62 h1:52PLkgVqjQ88LrI8gE5LnlqY1dMVtkj1NXevfoeP8Ok= +github.com/Trendyol/go-dcp-client v0.0.62/go.mod h1:V/s7c30X1y1ZAlxnbQFeBJd1J0U8KMir3Jr9XAcpWho= github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow= github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= diff --git a/kafka/message/message.go b/kafka/message/message.go index 8476230..63087a3 100644 --- a/kafka/message/message.go +++ b/kafka/message/message.go @@ -6,4 +6,5 @@ type KafkaMessage struct { Headers []kafka.Header Key []byte Value []byte + Topic *string }