Skip to content

Commit

Permalink
Refactored, added sub (#2)
Browse files Browse the repository at this point in the history
* Refactored, added sub

* Added (c) notes

* Updated README.md

* Fixed subret synchronization

* Updated server ref for CI

* Updated server ref for CI, 2
  • Loading branch information
levb authored Feb 15, 2024
1 parent 4dd571c commit 54ed0c1
Show file tree
Hide file tree
Showing 14 changed files with 895 additions and 654 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ jobs:
with:
repository: nats-io/nats-server
path: go/src/github.com/nats-io/nats-server
ref: lev-mqttex-test-retained

- name: Build and install
shell: bash --noprofile --norc -x -eo pipefail {0}
Expand All @@ -34,4 +35,4 @@ jobs:
shell: bash --noprofile --norc -x -eo pipefail {0}
run: |
cd go/src/github.com/nats-io/nats-server
go test -v --run='-' --bench 'MQTTEx' --benchtime=100x ./server | tee /tmp/current-bench-result.txt
go test -v --run='MQTTEx' --bench 'MQTTEx' --benchtime=100x ./server
83 changes: 50 additions & 33 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,64 +7,81 @@ Outputs JSON results that can be reported in a `go test --bench` wrapper.
##### Subcommands and common flags

```sh
mqtt-test [pub|pubsub|subret] [flags...]
mqtt-test [pub|sub|pubsub|subret] [flags...]
```

Available Commands:
- [pub](#pub) - Publish N messages
- [pubsub](#pubsub) - Subscribe and receive N published messages
- [subret](#subret) - Subscribe N times, and receive NTopics retained messages
Common Flags:
- [pub](#pub) - Publish MQTT messages.
- [sub](#sub) - Subscribe, receive all messages, unsubscribe, {repeat} times.
- [pubsub](#pubsub) - Subscribe and receive published messages.
- [subret](#subret) - Publish {topics} retained messages, subscribe {repeat} times, and receive all retained messages.

Common flags:
```
-h, --help help for mqtt-test
--id string MQTT client ID
-n, --n int Number of transactions to run, see the specific command (default 1)
-p, --password string MQTT client password (empty if auth disabled)
-q, --quiet Quiet mode, only print results
-s, --servers stringArray MQTT servers endpoint as host:port (default [tcp://localhost:1883])
-u, --username string MQTT client username (empty if auth disabled)
--version version for mqtt-test
-v, --very-verbose Very verbose, print everything we can
-h, --help help for mqtt-test
--id string MQTT client ID (default "mqtt-test-bssJjZUs1vhTvf6KpTpTLw")
-q, --quiet Quiet mode, only print results
-s, --server stringArray MQTT endpoint as username:password@host:port (default [tcp://localhost:1883])
--version version for mqtt-test
-v, --very-verbose Very verbose, print everything we can
```

##### pub

Publishes N messages using the flags and reports the results. Used with `--num-publishers` can run several concurrent publish connections.
Publishes messages using the flags and reports the results. Used with `--publishers` can run several concurrent publish connections.

Flags:

```
--messages int Number of transactions to run, see the specific command (default 1)
--mps int Publish mps messages per second; 0 means no delay (default 1000)
--publishers int Number of publishers to run concurrently, at --mps each (default 1)
--qos int MQTT QOS
--retain Mark each published message as retained
--size int Approximate size of each message (pub adds a timestamp)
--timestamp Prepend a timestamp to each message
--topic string Base topic (prefix) to publish into (/{n} will be added if --topics > 0) (default "mqtt-test/fIqfOq5Lg5wk636V4sLXoc")
--topics int Cycle through NTopics appending "/{n}"
```

##### sub

Subscribe, receive all expected messages, unsubscribe, {repeat} times.

Flags:

```
--mps int Publish mps messages per second; 0 means no delay (default 1000)
--num-publishers int Number of publishers to run concurrently, at --mps each (default 1)
--num-topics int Cycle through NTopics appending "-{n}" where n starts with --num-topics-start; 0 means use --topic
--num-topics-start int Start topic suffixes with this number (default 0)
--qos int MQTT QOS
--retain Mark each published message as retained
--size int Approximate size of each message (pub adds a timestamp)
--topic string MQTT topic
--messages int Expect to receive this many published messages
--qos int MQTT QOS
--repeat int Subscribe, receive retained messages, and unsubscribe N times (default 1)
--retained int Expect to receive this many retained messages
--subscribers int Number of subscribers to run concurrently (default 1)
--topic string Base topic for the test, will subscribe to {topic}/+
```

##### pubsub

Publishes N messages, and waits for all of them to be received by subscribers. Measures end-end delivery time on the messages. Used with `--num-subscribers` can run several concurrent subscriber connections.

```
--mps int Publish mps messages per second; 0 means all ASAP (default 1000)
--num-subscribers int Number of subscribers to run concurrently (default 1)
--qos int MQTT QOS
--size int Approximate size of each message (pub adds a timestamp)
--topic string MQTT topic
--messages int Number of messages to publish and receive (default 1)
--qos int MQTT QOS
--size int Approximate size of each message (pub adds a timestamp)
--subscribers int Number of subscribers to run concurrently (default 1)
--topic string Topic to publish and subscribe to (default "mqtt-test/JPrbNU6U3IbVQLIyazkP4y")
```

##### subret

Publishes retained messages into NTopics, then subscribes to a wildcard with all
topics N times. Measures time to SUBACK and to all retained messages received.
Used with `--num-subscribers` can run several concurrent subscriber connections.
Used with `--subscribers` can run several concurrent subscriber connections.

```
--num-subscribers int Number of subscribers to run concurrently (default 1)
--num-topics int Use this many topics with retained messages
--qos int MQTT QOS
--size int Approximate size of each message (pub adds a timestamp)
--qos int MQTT QOS
--repeat int Subscribe, receive retained messages, and unsubscribe N times (default 1)
--size int Approximate size of each message (pub adds a timestamp)
--subscribers int Number of subscribers to run concurrently (default 1)
--topic string Base topic (prefix) for the test (default "mqtt-test/yNkmAFnFHETSGnQJNjwGdN")
--topics int Number of sub-topics to publish retained messages to (default 1)
```
97 changes: 97 additions & 0 deletions command-pub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"encoding/json"
"log"
"os"
"strconv"
"time"

"github.com/spf13/cobra"
)

type pubCommand struct {
publisher
publishers int
timestamp bool
}

func newPubCommand() *cobra.Command {
c := &pubCommand{}

cmd := &cobra.Command{
Use: "pub [--flags...]",
Short: "Publish MQTT messages",
Run: c.run,
Args: cobra.NoArgs,
}

// Message options
cmd.Flags().StringVar(&c.topic, "topic", defaultTopic(), "Base topic (prefix) to publish into (/{n} will be added if --topics > 0)")
cmd.Flags().IntVar(&c.qos, "qos", DefaultQOS, "MQTT QOS")
cmd.Flags().IntVar(&c.size, "size", 0, "Approximate size of each message (pub adds a timestamp)")
cmd.Flags().BoolVar(&c.retain, "retain", false, "Mark each published message as retained")
cmd.Flags().BoolVar(&c.timestamp, "timestamp", false, "Prepend a timestamp to each message")

// Test options
cmd.Flags().IntVar(&c.mps, "mps", 1000, `Publish mps messages per second; 0 means no delay`)
cmd.Flags().IntVar(&c.messages, "messages", 1, "Number of transactions to run, see the specific command")
cmd.Flags().IntVar(&c.publishers, "publishers", 1, `Number of publishers to run concurrently, at --mps each`)
cmd.Flags().IntVar(&c.topics, "topics", 0, `Cycle through NTopics appending "/{n}"`)

return cmd
}

func (c *pubCommand) run(_ *cobra.Command, _ []string) {
msgChan := make(chan *Stat)
errChan := make(chan error)

for i := 0; i < c.publishers; i++ {
p := c.publisher // copy
p.clientID = ClientID + "-" + strconv.Itoa(i)
go p.publish(msgChan, errChan, c.timestamp)
}

pubOps := 0
pubNS := time.Duration(0)
pubBytes := int64(0)
timeout := time.NewTimer(Timeout)
defer timeout.Stop()

// get back 1 report per publisher
for n := 0; n < c.publishers; {
select {
case stat := <-msgChan:
pubOps += stat.Ops
pubNS += stat.NS["pub"]
pubBytes += stat.Bytes
n++

case err := <-errChan:
log.Fatalf("Error: %v", err)

case <-timeout.C:
log.Fatalf("Error: timeout waiting for publishers")
}
}

bb, _ := json.Marshal(Stat{
Ops: pubOps,
NS: map[string]time.Duration{"pub": pubNS},
Bytes: pubBytes,
})
os.Stdout.Write(bb)
}
120 changes: 120 additions & 0 deletions command-pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"encoding/json"
"log"
"os"
"strconv"
"time"

"github.com/spf13/cobra"
)

type pubsubCommand struct {
messageOpts

messages int
subscribers int
}

func newPubSubCommand() *cobra.Command {
c := &pubsubCommand{}

cmd := &cobra.Command{
Use: "pubsub [--flags...]",
Short: "Subscribe and receive N published messages",
Run: c.run,
Args: cobra.NoArgs,
}

// Message options
cmd.Flags().IntVar(&c.messages, "messages", 1, "Number of messages to publish and receive")
cmd.Flags().StringVar(&c.topic, "topic", defaultTopic(), "Topic to publish and subscribe to")
cmd.Flags().IntVar(&c.qos, "qos", DefaultQOS, "MQTT QOS")
cmd.Flags().IntVar(&c.size, "size", 0, "Approximate size of each message (pub adds a timestamp)")

// Test options
cmd.Flags().IntVar(&c.subscribers, "subscribers", 1, `Number of subscribers to run concurrently`)

return cmd
}

func (c *pubsubCommand) run(_ *cobra.Command, _ []string) {
clientID := ClientID + "-sub"
readyCh := make(chan struct{})
errCh := make(chan error)
statsCh := make(chan *Stat)

// Connect all subscribers (and subscribe)
for i := 0; i < c.subscribers; i++ {
r := &receiver{
clientID: clientID + "-" + strconv.Itoa(i),
topic: c.topic,
qos: c.qos,
expectPublished: c.messages,
repeat: 1,
}
go r.receive(readyCh, statsCh, errCh)
}

// Wait for all subscriptions to signal ready
cSub := 0
timeout := time.NewTimer(Timeout)
defer timeout.Stop()
for cSub < c.subscribers {
select {
case <-readyCh:
cSub++
case err := <-errCh:
log.Fatal(err)
case <-timeout.C:
log.Fatalf("timeout waiting for subscribers to be ready")
}
}

// ready to receive, start publishing. The publisher will exit when done, no need to wait for it.
p := &publisher{
clientID: ClientID + "-pub",
messageOpts: c.messageOpts,
messages: c.messages,
mps: 1000,
}
go p.publish(nil, errCh, true)

// wait for the stats
total := Stat{
NS: make(map[string]time.Duration),
}
timeout = time.NewTimer(Timeout)
defer timeout.Stop()
for i := 0; i < c.subscribers; i++ {
select {
case stat := <-statsCh:
total.Ops += stat.Ops
total.Bytes += stat.Bytes
for k, v := range stat.NS {
total.NS[k] += v
}
case err := <-errCh:
log.Fatalf("Error: %v", err)
case <-timeout.C:
log.Fatalf("Error: timeout waiting for messages")
}
}

bb, _ := json.Marshal(total)
os.Stdout.Write(bb)
}
Loading

0 comments on commit 54ed0c1

Please sign in to comment.