Skip to content

Commit

Permalink
feat: rename as go-dcp-kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Jul 12, 2023
1 parent 7dcdf18 commit 7cabe74
Show file tree
Hide file tree
Showing 20 changed files with 205 additions and 298 deletions.
4 changes: 2 additions & 2 deletions .goreleaser.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
project_name: go-kafka-connect-couchbase
project_name: go-dcp-kafka

release:
github:
name: go-kafka-connect-couchbase
name: go-dcp-kafka
owner: Trendyol

before:
Expand Down
4 changes: 2 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
## Contribution Guidelines

Thank you for your interest in go-kafka-connect-couchbase!
Thank you for your interest in go-dcp-kafka!

This project welcomes contributions and suggestions. Most contributions require you to signoff on your commits. Please
follow the instructions provided below.
Expand Down Expand Up @@ -36,7 +36,7 @@ If you find your issue already exists, make relevant comments and add your react
2. For bugs
Check it's not an environment issue. For example, if your configurations correct or network connections is alive.

### Contributing to go-kafka-connect-couchbase
### Contributing to go-dcp-kafka

Pull Requests
All contributions come through pull requests. To submit a proposed change, we recommend following this workflow:
Expand Down
101 changes: 50 additions & 51 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Go Kafka Connect Couchbase
# Go Dcp Kafka

[![Go Reference](https://pkg.go.dev/badge/github.com/Trendyol/go-kafka-connect-couchbase.svg)](https://pkg.go.dev/github.com/Trendyol/go-kafka-connect-couchbase) [![Go Report Card](https://goreportcard.com/badge/github.com/Trendyol/go-kafka-connect-couchbase)](https://goreportcard.com/report/github.com/Trendyol/go-kafka-connect-couchbase)
[![Go Reference](https://pkg.go.dev/badge/github.com/Trendyol/go-dcp-kafka.svg)](https://pkg.go.dev/github.com/Trendyol/go-dcp-kafka) [![Go Report Card](https://goreportcard.com/badge/github.com/Trendyol/go-dcp-kafka)](https://goreportcard.com/report/github.com/Trendyol/go-dcp-kafka)

Go implementation of the [Kafka Connect Couchbase](https://github.com/couchbase/kafka-connect-couchbase).

**Go Kafka Connect Couchbase** streams documents from Couchbase Database Change Protocol (DCP) and publishes Kafka
**Go Dcp Kafka** streams documents from Couchbase Database Change Protocol (DCP) and publishes Kafka
events in near real-time.

## Features
Expand All @@ -18,7 +18,7 @@ events in near real-time.
* Metadata can be saved to **Couchbase or Kafka**.
* **Managing batch configurations** such as maximum batch size, batch bytes, batch ticker durations.
* **Scale up and down** by custom membership algorithms(Couchbase, KubernetesHa, Kubernetes StatefulSet or
Static, see [examples](https://github.com/Trendyol/go-dcp-client#examples)).
Static, see [examples](https://github.com/Trendyol/go-dcp#examples)).
* **Easily manageable configurations**.

## Benchmarks
Expand All @@ -29,7 +29,7 @@ used for both connectors.

| Package | Time to Process Events | Average CPU Usage(Core) | Average Memory Usage |
|:-------------------------------------|:----------------------:|:-----------------------:|:--------------------:|
| **Go Kafka Connect Couchbase**(1.19) | **12s** | **0.383** | **428MB**
| **Go Dcp Kafka**(1.19) | **12s** | **0.383** | **428MB**
| Java Kafka Connect Couchbase(JDK11) | 19s | 1.5 | 932MB

## Example
Expand All @@ -38,53 +38,52 @@ used for both connectors.

```go
func mapper(event couchbase.Event) []message.KafkaMessage {
// return nil if you wish to discard the event
return []message.KafkaMessage{
{
Headers: nil,
Key: event.Key,
Value: event.Value,
},
}
// return nil if you wish to discard the event
return []message.KafkaMessage{
{
Headers: nil,
Key: event.Key,
Value: event.Value,
},
}
}

func main() {
c, err := dcpkafka.NewConnector(&config.Connector{
Dcp: dcpClientConfig.Dcp{
Hosts: []string{"localhost:8091"},
Username: "user",
Password: "password",
BucketName: "dcp-test",
Dcp: dcpClientConfig.ExternalDcp{
Group: dcpClientConfig.DCPGroup{
Name: "groupName",
Membership: dcpClientConfig.DCPGroupMembership{
RebalanceDelay: 3 * time.Second,
},
},
},
Metadata: dcpClientConfig.Metadata{
Config: map[string]string{
"bucket": "checkpoint-bucket-name",
"scope": "_default",
"collection": "_default",
},
Type: "couchbase",
},
Debug: true},
Kafka: config.Kafka{
CollectionTopicMapping: map[string]string{"_default": "topic"},
Brokers: []string{"localhost:9092"},
},
}, mapper)
if err != nil {
panic(err)
}

defer c.Close()
c.Start()
c, err := dcpkafka.NewConnector(&config.Connector{
Dcp: dcpConfig.Dcp{
Hosts: []string{"localhost:8091"},
Username: "user",
Password: "password",
BucketName: "dcp-test",
Dcp: dcpConfig.ExternalDcp{
Group: dcpConfig.DCPGroup{
Name: "groupName",
Membership: dcpConfig.DCPGroupMembership{
RebalanceDelay: 3 * time.Second,
},
},
},
Metadata: dcpConfig.Metadata{
Config: map[string]string{
"bucket": "checkpoint-bucket-name",
"scope": "_default",
"collection": "_default",
},
Type: "couchbase",
},
Debug: true},
Kafka: config.Kafka{
CollectionTopicMapping: map[string]string{"_default": "topic"},
Brokers: []string{"localhost:9092"},
},
}, mapper)
if err != nil {
panic(err)
}

defer c.Close()
c.Start()
}

```

[File Config](example/simple/main.go)
Expand All @@ -93,7 +92,7 @@ func main() {

### Dcp Configuration

Check out on [go-dcp-client](https://github.com/Trendyol/go-dcp-client#configuration)
Check out on [go-dcp](https://github.com/Trendyol/go-dcp#configuration)

### Kafka Specific Configuration

Expand Down Expand Up @@ -129,11 +128,11 @@ Check out on [go-dcp-client](https://github.com/Trendyol/go-dcp-client#configura
| kafka_connector_latency_ms | Time to adding to the batch. | N/A | Gauge |
| kafka_connector_batch_produce_latency_ms | Time to produce messages in the batch. | N/A | Gauge |

For DCP related metrics see [also](https://github.com/Trendyol/go-dcp-client#exposed-metrics).
For DCP related metrics see [also](https://github.com/Trendyol/go-dcp#exposed-metrics).

## Contributing

Go Kafka Connect Couchbase is always open for direct contributions. For more information please check
Go Dcp Kafka is always open for direct contributions. For more information please check
our [Contribution Guideline document](./CONTRIBUTING.md).

## License
Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package config
import (
"time"

"github.com/Trendyol/go-dcp-client/config"
"github.com/Trendyol/go-dcp/config"
)

type Kafka struct {
Expand Down
53 changes: 27 additions & 26 deletions connector.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
package gokafkaconnectcouchbase
package dcpkafka

import (
"errors"
"fmt"
"os"

"github.com/Trendyol/go-dcp-client"
dcpClientConfig "github.com/Trendyol/go-dcp-client/config"
"github.com/Trendyol/go-dcp-client/logger"
"github.com/Trendyol/go-dcp-client/models"
"github.com/Trendyol/go-kafka-connect-couchbase/config"
"github.com/Trendyol/go-kafka-connect-couchbase/couchbase"
"github.com/Trendyol/go-kafka-connect-couchbase/kafka"
"github.com/Trendyol/go-kafka-connect-couchbase/kafka/metadata"
"github.com/Trendyol/go-kafka-connect-couchbase/kafka/producer"
"github.com/Trendyol/go-kafka-connect-couchbase/metric"
"github.com/Trendyol/go-dcp"

"github.com/Trendyol/go-dcp-kafka/config"
"github.com/Trendyol/go-dcp-kafka/couchbase"
"github.com/Trendyol/go-dcp-kafka/kafka"
"github.com/Trendyol/go-dcp-kafka/kafka/metadata"
"github.com/Trendyol/go-dcp-kafka/kafka/producer"
"github.com/Trendyol/go-dcp-kafka/metric"
dcpConfig "github.com/Trendyol/go-dcp/config"
"github.com/Trendyol/go-dcp/logger"
"github.com/Trendyol/go-dcp/models"
sKafka "github.com/segmentio/kafka-go"
"gopkg.in/yaml.v3"
)
Expand All @@ -27,7 +28,7 @@ type Connector interface {
}

type connector struct {
dcp godcpclient.Dcp
dcp dcp.Dcp
mapper Mapper
producer producer.Producer
config *config.Connector
Expand Down Expand Up @@ -114,28 +115,28 @@ func NewConnector(cfg any, mapper Mapper) (Connector, error) {
return nil, err
}

dcp, err := createDcp(cfg, connector.produce, connector.logger, connector.errorLogger)
dcpClient, err := createDcp(cfg, connector.produce, connector.logger, connector.errorLogger)
if err != nil {
connector.errorLogger.Printf("dcp error: %v", err)
return nil, err
}

dcpConfig := dcp.GetConfig()
dcpConfig.Checkpoint.Type = "manual"
conf := dcpClient.GetConfig()
conf.Checkpoint.Type = "manual"

if dcpConfig.Metadata.Type == MetadataTypeKafka {
setKafkaMetadata(kafkaClient, dcpConfig, connector, dcp)
if conf.Metadata.Type == MetadataTypeKafka {
setKafkaMetadata(kafkaClient, conf, connector, dcpClient)
}

connector.dcp = dcp
connector.dcp = dcpClient

connector.producer, err = producer.NewProducer(kafkaClient, c, connector.logger, connector.errorLogger, dcp.Commit)
connector.producer, err = producer.NewProducer(kafkaClient, c, connector.logger, connector.errorLogger, dcpClient.Commit)
if err != nil {
connector.errorLogger.Printf("kafka error: %v", err)
return nil, err
}

initializeMetricCollector(connector, dcp)
initializeMetricCollector(connector, dcpClient)

return connector, nil
}
Expand Down Expand Up @@ -176,23 +177,23 @@ func createKafkaClient(cc *config.Connector, connector *connector) (kafka.Client
return kafkaClient, nil
}

func createDcp(cfg any, listener models.Listener, logger logger.Logger, errorLogger logger.Logger) (godcpclient.Dcp, error) {
func createDcp(cfg any, listener models.Listener, logger logger.Logger, errorLogger logger.Logger) (dcp.Dcp, error) {
switch v := cfg.(type) {
case dcpClientConfig.Dcp:
return godcpclient.NewDcpWithLoggers(v.Dcp, listener, logger, errorLogger)
case dcpConfig.Dcp:
return dcp.NewDcpWithLoggers(v.Dcp, listener, logger, errorLogger)
case string:
return godcpclient.NewDcpWithLoggers(v, listener, logger, errorLogger)
return dcp.NewDcpWithLoggers(v, listener, logger, errorLogger)
default:
return nil, errors.New("invalid config")
}
}

func setKafkaMetadata(kafkaClient kafka.Client, dcpConfig *dcpClientConfig.Dcp, connector *connector, dcp godcpclient.Dcp) {
func setKafkaMetadata(kafkaClient kafka.Client, dcpConfig *dcpConfig.Dcp, connector *connector, dcp dcp.Dcp) {
kafkaMetadata := metadata.NewKafkaMetadata(kafkaClient, dcpConfig.Metadata.Config, connector.logger, connector.errorLogger)
dcp.SetMetadata(kafkaMetadata)
}

func initializeMetricCollector(connector *connector, dcp godcpclient.Dcp) {
func initializeMetricCollector(connector *connector, dcp dcp.Dcp) {
metricCollector := metric.NewMetricCollector(connector.producer)
dcp.SetMetricCollectors(metricCollector)
}
Expand Down
18 changes: 9 additions & 9 deletions example/README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# How to Create a New Connector

If you want to create a new connector that streams mutations from Couchbase to Kafka, you can use the `go-kafka-connect-couchbase` library.
If you want to create a new connector that streams mutations from Couchbase to Kafka, you can use the `go-dcp-kafka` library.
This library provides a simple and flexible way to implement connectors that can be customized to your needs.

## Step 1: Installing the Library

To use the `go-kafka-connect-couchbase` library, you first need to install it. You can do this using the `go get` command:
To use the `go-dcp-kafka` library, you first need to install it. You can do this using the `go get` command:

```
$ go get github.com/Trendyol/go-kafka-connect-couchbase
$ go get github.com/Trendyol/go-dcp-kafka
```

Expand All @@ -23,8 +23,8 @@ Here's an example mapper implementation:
package main

import (
"github.com/Trendyol/go-kafka-connect-couchbase/couchbase"
"github.com/Trendyol/go-kafka-connect-couchbase/kafka/message"
"github.com/Trendyol/go-dcp-kafka/couchbase"
"github.com/Trendyol/go-dcp-kafka/kafka/message"
)

func mapper(event couchbase.Event) []message.KafkaMessage {
Expand All @@ -41,13 +41,13 @@ func mapper(event couchbase.Event) []message.KafkaMessage {

## Step 3: Configuring the Connector

The configuration for the connector is provided via a YAML file. Here's an example [configuration](https://github.com/Trendyol/go-kafka-connect-couchbase/blob/master/example/config.yml):
The configuration for the connector is provided via a YAML file. Here's an example [configuration](https://github.com/Trendyol/go-dcp-kafka/blob/master/example/config.yml):

You can find explanation of [configurations](https://github.com/Trendyol/go-dcp-client#configuration)
You can find explanation of [configurations](https://github.com/Trendyol/go-dcp#configuration)

You can pass this configuration file to the connector by providing the path to the file when creating the connector:
```go
connector, err := gokafkaconnectcouchbase.NewConnector("path-to-config.yml", mapper)
connector, err := dcpkafka.NewConnector("path-to-config.yml", mapper)
if err != nil {
panic(err)
}
Expand All @@ -68,4 +68,4 @@ This will start the connector, which will continuously listen for Couchbase muta

The connector features an API that allows for management and observation, and it also exposes multiple metrics to facilitate these tasks.

You can find explanation [here](https://github.com/Trendyol/go-dcp-client#monitoring)
You can find explanation [here](https://github.com/Trendyol/go-dcp#monitoring)
Loading

0 comments on commit 7cabe74

Please sign in to comment.