From 54ed0c18141f62958a5dcc6ce03ce92c23107503 Mon Sep 17 00:00:00 2001 From: Lev <1187448+levb@users.noreply.github.com> Date: Thu, 15 Feb 2024 08:58:53 -0800 Subject: [PATCH] Refactored, added sub (#2) * Refactored, added sub * Added (c) notes * Updated README.md * Fixed subret synchronization * Updated server ref for CI * Updated server ref for CI, 2 --- .github/workflows/test.yaml | 3 +- README.md | 83 +++++++++------- command-pub.go | 97 +++++++++++++++++++ command-pubsub.go | 120 +++++++++++++++++++++++ command-sub.go | 155 +++++++++++++++++++++++++++++ command-subret.go | 57 +++++++++++ common.go | 92 ------------------ connect.go | 78 ++++++++++++--- main.go | 109 ++++++++++++++++++--- pub.go | 144 --------------------------- publish.go | 101 +++++++++++++++++++ pubsub.go | 169 -------------------------------- receive.go | 153 +++++++++++++++++++++++++++++ subret.go | 188 ------------------------------------ 14 files changed, 895 insertions(+), 654 deletions(-) create mode 100644 command-pub.go create mode 100644 command-pubsub.go create mode 100644 command-sub.go create mode 100644 command-subret.go delete mode 100644 common.go delete mode 100644 pub.go create mode 100644 publish.go delete mode 100644 pubsub.go create mode 100644 receive.go delete mode 100644 subret.go diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index f5925e3..1b0181c 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -23,6 +23,7 @@ jobs: with: repository: nats-io/nats-server path: go/src/github.com/nats-io/nats-server + ref: lev-mqttex-test-retained - name: Build and install shell: bash --noprofile --norc -x -eo pipefail {0} @@ -34,4 +35,4 @@ jobs: shell: bash --noprofile --norc -x -eo pipefail {0} run: | cd go/src/github.com/nats-io/nats-server - go test -v --run='-' --bench 'MQTTEx' --benchtime=100x ./server | tee /tmp/current-bench-result.txt + go test -v --run='MQTTEx' --bench 'MQTTEx' --benchtime=100x ./server diff --git a/README.md b/README.md index bb022e3..0ed50c3 100644 --- a/README.md +++ b/README.md @@ -7,41 +7,56 @@ Outputs JSON results that can be reported in a `go test --bench` wrapper. ##### Subcommands and common flags ```sh -mqtt-test [pub|pubsub|subret] [flags...] +mqtt-test [pub|sub|pubsub|subret] [flags...] ``` Available Commands: -- [pub](#pub) - Publish N messages -- [pubsub](#pubsub) - Subscribe and receive N published messages -- [subret](#subret) - Subscribe N times, and receive NTopics retained messages -Common Flags: +- [pub](#pub) - Publish MQTT messages. +- [sub](#sub) - Subscribe, receive all messages, unsubscribe, {repeat} times. +- [pubsub](#pubsub) - Subscribe and receive published messages. +- [subret](#subret) - Publish {topics} retained messages, subscribe {repeat} times, and receive all retained messages. + +Common flags: ``` --h, --help help for mqtt-test - --id string MQTT client ID --n, --n int Number of transactions to run, see the specific command (default 1) --p, --password string MQTT client password (empty if auth disabled) --q, --quiet Quiet mode, only print results --s, --servers stringArray MQTT servers endpoint as host:port (default [tcp://localhost:1883]) --u, --username string MQTT client username (empty if auth disabled) - --version version for mqtt-test --v, --very-verbose Very verbose, print everything we can + -h, --help help for mqtt-test + --id string MQTT client ID (default "mqtt-test-bssJjZUs1vhTvf6KpTpTLw") + -q, --quiet Quiet mode, only print results + -s, --server stringArray MQTT endpoint as username:password@host:port (default [tcp://localhost:1883]) + --version version for mqtt-test + -v, --very-verbose Very verbose, print everything we can ``` ##### pub -Publishes N messages using the flags and reports the results. Used with `--num-publishers` can run several concurrent publish connections. +Publishes messages using the flags and reports the results. Used with `--publishers` can run several concurrent publish connections. + +Flags: + +``` +--messages int Number of transactions to run, see the specific command (default 1) +--mps int Publish mps messages per second; 0 means no delay (default 1000) +--publishers int Number of publishers to run concurrently, at --mps each (default 1) +--qos int MQTT QOS +--retain Mark each published message as retained +--size int Approximate size of each message (pub adds a timestamp) +--timestamp Prepend a timestamp to each message +--topic string Base topic (prefix) to publish into (/{n} will be added if --topics > 0) (default "mqtt-test/fIqfOq5Lg5wk636V4sLXoc") +--topics int Cycle through NTopics appending "/{n}" +``` + +##### sub + +Subscribe, receive all expected messages, unsubscribe, {repeat} times. Flags: ``` ---mps int Publish mps messages per second; 0 means no delay (default 1000) ---num-publishers int Number of publishers to run concurrently, at --mps each (default 1) ---num-topics int Cycle through NTopics appending "-{n}" where n starts with --num-topics-start; 0 means use --topic ---num-topics-start int Start topic suffixes with this number (default 0) ---qos int MQTT QOS ---retain Mark each published message as retained ---size int Approximate size of each message (pub adds a timestamp) ---topic string MQTT topic +--messages int Expect to receive this many published messages +--qos int MQTT QOS +--repeat int Subscribe, receive retained messages, and unsubscribe N times (default 1) +--retained int Expect to receive this many retained messages +--subscribers int Number of subscribers to run concurrently (default 1) +--topic string Base topic for the test, will subscribe to {topic}/+ ``` ##### pubsub @@ -49,22 +64,24 @@ Flags: Publishes N messages, and waits for all of them to be received by subscribers. Measures end-end delivery time on the messages. Used with `--num-subscribers` can run several concurrent subscriber connections. ``` ---mps int Publish mps messages per second; 0 means all ASAP (default 1000) ---num-subscribers int Number of subscribers to run concurrently (default 1) ---qos int MQTT QOS ---size int Approximate size of each message (pub adds a timestamp) ---topic string MQTT topic +--messages int Number of messages to publish and receive (default 1) +--qos int MQTT QOS +--size int Approximate size of each message (pub adds a timestamp) +--subscribers int Number of subscribers to run concurrently (default 1) +--topic string Topic to publish and subscribe to (default "mqtt-test/JPrbNU6U3IbVQLIyazkP4y") ``` ##### subret Publishes retained messages into NTopics, then subscribes to a wildcard with all topics N times. Measures time to SUBACK and to all retained messages received. -Used with `--num-subscribers` can run several concurrent subscriber connections. +Used with `--subscribers` can run several concurrent subscriber connections. ``` ---num-subscribers int Number of subscribers to run concurrently (default 1) ---num-topics int Use this many topics with retained messages ---qos int MQTT QOS ---size int Approximate size of each message (pub adds a timestamp) +--qos int MQTT QOS +--repeat int Subscribe, receive retained messages, and unsubscribe N times (default 1) +--size int Approximate size of each message (pub adds a timestamp) +--subscribers int Number of subscribers to run concurrently (default 1) +--topic string Base topic (prefix) for the test (default "mqtt-test/yNkmAFnFHETSGnQJNjwGdN") +--topics int Number of sub-topics to publish retained messages to (default 1) ``` diff --git a/command-pub.go b/command-pub.go new file mode 100644 index 0000000..c42bb80 --- /dev/null +++ b/command-pub.go @@ -0,0 +1,97 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "encoding/json" + "log" + "os" + "strconv" + "time" + + "github.com/spf13/cobra" +) + +type pubCommand struct { + publisher + publishers int + timestamp bool +} + +func newPubCommand() *cobra.Command { + c := &pubCommand{} + + cmd := &cobra.Command{ + Use: "pub [--flags...]", + Short: "Publish MQTT messages", + Run: c.run, + Args: cobra.NoArgs, + } + + // Message options + cmd.Flags().StringVar(&c.topic, "topic", defaultTopic(), "Base topic (prefix) to publish into (/{n} will be added if --topics > 0)") + cmd.Flags().IntVar(&c.qos, "qos", DefaultQOS, "MQTT QOS") + cmd.Flags().IntVar(&c.size, "size", 0, "Approximate size of each message (pub adds a timestamp)") + cmd.Flags().BoolVar(&c.retain, "retain", false, "Mark each published message as retained") + cmd.Flags().BoolVar(&c.timestamp, "timestamp", false, "Prepend a timestamp to each message") + + // Test options + cmd.Flags().IntVar(&c.mps, "mps", 1000, `Publish mps messages per second; 0 means no delay`) + cmd.Flags().IntVar(&c.messages, "messages", 1, "Number of transactions to run, see the specific command") + cmd.Flags().IntVar(&c.publishers, "publishers", 1, `Number of publishers to run concurrently, at --mps each`) + cmd.Flags().IntVar(&c.topics, "topics", 0, `Cycle through NTopics appending "/{n}"`) + + return cmd +} + +func (c *pubCommand) run(_ *cobra.Command, _ []string) { + msgChan := make(chan *Stat) + errChan := make(chan error) + + for i := 0; i < c.publishers; i++ { + p := c.publisher // copy + p.clientID = ClientID + "-" + strconv.Itoa(i) + go p.publish(msgChan, errChan, c.timestamp) + } + + pubOps := 0 + pubNS := time.Duration(0) + pubBytes := int64(0) + timeout := time.NewTimer(Timeout) + defer timeout.Stop() + + // get back 1 report per publisher + for n := 0; n < c.publishers; { + select { + case stat := <-msgChan: + pubOps += stat.Ops + pubNS += stat.NS["pub"] + pubBytes += stat.Bytes + n++ + + case err := <-errChan: + log.Fatalf("Error: %v", err) + + case <-timeout.C: + log.Fatalf("Error: timeout waiting for publishers") + } + } + + bb, _ := json.Marshal(Stat{ + Ops: pubOps, + NS: map[string]time.Duration{"pub": pubNS}, + Bytes: pubBytes, + }) + os.Stdout.Write(bb) +} diff --git a/command-pubsub.go b/command-pubsub.go new file mode 100644 index 0000000..27bbaeb --- /dev/null +++ b/command-pubsub.go @@ -0,0 +1,120 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "encoding/json" + "log" + "os" + "strconv" + "time" + + "github.com/spf13/cobra" +) + +type pubsubCommand struct { + messageOpts + + messages int + subscribers int +} + +func newPubSubCommand() *cobra.Command { + c := &pubsubCommand{} + + cmd := &cobra.Command{ + Use: "pubsub [--flags...]", + Short: "Subscribe and receive N published messages", + Run: c.run, + Args: cobra.NoArgs, + } + + // Message options + cmd.Flags().IntVar(&c.messages, "messages", 1, "Number of messages to publish and receive") + cmd.Flags().StringVar(&c.topic, "topic", defaultTopic(), "Topic to publish and subscribe to") + cmd.Flags().IntVar(&c.qos, "qos", DefaultQOS, "MQTT QOS") + cmd.Flags().IntVar(&c.size, "size", 0, "Approximate size of each message (pub adds a timestamp)") + + // Test options + cmd.Flags().IntVar(&c.subscribers, "subscribers", 1, `Number of subscribers to run concurrently`) + + return cmd +} + +func (c *pubsubCommand) run(_ *cobra.Command, _ []string) { + clientID := ClientID + "-sub" + readyCh := make(chan struct{}) + errCh := make(chan error) + statsCh := make(chan *Stat) + + // Connect all subscribers (and subscribe) + for i := 0; i < c.subscribers; i++ { + r := &receiver{ + clientID: clientID + "-" + strconv.Itoa(i), + topic: c.topic, + qos: c.qos, + expectPublished: c.messages, + repeat: 1, + } + go r.receive(readyCh, statsCh, errCh) + } + + // Wait for all subscriptions to signal ready + cSub := 0 + timeout := time.NewTimer(Timeout) + defer timeout.Stop() + for cSub < c.subscribers { + select { + case <-readyCh: + cSub++ + case err := <-errCh: + log.Fatal(err) + case <-timeout.C: + log.Fatalf("timeout waiting for subscribers to be ready") + } + } + + // ready to receive, start publishing. The publisher will exit when done, no need to wait for it. + p := &publisher{ + clientID: ClientID + "-pub", + messageOpts: c.messageOpts, + messages: c.messages, + mps: 1000, + } + go p.publish(nil, errCh, true) + + // wait for the stats + total := Stat{ + NS: make(map[string]time.Duration), + } + timeout = time.NewTimer(Timeout) + defer timeout.Stop() + for i := 0; i < c.subscribers; i++ { + select { + case stat := <-statsCh: + total.Ops += stat.Ops + total.Bytes += stat.Bytes + for k, v := range stat.NS { + total.NS[k] += v + } + case err := <-errCh: + log.Fatalf("Error: %v", err) + case <-timeout.C: + log.Fatalf("Error: timeout waiting for messages") + } + } + + bb, _ := json.Marshal(total) + os.Stdout.Write(bb) +} diff --git a/command-sub.go b/command-sub.go new file mode 100644 index 0000000..73bf462 --- /dev/null +++ b/command-sub.go @@ -0,0 +1,155 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "encoding/json" + "log" + "os" + "strconv" + "time" + + "github.com/spf13/cobra" +) + +type subCommand struct { + // message options + messageOpts + + // test options + repeat int + subscribers int + expectRetained int + expectPublished int +} + +func newSubCommand() *cobra.Command { + c := &subCommand{} + + cmd := &cobra.Command{ + Use: "sub [--flags...]", + Short: "Subscribe, receive all messages, unsubscribe, {repeat} times.", + Run: c.run, + Args: cobra.NoArgs, + } + + cmd.Flags().StringVar(&c.topic, "topic", "", "Base topic for the test, will subscribe to {topic}/+") + cmd.Flags().IntVar(&c.qos, "qos", DefaultQOS, "MQTT QOS") + cmd.Flags().IntVar(&c.repeat, "repeat", 1, "Subscribe, receive retained messages, and unsubscribe N times") + cmd.Flags().IntVar(&c.subscribers, "subscribers", 1, `Number of subscribers to run concurrently`) + cmd.Flags().IntVar(&c.expectRetained, "retained", 0, `Expect to receive this many retained messages`) + cmd.Flags().IntVar(&c.expectPublished, "messages", 0, `Expect to receive this many published messages`) + + return cmd +} + +func (c *subCommand) run(_ *cobra.Command, _ []string) { + total := runSubPrepublishRetained(c.subscribers, c.repeat, c.expectRetained, c.expectPublished, c.messageOpts, false) + bb, _ := json.Marshal(total) + os.Stdout.Write(bb) +} + +func runSubPrepublishRetained( + nSubscribers int, + repeat int, + expectRetained, + expectPublished int, + messageOpts messageOpts, + prepublishRetained bool, +) *Stat { + errCh := make(chan error) + receiverReadyCh := make(chan struct{}) + statsCh := make(chan *Stat) + + if prepublishRetained { + if expectPublished != 0 { + log.Fatalf("Error: --messages is not supported with --retained") + } + + // We need to wait for all prepublished retained messages to be processed. + // To ensure, subscribe once before we pre-publish and receive all published + // messages. + r := &receiver{ + clientID: ClientID + "-sub-init", + filterPrefix: messageOpts.topic, + topic: messageOpts.topic + "/+", + qos: messageOpts.qos, + expectRetained: 0, + expectPublished: expectRetained, + repeat: 1, + } + go r.receive(receiverReadyCh, statsCh, errCh) + <-receiverReadyCh + + // Pre-publish retained messages. + p := &publisher{ + clientID: ClientID + "-pub", + messages: expectRetained, + topics: expectRetained, + messageOpts: messageOpts, + } + p.messageOpts.retain = true + go p.publish(nil, errCh, true) + + // wait for the initial subscription to have received all messages + timeout := time.NewTimer(Timeout) + defer timeout.Stop() + select { + case err := <-errCh: + log.Fatalf("Error: %v", err) + case <-timeout.C: + log.Fatalf("Error: timeout waiting for messages in initial subscription") + case <-statsCh: + // all received + } + + } + + // Connect all subscribers (and subscribe to a wildcard topic that includes + // all published retained messages). + for i := 0; i < nSubscribers; i++ { + r := &receiver{ + clientID: ClientID + "-sub-" + strconv.Itoa(i), + filterPrefix: messageOpts.topic, + topic: messageOpts.topic + "/+", + qos: messageOpts.qos, + expectRetained: expectRetained, + expectPublished: expectPublished, + repeat: repeat, + } + go r.receive(nil, statsCh, errCh) + } + + // wait for the stats + total := &Stat{ + NS: make(map[string]time.Duration), + } + timeout := time.NewTimer(Timeout) + defer timeout.Stop() + for i := 0; i < nSubscribers*repeat; i++ { + select { + case stat := <-statsCh: + total.Ops += stat.Ops + total.Bytes += stat.Bytes + for k, v := range stat.NS { + total.NS[k] += v + } + case err := <-errCh: + log.Fatalf("Error: %v", err) + case <-timeout.C: + log.Fatalf("Error: timeout waiting for messages") + } + } + return total +} diff --git a/command-subret.go b/command-subret.go new file mode 100644 index 0000000..9370bf5 --- /dev/null +++ b/command-subret.go @@ -0,0 +1,57 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "encoding/json" + "os" + + "github.com/spf13/cobra" +) + +type subretCommand struct { + // message options + messageOpts + + // test options + repeat int + subscribers int + messages int +} + +func newSubRetCommand() *cobra.Command { + c := &subretCommand{} + + cmd := &cobra.Command{ + Use: "subret [--flags...]", + Short: "Publish {topics} retained messages, subscribe {repeat} times, and receive all retained messages.", + Run: c.run, + Args: cobra.NoArgs, + } + + cmd.Flags().StringVar(&c.topic, "topic", defaultTopic(), "Base topic (prefix) for the test") + cmd.Flags().IntVar(&c.qos, "qos", DefaultQOS, "MQTT QOS") + cmd.Flags().IntVar(&c.size, "size", 0, "Approximate size of each message (pub adds a timestamp)") + cmd.Flags().IntVar(&c.repeat, "repeat", 1, "Subscribe, receive retained messages, and unsubscribe N times") + cmd.Flags().IntVar(&c.subscribers, "subscribers", 1, `Number of subscribers to run concurrently`) + cmd.Flags().IntVar(&c.messages, "topics", 1, `Number of sub-topics to publish retained messages to`) + + return cmd +} + +func (c *subretCommand) run(_ *cobra.Command, _ []string) { + total := runSubPrepublishRetained(c.subscribers, c.repeat, c.messages, 0, c.messageOpts, true) + bb, _ := json.Marshal(total) + os.Stdout.Write(bb) +} diff --git a/common.go b/common.go deleted file mode 100644 index 1a969c5..0000000 --- a/common.go +++ /dev/null @@ -1,92 +0,0 @@ -package main - -import ( - "fmt" - "log" - "math/rand" - "time" - - "github.com/spf13/cobra" -) - -const ( - Name = "mqtt-test" - Version = "v0.0.1" - - DefaultServer = "tcp://localhost:1883" - DefaultQOS = 0 - - Timeout = 10 * time.Second - DisconnectCleanupTimeout = 500 // milliseconds - -) - -var ( - ClientID string - MPS int - N int - NPublishers int - NSubscribers int - NTopics int - NTopicsStart int - Password string - QOS int - Retain bool - Servers []string - Size int - Username string - MatchTopicPrefix string - Quiet bool - Verbose bool -) - -func initPubSub(cmd *cobra.Command) *cobra.Command { - cmd.Flags().IntVar(&QOS, "qos", DefaultQOS, "MQTT QOS") - cmd.Flags().IntVar(&Size, "size", 0, "Approximate size of each message (pub adds a timestamp)") - return cmd -} - -type PubValue struct { - Seq int `json:"seq"` - Timestamp int64 `json:"timestamp"` -} - -type MQTTBenchmarkResult struct { - Ops int `json:"ops"` - NS map[string]time.Duration `json:"ns"` - Bytes int64 `json:"bytes"` -} - -func randomPayload(sz int) []byte { - const ch = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@$#%^&*()" - b := make([]byte, sz) - for i := range b { - b[i] = ch[rand.Intn(len(ch))] - } - return b -} - -func mqttVarIntLen(value int) int { - c := 0 - for ; value > 0; value >>= 7 { - c++ - } - return c -} - -func mqttPublishLen(topic string, qos byte, retained bool, msg []byte) int { - // Compute len (will have to add packet id if message is sent as QoS>=1) - pkLen := 2 + len(topic) + len(msg) - if qos > 0 { - pkLen += 2 - } - return 1 + mqttVarIntLen(pkLen) + pkLen -} - -func logOp(clientID, op string, dur time.Duration, f string, args ...interface{}) { - log.Printf("%8s %-6s %30s\t"+f, append([]any{ - fmt.Sprintf("%.3fms", float64(dur)/float64(time.Millisecond)), - op, - clientID + ":"}, - args...)...) -} diff --git a/connect.go b/connect.go index 983f96b..0eb0173 100644 --- a/connect.go +++ b/connect.go @@ -2,18 +2,35 @@ package main import ( "log" + "strings" + "sync/atomic" "time" paho "github.com/eclipse/paho.mqtt.golang" "github.com/nats-io/nuid" ) +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + const ( CleanSession = true PersistentSession = false ) -func connect(clientID string, cleanSession bool, setoptsF func(*paho.ClientOptions)) (paho.Client, func(), error) { +var nextConnectServerIndex = atomic.Uint64{} + +func connect(clientID string, cleanSession bool) (paho.Client, *Stat, func(), error) { if clientID == "" { clientID = ClientID } @@ -21,36 +38,65 @@ func connect(clientID string, cleanSession bool, setoptsF func(*paho.ClientOptio clientID = Name + "-" + nuid.Next() } - clientOpts := paho.NewClientOptions(). + parseDial := func(in string) (u, p, s, c string) { + if in == "" { + return "", "", DefaultServer, "" + } + + if i := strings.LastIndex(in, "#"); i != -1 { + c = in[i+1:] + in = in[:i] + } + + if i := strings.LastIndex(in, "@"); i != -1 { + up := in[:i] + in = in[i+1:] + u = up + if i := strings.Index(up, ":"); i != -1 { + u = up[:i] + p = up[i+1:] + } + } + + s = in + return u, p, s, c + } + + // round-robin the servers. since we start at 0 and add first, subtract 1 to + // compensate and start at 0! + next := int((nextConnectServerIndex.Add(1) - 1) % uint64(len(Servers))) + u, p, s, c := parseDial(Servers[next]) + + cl := paho.NewClient(paho.NewClientOptions(). SetClientID(clientID). SetCleanSession(cleanSession). SetProtocolVersion(4). - SetUsername(Username). - SetPassword(Password). + AddBroker(s). + SetUsername(u). + SetPassword(p). SetStore(paho.NewMemoryStore()). SetAutoReconnect(false). SetDefaultPublishHandler(func(client paho.Client, msg paho.Message) { log.Fatalf("received an unexpected message on %q (default handler)", msg.Topic()) - }) - - for _, s := range Servers { - clientOpts.AddBroker(s) - } - if setoptsF != nil { - setoptsF(clientOpts) - } - - cl := paho.NewClient(clientOpts) + })) disconnectedWG.Add(1) start := time.Now() if t := cl.Connect(); t.Wait() && t.Error() != nil { disconnectedWG.Done() - return nil, func() {}, t.Error() + return nil, nil, nil, t.Error() } - logOp(clientOpts.ClientID, "CONN", time.Since(start), "Connected to %q\n", Servers) + if c != "" { + logOp(clientID, "CONN", time.Since(start), "Connected to %q (%s)\n", s, c) + } else { + logOp(clientID, "CONN", time.Since(start), "Connected to %q\n", s) + } return cl, + &Stat{ + Ops: 1, + NS: map[string]time.Duration{"conn": time.Since(start)}, + }, func() { cl.Disconnect(DisconnectCleanupTimeout) disconnectedWG.Done() diff --git a/main.go b/main.go index 0b02bcb..5c39c37 100644 --- a/main.go +++ b/main.go @@ -1,30 +1,72 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package main import ( + "fmt" "io" "log" + "math/rand" "os" "sync" + "time" paho "github.com/eclipse/paho.mqtt.golang" + "github.com/nats-io/nuid" "github.com/spf13/cobra" ) +const ( + Name = "mqtt-test" + Version = "v0.1.0" + DefaultServer = "tcp://localhost:1883" + DefaultQOS = 0 + Timeout = 10 * time.Second + DisconnectCleanupTimeout = 500 // milliseconds +) + +var ( + ClientID string + Password string + Quiet bool + Servers []string + Username string + Verbose bool +) + +var disconnectedWG = sync.WaitGroup{} + +func main() { + _ = mainCmd.Execute() + disconnectedWG.Wait() +} + var mainCmd = &cobra.Command{ - Use: Name + " [conn|pub|sub|...|] [--flags...]", - Short: "MQTT Test/Benchmark Utility", + Use: Name + " [pub|sub|subret|...] [--flags...]", + Short: "MQTT Test and Benchmark Utility", Version: Version, } func init() { - mainCmd.PersistentFlags().StringVar(&ClientID, "id", "", "MQTT client ID") - mainCmd.PersistentFlags().StringArrayVarP(&Servers, "servers", "s", []string{DefaultServer}, "MQTT servers endpoint as host:port") - mainCmd.PersistentFlags().StringVarP(&Username, "username", "u", "", "MQTT client username (empty if auth disabled)") - mainCmd.PersistentFlags().StringVarP(&Password, "password", "p", "", "MQTT client password (empty if auth disabled)") - mainCmd.PersistentFlags().IntVarP(&N, "n", "n", 1, "Number of transactions to run, see the specific command") + mainCmd.PersistentFlags().StringVar(&ClientID, "id", Name+"-"+nuid.Next(), "MQTT client ID") + mainCmd.PersistentFlags().StringArrayVarP(&Servers, "server", "s", []string{DefaultServer}, "MQTT endpoint as username:password@host:port") mainCmd.PersistentFlags().BoolVarP(&Quiet, "quiet", "q", false, "Quiet mode, only print results") mainCmd.PersistentFlags().BoolVarP(&Verbose, "very-verbose", "v", false, "Very verbose, print everything we can") + mainCmd.PersistentFlags().StringArrayVar(&Servers, "servers", []string{DefaultServer}, "MQTT endpoint as username:password@host:port") + mainCmd.PersistentFlags().MarkDeprecated("servers", "please use server instead.") + mainCmd.PersistentPreRun = func(cmd *cobra.Command, args []string) { paho.CRITICAL = log.New(os.Stderr, "[MQTT CRIT] ", 0) if Quiet { @@ -39,11 +81,56 @@ func init() { paho.DEBUG = log.New(os.Stderr, "[MQTT DEBUG] ", 0) } } + + mainCmd.AddCommand(newPubCommand()) + mainCmd.AddCommand(newPubSubCommand()) + mainCmd.AddCommand(newSubCommand()) + mainCmd.AddCommand(newSubRetCommand()) } -var disconnectedWG = sync.WaitGroup{} +type PubValue struct { + Seq int `json:"seq"` + Timestamp int64 `json:"timestamp"` +} -func main() { - _ = mainCmd.Execute() - disconnectedWG.Wait() +type Stat struct { + Ops int `json:"ops"` + NS map[string]time.Duration `json:"ns"` + Bytes int64 `json:"bytes"` +} + +func randomPayload(sz int) []byte { + const ch = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@$#%^&*()" + b := make([]byte, sz) + for i := range b { + b[i] = ch[rand.Intn(len(ch))] + } + return b +} + +func mqttVarIntLen(value int) int { + c := 0 + for ; value > 0; value >>= 7 { + c++ + } + return c +} + +func mqttPublishLen(topic string, qos byte, retained bool, msg []byte) int { + // Compute len (will have to add packet id if message is sent as QoS>=1) + pkLen := 2 + len(topic) + len(msg) + if qos > 0 { + pkLen += 2 + } + return 1 + mqttVarIntLen(pkLen) + pkLen +} + +func defaultTopic() string { return Name + "/" + nuid.Next() } + +func logOp(clientID, op string, dur time.Duration, f string, args ...interface{}) { + log.Printf("%8s %-6s %30s\t"+f, append([]any{ + fmt.Sprintf("%.3fms", float64(dur)/float64(time.Millisecond)), + op, + clientID + ":"}, + args...)...) } diff --git a/pub.go b/pub.go deleted file mode 100644 index aadc1d0..0000000 --- a/pub.go +++ /dev/null @@ -1,144 +0,0 @@ -package main - -import ( - "encoding/json" - "log" - "os" - "strconv" - "time" - - paho "github.com/eclipse/paho.mqtt.golang" - "github.com/nats-io/nuid" - "github.com/spf13/cobra" -) - -func init() { - cmd := initPubSub(&cobra.Command{ - Use: "pub [--flags...]", - Short: "Publish N messages", - Run: runPub, - Args: cobra.NoArgs, - }) - - cmd.Flags().IntVar(&NPublishers, "num-publishers", 1, `Number of publishers to run concurrently, at --mps each`) - cmd.Flags().IntVar(&NTopics, "num-topics", 0, `Cycle through NTopics appending "-{n}" where n starts with --num-topics-start; 0 means use --topic`) - cmd.Flags().IntVar(&NTopicsStart, "num-topics-start", 0, `Start topic suffixes with this number (default 0)`) - cmd.Flags().IntVar(&MPS, "mps", 1000, `Publish mps messages per second; 0 means no delay`) - cmd.Flags().BoolVar(&Retain, "retain", false, "Mark each published message as retained") - - mainCmd.AddCommand(cmd) -} - -func runPub(_ *cobra.Command, _ []string) { - clientID := ClientID - if clientID == "" { - clientID = Name + "-pub-" + nuid.Next() - } - topic := "/" + Name + "/" + nuid.Next() - msgChan := make(chan *MQTTBenchmarkResult) - errChan := make(chan error) - - for i := 0; i < NPublishers; i++ { - id := clientID + "-" + strconv.Itoa(i) - go func() { - cl, cleanup, err := connect(id, CleanSession, nil) - if err != nil { - log.Fatal(err) - } - defer cleanup() - - r, err := publish(cl, topic) - if err == nil { - msgChan <- r - } else { - errChan <- err - } - }() - } - - // wait for messages to arrive - pubOps := 0 - pubNS := time.Duration(0) - pubBytes := int64(0) - timeout := time.NewTimer(Timeout) - defer timeout.Stop() - - // get back 1 report per publisher - for n := 0; n < NPublishers; { - select { - case r := <-msgChan: - pubOps += r.Ops - pubNS += r.NS["pub"] - pubBytes += r.Bytes - n++ - - case err := <-errChan: - log.Fatalf("Error: %v", err) - - case <-timeout.C: - log.Fatalf("Error: timeout waiting for publishers") - } - } - - bb, _ := json.Marshal(MQTTBenchmarkResult{ - Ops: pubOps, - NS: map[string]time.Duration{"pub": pubNS}, - Bytes: pubBytes, - }) - os.Stdout.Write(bb) -} - -func publish(cl paho.Client, topic string) (*MQTTBenchmarkResult, error) { - opts := cl.OptionsReader() - start := time.Now() - var elapsed time.Duration - bc := 0 - iTopic := 0 - - for n := 0; n < N; n++ { - now := time.Now() - if n > 0 && MPS > 0 { - next := start.Add(time.Duration(n) * time.Second / time.Duration(MPS)) - time.Sleep(next.Sub(now)) - } - - // payload always starts with JSON containing timestamp, etc. The JSON - // is always terminated with a '-', which can not be part of the random - // fill. payload is then filled to the requested size with random data. - payload := randomPayload(Size) - structuredPayload, _ := json.Marshal(PubValue{ - Seq: n, - Timestamp: time.Now().UnixNano(), - }) - structuredPayload = append(structuredPayload, '\n') - if len(structuredPayload) > len(payload) { - payload = structuredPayload - } else { - copy(payload, structuredPayload) - } - - currTopic := topic - if NTopics > 0 { - currTopic = topic + "-" + strconv.Itoa(iTopic+NTopicsStart) - iTopic++ - if iTopic >= NTopics { - iTopic = 0 - } - } - - startPublish := time.Now() - if token := cl.Publish(currTopic, byte(QOS), Retain, payload); token.Wait() && token.Error() != nil { - return nil, token.Error() - } - elapsedPublish := time.Since(startPublish) - elapsed += elapsedPublish - logOp(opts.ClientID(), "PUB <-", elapsedPublish, "Published: %d bytes to %q, qos:%v, retain:%v", len(payload), currTopic, QOS, Retain) - bc += mqttPublishLen(currTopic, byte(QOS), Retain, payload) - } - - return &MQTTBenchmarkResult{ - Ops: N, - NS: map[string]time.Duration{"pub": elapsed}, - Bytes: int64(bc), - }, nil -} diff --git a/publish.go b/publish.go new file mode 100644 index 0000000..4528b75 --- /dev/null +++ b/publish.go @@ -0,0 +1,101 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "encoding/json" + "log" + "strconv" + "time" +) + +// Message options +type messageOpts struct { + qos int + retain bool + size int + topic string +} + +type publisher struct { + messageOpts + + mps int + messages int + topics int + clientID string +} + +func (p *publisher) publish(msgCh chan *Stat, errorCh chan error, timestamp bool) { + cl, _, cleanup, err := connect(p.clientID, CleanSession) + if err != nil { + log.Fatal(err) + } + defer cleanup() + + opts := cl.OptionsReader() + start := time.Now() + var elapsed time.Duration + bc := 0 + iTopic := 0 + + for n := 0; n < p.messages; n++ { + now := time.Now() + if n > 0 && p.mps > 0 { + next := start.Add(time.Duration(n) * time.Second / time.Duration(p.mps)) + time.Sleep(next.Sub(now)) + } + + // payload always starts with JSON containing timestamp, etc. The JSON + // is always terminated with a '-', which can not be part of the random + // fill. payload is then filled to the requested size with random data. + payload := randomPayload(p.size) + if timestamp { + structuredPayload, _ := json.Marshal(PubValue{ + Seq: n, + Timestamp: time.Now().UnixNano(), + }) + structuredPayload = append(structuredPayload, '\n') + if len(structuredPayload) > len(payload) { + payload = structuredPayload + } else { + copy(payload, structuredPayload) + } + } + + currTopic := p.topic + if p.topics > 0 { + currTopic = p.topic + "/" + strconv.Itoa(iTopic) + iTopic = (iTopic + 1) % p.topics + } + + startPublish := time.Now() + if token := cl.Publish(currTopic, byte(p.qos), p.retain, payload); token.Wait() && token.Error() != nil { + errorCh <- token.Error() + return + } + elapsedPublish := time.Since(startPublish) + elapsed += elapsedPublish + logOp(opts.ClientID(), "PUB <-", elapsedPublish, "Published: %d bytes to %q, qos:%v, retain:%v", len(payload), currTopic, p.qos, p.retain) + bc += mqttPublishLen(currTopic, byte(p.qos), p.retain, payload) + } + + if msgCh != nil { + msgCh <- &Stat{ + Ops: p.messages, + NS: map[string]time.Duration{"pub": elapsed}, + Bytes: int64(bc), + } + } +} diff --git a/pubsub.go b/pubsub.go deleted file mode 100644 index 91422cc..0000000 --- a/pubsub.go +++ /dev/null @@ -1,169 +0,0 @@ -package main - -import ( - "bytes" - "encoding/json" - "fmt" - "log" - "os" - "strconv" - "time" - - paho "github.com/eclipse/paho.mqtt.golang" - "github.com/nats-io/nuid" - "github.com/spf13/cobra" -) - -func init() { - cmd := initPubSub(&cobra.Command{ - Use: "pubsub [--flags...]", - Short: "Subscribe and receive N published messages", - Run: runPubSub, - Args: cobra.NoArgs, - }) - - cmd.Flags().IntVar(&MPS, "mps", 1000, `Publish mps messages per second; 0 means all ASAP`) - cmd.Flags().IntVar(&NSubscribers, "num-subscribers", 1, `Number of subscribers to run concurrently`) - - mainCmd.AddCommand(cmd) -} - -func pubsubMsgHandler(topic string, errChan chan error, msgChan chan MQTTBenchmarkResult, -) func(paho.Client, paho.Message) { - return func(client paho.Client, msg paho.Message) { - opts := client.OptionsReader() - clientID := opts.ClientID() - switch { - case msg.Topic() != topic: - log.Printf("Received a QOS %d message on unexpected topic: %s\n", msg.Qos(), msg.Topic()) - // ignore - - case msg.Duplicate(): - errChan <- fmt.Errorf("received unexpected duplicate message") - return - - case msg.Retained(): - errChan <- fmt.Errorf("received unexpected retained message") - return - } - - v := PubValue{} - body := msg.Payload() - if i := bytes.IndexByte(body, '\n'); i != -1 { - body = body[:i] - } - if err := json.Unmarshal(body, &v); err != nil { - log.Fatalf("Error parsing message JSON: %v", err) - } - elapsed := time.Since(time.Unix(0, v.Timestamp)) - msgChan <- MQTTBenchmarkResult{ - Ops: 1, - Bytes: int64(len(msg.Payload())), - NS: map[string]time.Duration{"receive": elapsed}, - } - logOp(clientID, "REC ->", elapsed, "Received %d bytes on %q, qos:%v", len(msg.Payload()), msg.Topic(), QOS) - } -} - -func runPubSub(_ *cobra.Command, _ []string) { - clientID := ClientID - if clientID == "" { - clientID = Name + "-sub-" + nuid.Next() - } - topic := "/" + Name + "/" + nuid.Next() - - clientCleanupChan := make(chan func()) - subsChan := make(chan struct{}) - msgChan := make(chan MQTTBenchmarkResult) - errChan := make(chan error) - - // Connect all subscribers (and subscribe) - for i := 0; i < NSubscribers; i++ { - id := clientID + "-" + strconv.Itoa(i) - go func() { - _, cleanup, err := connect(id, CleanSession, func(opts *paho.ClientOptions) { - opts. - SetOnConnectHandler(func(cl paho.Client) { - start := time.Now() - token := cl.Subscribe(topic, byte(QOS), pubsubMsgHandler(topic, errChan, msgChan)) - if token.Wait() && token.Error() != nil { - errChan <- token.Error() - return - } - logOp(id, "SUB", time.Since(start), "Subscribed to %q", topic) - subsChan <- struct{}{} - }). - SetDefaultPublishHandler(func(client paho.Client, msg paho.Message) { - errChan <- fmt.Errorf("received an unexpected message on %q", msg.Topic()) - }) - }) - if err != nil { - errChan <- err - } else { - clientCleanupChan <- cleanup - } - }() - } - - cConn, cSub := 0, 0 - timeout := time.NewTimer(Timeout) - defer timeout.Stop() - for (cConn < NSubscribers) || (cSub < NSubscribers) { - select { - case cleanup := <-clientCleanupChan: - defer cleanup() - cConn++ - case <-subsChan: - cSub++ - case err := <-errChan: - log.Fatal(err) - case <-timeout.C: - log.Fatalf("timeout waiting for connections") - } - } - - // ready to receive, start publishing. The publisher will exit when done, no need to wait for it. - clientID = ClientID - if clientID == "" { - clientID = Name + "-pub-" + nuid.Next() - } else { - clientID = clientID + "-pub" - } - go func() { - cl, cleanup, err := connect(clientID, CleanSession, nil) - defer cleanup() - if err == nil { - _, err = publish(cl, topic) - } - if err != nil { - errChan <- err - } - }() - - // wait for messages to arrive - elapsed := time.Duration(0) - bc := int64(0) - timeout = time.NewTimer(Timeout) - defer timeout.Stop() - for n := 0; n < N*NSubscribers; { - select { - case r := <-msgChan: - elapsed += r.NS["receive"] - bc += r.Bytes - n++ - - case err := <-errChan: - log.Fatalf("Error: %v", err) - - case <-timeout.C: - log.Fatalf("Error: timeout waiting for messages") - } - } - - bb, _ := json.Marshal(MQTTBenchmarkResult{ - Ops: N * NSubscribers, - NS: map[string]time.Duration{"receive": elapsed}, - Bytes: bc, - }) - os.Stdout.Write(bb) -} diff --git a/receive.go b/receive.go new file mode 100644 index 0000000..ae7ecb1 --- /dev/null +++ b/receive.go @@ -0,0 +1,153 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "log" + "strings" + "sync/atomic" + "time" + + paho "github.com/eclipse/paho.mqtt.golang" +) + +type receiver struct { + clientID string // MQTT client ID. + topic string // Subscription topic. + filterPrefix string // Only count messages if their topic starts with the prefix. + qos int // MQTT QOS for the subscription. + expectRetained int // expect to receive this many retained messages. + expectPublished int // expect to receive this many published messages. + repeat int // Number of times to repeat subscribe/receive/unsubscribe. + + cRetained atomic.Int32 // Count of retained messages received. + cPublished atomic.Int32 // Count of published messages received. + durPublished atomic.Int64 // Total duration of published messages received (measured from the sent timestamp in the message). + bc atomic.Int64 // Byte count of all messages received. + + start time.Time + errCh chan error + statCh chan *Stat +} + +func (r *receiver) receive(readyCh chan struct{}, statCh chan *Stat, errCh chan error) { + r.errCh = errCh + r.statCh = make(chan *Stat) + if r.filterPrefix == "" { + r.filterPrefix = r.topic + } + + cl, _, cleanup, err := connect(r.clientID, CleanSession) + if err != nil { + errCh <- err + return + } + + for i := 0; i < r.repeat; i++ { + // Reset the stats for each iteration. + r.start = time.Now() + r.cRetained.Store(0) + r.cPublished.Store(0) + r.durPublished.Store(0) + r.bc.Store(0) + + token := cl.Subscribe(r.topic, byte(r.qos), r.msgHandler) + if token.Wait() && token.Error() != nil { + errCh <- token.Error() + return + } + logOp(r.clientID, "SUB", time.Since(r.start), "Subscribed to %q", r.topic) + if readyCh != nil { + readyCh <- struct{}{} + } + + // wait for the stat value, then clean up and forward it to the caller. Errors are handled by the caller. + stat := <-r.statCh + statCh <- stat + + token = cl.Unsubscribe(r.topic) + if token.Wait() && token.Error() != nil { + errCh <- token.Error() + return + } + } + cleanup() +} + +func (r *receiver) msgHandler(client paho.Client, msg paho.Message) { + opts := client.OptionsReader() + clientID := opts.ClientID() + switch { + case !strings.HasPrefix(msg.Topic(), r.filterPrefix): + log.Printf("Received a QOS %d message on unexpected topic: %s\n", msg.Qos(), msg.Topic()) + return + + case msg.Duplicate(): + r.errCh <- fmt.Errorf("received unexpected duplicate message") + return + + case msg.Retained(): + newC := r.cRetained.Add(1) + if newC > int32(r.expectRetained) { + r.errCh <- fmt.Errorf("received unexpected retained message") + return + } + logOp(clientID, "RRET ->", time.Since(r.start), "Received %d bytes on %q, qos:%v", len(msg.Payload()), msg.Topic(), msg.Qos()) + r.bc.Add(int64(len(msg.Payload()))) + + if newC < int32(r.expectRetained) { + return + } + elapsed := time.Since(r.start) + r.statCh <- &Stat{ + Ops: 1, + NS: map[string]time.Duration{fmt.Sprintf("rec%vret", r.expectRetained): elapsed}, + Bytes: r.bc.Load(), + } + return + + default: + newC := r.cPublished.Add(1) + if newC > int32(r.expectPublished) { + r.errCh <- fmt.Errorf("received unexpected published message: dup:%v, topic: %s, qos:%v, retained:%v, payload: %q", + msg.Duplicate(), msg.Topic(), msg.Qos(), msg.Retained(), msg.Payload()) + return + } + + v := PubValue{} + body := msg.Payload() + if i := bytes.IndexByte(body, '\n'); i != -1 { + body = body[:i] + } + if err := json.Unmarshal(body, &v); err != nil { + log.Fatalf("Error parsing message JSON: %v", err) + } + elapsed := time.Since(time.Unix(0, v.Timestamp)) + logOp(clientID, "RPUB ->", elapsed, "Received %d bytes on %q, qos:%v", len(msg.Payload()), msg.Topic(), msg.Qos()) + + dur := r.durPublished.Add(int64(elapsed)) + bb := r.bc.Add(int64(len(msg.Payload()))) + if newC < int32(r.expectPublished) { + return + } + r.statCh <- &Stat{ + Ops: r.expectPublished, + Bytes: bb, + NS: map[string]time.Duration{"receive": time.Duration(dur)}, + } + } +} diff --git a/subret.go b/subret.go deleted file mode 100644 index d72e8ce..0000000 --- a/subret.go +++ /dev/null @@ -1,188 +0,0 @@ -package main - -import ( - "encoding/json" - "log" - "os" - "strconv" - "strings" - "sync/atomic" - "time" - - paho "github.com/eclipse/paho.mqtt.golang" - "github.com/nats-io/nuid" - "github.com/spf13/cobra" -) - -func init() { - cmd := initPubSub(&cobra.Command{ - Use: "subret [--flags...]", - Short: "Subscribe N times, and receive NTopics retained messages", - Run: runSubRet, - Args: cobra.NoArgs, - }) - - cmd.Flags().IntVar(&NSubscribers, "num-subscribers", 1, `Number of subscribers to run concurrently`) - cmd.Flags().IntVar(&NTopics, "num-topics", 0, `Use this many topics with retained messages`) - - mainCmd.AddCommand(cmd) -} - -func subretMsgHandler( - topicPrefix string, - expectNRetained int, - start time.Time, - doneChan chan MQTTBenchmarkResult, -) func(paho.Client, paho.Message) { - var cRetained atomic.Int32 - var bc atomic.Int64 - return func(client paho.Client, msg paho.Message) { - opts := client.OptionsReader() - clientID := opts.ClientID() - switch { - case !strings.HasPrefix(msg.Topic(), topicPrefix): - log.Printf("Received a QOS %d message on unexpected topic: %s\n", msg.Qos(), msg.Topic()) - // ignore - - case msg.Duplicate(): - log.Fatal("received unexpected duplicate message") - return - - case msg.Retained(): - newC := cRetained.Add(1) - bc.Add(int64(len(msg.Payload()))) - switch { - case newC < int32(expectNRetained): - logOp(clientID, "REC ->", time.Since(start), "Received %d bytes on %q, qos:%v", len(msg.Payload()), msg.Topic(), QOS) - // skip it - return - case newC > int32(expectNRetained): - log.Fatal("received unexpected retained message") - default: // last expected retained message - elapsed := time.Since(start) - r := MQTTBenchmarkResult{ - Ops: 1, - NS: map[string]time.Duration{"receive": elapsed}, - Bytes: bc.Load(), - } - doneChan <- r - } - } - } -} - -func runSubRet(_ *cobra.Command, _ []string) { - topic := "/" + Name + "/" + nuid.Next() - - clientID := ClientID - if clientID == "" { - clientID = Name + "-pub-" + nuid.Next() - } else { - clientID = clientID + "-pub" - } - nTopics := NTopics - if nTopics == 0 { - nTopics = 1 - } - - // Publish NTopics retained messages, 1 per topic; Use at least QoS1 to - // ensure the retained messages are fully processed by the time the - // publisher exits. - cl, disconnect, err := connect(clientID, CleanSession, nil) - if err != nil { - log.Fatal("Error connecting: ", err) - } - for i := 0; i < nTopics; i++ { - t := topic + "/" + strconv.Itoa(i) - payload := randomPayload(Size) - start := time.Now() - publishQOS := 1 - if publishQOS < QOS { - publishQOS = QOS - } - if token := cl.Publish(t, byte(publishQOS), true, payload); token.Wait() && token.Error() != nil { - log.Fatal("Error publishing: ", token.Error()) - } - logOp(clientID, "PUB <-", time.Since(start), "Published: %d bytes to %q, qos:%v, retain:%v", len(payload), t, QOS, true) - } - disconnect() - - // Now subscribe and verify that all subs receive all messages - clientID = ClientID - if clientID == "" { - clientID = Name + "-sub-" + nuid.Next() - } - - // Connect all subscribers (and subscribe to a wildcard topic that includes - // all published retained messages). - doneChan := make(chan MQTTBenchmarkResult) - for i := 0; i < NSubscribers; i++ { - id := clientID + "-" + strconv.Itoa(i) - prefix := topic - t := topic + "/+" - go subscribeAndReceiveRetained(id, t, prefix, N, nTopics, doneChan) - } - - var cDone int - total := MQTTBenchmarkResult{ - NS: map[string]time.Duration{}, - } - timeout := time.NewTimer(Timeout) - defer timeout.Stop() - - for cDone < NSubscribers { - select { - case r := <-doneChan: - total.Ops += r.Ops - total.NS["sub"] += r.NS["sub"] - total.NS["receive"] += r.NS["receive"] - total.Bytes += r.Bytes - cDone++ - case <-timeout.C: - log.Fatalf("timeout waiting for connections") - } - } - - bb, _ := json.Marshal(total) - os.Stdout.Write(bb) -} - -func subscribeAndReceiveRetained(id string, subTopic string, pubTopicPrefix string, n, expected int, doneChan chan MQTTBenchmarkResult) { - subNS := time.Duration(0) - receiveNS := time.Duration(0) - receiveBytes := int64(0) - cl, cleanup, err := connect(id, CleanSession, nil) - if err != nil { - log.Fatal(err) - } - defer cleanup() - - for i := 0; i < n; i++ { - start := time.Now() - doneChan := make(chan MQTTBenchmarkResult) - token := cl.Subscribe(subTopic, byte(QOS), - subretMsgHandler(pubTopicPrefix, expected, start, doneChan)) - if token.Wait() && token.Error() != nil { - log.Fatal(token.Error()) - } - subElapsed := time.Since(start) - subNS += subElapsed - logOp(id, "SUB", subElapsed, "Subscribed to %q", subTopic) - - r := <-doneChan - receiveNS += r.NS["receive"] - receiveBytes += r.Bytes - logOp(id, "SUBRET", r.NS["receive"], "Received %d messages (%d bytes) on %q", expected, r.Bytes, subTopic) - - // Unsubscribe - if token = cl.Unsubscribe(subTopic); token.Wait() && token.Error() != nil { - log.Fatal(token.Error()) - } - } - - doneChan <- MQTTBenchmarkResult{ - Ops: N, - NS: map[string]time.Duration{"sub": subNS, "receive": receiveNS}, - Bytes: receiveBytes, - } -}