diff --git a/pubsub.go b/pubsub.go index 7442a349..cf4644a9 100644 --- a/pubsub.go +++ b/pubsub.go @@ -170,6 +170,12 @@ type PubSub struct { protoMatchFunc ProtocolMatchFn ctx context.Context + + // appSpecificRpcInspector is an auxiliary that may be set by the application to inspect incoming RPCs prior to + // processing them. The inspector is invoked on an accepted RPC right prior to handling it. + // The return value of the inspector function is an error indicating whether the RPC should be processed or not. + // If the error is nil, the RPC is processed as usual. If the error is non-nil, the RPC is dropped. + appSpecificRpcInspector func(peer.ID, *RPC) error } // PubSubRouter is the message router component of PubSub. @@ -527,6 +533,13 @@ func WithSeenMessagesTTL(ttl time.Duration) Option { } } +func WithAppSpecificRpcInspector(inspector func(peer.ID, *RPC) error) Option { + return func(ps *PubSub) error { + ps.appSpecificRpcInspector = inspector + return nil + } +} + // processLoop handles all inputs arriving on the channels func (p *PubSub) processLoop(ctx context.Context) { defer func() { @@ -1005,6 +1018,15 @@ func (p *PubSub) notifyLeave(topic string, pid peer.ID) { } func (p *PubSub) handleIncomingRPC(rpc *RPC) { + // pass the rpc through app specific validation (if any available). + if p.appSpecificRpcInspector != nil { + // check if the RPC is allowed by the external inspector + if err := p.appSpecificRpcInspector(rpc.from, rpc); err != nil { + log.Debugf("application-specific inspection failed, rejecting incoming rpc: %s", err) + return // reject the RPC + } + } + p.tracer.RecvRPC(rpc) subs := rpc.GetSubscriptions()