forked from segmentio/kafka-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
createtopics_test.go
116 lines (102 loc) · 2.04 KB
/
createtopics_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package kafka
import (
"bufio"
"bytes"
"context"
"reflect"
"testing"
)
func TestClientCreateTopics(t *testing.T) {
const (
topic1 = "client-topic-1"
topic2 = "client-topic-2"
topic3 = "client-topic-3"
)
client, shutdown := newLocalClient()
defer shutdown()
config := []ConfigEntry{{
ConfigName: "retention.ms",
ConfigValue: "3600000",
}}
res, err := client.CreateTopics(context.Background(), &CreateTopicsRequest{
Topics: []TopicConfig{
{
Topic: topic1,
NumPartitions: -1,
ReplicationFactor: -1,
ReplicaAssignments: []ReplicaAssignment{
{
Partition: 0,
Replicas: []int{1},
},
{
Partition: 1,
Replicas: []int{1},
},
{
Partition: 2,
Replicas: []int{1},
},
},
ConfigEntries: config,
},
{
Topic: topic2,
NumPartitions: 2,
ReplicationFactor: 1,
ConfigEntries: config,
},
{
Topic: topic3,
NumPartitions: 1,
ReplicationFactor: 1,
ConfigEntries: config,
},
},
})
if err != nil {
t.Fatal(err)
}
defer deleteTopic(t, topic1, topic2, topic3)
expectTopics := map[string]struct{}{
topic1: {},
topic2: {},
topic3: {},
}
for topic, error := range res.Errors {
delete(expectTopics, topic)
if error != nil {
t.Errorf("%s => %s", topic, error)
}
}
for topic := range expectTopics {
t.Errorf("topic missing in response: %s", topic)
}
}
func TestCreateTopicsResponseV0(t *testing.T) {
item := createTopicsResponseV0{
TopicErrors: []createTopicsResponseV0TopicError{
{
Topic: "topic",
ErrorCode: 2,
},
},
}
b := bytes.NewBuffer(nil)
w := &writeBuffer{w: b}
item.writeTo(w)
var found createTopicsResponseV0
remain, err := (&found).readFrom(bufio.NewReader(b), b.Len())
if err != nil {
t.Error(err)
t.FailNow()
}
if remain != 0 {
t.Errorf("expected 0 remain, got %v", remain)
t.FailNow()
}
if !reflect.DeepEqual(item, found) {
t.Error("expected item and found to be the same")
t.FailNow()
}
}