Skip to content

Commit

Permalink
initial implementation (#1)
Browse files Browse the repository at this point in the history
undefined
  • Loading branch information
levb authored Jan 17, 2024
1 parent faccce0 commit 4dd571c
Show file tree
Hide file tree
Showing 11 changed files with 855 additions and 0 deletions.
37 changes: 37 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
name: Test with nats-server
on: [push, pull_request]

jobs:
test:
env:
GOPATH: /home/runner/work/mqtt-test/go
GO111MODULE: "on"
runs-on: ubuntu-latest
steps:
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: 1.21

- name: Checkout code
uses: actions/checkout@v4
with:
path: go/src/github.com/ConnectEverything/mqtt-test

- name: Checkout nats-server
uses: actions/checkout@v4
with:
repository: nats-io/nats-server
path: go/src/github.com/nats-io/nats-server

- name: Build and install
shell: bash --noprofile --norc -x -eo pipefail {0}
run: |
cd go/src/github.com/ConnectEverything/mqtt-test
go install -v .
- name: Run 'MQTTEx from nats-server'
shell: bash --noprofile --norc -x -eo pipefail {0}
run: |
cd go/src/github.com/nats-io/nats-server
go test -v --run='-' --bench 'MQTTEx' --benchtime=100x ./server | tee /tmp/current-bench-result.txt
70 changes: 70 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
MQTT Test is a CLI command used to test and benchmark the MQTT support in [NATS Server](https://github.com/nats-io/nats-server)

Outputs JSON results that can be reported in a `go test --bench` wrapper.

#### Usage

##### Subcommands and common flags

```sh
mqtt-test [pub|pubsub|subret] [flags...]
```

Available Commands:
- [pub](#pub) - Publish N messages
- [pubsub](#pubsub) - Subscribe and receive N published messages
- [subret](#subret) - Subscribe N times, and receive NTopics retained messages
Common Flags:
```
-h, --help help for mqtt-test
--id string MQTT client ID
-n, --n int Number of transactions to run, see the specific command (default 1)
-p, --password string MQTT client password (empty if auth disabled)
-q, --quiet Quiet mode, only print results
-s, --servers stringArray MQTT servers endpoint as host:port (default [tcp://localhost:1883])
-u, --username string MQTT client username (empty if auth disabled)
--version version for mqtt-test
-v, --very-verbose Very verbose, print everything we can
```

##### pub

Publishes N messages using the flags and reports the results. Used with `--num-publishers` can run several concurrent publish connections.

Flags:

```
--mps int Publish mps messages per second; 0 means no delay (default 1000)
--num-publishers int Number of publishers to run concurrently, at --mps each (default 1)
--num-topics int Cycle through NTopics appending "-{n}" where n starts with --num-topics-start; 0 means use --topic
--num-topics-start int Start topic suffixes with this number (default 0)
--qos int MQTT QOS
--retain Mark each published message as retained
--size int Approximate size of each message (pub adds a timestamp)
--topic string MQTT topic
```

##### pubsub

Publishes N messages, and waits for all of them to be received by subscribers. Measures end-end delivery time on the messages. Used with `--num-subscribers` can run several concurrent subscriber connections.

```
--mps int Publish mps messages per second; 0 means all ASAP (default 1000)
--num-subscribers int Number of subscribers to run concurrently (default 1)
--qos int MQTT QOS
--size int Approximate size of each message (pub adds a timestamp)
--topic string MQTT topic
```

##### subret

Publishes retained messages into NTopics, then subscribes to a wildcard with all
topics N times. Measures time to SUBACK and to all retained messages received.
Used with `--num-subscribers` can run several concurrent subscriber connections.

```
--num-subscribers int Number of subscribers to run concurrently (default 1)
--num-topics int Use this many topics with retained messages
--qos int MQTT QOS
--size int Approximate size of each message (pub adds a timestamp)
```
92 changes: 92 additions & 0 deletions common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package main

import (
"fmt"
"log"
"math/rand"
"time"

"github.com/spf13/cobra"
)

const (
Name = "mqtt-test"
Version = "v0.0.1"

DefaultServer = "tcp://localhost:1883"
DefaultQOS = 0

Timeout = 10 * time.Second
DisconnectCleanupTimeout = 500 // milliseconds

)

var (
ClientID string
MPS int
N int
NPublishers int
NSubscribers int
NTopics int
NTopicsStart int
Password string
QOS int
Retain bool
Servers []string
Size int
Username string
MatchTopicPrefix string
Quiet bool
Verbose bool
)

func initPubSub(cmd *cobra.Command) *cobra.Command {
cmd.Flags().IntVar(&QOS, "qos", DefaultQOS, "MQTT QOS")
cmd.Flags().IntVar(&Size, "size", 0, "Approximate size of each message (pub adds a timestamp)")
return cmd
}

type PubValue struct {
Seq int `json:"seq"`
Timestamp int64 `json:"timestamp"`
}

type MQTTBenchmarkResult struct {
Ops int `json:"ops"`
NS map[string]time.Duration `json:"ns"`
Bytes int64 `json:"bytes"`
}

func randomPayload(sz int) []byte {
const ch = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@$#%^&*()"
b := make([]byte, sz)
for i := range b {
b[i] = ch[rand.Intn(len(ch))]
}
return b
}

func mqttVarIntLen(value int) int {
c := 0
for ; value > 0; value >>= 7 {
c++
}
return c
}

func mqttPublishLen(topic string, qos byte, retained bool, msg []byte) int {
// Compute len (will have to add packet id if message is sent as QoS>=1)
pkLen := 2 + len(topic) + len(msg)
if qos > 0 {
pkLen += 2
}
return 1 + mqttVarIntLen(pkLen) + pkLen
}

func logOp(clientID, op string, dur time.Duration, f string, args ...interface{}) {
log.Printf("%8s %-6s %30s\t"+f, append([]any{
fmt.Sprintf("%.3fms", float64(dur)/float64(time.Millisecond)),
op,
clientID + ":"},
args...)...)
}
59 changes: 59 additions & 0 deletions connect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package main

import (
"log"
"time"

paho "github.com/eclipse/paho.mqtt.golang"
"github.com/nats-io/nuid"
)

const (
CleanSession = true
PersistentSession = false
)

func connect(clientID string, cleanSession bool, setoptsF func(*paho.ClientOptions)) (paho.Client, func(), error) {
if clientID == "" {
clientID = ClientID
}
if clientID == "" {
clientID = Name + "-" + nuid.Next()
}

clientOpts := paho.NewClientOptions().
SetClientID(clientID).
SetCleanSession(cleanSession).
SetProtocolVersion(4).
SetUsername(Username).
SetPassword(Password).
SetStore(paho.NewMemoryStore()).
SetAutoReconnect(false).
SetDefaultPublishHandler(func(client paho.Client, msg paho.Message) {
log.Fatalf("received an unexpected message on %q (default handler)", msg.Topic())
})

for _, s := range Servers {
clientOpts.AddBroker(s)
}
if setoptsF != nil {
setoptsF(clientOpts)
}

cl := paho.NewClient(clientOpts)

disconnectedWG.Add(1)
start := time.Now()
if t := cl.Connect(); t.Wait() && t.Error() != nil {
disconnectedWG.Done()
return nil, func() {}, t.Error()
}

logOp(clientOpts.ClientID, "CONN", time.Since(start), "Connected to %q\n", Servers)
return cl,
func() {
cl.Disconnect(DisconnectCleanupTimeout)
disconnectedWG.Done()
},
nil
}
10 changes: 10 additions & 0 deletions dependencies.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# External Dependencies

This file lists the dependencies used in this repository.

| Dependency | License |
|--------------------------------------|------------------------------------------|
| Go | BSD 3-Clause |
| github.com/eclipse/paho.mqtt.golang | Eclipse Public License - v 2.0 (EPL-2.0) |
| github.com/spf13/cobra v1.8.0 | Apache-2.0 |
| github.com/nats-io/nuid | Apache-2.0 |
17 changes: 17 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
module github.com/ConnectEverything/mqtt-test

go 1.21

require (
github.com/eclipse/paho.mqtt.golang v1.4.3
github.com/nats-io/nuid v1.0.1
github.com/spf13/cobra v1.8.0
)

require (
github.com/gorilla/websocket v1.5.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sync v0.1.0 // indirect
)
20 changes: 20 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik=
github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0=
github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
49 changes: 49 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package main

import (
"io"
"log"
"os"
"sync"

paho "github.com/eclipse/paho.mqtt.golang"
"github.com/spf13/cobra"
)

var mainCmd = &cobra.Command{
Use: Name + " [conn|pub|sub|...|] [--flags...]",
Short: "MQTT Test/Benchmark Utility",
Version: Version,
}

func init() {
mainCmd.PersistentFlags().StringVar(&ClientID, "id", "", "MQTT client ID")
mainCmd.PersistentFlags().StringArrayVarP(&Servers, "servers", "s", []string{DefaultServer}, "MQTT servers endpoint as host:port")
mainCmd.PersistentFlags().StringVarP(&Username, "username", "u", "", "MQTT client username (empty if auth disabled)")
mainCmd.PersistentFlags().StringVarP(&Password, "password", "p", "", "MQTT client password (empty if auth disabled)")
mainCmd.PersistentFlags().IntVarP(&N, "n", "n", 1, "Number of transactions to run, see the specific command")
mainCmd.PersistentFlags().BoolVarP(&Quiet, "quiet", "q", false, "Quiet mode, only print results")
mainCmd.PersistentFlags().BoolVarP(&Verbose, "very-verbose", "v", false, "Very verbose, print everything we can")

mainCmd.PersistentPreRun = func(cmd *cobra.Command, args []string) {
paho.CRITICAL = log.New(os.Stderr, "[MQTT CRIT] ", 0)
if Quiet {
Verbose = false
log.SetOutput(io.Discard)
}
if !Quiet {
paho.ERROR = log.New(os.Stderr, "[MQTT ERROR] ", 0)
}
if Verbose {
paho.WARN = log.New(os.Stderr, "[MQTT WARN] ", 0)
paho.DEBUG = log.New(os.Stderr, "[MQTT DEBUG] ", 0)
}
}
}

var disconnectedWG = sync.WaitGroup{}

func main() {
_ = mainCmd.Execute()
disconnectedWG.Wait()
}
Loading

0 comments on commit 4dd571c

Please sign in to comment.