From d3f151c22474477f19c4379b1dc42aa4862dd6de Mon Sep 17 00:00:00 2001 From: Yahya Hassanzadeh <yhassanzadeh13@ku.edu.tr> Date: Wed, 30 Nov 2022 22:10:07 -0800 Subject: [PATCH] Adds Application Specific RPC Inspector (#509) * Update go.mod * Refactor GossipSub Construction (#1) * Enables non-atomic validation for peer scoring parameters (#499) * decouples topic scoring parameters * adds skiping atomic validation for topic parameters * cleans up * adds skip atomic validation to peer score threshold * adds skip atomic validation for peer parameters * adds test for non-atomic validation * adds tests for peer score * adds tests for peer score thresholds * refactors tests * chore: Update .github/workflows/stale.yml [skip ci] * adds with gossipsub tracker Co-authored-by: libp2p-mgmt-read-write[bot] <104492852+libp2p-mgmt-read-write[bot]@users.noreply.github.com> * decouples options * fixes conflict * reverts back module * fixes peer score helper * Adds send control message to gossipsub router (#2) * adjusts libp2p version (#3) * Update go.mod (#4) * adds app specific rpc handler * Create ci.yml (#5) * Create Makefile (#7) * Revert "Merge branch 'yahya/gossipsub-router-interface'" (#6) This reverts commit 1c91995b7fbce0e4b9c5990c5bfda0d555267182. * Update ci.yml (#9) * Revert "Merge branch 'master' into yahya/adds-rpc-inspector" This reverts commit 352d7471c58580480b7f6592001bc3e9b910fa77. * Revert "Merge remote-tracking branch 'origin/yahya/adds-rpc-inspector' into yahya/adds-rpc-inspector" This reverts commit 586c5cb6eb2a971a1590ea32050de139316984d2. * Revert "Merge branch 'master' into yahya/adds-rpc-inspector" This reverts commit 2e13ee8b95dded5a3401dd86f952fae3419bd86b. * moves app specific inspector to pubsub * removes option from gossipsub * moves app specific rpc inspector up * refactors app specific to return an error Co-authored-by: libp2p-mgmt-read-write[bot] <104492852+libp2p-mgmt-read-write[bot]@users.noreply.github.com> --- pubsub.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/pubsub.go b/pubsub.go index 7442a349..cf4644a9 100644 --- a/pubsub.go +++ b/pubsub.go @@ -170,6 +170,12 @@ type PubSub struct { protoMatchFunc ProtocolMatchFn ctx context.Context + + // appSpecificRpcInspector is an auxiliary that may be set by the application to inspect incoming RPCs prior to + // processing them. The inspector is invoked on an accepted RPC right prior to handling it. + // The return value of the inspector function is an error indicating whether the RPC should be processed or not. + // If the error is nil, the RPC is processed as usual. If the error is non-nil, the RPC is dropped. + appSpecificRpcInspector func(peer.ID, *RPC) error } // PubSubRouter is the message router component of PubSub. @@ -527,6 +533,13 @@ func WithSeenMessagesTTL(ttl time.Duration) Option { } } +func WithAppSpecificRpcInspector(inspector func(peer.ID, *RPC) error) Option { + return func(ps *PubSub) error { + ps.appSpecificRpcInspector = inspector + return nil + } +} + // processLoop handles all inputs arriving on the channels func (p *PubSub) processLoop(ctx context.Context) { defer func() { @@ -1005,6 +1018,15 @@ func (p *PubSub) notifyLeave(topic string, pid peer.ID) { } func (p *PubSub) handleIncomingRPC(rpc *RPC) { + // pass the rpc through app specific validation (if any available). + if p.appSpecificRpcInspector != nil { + // check if the RPC is allowed by the external inspector + if err := p.appSpecificRpcInspector(rpc.from, rpc); err != nil { + log.Debugf("application-specific inspection failed, rejecting incoming rpc: %s", err) + return // reject the RPC + } + } + p.tracer.RecvRPC(rpc) subs := rpc.GetSubscriptions()