Skip to content

Commit

Permalink
Added subret for delivery of retained messages to subs
Browse files Browse the repository at this point in the history
  • Loading branch information
levb committed Jan 8, 2024
1 parent 87ee78f commit 7a66171
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 34 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
shell: bash --noprofile --norc -x -eo pipefail {0}
run: |
cd go/src/github.com/ConnectEverything/mqtt-test
go install -v
go install -v .
- name: Run 'MQTTEx from nats-server'
shell: bash --noprofile --norc -x -eo pipefail {0}
Expand Down
8 changes: 3 additions & 5 deletions common.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ var (
Retain bool
Servers []string
Size int
Topic string
Username string
MatchTopicPrefix string
Quiet bool
Expand All @@ -53,10 +52,9 @@ type PubValue struct {
}

type MQTTBenchmarkResult struct {
Ops int `json:"ops"`
NS time.Duration `json:"ns"`
Unit string `json:"unit"`
Bytes int64 `json:"bytes"`
Ops int `json:"ops"`
NS map[string]time.Duration `json:"ns"`
Bytes int64 `json:"bytes"`
}

func randomPayload(sz int) []byte {
Expand Down
8 changes: 7 additions & 1 deletion connect.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"log"
"time"

paho "github.com/eclipse/paho.mqtt.golang"
Expand All @@ -26,7 +27,12 @@ func connect(clientID string, cleanSession bool, setoptsF func(*paho.ClientOptio
SetProtocolVersion(4).
SetUsername(Username).
SetPassword(Password).
SetStore(paho.NewMemoryStore())
SetStore(paho.NewMemoryStore()).
SetAutoReconnect(false).
SetDefaultPublishHandler(func(client paho.Client, msg paho.Message) {
log.Fatalf("received an unexpected message on %q (default handler)", msg.Topic())
})

for _, s := range Servers {
clientOpts.AddBroker(s)
}
Expand Down
27 changes: 13 additions & 14 deletions pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ func init() {
})

cmd.Flags().IntVar(&NPublishers, "num-publishers", 1, `Number of publishers to run concurrently, at --mps each`)
cmd.Flags().StringVar(&Topic, "topic", "", "MQTT topic")
cmd.Flags().IntVar(&NTopics, "num-topics", 0, `Cycle through NTopics appending "-{n}" where n starts with --num-topics-start; 0 means use --topic`)
cmd.Flags().IntVar(&NTopicsStart, "num-topics-start", 0, `Start topic suffixes with this number (default 0)`)
cmd.Flags().IntVar(&MPS, "mps", 1000, `Publish mps messages per second; 0 means no delay`)
Expand All @@ -35,10 +34,7 @@ func runPub(_ *cobra.Command, _ []string) {
if clientID == "" {
clientID = Name + "-pub-" + nuid.Next()
}
topic := Topic
if topic == "" {
topic = Name + nuid.Next()
}
topic := "/" + Name + "/" + nuid.Next()
msgChan := make(chan *MQTTBenchmarkResult)
errChan := make(chan error)

Expand All @@ -61,19 +57,19 @@ func runPub(_ *cobra.Command, _ []string) {
}

// wait for messages to arrive
result := MQTTBenchmarkResult{
Unit: "pub",
}
pubOps := 0
pubNS := time.Duration(0)
pubBytes := int64(0)
timeout := time.NewTimer(Timeout)
defer timeout.Stop()

// get back 1 report per publisher
for n := 0; n < NPublishers; {
select {
case r := <-msgChan:
result.Ops += r.Ops
result.NS += r.NS
result.Bytes += r.Bytes
pubOps += r.Ops
pubNS += r.NS["pub"]
pubBytes += r.Bytes
n++

case err := <-errChan:
Expand All @@ -84,7 +80,11 @@ func runPub(_ *cobra.Command, _ []string) {
}
}

bb, _ := json.Marshal(result)
bb, _ := json.Marshal(MQTTBenchmarkResult{
Ops: pubOps,
NS: map[string]time.Duration{"pub": pubNS},
Bytes: pubBytes,
})
os.Stdout.Write(bb)
}

Expand Down Expand Up @@ -138,8 +138,7 @@ func publish(cl paho.Client, topic string) (*MQTTBenchmarkResult, error) {

return &MQTTBenchmarkResult{
Ops: N,
NS: elapsed,
Unit: "pub",
NS: map[string]time.Duration{"pub": elapsed},
Bytes: int64(bc),
}, nil
}
21 changes: 8 additions & 13 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
func init() {
cmd := initPubSub(&cobra.Command{
Use: "pubsub [--flags...]",
Short: "Subscribe and receive N messages",
Short: "Subscribe and receive N published messages",
Run: runPubSub,
Args: cobra.NoArgs,
})
Expand All @@ -28,7 +28,7 @@ func init() {
mainCmd.AddCommand(cmd)
}

func receivedMsgHandler(topic string, errChan chan error, msgChan chan MQTTBenchmarkResult,
func pubsubMsgHandler(topic string, errChan chan error, msgChan chan MQTTBenchmarkResult,
) func(paho.Client, paho.Message) {
return func(client paho.Client, msg paho.Message) {
opts := client.OptionsReader()
Expand Down Expand Up @@ -59,8 +59,7 @@ func receivedMsgHandler(topic string, errChan chan error, msgChan chan MQTTBench
msgChan <- MQTTBenchmarkResult{
Ops: 1,
Bytes: int64(len(msg.Payload())),
NS: elapsed,
Unit: "pubsub",
NS: map[string]time.Duration{"receive": elapsed},
}
logOp(clientID, "REC ->", elapsed, "Received %d bytes on %q, qos:%v", len(msg.Payload()), msg.Topic(), QOS)
}
Expand All @@ -71,10 +70,7 @@ func runPubSub(_ *cobra.Command, _ []string) {
if clientID == "" {
clientID = Name + "-sub-" + nuid.Next()
}
topic := Topic
if topic == "" {
topic = "/" + Name + "/" + nuid.Next()
}
topic := "/" + Name + "/" + nuid.Next()

clientCleanupChan := make(chan func())
subsChan := make(chan struct{})
Expand All @@ -89,7 +85,7 @@ func runPubSub(_ *cobra.Command, _ []string) {
opts.
SetOnConnectHandler(func(cl paho.Client) {
start := time.Now()
token := cl.Subscribe(topic, byte(QOS), receivedMsgHandler(topic, errChan, msgChan))
token := cl.Subscribe(topic, byte(QOS), pubsubMsgHandler(topic, errChan, msgChan))
if token.Wait() && token.Error() != nil {
errChan <- token.Error()
return
Expand All @@ -98,7 +94,7 @@ func runPubSub(_ *cobra.Command, _ []string) {
subsChan <- struct{}{}
}).
SetDefaultPublishHandler(func(client paho.Client, msg paho.Message) {
log.Printf("<>/<> NON-SUBSCRIBED MESSAGE\n")
errChan <- fmt.Errorf("received an unexpected message on %q", msg.Topic())
})
})
if err != nil {
Expand Down Expand Up @@ -152,7 +148,7 @@ func runPubSub(_ *cobra.Command, _ []string) {
for n := 0; n < N*NSubscribers; {
select {
case r := <-msgChan:
elapsed += r.NS
elapsed += r.NS["receive"]
bc += r.Bytes
n++

Expand All @@ -166,8 +162,7 @@ func runPubSub(_ *cobra.Command, _ []string) {

bb, _ := json.Marshal(MQTTBenchmarkResult{
Ops: N * NSubscribers,
NS: elapsed,
Unit: "receive",
NS: map[string]time.Duration{"receive": elapsed},
Bytes: bc,
})
os.Stdout.Write(bb)
Expand Down
188 changes: 188 additions & 0 deletions subret.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package main

import (
"encoding/json"
"log"
"os"
"strconv"
"strings"
"sync/atomic"
"time"

paho "github.com/eclipse/paho.mqtt.golang"
"github.com/nats-io/nuid"
"github.com/spf13/cobra"
)

func init() {
cmd := initPubSub(&cobra.Command{
Use: "subret [--flags...]",
Short: "Subscribe N times, and receive NTopics retained messages",
Run: runSubRet,
Args: cobra.NoArgs,
})

cmd.Flags().IntVar(&NSubscribers, "num-subscribers", 1, `Number of subscribers to run concurrently`)
cmd.Flags().IntVar(&NTopics, "num-topics", 0, `Use this many topics with retained messages`)

mainCmd.AddCommand(cmd)
}

func subretMsgHandler(
topicPrefix string,
expectNRetained int,
start time.Time,
doneChan chan MQTTBenchmarkResult,
) func(paho.Client, paho.Message) {
var cRetained atomic.Int32
var bc atomic.Int64
return func(client paho.Client, msg paho.Message) {
opts := client.OptionsReader()
clientID := opts.ClientID()
switch {
case !strings.HasPrefix(msg.Topic(), topicPrefix):
log.Printf("Received a QOS %d message on unexpected topic: %s\n", msg.Qos(), msg.Topic())
// ignore

case msg.Duplicate():
log.Fatal("received unexpected duplicate message")
return

case msg.Retained():
newC := cRetained.Add(1)
bc.Add(int64(len(msg.Payload())))
switch {
case newC < int32(expectNRetained):
logOp(clientID, "REC ->", time.Since(start), "Received %d bytes on %q, qos:%v", len(msg.Payload()), msg.Topic(), QOS)
// skip it
return
case newC > int32(expectNRetained):
log.Fatal("received unexpected retained message")
default: // last expected retained message
elapsed := time.Since(start)
r := MQTTBenchmarkResult{
Ops: 1,
NS: map[string]time.Duration{"receive": elapsed},
Bytes: bc.Load(),
}
doneChan <- r
}
}
}
}

func runSubRet(_ *cobra.Command, _ []string) {
topic := "/" + Name + "/" + nuid.Next()

clientID := ClientID
if clientID == "" {
clientID = Name + "-pub-" + nuid.Next()
} else {
clientID = clientID + "-pub"
}
nTopics := NTopics
if nTopics == 0 {
nTopics = 1
}

// Publish NTopics retained messages, 1 per topic; Use at least QoS1 to
// ensure the retained messages are fully processed by the time the
// publisher exits.
cl, disconnect, err := connect(clientID, CleanSession, nil)
if err != nil {
log.Fatal("Error connecting: ", err)
}
for i := 0; i < nTopics; i++ {
t := topic + "/" + strconv.Itoa(i)
payload := randomPayload(Size)
start := time.Now()
publishQOS := 1
if publishQOS < QOS {
publishQOS = QOS
}
if token := cl.Publish(t, byte(publishQOS), true, payload); token.Wait() && token.Error() != nil {
log.Fatal("Error publishing: ", token.Error())
}
logOp(clientID, "PUB <-", time.Since(start), "Published: %d bytes to %q, qos:%v, retain:%v", len(payload), t, QOS, true)
}
disconnect()

// Now subscribe and verify that all subs receive all messages
clientID = ClientID
if clientID == "" {
clientID = Name + "-sub-" + nuid.Next()
}

// Connect all subscribers (and subscribe to a wildcard topic that includes
// all published retained messages).
doneChan := make(chan MQTTBenchmarkResult)
for i := 0; i < NSubscribers; i++ {
id := clientID + "-" + strconv.Itoa(i)
prefix := topic
t := topic + "/+"
go subscribeAndReceiveRetained(id, t, prefix, N, nTopics, doneChan)
}

var cDone int
total := MQTTBenchmarkResult{
NS: map[string]time.Duration{},
}
timeout := time.NewTimer(Timeout)
defer timeout.Stop()

for cDone < NSubscribers {
select {
case r := <-doneChan:
total.Ops += r.Ops
total.NS["sub"] += r.NS["sub"]
total.NS["receive"] += r.NS["receive"]
total.Bytes += r.Bytes
cDone++
case <-timeout.C:
log.Fatalf("timeout waiting for connections")
}
}

bb, _ := json.Marshal(total)
os.Stdout.Write(bb)
}

func subscribeAndReceiveRetained(id string, subTopic string, pubTopicPrefix string, n, expected int, doneChan chan MQTTBenchmarkResult) {
subNS := time.Duration(0)
receiveNS := time.Duration(0)
receiveBytes := int64(0)
cl, cleanup, err := connect(id, CleanSession, nil)
if err != nil {
log.Fatal(err)
}
defer cleanup()

for i := 0; i < n; i++ {
start := time.Now()
doneChan := make(chan MQTTBenchmarkResult)
token := cl.Subscribe(subTopic, byte(QOS),
subretMsgHandler(pubTopicPrefix, expected, start, doneChan))
if token.Wait() && token.Error() != nil {
log.Fatal(token.Error())
}
subElapsed := time.Since(start)
subNS += subElapsed
logOp(id, "SUB", subElapsed, "Subscribed to %q", subTopic)

r := <-doneChan
receiveNS += r.NS["receive"]
receiveBytes += r.Bytes
logOp(id, "SUBRET", r.NS["receive"], "Received %d messages (%d bytes) on %q", expected, r.Bytes, subTopic)

// Unsubscribe
if token = cl.Unsubscribe(subTopic); token.Wait() && token.Error() != nil {
log.Fatal(token.Error())
}
}

doneChan <- MQTTBenchmarkResult{
Ops: N,
NS: map[string]time.Duration{"sub": subNS, "receive": receiveNS},
Bytes: receiveBytes,
}
}

0 comments on commit 7a66171

Please sign in to comment.