Skip to content

Commit

Permalink
Issue go-zeromq#108: moved subscribe logic from pub to sub and xsub s…
Browse files Browse the repository at this point in the history
…ockets.
  • Loading branch information
Tina Mancuso authored and Tina Mancuso committed Sep 13, 2021
1 parent 7ab29a1 commit 62a12e4
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 44 deletions.
7 changes: 7 additions & 0 deletions proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ func (p *Proxy) init(front, back, capture Socket) {
}
)

// Subscribe all. If we don't do this, the sub socket will drop all messages.
// It has no effect for other socket types.
if err := front.SetOption(OptionSubscribe, ""); err != nil {
log.Fatalf("Subscribe error: %v\n", err)
return
}

// workers makes sure all goroutines are launched and scheduled.
var workers sync.WaitGroup
workers.Add(len(pipes) + 1)
Expand Down
36 changes: 2 additions & 34 deletions pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,6 @@ import (
"sync"
)

// Topics is an interface that wraps the basic Topics method.
type Topics interface {
// Topics returns the sorted list of topics a socket is subscribed to.
Topics() []string
}

// NewPub returns a new PUB ZeroMQ socket.
// The returned socket value is initially unbound.
func NewPub(ctx context.Context, opts ...Option) Socket {
Expand Down Expand Up @@ -109,11 +103,6 @@ func (pub *pubSocket) SetOption(name string, value interface{}) error {
return nil
}

// Topics returns the sorted list of topics a socket is subscribed to.
func (pub *pubSocket) Topics() []string {
return pub.sck.topics()
}

// pubQReader is a queued-message reader.
type pubQReader struct {
ctx context.Context
Expand Down Expand Up @@ -194,28 +183,11 @@ func (q *pubQReader) listen(ctx context.Context, r *Conn) {
if msg.err != nil {
return
}
switch {
case q.topic(msg):
r.subscribe(msg)
default:
q.c <- msg
}
q.c <- msg
}
}
}

func (q *pubQReader) topic(msg Msg) bool {
if len(msg.Frames) != 1 {
return false
}
frame := msg.Frames[0]
if len(frame) == 0 {
return false
}
topic := frame[0]
return topic == 0 || topic == 1
}

type pubMWriter struct {
ctx context.Context
mu sync.Mutex
Expand Down Expand Up @@ -309,21 +281,17 @@ func (w *pubMWriter) write(ctx context.Context, msg Msg) error {
}

func (w *pubMWriter) sendMsg(msg Msg) {
topic := string(msg.Frames[0])
w.mu.Lock()
defer w.mu.Unlock()
// TODO(inphi): distribute messages across subscribers at once
for i := range w.ws {
ww := w.ws[i]
if ww.subscribed(topic) {
_ = ww.SendMsg(msg)
}
_ = ww.SendMsg(msg)
}
}

var (
_ rpool = (*pubQReader)(nil)
_ wpool = (*pubMWriter)(nil)
_ Socket = (*pubSocket)(nil)
_ Topics = (*pubSocket)(nil)
)
38 changes: 31 additions & 7 deletions sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"net"
"sort"
"strings"
"sync"
)

Expand Down Expand Up @@ -48,7 +49,17 @@ func (sub *subSocket) SendMulti(msg Msg) error {

// Recv receives a complete message.
func (sub *subSocket) Recv() (Msg, error) {
return sub.sck.Recv()
// If we're not subscribed to this message, we keep looping until we get one we are subscribed to, or an empty message or an error.
for {
msg, err := sub.sck.Recv()
if err != nil || len(msg.Frames) == 0 || string(msg.Frames[0]) == "" {
return msg, err
}
t := string(msg.Frames[0])
if sub.subscribed(t) {
return msg, err
}
}
}

// Listen connects a local endpoint to the Socket.
Expand All @@ -58,11 +69,7 @@ func (sub *subSocket) Listen(ep string) error {

// Dial connects a remote endpoint to the Socket.
func (sub *subSocket) Dial(ep string) error {
err := sub.sck.Dial(ep)
if err != nil {
return err
}
return nil
return sub.sck.Dial(ep)
}

// Type returns the type of this Socket (PUB, SUB, ...)
Expand Down Expand Up @@ -138,7 +145,24 @@ func (sub *subSocket) subscribe(topic string, v int) {
sub.mu.Unlock()
}

func (sub *subSocket) subscribed(topic string) bool {
sub.mu.RLock()
defer sub.mu.RUnlock()
if _, ok := sub.topics[""]; ok {
return true
}
if _, ok := sub.topics[topic]; ok {
return true
}
for k := range sub.topics {
if strings.HasPrefix(topic, k) {
return true
}
}
return false
}

var (
_ Socket = (*subSocket)(nil)
_ Topics = (*subSocket)(nil)
//_ Topics = (*subSocket)(nil)
)
79 changes: 76 additions & 3 deletions xsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,24 @@ package zmq4
import (
"context"
"net"
"strings"
"sync"
)

// NewXSub returns a new XSUB ZeroMQ socket.
// The returned socket value is initially unbound.
func NewXSub(ctx context.Context, opts ...Option) Socket {
xsub := &xsubSocket{newSocket(ctx, XSub, opts...)}
xsub := &xsubSocket{newSocket(ctx, XSub, opts...), sync.RWMutex{}, make(map[string]struct{})}
xsub.sck.r = newQReader(xsub.sck.ctx)
xsub.topics = make(map[string]struct{})
return xsub
}

// xsubSocket is a XSUB ZeroMQ socket.
type xsubSocket struct {
sck *socket
mu sync.RWMutex
topics map[string]struct{}
}

// Close closes the open Socket
Expand All @@ -41,7 +47,17 @@ func (xsub *xsubSocket) SendMulti(msg Msg) error {

// Recv receives a complete message.
func (xsub *xsubSocket) Recv() (Msg, error) {
return xsub.sck.Recv()
// If we're not subscribed to this message, we keep looping until we get one we are subscribed to, or an error or empty message.
for {
msg, err := xsub.sck.Recv()
if err != nil || len(msg.Frames) == 0 || string(msg.Frames[0]) == "" {
return msg, err
}
t := string(msg.Frames[0])
if xsub.subscribed(t) {
return msg, err
}
}
}

// Listen connects a local endpoint to the Socket.
Expand Down Expand Up @@ -72,7 +88,64 @@ func (xsub *xsubSocket) GetOption(name string) (interface{}, error) {

// SetOption is used to set an option for a socket.
func (xsub *xsubSocket) SetOption(name string, value interface{}) error {
return xsub.sck.SetOption(name, value)
err := xsub.sck.SetOption(name, value)
if err != nil {
return err
}

var (
topic []byte
)

switch name {
case OptionSubscribe:
k := value.(string)
xsub.subscribe(k, 1)
topic = append([]byte{1}, k...)

case OptionUnsubscribe:
k := value.(string)
topic = append([]byte{0}, k...)
xsub.subscribe(k, 0)

default:
return ErrBadProperty
}

xsub.sck.mu.RLock()
if len(xsub.sck.conns) > 0 {
err = xsub.Send(NewMsg(topic))
}
xsub.sck.mu.RUnlock()
return err
}

func (xsub *xsubSocket) subscribe(topic string, v int) {
xsub.mu.Lock()
switch v {
case 0:
delete(xsub.topics, topic)
case 1:
xsub.topics[topic] = struct{}{}
}
xsub.mu.Unlock()
}

func (xsub *xsubSocket) subscribed(topic string) bool {
xsub.mu.RLock()
defer xsub.mu.RUnlock()
if _, ok := xsub.topics[""]; ok {
return true
}
if _, ok := xsub.topics[topic]; ok {
return true
}
for k := range xsub.topics {
if strings.HasPrefix(topic, k) {
return true
}
}
return false
}

var (
Expand Down

0 comments on commit 62a12e4

Please sign in to comment.