Skip to content

Commit

Permalink
Add pending message metrics.
Browse files Browse the repository at this point in the history
Also added associated tests. Part of #8.
  • Loading branch information
George Vanburgh committed Dec 30, 2015
1 parent e075cdb commit 17c4fd0
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 1 deletion.
3 changes: 2 additions & 1 deletion queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,11 @@ func (q *Queue) logMetrics() {
break
}

// Print out various metrics
// Send various metrics
currentMessageRate := atomic.SwapUint64(&q.messagesSentLastSecond, 0)

q.metrics <- &Metric{Name: q.Name + ".messagerate", Value: int64(currentMessageRate), Type: "counter"}
q.metrics <- &Metric{Name: q.Name + ".subscribers", Value: int64(len(q.subscribers)), Type: "guage"}
q.metrics <- &Metric{Name: q.Name + ".pending", Value: int64(len(q.messageOutput)), Type: "guage"}
}
}
72 changes: 72 additions & 0 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gamq
import (
"bufio"
"bytes"
"strings"
"testing"

"github.com/onsi/gomega"
Expand Down Expand Up @@ -40,6 +41,49 @@ func TestQueue_sendMessage_messageReceivedSuccessfully(t *testing.T) {
}).Should(gomega.Equal(testMessage))
}

func TestQueue_sendMessage_generatesMetrics(t *testing.T) {
// More async testing
gomega.RegisterTestingT(t)

// We should receive metrics ending in these names from a queue during
// normal operation
expectedMetricNames := [...]string{"messagerate", "subscribers", "pending"}

// Mocking
dummyMetricsChannel := make(chan *Metric)
dummyClosingChannel := make(chan *string)

underTest := Queue{}
underTest.Initialize(dummyMetricsChannel, dummyClosingChannel)

// After a subscriber is added, we should start receiving metrics
dummySubscriber := Client{Closed: new(chan bool)}
underTest.AddSubscriber(&dummySubscriber)

seenMetricNames := make(map[string]bool)
go func() {
for {
metric := <-dummyMetricsChannel
metricNameChunks := strings.Split(metric.Name, ".")
finalMetricName := metricNameChunks[len(metricNameChunks)-1]
seenMetricNames[finalMetricName] = true
}
}()

// Check we've received metrics ending in all the expected names
// NOTE: It might take longer than the default gomega 1 second timeout to
// receive all the metrics we're expecting
gomega.Eventually(func() bool {
toReturn := true
for _, metricName := range expectedMetricNames {
if !seenMetricNames[metricName] {
toReturn = false
}
}
return toReturn
}, "5s").Should(gomega.BeTrue()) // Timeout upped to 5 seconds
}

// A unsubscribing client should not be considered for message delivery
func TestQueue_sendMessageAfterUnsubscribe_messageReceivedSuccessfully(t *testing.T) {
// Need gomega for async testing
Expand Down Expand Up @@ -99,6 +143,34 @@ func TestQueue_sendMessageAfterUnsubscribe_messageReceivedSuccessfully(t *testin
}).Should(gomega.Equal(testMessage))
}

func TestQueue_xPendingMetrics_producesCorrectMetric(t *testing.T) {
// Need gomega for async testing
gomega.RegisterTestingT(t)

numberOfMessagesToSend := 10

underTest := Queue{Name: TEST_QUEUE_NAME}
testMessage := "Testing!"

dummyMetricsPipe := make(chan *Metric)
dummyClosingPipe := make(chan *string)
underTest.Initialize(dummyMetricsPipe, dummyClosingPipe)

for i := 0; i < numberOfMessagesToSend; i++ {
underTest.Publish(&testMessage)
}

// Eventually, we should see `numberOfMessagesToSend` pending messages
gomega.Eventually(func() int {
metric := <-dummyMetricsPipe
if strings.Contains(metric.Name, "pending") {
return int(metric.Value)
} else {
return -1
}
}, "5s").Should(gomega.Equal(numberOfMessagesToSend))
}

func TestQueue_initialize_completesSuccessfully(t *testing.T) {
underTest := Queue{Name: TEST_QUEUE_NAME}

Expand Down

0 comments on commit 17c4fd0

Please sign in to comment.