forked from segmentio/kafka-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
dialer_test.go
403 lines (358 loc) · 11.2 KB
/
dialer_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
package kafka
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"net"
"reflect"
"sort"
"testing"
"time"
)
func TestDialer(t *testing.T) {
tests := []struct {
scenario string
function func(*testing.T, context.Context, *Dialer)
}{
{
scenario: "looking up partitions returns the list of available partitions for a topic",
function: testDialerLookupPartitions,
},
}
for _, test := range tests {
testFunc := test.function
t.Run(test.scenario, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
testFunc(t, ctx, &Dialer{})
})
}
}
func testDialerLookupPartitions(t *testing.T, ctx context.Context, d *Dialer) {
client, topic, shutdown := newLocalClientAndTopic()
defer shutdown()
// Write a message to ensure the partition gets created.
w := &Writer{
Addr: TCP("localhost:9092"),
Topic: topic,
Transport: client.Transport,
}
w.WriteMessages(ctx, Message{})
w.Close()
partitions, err := d.LookupPartitions(ctx, "tcp", "localhost:9092", topic)
if err != nil {
t.Error(err)
return
}
sort.Slice(partitions, func(i int, j int) bool {
return partitions[i].ID < partitions[j].ID
})
want := []Partition{
{
Topic: topic,
Leader: Broker{Host: "localhost", Port: 9092, ID: 1},
Replicas: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
Isr: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
ID: 0,
},
}
if !reflect.DeepEqual(partitions, want) {
t.Errorf("bad partitions:\ngot: %+v\nwant: %+v", partitions, want)
}
}
func tlsConfig(t *testing.T) *tls.Config {
const (
certPEM = `-----BEGIN CERTIFICATE-----
MIID2zCCAsOgAwIBAgIJAMSqbewCgw4xMA0GCSqGSIb3DQEBCwUAMGAxCzAJBgNV
BAYTAlVTMRMwEQYDVQQIDApDYWxpZm9ybmlhMRYwFAYDVQQHDA1TYW4gRnJhbmNp
c2NvMRAwDgYDVQQKDAdTZWdtZW50MRIwEAYDVQQDDAlsb2NhbGhvc3QwHhcNMTcx
MjIzMTU1NzAxWhcNMjcxMjIxMTU1NzAxWjBgMQswCQYDVQQGEwJVUzETMBEGA1UE
CAwKQ2FsaWZvcm5pYTEWMBQGA1UEBwwNU2FuIEZyYW5jaXNjbzEQMA4GA1UECgwH
U2VnbWVudDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOC
AQ8AMIIBCgKCAQEAtda9OWKYNtINe/BKAoB+/zLg2qbaTeHN7L722Ug7YoY6zMVB
aQEHrUmshw/TOrT7GLN/6e6rFN74UuNg72C1tsflZvxqkGdrup3I3jxMh2ApAxLi
zem/M6Eke2OAqt+SzRPqc5GXH/nrWVd3wqg48DZOAR0jVTY2e0fWy+Er/cPJI1lc
L6ZMIRJikHTXkaiFj2Jct1iWvgizx5HZJBxXJn2Awix5nvc+zmXM0ZhoedbJRoBC
dGkRXd3xv2F4lqgVHtP3Ydjc/wYoPiGudSAkhyl9tnkHjvIjA/LeRNshWHbCIaQX
yemnXIcyyf+W+7EK0gXio7uiP+QSoM5v/oeVMQIDAQABo4GXMIGUMHoGA1UdIwRz
MHGhZKRiMGAxCzAJBgNVBAYTAlVTMRMwEQYDVQQIDApDYWxpZm9ybmlhMRYwFAYD
VQQHDA1TYW4gRnJhbmNpc2NvMRAwDgYDVQQKDAdTZWdtZW50MRIwEAYDVQQDDAls
b2NhbGhvc3SCCQCBYUuEuypDMTAJBgNVHRMEAjAAMAsGA1UdDwQEAwIE8DANBgkq
hkiG9w0BAQsFAAOCAQEATk6IlVsXtNp4C1yeegaM+jE8qgKJfNm1sV27zKx8HPiO
F7LvTGYIG7zd+bf3pDSwRxfBhsLEwmN9TUN1d6Aa9zeu95qOnR76POfHILgttu2w
IzegO8I7BycnLjU9o/l9gCpusnN95tIYQhfD08ygUpYTQRuI0cmZ/Dp3xb0S9f5N
miYTuUoStYSA4RWbDWo+Is9YWPu7rwieziOZ96oguGz3mtqvkjxVAQH1xZr3bKHr
HU9LpQh0i6oTK0UCqnDwlhJl1c7A3UooxFpc3NGxyjogzTfI/gnBKfPo7eeswwsV
77rjIkhBW49L35KOo1uyblgK1vTT7VPtzJnuDq3ORg==
-----END CERTIFICATE-----`
keyPEM = `-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEAtda9OWKYNtINe/BKAoB+/zLg2qbaTeHN7L722Ug7YoY6zMVB
aQEHrUmshw/TOrT7GLN/6e6rFN74UuNg72C1tsflZvxqkGdrup3I3jxMh2ApAxLi
zem/M6Eke2OAqt+SzRPqc5GXH/nrWVd3wqg48DZOAR0jVTY2e0fWy+Er/cPJI1lc
L6ZMIRJikHTXkaiFj2Jct1iWvgizx5HZJBxXJn2Awix5nvc+zmXM0ZhoedbJRoBC
dGkRXd3xv2F4lqgVHtP3Ydjc/wYoPiGudSAkhyl9tnkHjvIjA/LeRNshWHbCIaQX
yemnXIcyyf+W+7EK0gXio7uiP+QSoM5v/oeVMQIDAQABAoIBAQCa6roHW8JGYipu
vsau3v5TOOtsHN67n3arDf6MGwfM5oLN1ffmF6SMs8myv36781hBMRv3FwjWHSf+
pgz9o6zsbd05Ii8/m3yiXq609zZT107ZeYuU1mG5AL5uCNWjvhn5cdA6aX0RFwC0
+tnjEyJ/NCS8ujBR9n/wA8IxrEKoTGcxRb6qFPPKWYoBevu34td1Szf0kH8AKjtQ
rdPK0Of/ZEiAUxNMLTBEOmC0ZabxJV/YGWcUU4DpmEDZSgQSr4yLT4BFUwF2VC8t
8VXn5dBP3RMo4h7JlteulcKYsMQZXD6KvUwY2LaEpFM/b14r+TZTUQGhwS+Ha11m
xa4eNwFhAoGBANshGlpR9cUUq8vNex0Wb63P9BTRTXwg1yEJVMSua+DlaaqaX/hS
hOxl3K4y2V5OCK31C+SOAqqbrGtMXVym5c5pX8YyC11HupFJwdFLUEc74uF3CtWY
GMMvEvItCK5ZvYvS5I2CQGcp1fhEMle/Uz+hFi1eeWepMqgHbVx5vkdtAoGBANRv
XYQsTAGSkhcHB++/ASDskAew5EoHfwtJzSX0BZC6DCACF/U4dCKzBVndOrELOPXs
2CZXCG4ptWzNgt6YTlMX9U7nLei5pPjoivIJsMudnc22DrDS7C94rCk++M3JeLOM
KSN0ou9+1iEdE7rQdMgZMryaY71OBonCIDsWgJZVAoGAB+k0CFq5IrpSUXNDpJMw
yPee+jlsMLUGzzyFAOzDHEVsASq9mDtybQ5oXymay1rJ2W3lVgUCd6JTITSKklO8
LC2FtaQM4Ps78w7Unne3mDrDQByKGZf6HOHQL0oM7C51N10Pv0Qaix7piKL9pklT
+hIYuN6WR3XGTGaoPhRvGCkCgYBqaQ5y8q1v7Dd5iXAUS50JHPZYo+b2niKpSOKW
LFHNWSRRtDrD/u9Nolb/2K1ZmcGCjo0HR3lVlVbnlVoEnk49mTaru2lntfZJKFLR
QsFofR9at+NL95uPe+bhEkYW7uCjL4Y72GT1ipdAJwyG+3xD7ztW9g8X+EmWH8N9
VZw7sQKBgGxp820jbjWhG1O9RnYLwflcZzUlSkhWJDg9tKJXBjD+hFX98Okuf0gu
DUpdbxbJHSi0xAjOjLVswNws4pVwzgtZVK8R7k8j3Z5TtYTJTSQLfgVowuyEdAaI
C8OxVJ/At/IJGnWSIz8z+/YCUf7p4jd2LJgmZVVzXeDsOFcH62gu
-----END RSA PRIVATE KEY-----`
caPEM = `-----BEGIN CERTIFICATE-----
MIIDPDCCAiQCCQCBYUuEuypDMTANBgkqhkiG9w0BAQsFADBgMQswCQYDVQQGEwJV
UzETMBEGA1UECAwKQ2FsaWZvcm5pYTEWMBQGA1UEBwwNU2FuIEZyYW5jaXNjbzEQ
MA4GA1UECgwHU2VnbWVudDESMBAGA1UEAwwJbG9jYWxob3N0MB4XDTE3MTIyMzE1
NTMxOVoXDTI3MTIyMTE1NTMxOVowYDELMAkGA1UEBhMCVVMxEzARBgNVBAgMCkNh
bGlmb3JuaWExFjAUBgNVBAcMDVNhbiBGcmFuY2lzY28xEDAOBgNVBAoMB1NlZ21l
bnQxEjAQBgNVBAMMCWxvY2FsaG9zdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCC
AQoCggEBAJwB+Yp6MyUepgtaRDxVjpMI2RmlAaV1qApMWu60LWGKJs4KWoIoLl6p
oSEqnWrpMmb38pyGP99X1+t3uZjiK9L8nFhuKZ581tsTKLxaSl+YVg7JbH5LVCS6
opsfB5ON1gJxf1HA9YyMqKHkBFh8/hdOGR0T6Bll9TPO1NQB/UqMy/tKr3sA3KZm
XVDbRKSuUAQWz5J9/hLPmVMU41F/uD7mvyDY+x8GymInZjUXG4e0oq2RJgU6SYZ8
mkscM6qhKY3mL487w/kHVFtFlMkOhvI7LIh3zVvWwgGSAoAv9yai9BDZNFSk0cEb
bb/IK7BQW9sNI3lcnGirdbnjV94X9/sCAwEAATANBgkqhkiG9w0BAQsFAAOCAQEA
MJLeGdYO3dpsPx2R39Bw0qa5cUh42huPf8n7rp4a4Ca5jJjcAlCYV8HzqOzpiKYy
ZNuHy8LnNVYYh5Qoh8EO45bplMV1wnHfi6hW6DY5j3SQdcxkoVsW5R7rBF7a7SDg
6uChVRPHgsnALUUc7Wvvd3sAs/NKHzHu86mgD3EefkdqWAaCapzcqT9mo9KXkWJM
DhSJS+/iIaroc8umDnbPfhhgnlMf0/D4q0TjiLSSqyLzVifxnv9yHz56TrhHG/QP
E/8+FEGCHYKM4JLr5smGlzv72Kfx9E1CkG6TgFNIHjipVv1AtYDvaNMdPF2533+F
wE3YmpC3Q0g9r44nEbz4Bw==
-----END CERTIFICATE-----`
)
// Define TLS configuration
certificate, err := tls.X509KeyPair([]byte(certPEM), []byte(keyPEM))
if err != nil {
t.Error(err)
t.FailNow()
}
caCertPool := x509.NewCertPool()
if ok := caCertPool.AppendCertsFromPEM([]byte(caPEM)); !ok {
t.Error(err)
t.FailNow()
}
return &tls.Config{
Certificates: []tls.Certificate{certificate},
RootCAs: caCertPool,
InsecureSkipVerify: true,
}
}
func TestDialerTLS(t *testing.T) {
client, topic, shutdown := newLocalClientAndTopic()
defer shutdown()
// Write a message to ensure the partition gets created.
w := &Writer{
Addr: TCP("localhost:9092"),
Topic: topic,
Transport: client.Transport,
}
w.WriteMessages(context.Background(), Message{})
w.Close()
// Create an SSL proxy using the tls.Config that connects to the
// docker-composed kafka
config := tlsConfig(t)
l, err := tls.Listen("tcp", "127.0.0.1:", config)
if err != nil {
t.Error(err)
return
}
defer l.Close()
go func() {
for {
conn, err := l.Accept()
if err != nil {
return // intentionally ignored
}
go func(in net.Conn) {
out, err := net.Dial("tcp", "localhost:9092")
if err != nil {
t.Error(err)
return
}
defer out.Close()
go io.Copy(in, out)
io.Copy(out, in)
}(conn)
}
}()
// Use the tls.Config and connect to the SSL proxy
d := &Dialer{
TLS: config,
}
partitions, err := d.LookupPartitions(context.Background(), "tcp", l.Addr().String(), topic)
if err != nil {
t.Error(err)
return
}
// Verify returned partition data is what we expect
sort.Slice(partitions, func(i int, j int) bool {
return partitions[i].ID < partitions[j].ID
})
want := []Partition{
{
Topic: topic,
Leader: Broker{Host: "localhost", Port: 9092, ID: 1},
Replicas: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
Isr: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
ID: 0,
},
}
if !reflect.DeepEqual(partitions, want) {
t.Errorf("bad partitions:\ngot: %+v\nwant: %+v", partitions, want)
}
}
type MockConn struct {
net.Conn
done chan struct{}
partitions []Partition
}
func (m *MockConn) Read(b []byte) (n int, err error) {
select {
case <-time.After(time.Minute):
case <-m.done:
return 0, context.Canceled
}
return 0, io.EOF
}
func (m *MockConn) Write(b []byte) (n int, err error) {
select {
case <-time.After(time.Minute):
case <-m.done:
return 0, context.Canceled
}
return 0, io.EOF
}
func (m *MockConn) Close() error {
select {
case <-m.done:
default:
close(m.done)
}
return nil
}
func (m *MockConn) ReadPartitions(topics ...string) (partitions []Partition, err error) {
return m.partitions, err
}
func TestDialerConnectTLSHonorsContext(t *testing.T) {
config := tlsConfig(t)
d := &Dialer{
TLS: config,
}
conn := &MockConn{
done: make(chan struct{}),
}
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*25)
defer cancel()
_, err := d.connectTLS(ctx, conn, d.TLS)
if context.DeadlineExceeded != err {
t.Errorf("expected err to be %v; got %v", context.DeadlineExceeded, err)
t.FailNow()
}
}
func TestDialerResolver(t *testing.T) {
ctx := context.TODO()
tests := []struct {
scenario string
address string
resolver map[string][]string
}{
{
scenario: "resolve domain to ip",
address: "example.com",
resolver: map[string][]string{
"example.com": {"127.0.0.1"},
},
},
{
scenario: "resolve domain to ip and port",
address: "example.com",
resolver: map[string][]string{
"example.com": {"127.0.0.1:9092"},
},
},
{
scenario: "resolve domain with port to ip",
address: "example.com:9092",
resolver: map[string][]string{
"example.com": {"127.0.0.1:9092"},
},
},
{
scenario: "resolve domain with port to ip with different port",
address: "example.com:9092",
resolver: map[string][]string{
"example.com": {"127.0.0.1:80"},
},
},
{
scenario: "resolve domain with port to ip",
address: "example.com:9092",
resolver: map[string][]string{
"example.com": {"127.0.0.1"},
},
},
}
for _, test := range tests {
t.Run(test.scenario, func(t *testing.T) {
topic := makeTopic()
createTopic(t, topic, 1)
defer deleteTopic(t, topic)
d := Dialer{
Resolver: &mockResolver{addrs: test.resolver},
}
// Write a message to ensure the partition gets created.
w := NewWriter(WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: topic,
Dialer: &d,
})
w.WriteMessages(context.Background(), Message{})
w.Close()
partitions, err := d.LookupPartitions(ctx, "tcp", test.address, topic)
if err != nil {
t.Error(err)
return
}
sort.Slice(partitions, func(i int, j int) bool {
return partitions[i].ID < partitions[j].ID
})
want := []Partition{
{
Topic: topic,
Leader: Broker{Host: "localhost", Port: 9092, ID: 1},
Replicas: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
Isr: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
ID: 0,
},
}
if !reflect.DeepEqual(partitions, want) {
t.Errorf("bad partitions:\ngot: %+v\nwant: %+v", partitions, want)
}
})
}
}
type mockResolver struct {
addrs map[string][]string
}
func (mr *mockResolver) LookupHost(ctx context.Context, host string) ([]string, error) {
if addrs, ok := mr.addrs[host]; !ok {
return nil, fmt.Errorf("unrecognized host %s", host)
} else {
return addrs, nil
}
}