-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream.go
111 lines (100 loc) · 2.58 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package easykafka
import (
"context"
"github.com/segmentio/kafka-go"
"time"
)
// StreamRoutingRule is a function that modifies a message
type StreamRoutingRule[T any] func(message *T, kafkaMessage *kafka.Message) (newMessage *kafka.Message, err error)
// Stream is a wrapper around Consumer and Producer
type Stream[T any] struct {
LoggerContainer
brokers []string
groupId string
consumer *Consumer[T]
producer *BaseProducer
topics []string
consumerOptions []ConsumerOption[T]
producerOptions []BaseProducerOption
parallelJobs uint
}
// StreamOption is a function that modifies a Stream instance
type StreamOption[T any] func(stream *Stream[T]) error
// InitStream initializes a new Stream instance
func InitStream[T any](
brokers []string,
topicsList []string,
groupId string,
opts ...StreamOption[T],
) (*Stream[T], func() error) {
s := &Stream[T]{
brokers: brokers,
groupId: groupId,
parallelJobs: 5,
}
for _, opt := range opts {
if err := opt(s); err != nil {
panic(err)
}
}
consumer, closeConsumer := InitConsumer[T](
brokers,
topicsList,
groupId,
append(
s.consumerOptions,
ConsumerWithLogger[T](s.logger),
ConsumerWithErrorLogger[T](s.logger),
ConsumerConcurrency[T](s.parallelJobs),
)...,
)
s.consumer = consumer
producer, closeProducer := InitBaseProducer(
brokers,
append(
s.producerOptions,
BaseProducerWithLogger(s.logger),
BaseProducerWithErrorLogger(s.logger),
BaseProducerWithWriterConfig(&kafka.Writer{
Balancer: &kafka.LeastBytes{},
BatchTimeout: time.Nanosecond,
}),
)...,
)
s.producer = producer
return s, func() error {
err := closeProducer()
if err != nil {
return err
}
err = closeConsumer()
if err != nil {
return err
}
return nil
}
}
// Run starts the stream
func (s *Stream[T]) Run(ctx context.Context, routingRules ...StreamRoutingRule[T]) {
s.log("starting stream with %d routing rules", len(routingRules))
s.consumer.Consume(ctx, func(message *T, kafkaMessage *kafka.Message) error {
start := time.Now()
// Log message topic and partition
s.log("received message from topic %s partition %d", kafkaMessage.Topic, kafkaMessage.Partition)
for _, routingRule := range routingRules {
newMessage, err := routingRule(message, kafkaMessage)
if err != nil {
return err
}
if newMessage != nil {
err := s.producer.Produce(ctx, newMessage)
if err != nil {
s.error("error producing message: %s", err.Error())
return err
}
}
}
s.log("message processed in %s", time.Since(start))
return nil
})
}