-
Notifications
You must be signed in to change notification settings - Fork 792
/
addpartitionstotxn_test.go
126 lines (112 loc) · 2.69 KB
/
addpartitionstotxn_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package kafka
import (
"context"
"net"
"strconv"
"testing"
"time"
ktesting "github.com/segmentio/kafka-go/testing"
)
func TestClientAddPartitionsToTxn(t *testing.T) {
if !ktesting.KafkaIsAtLeast("0.11.0") {
t.Skip("Skipping test because kafka version is not high enough.")
}
topic1 := makeTopic()
topic2 := makeTopic()
client, shutdown := newLocalClient()
defer shutdown()
err := clientCreateTopic(client, topic1, 3)
if err != nil {
t.Fatal(err)
}
err = clientCreateTopic(client, topic2, 3)
if err != nil {
t.Fatal(err)
}
transactionalID := makeTransactionalID()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
respc, err := waitForCoordinatorIndefinitely(ctx, client, &FindCoordinatorRequest{
Addr: client.Addr,
Key: transactionalID,
KeyType: CoordinatorKeyTypeTransaction,
})
if err != nil {
t.Fatal(err)
}
transactionCoordinator := TCP(net.JoinHostPort(respc.Coordinator.Host, strconv.Itoa(int(respc.Coordinator.Port))))
client, shutdown = newClient(transactionCoordinator)
defer shutdown()
ipResp, err := client.InitProducerID(ctx, &InitProducerIDRequest{
TransactionalID: transactionalID,
TransactionTimeoutMs: 10000,
})
if err != nil {
t.Fatal(err)
}
if ipResp.Error != nil {
t.Fatal(ipResp.Error)
}
defer func() {
err := clientEndTxn(client, &EndTxnRequest{
TransactionalID: transactionalID,
ProducerID: ipResp.Producer.ProducerID,
ProducerEpoch: ipResp.Producer.ProducerEpoch,
Committed: false,
})
if err != nil {
t.Fatal(err)
}
}()
ctx, cancel = context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
resp, err := client.AddPartitionsToTxn(ctx, &AddPartitionsToTxnRequest{
TransactionalID: transactionalID,
ProducerID: ipResp.Producer.ProducerID,
ProducerEpoch: ipResp.Producer.ProducerEpoch,
Topics: map[string][]AddPartitionToTxn{
topic1: {
{
Partition: 0,
},
{
Partition: 1,
},
{
Partition: 2,
},
},
topic2: {
{
Partition: 0,
},
{
Partition: 2,
},
},
},
})
if err != nil {
t.Fatal(err)
}
if len(resp.Topics) != 2 {
t.Errorf("expected responses for 2 topics; got: %d", len(resp.Topics))
}
for topic, partitions := range resp.Topics {
if topic == topic1 {
if len(partitions) != 3 {
t.Errorf("expected 3 partitions in response for topic %s; got: %d", topic, len(partitions))
}
}
if topic == topic2 {
if len(partitions) != 2 {
t.Errorf("expected 2 partitions in response for topic %s; got: %d", topic, len(partitions))
}
}
for _, partition := range partitions {
if partition.Error != nil {
t.Error(partition.Error)
}
}
}
}