-
Notifications
You must be signed in to change notification settings - Fork 5
/
messagequeue_test.go
188 lines (147 loc) · 5.46 KB
/
messagequeue_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
package gamq
import (
"bufio"
"bytes"
"github.com/FireEater64/gamq/message"
"strings"
"testing"
"github.com/onsi/gomega"
)
const (
TEST_QUEUE_NAME = "TestQueue"
)
// Check that messages sent to a queue are eventually sent to consumers
func TestQueue_sendMessage_messageReceivedSuccessfully(t *testing.T) {
// Need gomega for async testing
gomega.RegisterTestingT(t)
testMessagePayload := []byte("Testing!")
expectedMessagePayload := []byte("Testing!\r\n.\r\n")
testMessage := message.NewHeaderlessMessage(&testMessagePayload)
dummyMetricsPipe := make(chan<- *Metric)
dummyClosingPipe := make(chan<- *string)
underTest := newMessageQueue(TEST_QUEUE_NAME, dummyMetricsPipe, dummyClosingPipe)
writerBuffer := new(bytes.Buffer)
dummyWriter := bufio.NewWriter(writerBuffer)
closedChannel := make(chan bool)
dummyClient := Client{Name: "Test", Writer: dummyWriter, Closed: &closedChannel}
// Add the subscription
underTest.AddSubscriber(&dummyClient)
// Queue the message
underTest.Publish(testMessage)
gomega.Eventually(func() []byte {
return writerBuffer.Bytes()
}).Should(gomega.Equal(expectedMessagePayload))
}
func TestQueue_sendMessage_generatesMetrics(t *testing.T) {
// More async testing
gomega.RegisterTestingT(t)
// We should receive metrics ending in these names from a queue during
// normal operation
expectedMetricNames := [...]string{"messagerate", "subscribers", "pending"}
// Mocking
dummyMetricsChannel := make(chan *Metric)
dummyClosingChannel := make(chan *string)
underTest := newMessageQueue(TEST_QUEUE_NAME, dummyMetricsChannel, dummyClosingChannel)
// After a subscriber is added, we should start receiving metrics
dummySubscriber := Client{Closed: new(chan bool)}
underTest.AddSubscriber(&dummySubscriber)
seenMetricNames := make(map[string]bool)
go func() {
for {
metric := <-dummyMetricsChannel
metricNameChunks := strings.Split(metric.Name, ".")
finalMetricName := metricNameChunks[len(metricNameChunks)-1]
seenMetricNames[finalMetricName] = true
}
}()
// Check we've received metrics ending in all the expected names
// NOTE: It might take longer than the default gomega 1 second timeout to
// receive all the metrics we're expecting
gomega.Eventually(func() bool {
toReturn := true
for _, metricName := range expectedMetricNames {
if !seenMetricNames[metricName] {
toReturn = false
}
}
return toReturn
}, "5s").Should(gomega.BeTrue()) // Timeout upped to 5 seconds
}
// A unsubscribing client should not be considered for message delivery
func TestQueue_sendMessageAfterUnsubscribe_messageReceivedSuccessfully(t *testing.T) {
// Need gomega for async testing
gomega.RegisterTestingT(t)
testMessagePayload := []byte("Testing!")
expectedMessagePayload := []byte("Testing!\r\n.\r\n")
testMessage := message.NewHeaderlessMessage(&testMessagePayload)
dummyMetricsPipe := make(chan<- *Metric, 10)
dummyClosingPipe := make(chan<- *string)
underTest := newMessageQueue(TEST_QUEUE_NAME, dummyMetricsPipe, dummyClosingPipe)
writerBuffer1 := new(bytes.Buffer)
dummyWriter1 := bufio.NewWriter(writerBuffer1)
closedChannel1 := make(chan bool)
dummyClient1 := Client{Name: "Test1", Writer: dummyWriter1, Closed: &closedChannel1}
writerBuffer2 := new(bytes.Buffer)
dummyWriter2 := bufio.NewWriter(writerBuffer2)
closedChannel2 := make(chan bool)
dummyClient2 := Client{Name: "Test2", Writer: dummyWriter2, Closed: &closedChannel2}
// Add the subscription
underTest.AddSubscriber(&dummyClient1)
underTest.AddSubscriber(&dummyClient2)
// Queue the message
underTest.Publish(testMessage)
// Bit of a hack - only one of the subscribers will get the message,
// and we don't know which one
gomega.Eventually(func() []byte {
if writerBuffer1.String() == "" {
return writerBuffer2.Bytes()
} else {
return writerBuffer1.Bytes()
}
}).Should(gomega.Equal(expectedMessagePayload))
// We'll be reusing these buffers
writerBuffer1.Reset()
writerBuffer2.Reset()
// Close one client
*dummyClient1.Closed <- true
// Should remove the client from the map
gomega.Eventually(func() bool {
return underTest.subscribers[dummyClient1.Name] == nil
}).Should(gomega.BeTrue())
// Now send a message - the remaining client should receive it without issue
underTest.Publish(testMessage)
gomega.Eventually(func() []byte {
return writerBuffer2.Bytes()
}).Should(gomega.Equal(expectedMessagePayload))
}
func TestQueue_xPendingMetrics_producesCorrectMetric(t *testing.T) {
// Need gomega for async testing
gomega.RegisterTestingT(t)
numberOfMessagesToSend := 10
testMessagePayload := []byte("Testing!")
testMessage := message.NewHeaderlessMessage(&testMessagePayload)
dummyMetricsPipe := make(chan *Metric)
dummyClosingPipe := make(chan *string)
underTest := newMessageQueue(TEST_QUEUE_NAME, dummyMetricsPipe, dummyClosingPipe)
for i := 0; i < numberOfMessagesToSend; i++ {
underTest.Publish(testMessage)
}
// Eventually, we should see `numberOfMessagesToSend` pending messages
gomega.Eventually(func() int {
metric := <-dummyMetricsPipe
if strings.Contains(metric.Name, "pending") {
return int(metric.Value)
} else {
return -1
}
}, "5s").Should(gomega.Equal(numberOfMessagesToSend))
}
func TestQueue_initialize_completesSuccessfully(t *testing.T) {
dummyMetricsPipe := make(chan<- *Metric)
dummyClosingPipe := make(chan<- *string)
underTest := newMessageQueue(TEST_QUEUE_NAME, dummyMetricsPipe, dummyClosingPipe)
// Queue should be named correctly
if underTest.Name != TEST_QUEUE_NAME {
t.Fail()
}
}