Skip to content

Commit

Permalink
add support for custom protocol matching function
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos authored and vyzo committed Jul 30, 2021
1 parent 02dae65 commit faffd2a
Showing 1 changed file with 20 additions and 1 deletion.
21 changes: 20 additions & 1 deletion pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ var (

var log = logging.Logger("pubsub")

type MatchingFunction func(string) func(string) bool

// PubSub is the implementation of the pubsub system.
type PubSub struct {
// atomic counter for seqnos
Expand Down Expand Up @@ -157,6 +159,9 @@ type PubSub struct {
// filter for tracking subscriptions in topics of interest; if nil, then we track all subscriptions
subFilter SubscriptionFilter

// protoMatchFunc is a matching function for protocol selection.
protoMatchFunc *MatchingFunction

ctx context.Context
}

Expand Down Expand Up @@ -235,6 +240,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
peerOutboundQueueSize: 32,
signID: h.ID(),
signKey: nil,
protoMatchFunc: nil,
signPolicy: StrictSign,
incoming: make(chan *RPC, 32),
newPeers: make(chan struct{}, 1),
Expand Down Expand Up @@ -292,7 +298,11 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
rt.Attach(ps)

for _, id := range rt.Protocols() {
h.SetStreamHandler(id, ps.handleNewStream)
if ps.protoMatchFunc != nil {
h.SetStreamHandlerMatch(id, (*ps.protoMatchFunc)(string(id)), ps.handleNewStream)
} else {
h.SetStreamHandler(id, ps.handleNewStream)
}
}
h.Network().Notify((*PubSubNotif)(ps))

Expand Down Expand Up @@ -475,6 +485,15 @@ func WithMaxMessageSize(maxMessageSize int) Option {
}
}

// WithProtocolMatchFunction sets a custom matching function for protocol
// selection to be used by the protocol handler on the Host's Mux
func WithProtocolMatchFunction(m MatchingFunction) Option {
return func(ps *PubSub) error {
ps.protoMatchFunc = &m
return nil
}
}

// processLoop handles all inputs arriving on the channels
func (p *PubSub) processLoop(ctx context.Context) {
defer func() {
Expand Down

0 comments on commit faffd2a

Please sign in to comment.