Skip to content

Commit

Permalink
v0.2.0 candidate
Browse files Browse the repository at this point in the history
  • Loading branch information
levb committed Feb 27, 2024
1 parent 54ed0c1 commit 8926d09
Show file tree
Hide file tree
Showing 10 changed files with 482 additions and 462 deletions.
33 changes: 20 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Common flags:
--id string MQTT client ID (default "mqtt-test-bssJjZUs1vhTvf6KpTpTLw")
-q, --quiet Quiet mode, only print results
-s, --server stringArray MQTT endpoint as username:password@host:port (default [tcp://localhost:1883])
--timeout duration Timeout for the test (default 10s)
--version version for mqtt-test
-v, --very-verbose Very verbose, print everything we can
```
Expand All @@ -40,7 +41,7 @@ Flags:
--retain Mark each published message as retained
--size int Approximate size of each message (pub adds a timestamp)
--timestamp Prepend a timestamp to each message
--topic string Base topic (prefix) to publish into (/{n} will be added if --topics > 0) (default "mqtt-test/fIqfOq5Lg5wk636V4sLXoc")
--topic string Base topic (prefix) to publish into (/{n} will be added if --topics > 0)
--topics int Cycle through NTopics appending "/{n}"
```

Expand All @@ -56,19 +57,23 @@ Flags:
--repeat int Subscribe, receive retained messages, and unsubscribe N times (default 1)
--retained int Expect to receive this many retained messages
--subscribers int Number of subscribers to run concurrently (default 1)
--topic string Base topic for the test, will subscribe to {topic}/+
--timestamp Expect a timestamp in the payload and use it to calculate receive time
--topic string Topic to subscribe to
```

##### pubsub

Publishes N messages, and waits for all of them to be received by subscribers. Measures end-end delivery time on the messages. Used with `--num-subscribers` can run several concurrent subscriber connections.

```
--messages int Number of messages to publish and receive (default 1)
--qos int MQTT QOS
--size int Approximate size of each message (pub adds a timestamp)
--subscribers int Number of subscribers to run concurrently (default 1)
--topic string Topic to publish and subscribe to (default "mqtt-test/JPrbNU6U3IbVQLIyazkP4y")
--messages int Number of messages to publish and receive (default 1)
--mps int Publish mps messages per second; 0 means no delay (default 1000)
--pub-server string Server to publish to. Defaults to the first server in --servers
--qos int MQTT QOS
--size int Message extra payload size (in addition to the JSON timestamp)
--subscribers int Number of subscribers to run concurrently (default 1)
--topic string Topic (or base topic if --topics > 1)
--topics int Number of topics to use, If more than one will add /1, /2, ... to --topic when publishing, and subscribe to topic/+ (default 1)
```

##### subret
Expand All @@ -78,10 +83,12 @@ topics N times. Measures time to SUBACK and to all retained messages received.
Used with `--subscribers` can run several concurrent subscriber connections.

```
--qos int MQTT QOS
--repeat int Subscribe, receive retained messages, and unsubscribe N times (default 1)
--size int Approximate size of each message (pub adds a timestamp)
--subscribers int Number of subscribers to run concurrently (default 1)
--topic string Base topic (prefix) for the test (default "mqtt-test/yNkmAFnFHETSGnQJNjwGdN")
--topics int Number of sub-topics to publish retained messages to (default 1)
--mps int Publish mps messages per second; 0 means no delay (default 1000)
--pub-server stringArray Server(s) to publish to. Defaults to --servers
--qos int MQTT QOS for subscriptions. Messages are published as QOS1.
--repeat int Subscribe, receive retained messages, and unsubscribe N times (default 1)
--retained int Number of retained messages to publish and receive (default 1)
--size int Message payload size
--subscribers int Number of subscribers to run concurrently (default 1)
--topic string base topic (if --retaned > 1 will be published to topic/1, topic/2, ...)
```
68 changes: 17 additions & 51 deletions command-pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,14 @@
package main

import (
"encoding/json"
"log"
"os"
"strconv"
"time"

"github.com/spf13/cobra"
)

type pubCommand struct {
publisher
opts publisher
publishers int
timestamp bool
}

func newPubCommand() *cobra.Command {
Expand All @@ -39,59 +34,30 @@ func newPubCommand() *cobra.Command {
Args: cobra.NoArgs,
}

// Message options
cmd.Flags().StringVar(&c.topic, "topic", defaultTopic(), "Base topic (prefix) to publish into (/{n} will be added if --topics > 0)")
cmd.Flags().IntVar(&c.qos, "qos", DefaultQOS, "MQTT QOS")
cmd.Flags().IntVar(&c.size, "size", 0, "Approximate size of each message (pub adds a timestamp)")
cmd.Flags().BoolVar(&c.retain, "retain", false, "Mark each published message as retained")
cmd.Flags().BoolVar(&c.timestamp, "timestamp", false, "Prepend a timestamp to each message")

// Test options
cmd.Flags().IntVar(&c.mps, "mps", 1000, `Publish mps messages per second; 0 means no delay`)
cmd.Flags().IntVar(&c.messages, "messages", 1, "Number of transactions to run, see the specific command")
cmd.Flags().IntVar(&c.opts.messages, "messages", 1, "Number of transactions to run, see the specific command")
cmd.Flags().IntVar(&c.opts.mps, "mps", 1000, `Publish mps messages per second; 0 means no delay`)
cmd.Flags().IntVar(&c.opts.qos, "qos", DefaultQOS, "MQTT QOS")
cmd.Flags().BoolVar(&c.opts.retain, "retain", false, "Mark each published message as retained")
cmd.Flags().IntVar(&c.opts.size, "size", 0, "Approximate size of each message (pub adds a timestamp)")
cmd.Flags().BoolVar(&c.opts.timestamp, "timestamp", false, "Prepend a timestamp to each message")
cmd.Flags().StringVar(&c.opts.topic, "topic", defaultTopic(), "Base topic (prefix) to publish into (/{n} will be added if --topics > 0)")
cmd.Flags().IntVar(&c.opts.topics, "topics", 0, `Cycle through NTopics appending "/{n}"`)
cmd.Flags().IntVar(&c.publishers, "publishers", 1, `Number of publishers to run concurrently, at --mps each`)
cmd.Flags().IntVar(&c.topics, "topics", 0, `Cycle through NTopics appending "/{n}"`)

return cmd
}

func (c *pubCommand) run(_ *cobra.Command, _ []string) {
msgChan := make(chan *Stat)
errChan := make(chan error)

doneCh := make(chan struct{})
for i := 0; i < c.publishers; i++ {
p := c.publisher // copy
p.clientID = ClientID + "-" + strconv.Itoa(i)
go p.publish(msgChan, errChan, c.timestamp)
}

pubOps := 0
pubNS := time.Duration(0)
pubBytes := int64(0)
timeout := time.NewTimer(Timeout)
defer timeout.Stop()

// get back 1 report per publisher
for n := 0; n < c.publishers; {
select {
case stat := <-msgChan:
pubOps += stat.Ops
pubNS += stat.NS["pub"]
pubBytes += stat.Bytes
n++

case err := <-errChan:
log.Fatalf("Error: %v", err)

case <-timeout.C:
log.Fatalf("Error: timeout waiting for publishers")
p := c.opts // copy
p.dials = dials(Servers)
p.clientID = ClientID
if c.publishers > 1 {
p.clientID = p.clientID + "-" + strconv.Itoa(i)
}
go p.publish(doneCh)
}

bb, _ := json.Marshal(Stat{
Ops: pubOps,
NS: map[string]time.Duration{"pub": pubNS},
Bytes: pubBytes,
})
os.Stdout.Write(bb)
waitN(doneCh, c.publishers, "publisher to finish")
}
128 changes: 56 additions & 72 deletions command-pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,107 +14,91 @@
package main

import (
"encoding/json"
"log"
"os"
"strconv"
"time"

"github.com/spf13/cobra"
)

type pubsubCommand struct {
messageOpts

messages int
pubOpts publisher
subOpts receiver
subscribers int
pubServer string
}

func newPubSubCommand() *cobra.Command {
c := &pubsubCommand{}

cmd := &cobra.Command{
Use: "pubsub [--flags...]",
Short: "Subscribe and receive N published messages",
Run: c.run,
Args: cobra.NoArgs,
}

// Message options
cmd.Flags().IntVar(&c.messages, "messages", 1, "Number of messages to publish and receive")
cmd.Flags().StringVar(&c.topic, "topic", defaultTopic(), "Topic to publish and subscribe to")
cmd.Flags().IntVar(&c.qos, "qos", DefaultQOS, "MQTT QOS")
cmd.Flags().IntVar(&c.size, "size", 0, "Approximate size of each message (pub adds a timestamp)")

// Test options
cmd.Flags().IntVar(&c.pubOpts.messages, "messages", 1, "Number of messages to publish and receive")
cmd.Flags().IntVar(&c.pubOpts.mps, "mps", 1000, `Publish mps messages per second; 0 means no delay`)
cmd.Flags().IntVar(&c.pubOpts.qos, "qos", DefaultQOS, "MQTT QOS")
cmd.Flags().IntVar(&c.pubOpts.size, "size", 0, "Message extra payload size (in addition to the JSON timestamp)")
cmd.Flags().StringVar(&c.pubOpts.topic, "topic", defaultTopic(), "Topic (or base topic if --topics > 1)")
cmd.Flags().IntVar(&c.pubOpts.topics, "topics", 1, "Number of topics to use, If more than one will add /1, /2, ... to --topic when publishing, and subscribe to topic/+")
cmd.Flags().StringVar(&c.pubServer, "pub-server", "", "Server to publish to. Defaults to the first server in --servers")
cmd.Flags().IntVar(&c.subscribers, "subscribers", 1, `Number of subscribers to run concurrently`)

cmd.PreRun = func(_ *cobra.Command, _ []string) {
c.pubOpts.clientID = ClientID + "-pub"
c.pubOpts.timestamp = true
s := c.pubServer
if s == "" {
s = Servers[0]
}
c.pubOpts.dials = []dial{dial(s)}

c.subOpts.clientID = ClientID + "-sub"
c.subOpts.expectPublished = c.pubOpts.messages
c.subOpts.expectTimestamp = true
c.subOpts.filterPrefix = c.pubOpts.topic
c.subOpts.qos = c.pubOpts.qos
c.subOpts.repeat = 1
c.subOpts.topic = c.pubOpts.topic
if c.pubOpts.topics > 1 {
c.subOpts.topic = c.pubOpts.topic + "/+"
}
}

return cmd
}

func (c *pubsubCommand) run(_ *cobra.Command, _ []string) {
clientID := ClientID + "-sub"
readyCh := make(chan struct{})
errCh := make(chan error)
statsCh := make(chan *Stat)
doneCh := make(chan struct{})

// Connect all subscribers (and subscribe)
for i := 0; i < c.subscribers; i++ {
r := &receiver{
clientID: clientID + "-" + strconv.Itoa(i),
topic: c.topic,
qos: c.qos,
expectPublished: c.messages,
repeat: 1,
}
go r.receive(readyCh, statsCh, errCh)
counter := 0
if len(Servers) > 1 || c.subscribers > 1 {
counter = 1
}

// Wait for all subscriptions to signal ready
cSub := 0
timeout := time.NewTimer(Timeout)
defer timeout.Stop()
for cSub < c.subscribers {
select {
case <-readyCh:
cSub++
case err := <-errCh:
log.Fatal(err)
case <-timeout.C:
log.Fatalf("timeout waiting for subscribers to be ready")
}
}

// ready to receive, start publishing. The publisher will exit when done, no need to wait for it.
p := &publisher{
clientID: ClientID + "-pub",
messageOpts: c.messageOpts,
messages: c.messages,
mps: 1000,
}
go p.publish(nil, errCh, true)

// wait for the stats
total := Stat{
NS: make(map[string]time.Duration),
}
timeout = time.NewTimer(Timeout)
defer timeout.Stop()
for i := 0; i < c.subscribers; i++ {
select {
case stat := <-statsCh:
total.Ops += stat.Ops
total.Bytes += stat.Bytes
for k, v := range stat.NS {
total.NS[k] += v
N := c.subscribers * len(Servers)

// Connect all subscribers and subscribe. Wait for all subscriptions to
// signal ready before publishing.
for _, d := range dials(Servers) {
for i := 0; i < c.subscribers; i++ {
r := c.subOpts // copy
if r.clientID == "" {
r.clientID = ClientID
}
case err := <-errCh:
log.Fatalf("Error: %v", err)
case <-timeout.C:
log.Fatalf("Error: timeout waiting for messages")
if counter != 0 {
r.clientID = r.clientID + "-" + strconv.Itoa(counter)
counter++
}
r.dial = d
go r.receive(readyCh, doneCh)
}
}
waitN(readyCh, N, "subscribers to be ready")

// ready to receive, start publishing. Give the publisher the same done
// channel, will wait for one more.
go c.pubOpts.publish(doneCh)

bb, _ := json.Marshal(total)
os.Stdout.Write(bb)
waitN(doneCh, N+1, "publisher and all subscribers to finish")
}
Loading

0 comments on commit 8926d09

Please sign in to comment.