diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 8526697..94848d4 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -29,7 +29,7 @@ jobs: shell: bash --noprofile --norc -x -eo pipefail {0} run: | cd go/src/github.com/ConnectEverything/mqtt-test - go install -v + go install -v . - name: Run 'MQTTEx from nats-server' shell: bash --noprofile --norc -x -eo pipefail {0} diff --git a/common.go b/common.go index 23c39e1..1a969c5 100644 --- a/common.go +++ b/common.go @@ -34,7 +34,6 @@ var ( Retain bool Servers []string Size int - Topic string Username string MatchTopicPrefix string Quiet bool @@ -53,10 +52,9 @@ type PubValue struct { } type MQTTBenchmarkResult struct { - Ops int `json:"ops"` - NS time.Duration `json:"ns"` - Unit string `json:"unit"` - Bytes int64 `json:"bytes"` + Ops int `json:"ops"` + NS map[string]time.Duration `json:"ns"` + Bytes int64 `json:"bytes"` } func randomPayload(sz int) []byte { diff --git a/connect.go b/connect.go index e7fee79..983f96b 100644 --- a/connect.go +++ b/connect.go @@ -1,6 +1,7 @@ package main import ( + "log" "time" paho "github.com/eclipse/paho.mqtt.golang" @@ -26,7 +27,12 @@ func connect(clientID string, cleanSession bool, setoptsF func(*paho.ClientOptio SetProtocolVersion(4). SetUsername(Username). SetPassword(Password). - SetStore(paho.NewMemoryStore()) + SetStore(paho.NewMemoryStore()). + SetAutoReconnect(false). + SetDefaultPublishHandler(func(client paho.Client, msg paho.Message) { + log.Fatalf("received an unexpected message on %q (default handler)", msg.Topic()) + }) + for _, s := range Servers { clientOpts.AddBroker(s) } diff --git a/pub.go b/pub.go index 1cd2add..aadc1d0 100644 --- a/pub.go +++ b/pub.go @@ -21,7 +21,6 @@ func init() { }) cmd.Flags().IntVar(&NPublishers, "num-publishers", 1, `Number of publishers to run concurrently, at --mps each`) - cmd.Flags().StringVar(&Topic, "topic", "", "MQTT topic") cmd.Flags().IntVar(&NTopics, "num-topics", 0, `Cycle through NTopics appending "-{n}" where n starts with --num-topics-start; 0 means use --topic`) cmd.Flags().IntVar(&NTopicsStart, "num-topics-start", 0, `Start topic suffixes with this number (default 0)`) cmd.Flags().IntVar(&MPS, "mps", 1000, `Publish mps messages per second; 0 means no delay`) @@ -35,10 +34,7 @@ func runPub(_ *cobra.Command, _ []string) { if clientID == "" { clientID = Name + "-pub-" + nuid.Next() } - topic := Topic - if topic == "" { - topic = Name + nuid.Next() - } + topic := "/" + Name + "/" + nuid.Next() msgChan := make(chan *MQTTBenchmarkResult) errChan := make(chan error) @@ -61,9 +57,9 @@ func runPub(_ *cobra.Command, _ []string) { } // wait for messages to arrive - result := MQTTBenchmarkResult{ - Unit: "pub", - } + pubOps := 0 + pubNS := time.Duration(0) + pubBytes := int64(0) timeout := time.NewTimer(Timeout) defer timeout.Stop() @@ -71,9 +67,9 @@ func runPub(_ *cobra.Command, _ []string) { for n := 0; n < NPublishers; { select { case r := <-msgChan: - result.Ops += r.Ops - result.NS += r.NS - result.Bytes += r.Bytes + pubOps += r.Ops + pubNS += r.NS["pub"] + pubBytes += r.Bytes n++ case err := <-errChan: @@ -84,7 +80,11 @@ func runPub(_ *cobra.Command, _ []string) { } } - bb, _ := json.Marshal(result) + bb, _ := json.Marshal(MQTTBenchmarkResult{ + Ops: pubOps, + NS: map[string]time.Duration{"pub": pubNS}, + Bytes: pubBytes, + }) os.Stdout.Write(bb) } @@ -138,8 +138,7 @@ func publish(cl paho.Client, topic string) (*MQTTBenchmarkResult, error) { return &MQTTBenchmarkResult{ Ops: N, - NS: elapsed, - Unit: "pub", + NS: map[string]time.Duration{"pub": elapsed}, Bytes: int64(bc), }, nil } diff --git a/pubsub.go b/pubsub.go index 2d36b18..91422cc 100644 --- a/pubsub.go +++ b/pubsub.go @@ -17,7 +17,7 @@ import ( func init() { cmd := initPubSub(&cobra.Command{ Use: "pubsub [--flags...]", - Short: "Subscribe and receive N messages", + Short: "Subscribe and receive N published messages", Run: runPubSub, Args: cobra.NoArgs, }) @@ -28,7 +28,7 @@ func init() { mainCmd.AddCommand(cmd) } -func receivedMsgHandler(topic string, errChan chan error, msgChan chan MQTTBenchmarkResult, +func pubsubMsgHandler(topic string, errChan chan error, msgChan chan MQTTBenchmarkResult, ) func(paho.Client, paho.Message) { return func(client paho.Client, msg paho.Message) { opts := client.OptionsReader() @@ -59,8 +59,7 @@ func receivedMsgHandler(topic string, errChan chan error, msgChan chan MQTTBench msgChan <- MQTTBenchmarkResult{ Ops: 1, Bytes: int64(len(msg.Payload())), - NS: elapsed, - Unit: "pubsub", + NS: map[string]time.Duration{"receive": elapsed}, } logOp(clientID, "REC ->", elapsed, "Received %d bytes on %q, qos:%v", len(msg.Payload()), msg.Topic(), QOS) } @@ -71,10 +70,7 @@ func runPubSub(_ *cobra.Command, _ []string) { if clientID == "" { clientID = Name + "-sub-" + nuid.Next() } - topic := Topic - if topic == "" { - topic = "/" + Name + "/" + nuid.Next() - } + topic := "/" + Name + "/" + nuid.Next() clientCleanupChan := make(chan func()) subsChan := make(chan struct{}) @@ -89,7 +85,7 @@ func runPubSub(_ *cobra.Command, _ []string) { opts. SetOnConnectHandler(func(cl paho.Client) { start := time.Now() - token := cl.Subscribe(topic, byte(QOS), receivedMsgHandler(topic, errChan, msgChan)) + token := cl.Subscribe(topic, byte(QOS), pubsubMsgHandler(topic, errChan, msgChan)) if token.Wait() && token.Error() != nil { errChan <- token.Error() return @@ -98,7 +94,7 @@ func runPubSub(_ *cobra.Command, _ []string) { subsChan <- struct{}{} }). SetDefaultPublishHandler(func(client paho.Client, msg paho.Message) { - log.Printf("<>/<> NON-SUBSCRIBED MESSAGE\n") + errChan <- fmt.Errorf("received an unexpected message on %q", msg.Topic()) }) }) if err != nil { @@ -152,7 +148,7 @@ func runPubSub(_ *cobra.Command, _ []string) { for n := 0; n < N*NSubscribers; { select { case r := <-msgChan: - elapsed += r.NS + elapsed += r.NS["receive"] bc += r.Bytes n++ @@ -166,8 +162,7 @@ func runPubSub(_ *cobra.Command, _ []string) { bb, _ := json.Marshal(MQTTBenchmarkResult{ Ops: N * NSubscribers, - NS: elapsed, - Unit: "receive", + NS: map[string]time.Duration{"receive": elapsed}, Bytes: bc, }) os.Stdout.Write(bb) diff --git a/subret.go b/subret.go new file mode 100644 index 0000000..d72e8ce --- /dev/null +++ b/subret.go @@ -0,0 +1,188 @@ +package main + +import ( + "encoding/json" + "log" + "os" + "strconv" + "strings" + "sync/atomic" + "time" + + paho "github.com/eclipse/paho.mqtt.golang" + "github.com/nats-io/nuid" + "github.com/spf13/cobra" +) + +func init() { + cmd := initPubSub(&cobra.Command{ + Use: "subret [--flags...]", + Short: "Subscribe N times, and receive NTopics retained messages", + Run: runSubRet, + Args: cobra.NoArgs, + }) + + cmd.Flags().IntVar(&NSubscribers, "num-subscribers", 1, `Number of subscribers to run concurrently`) + cmd.Flags().IntVar(&NTopics, "num-topics", 0, `Use this many topics with retained messages`) + + mainCmd.AddCommand(cmd) +} + +func subretMsgHandler( + topicPrefix string, + expectNRetained int, + start time.Time, + doneChan chan MQTTBenchmarkResult, +) func(paho.Client, paho.Message) { + var cRetained atomic.Int32 + var bc atomic.Int64 + return func(client paho.Client, msg paho.Message) { + opts := client.OptionsReader() + clientID := opts.ClientID() + switch { + case !strings.HasPrefix(msg.Topic(), topicPrefix): + log.Printf("Received a QOS %d message on unexpected topic: %s\n", msg.Qos(), msg.Topic()) + // ignore + + case msg.Duplicate(): + log.Fatal("received unexpected duplicate message") + return + + case msg.Retained(): + newC := cRetained.Add(1) + bc.Add(int64(len(msg.Payload()))) + switch { + case newC < int32(expectNRetained): + logOp(clientID, "REC ->", time.Since(start), "Received %d bytes on %q, qos:%v", len(msg.Payload()), msg.Topic(), QOS) + // skip it + return + case newC > int32(expectNRetained): + log.Fatal("received unexpected retained message") + default: // last expected retained message + elapsed := time.Since(start) + r := MQTTBenchmarkResult{ + Ops: 1, + NS: map[string]time.Duration{"receive": elapsed}, + Bytes: bc.Load(), + } + doneChan <- r + } + } + } +} + +func runSubRet(_ *cobra.Command, _ []string) { + topic := "/" + Name + "/" + nuid.Next() + + clientID := ClientID + if clientID == "" { + clientID = Name + "-pub-" + nuid.Next() + } else { + clientID = clientID + "-pub" + } + nTopics := NTopics + if nTopics == 0 { + nTopics = 1 + } + + // Publish NTopics retained messages, 1 per topic; Use at least QoS1 to + // ensure the retained messages are fully processed by the time the + // publisher exits. + cl, disconnect, err := connect(clientID, CleanSession, nil) + if err != nil { + log.Fatal("Error connecting: ", err) + } + for i := 0; i < nTopics; i++ { + t := topic + "/" + strconv.Itoa(i) + payload := randomPayload(Size) + start := time.Now() + publishQOS := 1 + if publishQOS < QOS { + publishQOS = QOS + } + if token := cl.Publish(t, byte(publishQOS), true, payload); token.Wait() && token.Error() != nil { + log.Fatal("Error publishing: ", token.Error()) + } + logOp(clientID, "PUB <-", time.Since(start), "Published: %d bytes to %q, qos:%v, retain:%v", len(payload), t, QOS, true) + } + disconnect() + + // Now subscribe and verify that all subs receive all messages + clientID = ClientID + if clientID == "" { + clientID = Name + "-sub-" + nuid.Next() + } + + // Connect all subscribers (and subscribe to a wildcard topic that includes + // all published retained messages). + doneChan := make(chan MQTTBenchmarkResult) + for i := 0; i < NSubscribers; i++ { + id := clientID + "-" + strconv.Itoa(i) + prefix := topic + t := topic + "/+" + go subscribeAndReceiveRetained(id, t, prefix, N, nTopics, doneChan) + } + + var cDone int + total := MQTTBenchmarkResult{ + NS: map[string]time.Duration{}, + } + timeout := time.NewTimer(Timeout) + defer timeout.Stop() + + for cDone < NSubscribers { + select { + case r := <-doneChan: + total.Ops += r.Ops + total.NS["sub"] += r.NS["sub"] + total.NS["receive"] += r.NS["receive"] + total.Bytes += r.Bytes + cDone++ + case <-timeout.C: + log.Fatalf("timeout waiting for connections") + } + } + + bb, _ := json.Marshal(total) + os.Stdout.Write(bb) +} + +func subscribeAndReceiveRetained(id string, subTopic string, pubTopicPrefix string, n, expected int, doneChan chan MQTTBenchmarkResult) { + subNS := time.Duration(0) + receiveNS := time.Duration(0) + receiveBytes := int64(0) + cl, cleanup, err := connect(id, CleanSession, nil) + if err != nil { + log.Fatal(err) + } + defer cleanup() + + for i := 0; i < n; i++ { + start := time.Now() + doneChan := make(chan MQTTBenchmarkResult) + token := cl.Subscribe(subTopic, byte(QOS), + subretMsgHandler(pubTopicPrefix, expected, start, doneChan)) + if token.Wait() && token.Error() != nil { + log.Fatal(token.Error()) + } + subElapsed := time.Since(start) + subNS += subElapsed + logOp(id, "SUB", subElapsed, "Subscribed to %q", subTopic) + + r := <-doneChan + receiveNS += r.NS["receive"] + receiveBytes += r.Bytes + logOp(id, "SUBRET", r.NS["receive"], "Received %d messages (%d bytes) on %q", expected, r.Bytes, subTopic) + + // Unsubscribe + if token = cl.Unsubscribe(subTopic); token.Wait() && token.Error() != nil { + log.Fatal(token.Error()) + } + } + + doneChan <- MQTTBenchmarkResult{ + Ops: N, + NS: map[string]time.Duration{"sub": subNS, "receive": receiveNS}, + Bytes: receiveBytes, + } +}