diff --git a/command-sub.go b/command-sub.go index 531dcaa..73bf462 100644 --- a/command-sub.go +++ b/command-sub.go @@ -15,7 +15,10 @@ package main import ( "encoding/json" + "log" "os" + "strconv" + "time" "github.com/spf13/cobra" ) @@ -52,7 +55,101 @@ func newSubCommand() *cobra.Command { } func (c *subCommand) run(_ *cobra.Command, _ []string) { - total := runSubWithPubret(c.subscribers, c.repeat, c.expectRetained, c.expectPublished, c.messageOpts, false) + total := runSubPrepublishRetained(c.subscribers, c.repeat, c.expectRetained, c.expectPublished, c.messageOpts, false) bb, _ := json.Marshal(total) os.Stdout.Write(bb) } + +func runSubPrepublishRetained( + nSubscribers int, + repeat int, + expectRetained, + expectPublished int, + messageOpts messageOpts, + prepublishRetained bool, +) *Stat { + errCh := make(chan error) + receiverReadyCh := make(chan struct{}) + statsCh := make(chan *Stat) + + if prepublishRetained { + if expectPublished != 0 { + log.Fatalf("Error: --messages is not supported with --retained") + } + + // We need to wait for all prepublished retained messages to be processed. + // To ensure, subscribe once before we pre-publish and receive all published + // messages. + r := &receiver{ + clientID: ClientID + "-sub-init", + filterPrefix: messageOpts.topic, + topic: messageOpts.topic + "/+", + qos: messageOpts.qos, + expectRetained: 0, + expectPublished: expectRetained, + repeat: 1, + } + go r.receive(receiverReadyCh, statsCh, errCh) + <-receiverReadyCh + + // Pre-publish retained messages. + p := &publisher{ + clientID: ClientID + "-pub", + messages: expectRetained, + topics: expectRetained, + messageOpts: messageOpts, + } + p.messageOpts.retain = true + go p.publish(nil, errCh, true) + + // wait for the initial subscription to have received all messages + timeout := time.NewTimer(Timeout) + defer timeout.Stop() + select { + case err := <-errCh: + log.Fatalf("Error: %v", err) + case <-timeout.C: + log.Fatalf("Error: timeout waiting for messages in initial subscription") + case <-statsCh: + // all received + } + + } + + // Connect all subscribers (and subscribe to a wildcard topic that includes + // all published retained messages). + for i := 0; i < nSubscribers; i++ { + r := &receiver{ + clientID: ClientID + "-sub-" + strconv.Itoa(i), + filterPrefix: messageOpts.topic, + topic: messageOpts.topic + "/+", + qos: messageOpts.qos, + expectRetained: expectRetained, + expectPublished: expectPublished, + repeat: repeat, + } + go r.receive(nil, statsCh, errCh) + } + + // wait for the stats + total := &Stat{ + NS: make(map[string]time.Duration), + } + timeout := time.NewTimer(Timeout) + defer timeout.Stop() + for i := 0; i < nSubscribers*repeat; i++ { + select { + case stat := <-statsCh: + total.Ops += stat.Ops + total.Bytes += stat.Bytes + for k, v := range stat.NS { + total.NS[k] += v + } + case err := <-errCh: + log.Fatalf("Error: %v", err) + case <-timeout.C: + log.Fatalf("Error: timeout waiting for messages") + } + } + return total +} diff --git a/command-subret.go b/command-subret.go index 87fe445..9370bf5 100644 --- a/command-subret.go +++ b/command-subret.go @@ -27,7 +27,7 @@ type subretCommand struct { // test options repeat int subscribers int - messages int + messages int } func newSubRetCommand() *cobra.Command { @@ -51,7 +51,7 @@ func newSubRetCommand() *cobra.Command { } func (c *subretCommand) run(_ *cobra.Command, _ []string) { - total := runSubWithPubret(c.subscribers, c.repeat, c.messages, 0, c.messageOpts, true) + total := runSubPrepublishRetained(c.subscribers, c.repeat, c.messages, 0, c.messageOpts, true) bb, _ := json.Marshal(total) os.Stdout.Write(bb) } diff --git a/receive.go b/receive.go index 3d691d0..ae7ecb1 100644 --- a/receive.go +++ b/receive.go @@ -18,7 +18,6 @@ import ( "encoding/json" "fmt" "log" - "strconv" "strings" "sync/atomic" "time" @@ -124,7 +123,8 @@ func (r *receiver) msgHandler(client paho.Client, msg paho.Message) { default: newC := r.cPublished.Add(1) if newC > int32(r.expectPublished) { - r.errCh <- fmt.Errorf("received unexpected published message") + r.errCh <- fmt.Errorf("received unexpected published message: dup:%v, topic: %s, qos:%v, retained:%v, payload: %q", + msg.Duplicate(), msg.Topic(), msg.Qos(), msg.Retained(), msg.Payload()) return } @@ -151,63 +151,3 @@ func (r *receiver) msgHandler(client paho.Client, msg paho.Message) { } } } - -func runSubWithPubret( - nSubscribers int, - repeat int, - expectRetained, - expectPublished int, - messageOpts messageOpts, - prepublishRetained bool, -) *Stat { - errCh := make(chan error) - - if prepublishRetained { - p := &publisher{ - clientID: ClientID + "-pub", - messages: expectRetained, - topics: expectRetained, - messageOpts: messageOpts, - } - p.messageOpts.retain = true - p.publish(nil, errCh, true) - } - - // Connect all subscribers (and subscribe to a wildcard topic that includes - // all published retained messages). - statsCh := make(chan *Stat) - for i := 0; i < nSubscribers; i++ { - r := &receiver{ - clientID: ClientID + "-sub-" + strconv.Itoa(i), - filterPrefix: messageOpts.topic, - topic: messageOpts.topic + "/+", - qos: messageOpts.qos, - expectRetained: expectRetained, - expectPublished: expectPublished, - repeat: repeat, - } - go r.receive(nil, statsCh, errCh) - } - - // wait for the stats - total := &Stat{ - NS: make(map[string]time.Duration), - } - timeout := time.NewTimer(Timeout) - defer timeout.Stop() - for i := 0; i < nSubscribers*repeat; i++ { - select { - case stat := <-statsCh: - total.Ops += stat.Ops - total.Bytes += stat.Bytes - for k, v := range stat.NS { - total.NS[k] += v - } - case err := <-errCh: - log.Fatalf("Error: %v", err) - case <-timeout.C: - log.Fatalf("Error: timeout waiting for messages") - } - } - return total -}