Skip to content

Commit

Permalink
Add topic per dcp message metadata field (#56)
Browse files Browse the repository at this point in the history
* Add topic per dcp message metadata field

* feat: bump go-dcp-client v0.0.62
  • Loading branch information
mhmtszr authored Jul 7, 2023
1 parent 3fc23de commit 80df80c
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 10 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ Check out on [go-dcp-client](https://github.com/Trendyol/go-dcp-client#configura

| Variable | Type | Required | Default | Description |
|-------------------------------------|-------------------|----------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `kafka.collectionTopicMapping` | map[string]string | yes | | Defines which Couchbase collection events will be sent to which topic |
| `kafka.collectionTopicMapping` | map[string]string | yes | | Defines which Couchbase collection events will be sent to which topic,:warning: **If topic information is entered in the mapper, it will OVERWRITE this config**. |
| `kafka.brokers` | []string | yes | | Broker ip and port information |
| `kafka.producerBatchSize` | integer | no | 2000 | Maximum message count for batch, if exceed flush will be triggered. |
| `kafka.producerBatchBytes` | integer | no | 10485760 | Maximum size(byte) for batch, if exceed flush will be triggered. |
Expand Down
21 changes: 15 additions & 6 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gokafkaconnectcouchbase

import (
"errors"
"fmt"
"os"

"github.com/Trendyol/go-dcp-client"
Expand Down Expand Up @@ -62,11 +63,7 @@ func (c *connector) produce(ctx *models.ListenerContext) {
default:
return
}
topic := c.config.Kafka.CollectionTopicMapping[e.CollectionName]
if topic == "" {
c.errorLogger.Printf("unexpected collection | %s", e.CollectionName)
return
}

kafkaMessages := c.mapper(e)

if len(kafkaMessages) == 0 {
Expand All @@ -77,7 +74,7 @@ func (c *connector) produce(ctx *models.ListenerContext) {
messages := make([]sKafka.Message, 0, len(kafkaMessages))
for _, message := range kafkaMessages {
messages = append(messages, sKafka.Message{
Topic: topic,
Topic: c.getTopicName(e.CollectionName, message.Topic),
Key: message.Key,
Value: message.Value,
Headers: message.Headers,
Expand All @@ -86,6 +83,18 @@ func (c *connector) produce(ctx *models.ListenerContext) {
c.producer.Produce(ctx, e.EventTime, messages)
}

func (c *connector) getTopicName(collectionName string, messageTopic *string) string {
if messageTopic != nil {
return *messageTopic
}

topic := c.config.Kafka.CollectionTopicMapping[collectionName]
if topic == "" {
panic(fmt.Sprintf("there is no topic mapping for collection: %s on your configuration", collectionName))
}
return topic
}

func NewConnector(cfg any, mapper Mapper) (Connector, error) {
c, err := newConfig(cfg)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/Trendyol/go-kafka-connect-couchbase
go 1.19

require (
github.com/Trendyol/go-dcp-client v0.0.61
github.com/Trendyol/go-dcp-client v0.0.62
github.com/json-iterator/go v1.1.12
github.com/prometheus/client_golang v1.15.1
github.com/segmentio/kafka-go v0.4.39
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Trendyol/go-dcp-client v0.0.61 h1:v1nPJIgwVJEZuykkPgghpgGDjfbqqN0rzsoCuYaqKF0=
github.com/Trendyol/go-dcp-client v0.0.61/go.mod h1:V/s7c30X1y1ZAlxnbQFeBJd1J0U8KMir3Jr9XAcpWho=
github.com/Trendyol/go-dcp-client v0.0.62 h1:52PLkgVqjQ88LrI8gE5LnlqY1dMVtkj1NXevfoeP8Ok=
github.com/Trendyol/go-dcp-client v0.0.62/go.mod h1:V/s7c30X1y1ZAlxnbQFeBJd1J0U8KMir3Jr9XAcpWho=
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down
1 change: 1 addition & 0 deletions kafka/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ type KafkaMessage struct {
Headers []kafka.Header
Key []byte
Value []byte
Topic *string
}

0 comments on commit 80df80c

Please sign in to comment.