diff --git a/README.md b/README.md index 560a2f5..c44013b 100644 --- a/README.md +++ b/README.md @@ -38,51 +38,51 @@ used for both connectors. ```go func mapper(event couchbase.Event) []message.KafkaMessage { -// return nil if you wish to discard the event -return []message.KafkaMessage{ -{ -Headers: nil, -Key: event.Key, -Value: event.Value, -}, -} + // return nil if you wish to discard the event + return []message.KafkaMessage{ + { + Headers: nil, + Key: event.Key, + Value: event.Value, + }, + } } func main() { -c, err := dcpkafka.NewConnector(&config.Connector{ -Dcp: dcpConfig.Dcp{ -Hosts: []string{"localhost:8091"}, -Username: "user", -Password: "password", -BucketName: "dcp-test", -Dcp: dcpConfig.ExternalDcp{ -Group: dcpConfig.DCPGroup{ -Name: "groupName", -Membership: dcpConfig.DCPGroupMembership{ -RebalanceDelay: 3 * time.Second, -}, -}, -}, -Metadata: dcpConfig.Metadata{ -Config: map[string]string{ -"bucket": "checkpoint-bucket-name", -"scope": "_default", -"collection": "_default", -}, -Type: "couchbase", -}, -Debug: true}, -Kafka: config.Kafka{ -CollectionTopicMapping: map[string]string{"_default": "topic"}, -Brokers: []string{"localhost:9092"}, -}, -}, mapper) -if err != nil { -panic(err) -} - -defer c.Close() -c.Start() + c, err := dcpkafka.NewConnector(&config.Connector{ + Dcp: dcpConfig.Dcp{ + Hosts: []string{"localhost:8091"}, + Username: "user", + Password: "password", + BucketName: "dcp-test", + Dcp: dcpConfig.ExternalDcp{ + Group: dcpConfig.DCPGroup{ + Name: "groupName", + Membership: dcpConfig.DCPGroupMembership{ + RebalanceDelay: 3 * time.Second, + }, + }, + }, + Metadata: dcpConfig.Metadata{ + Config: map[string]string{ + "bucket": "checkpoint-bucket-name", + "scope": "_default", + "collection": "_default", + }, + Type: "couchbase", + }, + Debug: true}, + Kafka: config.Kafka{ + CollectionTopicMapping: map[string]string{"_default": "topic"}, + Brokers: []string{"localhost:9092"}, + }, + }, mapper) + if err != nil { + panic(err) + } + + defer c.Close() + c.Start() } ``` @@ -120,8 +120,6 @@ Check out on [go-dcp](https://github.com/Trendyol/go-dcp#configuration) | `kafka.metadataTopics` | []string | no | | Topic names for the metadata cached by segmentio, define topics here that the connector may produce. In large Kafka clusters, this will reduce memory usage. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.MetadataTopics). | | `kafka.clientID` | string | no | | Unique identifier that the transport communicates to the brokers when it sends requests. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.ClientID). | | `kafka.allowAutoTopicCreation` | bool | no | false | Create topic if missing. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Writer.AllowAutoTopicCreation). | -| `kafka.rejectionLog.Topic` | string | no | | Rejection topic name. | -| `kafka.rejectionLog.IncludeValue` | boolean | no | false | Includes rejection log source info. `false` is default. | ### Kafka Metadata Configuration(Use it if you want to store the checkpoint data in Kafka) @@ -139,7 +137,7 @@ Check out on [go-dcp](https://github.com/Trendyol/go-dcp#configuration) | cbgo_kafka_connector_batch_produce_latency_ms_current | Time to produce messages in the batch. | N/A | Gauge | You can also use all DCP-related metrics explained [here](https://github.com/Trendyol/go-dcp#exposed-metrics). -All DCP-related metrics are automatically injected. It means you don't need to do anything. +All DCP-related metrics are automatically injected. It means you don't need to do anything. ## Breaking Changes