-
-
Notifications
You must be signed in to change notification settings - Fork 195
/
main.go
150 lines (125 loc) · 4.12 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package main
import (
"context"
"flag"
"fmt"
"os"
"strconv"
"strings"
"time"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
)
var (
seedBrokers = flag.String("brokers", "localhost:9092", "comma delimited list of seed brokers")
produceTo = flag.String("produce-to", "", "input topic to produce transactionally produce to")
eosTo = flag.String("eos-to", "", "consume from produce-to, modify, and write to eos-to")
group = flag.String("group", "eos-example-group", "group to use for EOS consuming")
produceTxnID = flag.String("produce-txn-id", "eos-example-input-producer", "transactional ID to use for the input producer")
consumeTxnID = flag.String("consume-txn-id", "eos-example-eos-consumer", "transactional ID to use for the EOS consumer/producer")
)
func die(msg string, args ...any) {
fmt.Fprintf(os.Stderr, msg, args...)
os.Exit(1)
}
func main() {
flag.Parse()
if *produceTo == "" || *eosTo == "" {
die("missing either -produce-to (%s) or -eos-to (%s)", *produceTo, *eosTo)
}
go inputProducer()
go eosConsumer()
select {}
}
func inputProducer() {
cl, err := kgo.NewClient(
kgo.SeedBrokers(strings.Split(*seedBrokers, ",")...),
kgo.DefaultProduceTopic(*produceTo),
kgo.TransactionalID(*produceTxnID),
kgo.WithLogger(kgo.BasicLogger(os.Stderr, kgo.LogLevelInfo, func() string {
return "[input producer] "
})),
)
if err != nil {
die("unable to create input producer: %v", err)
}
ctx := context.Background()
for doCommit := true; ; doCommit = !doCommit {
if err := cl.BeginTransaction(); err != nil {
// We are unable to start a transaction if the client
// is not transactional or if we are already in a
// transaction. A proper transactional loop will never
// account either error.
die("unable to start transaction: %v", err)
}
msg := "commit "
if !doCommit {
msg = "abort "
}
e := kgo.AbortingFirstErrPromise(cl)
for i := 0; i < 10; i++ {
cl.Produce(ctx, kgo.StringRecord(msg+strconv.Itoa(i)), e.Promise())
}
perr := e.Err() // always evaluate e.Err() to ensure we do not short circuit in the logic below (doCommit && ... would fail if doCommit is false!)
commit := kgo.TransactionEndTry(doCommit && perr == nil)
switch err := cl.EndTransaction(ctx, commit); err {
case nil:
case kerr.OperationNotAttempted:
if err := cl.EndTransaction(ctx, kgo.TryAbort); err != nil {
die("abort failed: %v", err)
}
default:
die("commit failed: %v", err)
}
time.Sleep(10 * time.Second)
}
}
func eosConsumer() {
sess, err := kgo.NewGroupTransactSession(
kgo.SeedBrokers(strings.Split(*seedBrokers, ",")...),
kgo.DefaultProduceTopic(*eosTo),
kgo.TransactionalID(*consumeTxnID),
kgo.FetchIsolationLevel(kgo.ReadCommitted()),
kgo.WithLogger(kgo.BasicLogger(os.Stderr, kgo.LogLevelInfo, func() string {
return "[eos consumer] "
})),
kgo.ConsumerGroup(*group),
kgo.ConsumeTopics(*produceTo),
kgo.RequireStableFetchOffsets(),
)
if err != nil {
die("unable to create eos consumer/producer: %v", err)
}
defer sess.Close()
ctx := context.Background()
for {
fetches := sess.PollFetches(ctx)
if fetchErrs := fetches.Errors(); len(fetchErrs) > 0 {
for _, fetchErr := range fetchErrs {
fmt.Printf("error consuming from topic: topic=%s, partition=%d, err=%v",
fetchErr.Topic, fetchErr.Partition, fetchErr.Err)
}
// The errors may be fatal for the partition (auth
// problems), but we can still process any records if
// there are any.
}
if err := sess.Begin(); err != nil {
// Similar to above, we only encounter errors here if
// we are not transactional or are already in a
// transaction. We should not hit this error.
die("unable to start transaction: %v", err)
}
e := kgo.AbortingFirstErrPromise(sess.Client())
fetches.EachRecord(func(r *kgo.Record) {
sess.Produce(ctx, kgo.StringRecord("eos "+string(r.Value)), e.Promise())
})
committed, err := sess.End(ctx, e.Err() == nil)
if committed {
fmt.Println("eos commit successful!")
} else {
// A failed End always means an error occurred, because
// End retries as appropriate.
die("unable to eos commit: %v", err)
}
}
}