diff --git a/connector.go b/connector.go index 0bfc69d..a3d81d2 100644 --- a/connector.go +++ b/connector.go @@ -96,7 +96,7 @@ func (c *connector) getTopicName(collectionName string, messageTopic string) str return topic } -func newConnector(cfg any, mapper Mapper) (Connector, error) { +func newConnector(cfg any, mapper Mapper, sinkResponseHandler kafka.SinkResponseHandler) (Connector, error) { c, err := newConfig(cfg) if err != nil { return nil, err @@ -128,7 +128,8 @@ func newConnector(cfg any, mapper Mapper) (Connector, error) { connector.dcp = dcpClient - connector.producer, err = producer.NewProducer(kafkaClient, c, dcpClient.Commit) + connector.producer, err = producer.NewProducer(kafkaClient, c, dcpClient.Commit, sinkResponseHandler) + if err != nil { logger.Log.Error("kafka error: %v", err) return nil, err @@ -200,14 +201,16 @@ func newConnectorConfigFromPath(path string) (*config.Connector, error) { } type ConnectorBuilder struct { - mapper Mapper - config any + mapper Mapper + config any + sinkResponseHandler kafka.SinkResponseHandler } func NewConnectorBuilder(config any) *ConnectorBuilder { return &ConnectorBuilder{ - config: config, - mapper: DefaultMapper, + config: config, + mapper: DefaultMapper, + sinkResponseHandler: nil, } } @@ -216,8 +219,13 @@ func (c *ConnectorBuilder) SetMapper(mapper Mapper) *ConnectorBuilder { return c } +func (c *ConnectorBuilder) SetSinkResponseHandler(sinkResponseHandler kafka.SinkResponseHandler) *ConnectorBuilder { + c.sinkResponseHandler = sinkResponseHandler + return c +} + func (c *ConnectorBuilder) Build() (Connector, error) { - return newConnector(c.config, c.mapper) + return newConnector(c.config, c.mapper, c.sinkResponseHandler) } func (c *ConnectorBuilder) SetLogger(l *logrus.Logger) *ConnectorBuilder { diff --git a/example/README.md b/example/README.md index 1224731..9d803a4 100644 --- a/example/README.md +++ b/example/README.md @@ -39,7 +39,25 @@ func mapper(event couchbase.Event) []message.KafkaMessage { } ``` -## Step 3: Configuring the Connector +## Step 3: Implementing the SinkResponseHandler + +This function is called after the event is published and takes `message.KafkaMessage` as a parameter. +Here's an example SinkResponseHandler implementation: + +```go +type sinkResponseHandler struct { +} + +func (s *sinkResponseHandler) OnSuccess(ctx *kafka.SinkResponseHandlerContext) { +fmt.Printf("OnSuccess %v\n", string(ctx.Message.Value)) +} + +func (s *sinkResponseHandler) OnError(ctx *kafka.SinkResponseHandlerContext) { +fmt.Printf("OnError %v\n", string(ctx.Message.Value)) +} +``` + +## Step 4: Configuring the Connector The configuration for the connector is provided via a YAML file. Here's an example [configuration](https://github.com/Trendyol/go-dcp-kafka/blob/master/example/config.yml): @@ -47,13 +65,17 @@ You can find explanation of [configurations](https://github.com/Trendyol/go-dcp# You can pass this configuration file to the connector by providing the path to the file when creating the connector: ```go -connector, err := dcpkafka.NewConnector("path-to-config.yml", mapper) +connector, err := dcpkafka.NewConnectorBuilder("config.yml"). + SetMapper(mapper). + SetSinkResponseHandler(&sinkResponseHandler{}). // if you want to add callback func + Build() + if err != nil { -panic(err) + panic(err) } ``` -## Step 4: Starting and Closing the Connector +## Step 5: Starting and Closing the Connector Once you have implemented the mapper and configured the connector, you can start/stop the connector: diff --git a/example/simple/main.go b/example/simple/main.go index 849a020..dcbeac9 100644 --- a/example/simple/main.go +++ b/example/simple/main.go @@ -1,8 +1,10 @@ package main import ( - "github.com/Trendyol/go-dcp-kafka" + "fmt" + dcpkafka "github.com/Trendyol/go-dcp-kafka" "github.com/Trendyol/go-dcp-kafka/couchbase" + "github.com/Trendyol/go-dcp-kafka/kafka" "github.com/Trendyol/go-dcp-kafka/kafka/message" ) @@ -17,8 +19,22 @@ func mapper(event couchbase.Event) []message.KafkaMessage { } } +type sinkResponseHandler struct { +} + +func (s *sinkResponseHandler) OnSuccess(ctx *kafka.SinkResponseHandlerContext) { + fmt.Printf("OnSuccess %v\n", string(ctx.Message.Value)) +} + +func (s *sinkResponseHandler) OnError(ctx *kafka.SinkResponseHandlerContext) { + fmt.Printf("OnError %v\n", string(ctx.Message.Value)) +} + func main() { - c, err := dcpkafka.NewConnectorBuilder("config.yml").SetMapper(mapper).Build() + c, err := dcpkafka.NewConnectorBuilder("config.yml"). + SetMapper(mapper). + SetSinkResponseHandler(&sinkResponseHandler{}). + Build() if err != nil { panic(err) } diff --git a/kafka/producer/producer.go b/kafka/producer/producer.go index 71e2f57..69aac35 100644 --- a/kafka/producer/producer.go +++ b/kafka/producer/producer.go @@ -23,6 +23,7 @@ type Producer struct { func NewProducer(kafkaClient gKafka.Client, config *config.Connector, dcpCheckpointCommit func(), + sinkResponseHandler gKafka.SinkResponseHandler, ) (Producer, error) { writer := kafkaClient.Producer() @@ -33,6 +34,7 @@ func NewProducer(kafkaClient gKafka.Client, config.Kafka.ProducerBatchSize, int64(helpers.ResolveUnionIntOrStringValue(config.Kafka.ProducerBatchBytes)), dcpCheckpointCommit, + sinkResponseHandler, ), }, nil } diff --git a/kafka/producer/producer_batch.go b/kafka/producer/producer_batch.go index 5c8773c..4b0ce7f 100644 --- a/kafka/producer/producer_batch.go +++ b/kafka/producer/producer_batch.go @@ -10,6 +10,8 @@ import ( "syscall" "time" + gKafka "github.com/Trendyol/go-dcp-kafka/kafka" + "github.com/Trendyol/go-dcp-kafka/kafka/message" "github.com/Trendyol/go-dcp/logger" "github.com/Trendyol/go-dcp/models" @@ -17,6 +19,7 @@ import ( ) type Batch struct { + sinkResponseHandler gKafka.SinkResponseHandler batchTicker *time.Ticker Writer *kafka.Writer dcpCheckpointCommit func() @@ -36,6 +39,7 @@ func newBatch( batchLimit int, batchBytes int64, dcpCheckpointCommit func(), + sinkResponseHandler gKafka.SinkResponseHandler, ) *Batch { batch := &Batch{ batchTickerDuration: batchTime, @@ -46,6 +50,7 @@ func newBatch( batchLimit: batchLimit, dcpCheckpointCommit: dcpCheckpointCommit, batchBytes: batchBytes, + sinkResponseHandler: sinkResponseHandler, } return batch } @@ -108,15 +113,31 @@ func (b *Batch) FlushMessages() { if len(b.messages) > 0 { startedTime := time.Now() err := b.Writer.WriteMessages(context.Background(), b.messages...) - if err != nil { + + if err != nil && b.sinkResponseHandler == nil { if isFatalError(err) { panic(fmt.Errorf("permanent error on Kafka side %v", err)) } logger.Log.Error("batch producer flush error %v", err) return } + b.metric.BatchProduceLatency = time.Since(startedTime).Milliseconds() + if b.sinkResponseHandler != nil { + switch e := err.(type) { + case nil: + b.handleResponseSuccess() + case kafka.WriteErrors: + b.handleWriteError(e) + case kafka.MessageTooLargeError: + b.handleMessageTooLargeError(e) + return + default: + logger.Log.Error("batch producer flush error %v", err) + return + } + } b.messages = b.messages[:0] b.currentMessageBytes = 0 b.batchTicker.Reset(b.batchTickerDuration) @@ -136,3 +157,44 @@ func isFatalError(err error) bool { } return true } + +func (b *Batch) handleWriteError(writeErrors kafka.WriteErrors) { + for i := range writeErrors { + if writeErrors[i] != nil { + b.sinkResponseHandler.OnError(&gKafka.SinkResponseHandlerContext{ + Message: convertKafkaMessage(b.messages[i]), + Err: writeErrors[i], + }) + } else { + b.sinkResponseHandler.OnSuccess(&gKafka.SinkResponseHandlerContext{ + Message: convertKafkaMessage(b.messages[i]), + Err: nil, + }) + } + } +} + +func (b *Batch) handleResponseSuccess() { + for _, msg := range b.messages { + b.sinkResponseHandler.OnSuccess(&gKafka.SinkResponseHandlerContext{ + Message: convertKafkaMessage(msg), + Err: nil, + }) + } +} + +func (b *Batch) handleMessageTooLargeError(mTooLargeError kafka.MessageTooLargeError) { + b.sinkResponseHandler.OnError(&gKafka.SinkResponseHandlerContext{ + Message: convertKafkaMessage(mTooLargeError.Message), + Err: mTooLargeError, + }) +} + +func convertKafkaMessage(src kafka.Message) *message.KafkaMessage { + return &message.KafkaMessage{ + Topic: src.Topic, + Headers: src.Headers, + Key: src.Key, + Value: src.Value, + } +} diff --git a/kafka/sink_response_handler.go b/kafka/sink_response_handler.go new file mode 100644 index 0000000..401840a --- /dev/null +++ b/kafka/sink_response_handler.go @@ -0,0 +1,15 @@ +package kafka + +import ( + "github.com/Trendyol/go-dcp-kafka/kafka/message" +) + +type SinkResponseHandlerContext struct { + Message *message.KafkaMessage + Err error +} + +type SinkResponseHandler interface { + OnSuccess(ctx *SinkResponseHandlerContext) + OnError(ctx *SinkResponseHandlerContext) +}