-
Notifications
You must be signed in to change notification settings - Fork 0
/
publisher_test.go
105 lines (89 loc) · 2.7 KB
/
publisher_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package kafka
import (
"context"
crand "crypto/rand"
"math/rand"
"testing"
"time"
"github.com/IBM/sarama"
"github.com/stretchr/testify/assert"
)
type testMessage struct {
ID int `json:"id"`
Title string `json:"title"`
}
func TestSending(t *testing.T) {
defaultCtx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
seedBroker := sarama.NewMockBroker(t, 1)
leader := sarama.NewMockBroker(t, 2)
defer leader.Close()
defer seedBroker.Close()
metadataResponse := new(sarama.MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError)
seedBroker.Returns(metadataResponse)
prodSuccess := new(sarama.ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, sarama.ErrNoError)
leader.Returns(prodSuccess)
const messageCount = 10
successCount := 0
receiveChan := make(chan error, messageCount)
config := sarama.NewConfig()
config.Version = sarama.MinVersion
config.Producer.Flush.Messages = messageCount
config.Producer.Return.Successes = true
// Connect the publisher
publisher := MustNewPublisher(
defaultCtx,
WithBrokers(seedBroker.Addr()),
WithTopics(`my_topic`),
WithClientID("test"),
WithSaramaConfig(config),
WithPublisherSuccessHandler(func(*sarama.ProducerMessage) { receiveChan <- nil }),
WithPublisherErrorHandler(func(err *sarama.ProducerError) { receiveChan <- err }),
WithCompression(sarama.CompressionNone, 0),
WithFlashMessages(messageCount),
WithFlashFrequency(0),
)
for i := 0; i < messageCount; i++ {
err := publisher.Publish(defaultCtx, &testMessage{ID: rand.Int(), Title: "test"})
assert.NoError(t, err, "send message")
}
loop:
for i := 0; i < messageCount; i++ {
select {
case err := <-receiveChan:
if err != nil {
t.Errorf("kafka publisher test: %s", err)
} else {
successCount++
}
case <-time.After(time.Second):
t.Errorf("Timeout waiting for msg #%d", i)
break loop
}
}
assert.NoError(t, publisher.Close(), "close connection")
assert.Equal(t, messageCount, successCount, "not all messages are success")
}
func TestNewPublisherError(t *testing.T) {
_, err := NewPublisher(context.TODO())
assert.Error(t, err)
}
func TestNewPublisherPanic(t *testing.T) {
defer func() {
assert.True(t, recover() != nil)
}()
MustNewPublisher(context.TODO(), nil, nil)
time.Sleep(time.Millisecond * 100)
}
func strFromDict(strSize int) string {
const dict = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz!@#$%^&*_+-=~"
var bytes = make([]byte, strSize)
_, _ = crand.Read(bytes)
for k, v := range bytes {
bytes[k] = dict[v%byte(len(dict))]
}
return string(bytes)
}