Skip to content

Commit

Permalink
Fixed subret synchronization
Browse files Browse the repository at this point in the history
  • Loading branch information
levb committed Feb 12, 2024
1 parent 5c147d6 commit 27cdbf0
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 65 deletions.
99 changes: 98 additions & 1 deletion command-sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ package main

import (
"encoding/json"
"log"
"os"
"strconv"
"time"

"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -52,7 +55,101 @@ func newSubCommand() *cobra.Command {
}

func (c *subCommand) run(_ *cobra.Command, _ []string) {
total := runSubWithPubret(c.subscribers, c.repeat, c.expectRetained, c.expectPublished, c.messageOpts, false)
total := runSubPrepublishRetained(c.subscribers, c.repeat, c.expectRetained, c.expectPublished, c.messageOpts, false)
bb, _ := json.Marshal(total)
os.Stdout.Write(bb)
}

func runSubPrepublishRetained(
nSubscribers int,
repeat int,
expectRetained,
expectPublished int,
messageOpts messageOpts,
prepublishRetained bool,
) *Stat {
errCh := make(chan error)
receiverReadyCh := make(chan struct{})
statsCh := make(chan *Stat)

if prepublishRetained {
if expectPublished != 0 {
log.Fatalf("Error: --messages is not supported with --retained")
}

// We need to wait for all prepublished retained messages to be processed.
// To ensure, subscribe once before we pre-publish and receive all published
// messages.
r := &receiver{
clientID: ClientID + "-sub-init",
filterPrefix: messageOpts.topic,
topic: messageOpts.topic + "/+",
qos: messageOpts.qos,
expectRetained: 0,
expectPublished: expectRetained,
repeat: 1,
}
go r.receive(receiverReadyCh, statsCh, errCh)
<-receiverReadyCh

// Pre-publish retained messages.
p := &publisher{
clientID: ClientID + "-pub",
messages: expectRetained,
topics: expectRetained,
messageOpts: messageOpts,
}
p.messageOpts.retain = true
go p.publish(nil, errCh, true)

// wait for the initial subscription to have received all messages
timeout := time.NewTimer(Timeout)
defer timeout.Stop()
select {
case err := <-errCh:
log.Fatalf("Error: %v", err)
case <-timeout.C:
log.Fatalf("Error: timeout waiting for messages in initial subscription")
case <-statsCh:
// all received
}

}

// Connect all subscribers (and subscribe to a wildcard topic that includes
// all published retained messages).
for i := 0; i < nSubscribers; i++ {
r := &receiver{
clientID: ClientID + "-sub-" + strconv.Itoa(i),
filterPrefix: messageOpts.topic,
topic: messageOpts.topic + "/+",
qos: messageOpts.qos,
expectRetained: expectRetained,
expectPublished: expectPublished,
repeat: repeat,
}
go r.receive(nil, statsCh, errCh)
}

// wait for the stats
total := &Stat{
NS: make(map[string]time.Duration),
}
timeout := time.NewTimer(Timeout)
defer timeout.Stop()
for i := 0; i < nSubscribers*repeat; i++ {
select {
case stat := <-statsCh:
total.Ops += stat.Ops
total.Bytes += stat.Bytes
for k, v := range stat.NS {
total.NS[k] += v
}
case err := <-errCh:
log.Fatalf("Error: %v", err)
case <-timeout.C:
log.Fatalf("Error: timeout waiting for messages")
}
}
return total
}
4 changes: 2 additions & 2 deletions command-subret.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type subretCommand struct {
// test options
repeat int
subscribers int
messages int
messages int
}

func newSubRetCommand() *cobra.Command {
Expand All @@ -51,7 +51,7 @@ func newSubRetCommand() *cobra.Command {
}

func (c *subretCommand) run(_ *cobra.Command, _ []string) {
total := runSubWithPubret(c.subscribers, c.repeat, c.messages, 0, c.messageOpts, true)
total := runSubPrepublishRetained(c.subscribers, c.repeat, c.messages, 0, c.messageOpts, true)
bb, _ := json.Marshal(total)
os.Stdout.Write(bb)
}
64 changes: 2 additions & 62 deletions receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"encoding/json"
"fmt"
"log"
"strconv"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -124,7 +123,8 @@ func (r *receiver) msgHandler(client paho.Client, msg paho.Message) {
default:
newC := r.cPublished.Add(1)
if newC > int32(r.expectPublished) {
r.errCh <- fmt.Errorf("received unexpected published message")
r.errCh <- fmt.Errorf("received unexpected published message: dup:%v, topic: %s, qos:%v, retained:%v, payload: %q",
msg.Duplicate(), msg.Topic(), msg.Qos(), msg.Retained(), msg.Payload())
return
}

Expand All @@ -151,63 +151,3 @@ func (r *receiver) msgHandler(client paho.Client, msg paho.Message) {
}
}
}

func runSubWithPubret(
nSubscribers int,
repeat int,
expectRetained,
expectPublished int,
messageOpts messageOpts,
prepublishRetained bool,
) *Stat {
errCh := make(chan error)

if prepublishRetained {
p := &publisher{
clientID: ClientID + "-pub",
messages: expectRetained,
topics: expectRetained,
messageOpts: messageOpts,
}
p.messageOpts.retain = true
p.publish(nil, errCh, true)
}

// Connect all subscribers (and subscribe to a wildcard topic that includes
// all published retained messages).
statsCh := make(chan *Stat)
for i := 0; i < nSubscribers; i++ {
r := &receiver{
clientID: ClientID + "-sub-" + strconv.Itoa(i),
filterPrefix: messageOpts.topic,
topic: messageOpts.topic + "/+",
qos: messageOpts.qos,
expectRetained: expectRetained,
expectPublished: expectPublished,
repeat: repeat,
}
go r.receive(nil, statsCh, errCh)
}

// wait for the stats
total := &Stat{
NS: make(map[string]time.Duration),
}
timeout := time.NewTimer(Timeout)
defer timeout.Stop()
for i := 0; i < nSubscribers*repeat; i++ {
select {
case stat := <-statsCh:
total.Ops += stat.Ops
total.Bytes += stat.Bytes
for k, v := range stat.NS {
total.NS[k] += v
}
case err := <-errCh:
log.Fatalf("Error: %v", err)
case <-timeout.C:
log.Fatalf("Error: timeout waiting for messages")
}
}
return total
}

0 comments on commit 27cdbf0

Please sign in to comment.