Skip to content

Commit

Permalink
feat: add producer interceptor interface (#145)
Browse files Browse the repository at this point in the history
* feat: add producer interceptor interface

* fix: redpanda docker image

* fix: producer tests

* fix: linter

* feat: add multi interceptor support

* fix: refactoring code

* docs: add producer interceptor example
  • Loading branch information
ademekici authored Sep 27, 2024
1 parent a40194d commit 81762bd
Show file tree
Hide file tree
Showing 7 changed files with 271 additions and 37 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ After running `docker-compose up` command, you can run any application you want.

</details>

#### With Producer Interceptor

Please refer to [Producer Interceptor Example](examples/with-kafka-producer-interceptor)

#### With Distributed Tracing Support

Please refer to [Tracing Example](examples/with-tracing/README.md)
Expand Down
38 changes: 38 additions & 0 deletions examples/with-kafka-producer-interceptor/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package main

import (
"context"
"fmt"
"github.com/Trendyol/kafka-konsumer/v2"
)

func main() {
producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
Writer: kafka.WriterConfig{
Brokers: []string{"localhost:29092"},
},
}, newProducerInterceptor()...)

const topicName = "standart-topic"

_ = producer.Produce(context.Background(), kafka.Message{
Topic: topicName,
Key: []byte("1"),
Value: []byte(`{ "foo": "bar" }`),
})

_ = producer.ProduceBatch(context.Background(), []kafka.Message{
{
Topic: topicName,
Key: []byte("1"),
Value: []byte(`{ "foo": "bar" }`),
},
{
Topic: topicName,
Key: []byte("2"),
Value: []byte(`{ "foo2": "bar2" }`),
},
})

fmt.Println("Messages sended...!")
}
16 changes: 16 additions & 0 deletions examples/with-kafka-producer-interceptor/producer-interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package main

import "github.com/Trendyol/kafka-konsumer/v2"

type producerInterceptor struct{}

func (i *producerInterceptor) OnProduce(ctx kafka.ProducerInterceptorContext) {
ctx.Message.Headers = append(ctx.Message.Headers, kafka.Header{
Key: "x-source-app",
Value: []byte("kafka-konsumer"),
})
}

func newProducerInterceptor() []kafka.ProducerInterceptor {
return []kafka.ProducerInterceptor{&producerInterceptor{}}
}
34 changes: 25 additions & 9 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ type Writer interface {
}

type producer struct {
w Writer
w Writer
interceptors []ProducerInterceptor
}

func NewProducer(cfg *ProducerConfig) (Producer, error) {
func NewProducer(cfg *ProducerConfig, interceptors ...ProducerInterceptor) (Producer, error) {
kafkaWriter := &kafka.Writer{
Addr: kafka.TCP(cfg.Writer.Brokers...),
Topic: cfg.Writer.Topic,
Expand Down Expand Up @@ -51,7 +52,7 @@ func NewProducer(cfg *ProducerConfig) (Producer, error) {
kafkaWriter.Transport = transport
}

p := &producer{w: kafkaWriter}
p := &producer{w: kafkaWriter, interceptors: interceptors}

if cfg.DistributedTracingEnabled {
otelWriter, err := NewOtelProducer(cfg, kafkaWriter)
Expand All @@ -64,18 +65,33 @@ func NewProducer(cfg *ProducerConfig) (Producer, error) {
return p, nil
}

func (c *producer) Produce(ctx context.Context, message Message) error {
return c.w.WriteMessages(ctx, message.toKafkaMessage())
func (p *producer) Produce(ctx context.Context, message Message) error {
if len(p.interceptors) > 0 {
p.executeInterceptors(ctx, &message)
}

return p.w.WriteMessages(ctx, message.toKafkaMessage())
}

func (c *producer) ProduceBatch(ctx context.Context, messages []Message) error {
func (p *producer) ProduceBatch(ctx context.Context, messages []Message) error {
kafkaMessages := make([]kafka.Message, 0, len(messages))
for i := range messages {
if len(p.interceptors) > 0 {
p.executeInterceptors(ctx, &messages[i])
}

kafkaMessages = append(kafkaMessages, messages[i].toKafkaMessage())
}
return c.w.WriteMessages(ctx, kafkaMessages...)

return p.w.WriteMessages(ctx, kafkaMessages...)
}

func (p *producer) executeInterceptors(ctx context.Context, message *Message) {
for _, interceptor := range p.interceptors {
interceptor.OnProduce(ProducerInterceptorContext{Context: ctx, Message: message})
}
}

func (c *producer) Close() error {
return c.w.Close()
func (p *producer) Close() error {
return p.w.Close()
}
14 changes: 14 additions & 0 deletions producer_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package kafka

import (
"context"
)

type ProducerInterceptorContext struct {
Context context.Context
Message *Message
}

type ProducerInterceptor interface {
OnProduce(ctx ProducerInterceptorContext)
}
51 changes: 50 additions & 1 deletion producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"testing"

"github.com/gofiber/fiber/v2/utils"

"github.com/segmentio/kafka-go"
)

Expand All @@ -20,6 +22,26 @@ func Test_producer_Produce_Successfully(t *testing.T) {
}
}

func Test_producer_Produce_interceptor_Successfully(t *testing.T) {
// Given
mw := &mockWriter{}
msg := Message{Headers: make([]Header, 0)}
msg.Headers = append(msg.Headers, kafka.Header{
Key: "x-correlation-id",
Value: []byte(utils.UUIDv4()),
})
interceptor := newMockProducerInterceptor()

p := producer{w: mw, interceptors: interceptor}

// When
err := p.Produce(context.Background(), msg)
// Then
if err != nil {
t.Fatalf("Producing err %s", err.Error())
}
}

func Test_producer_ProduceBatch_Successfully(t *testing.T) {
// Given
mw := &mockWriter{}
Expand All @@ -33,6 +55,20 @@ func Test_producer_ProduceBatch_Successfully(t *testing.T) {
}
}

func Test_producer_ProduceBatch_interceptor_Successfully(t *testing.T) {
// Given
mw := &mockWriter{}
interceptor := newMockProducerInterceptor()
p := producer{w: mw, interceptors: interceptor}

// When
err := p.ProduceBatch(context.Background(), []Message{{}, {}, {}})
// Then
if err != nil {
t.Fatalf("Batch Producing err %s", err.Error())
}
}

func Test_producer_Close_Successfully(t *testing.T) {
// Given
mw := &mockWriter{}
Expand All @@ -48,10 +84,23 @@ func Test_producer_Close_Successfully(t *testing.T) {

type mockWriter struct{}

func (m *mockWriter) WriteMessages(_ context.Context, _ ...kafka.Message) error {
func (m *mockWriter) WriteMessages(_ context.Context, msg ...kafka.Message) error {
return nil
}

func (m *mockWriter) Close() error {
return nil
}

type mockProducerInterceptor struct{}

func (i *mockProducerInterceptor) OnProduce(ctx ProducerInterceptorContext) {
ctx.Message.Headers = append(ctx.Message.Headers, kafka.Header{
Key: "test",
Value: []byte("test"),
})
}

func newMockProducerInterceptor() []ProducerInterceptor {
return []ProducerInterceptor{&mockProducerInterceptor{}}
}
Loading

0 comments on commit 81762bd

Please sign in to comment.