Skip to content

Commit

Permalink
implement synchronous validation for locally published messages
Browse files Browse the repository at this point in the history
So that locally published messages are never dropped because of full
validation queues.
  • Loading branch information
vyzo committed Mar 31, 2021
1 parent 5457a28 commit 005ac15
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 54 deletions.
71 changes: 35 additions & 36 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ type PubSub struct {
// incoming messages from other peers
incoming chan *RPC

// messages we are publishing out to our peers
publish chan *Message

// addSub is a control channel for us to add and remove subscriptions
addSub chan *addSubReq

Expand Down Expand Up @@ -234,7 +231,6 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
signKey: nil,
signPolicy: StrictSign,
incoming: make(chan *RPC, 32),
publish: make(chan *Message),
newPeers: make(chan peer.ID),
newPeerStream: make(chan network.Stream),
newPeerError: make(chan peer.ID),
Expand Down Expand Up @@ -589,10 +585,6 @@ func (p *PubSub) processLoop(ctx context.Context) {
case rpc := <-p.incoming:
p.handleIncomingRPC(rpc)

case msg := <-p.publish:
p.tracer.PublishMessage(msg)
p.pushMsg(msg)

case msg := <-p.sendMsg:
p.publishMessage(msg)

Expand Down Expand Up @@ -999,59 +991,66 @@ func (p *PubSub) pushMsg(msg *Message) {
return
}

err := p.checkSignature(msg)
if err != nil {
log.Debugf("dropping message from %s: %s", src, err)
return
}

// reject messages claiming to be from ourselves but not locally published
self := p.host.ID()
if peer.ID(msg.GetFrom()) == self && src != self {
log.Debugf("dropping message claiming to be from self but forwarded from %s", src)
p.tracer.RejectMessage(msg, RejectSelfOrigin)
return
}

// have we already seen and validated this message?
id := p.msgID(msg.Message)
if p.seenMessage(id) {
p.tracer.DuplicateMessage(msg)
return
}

if !p.val.Push(src, msg) {
return
}

if p.markSeen(id) {
p.publishMessage(msg)
}
}

func (p *PubSub) checkSignature(msg *Message) error {
// reject unsigned messages when strict before we even process the id
if p.signPolicy.mustVerify() {
if p.signPolicy.mustSign() {
if msg.Signature == nil {
log.Debugf("dropping unsigned message from %s", src)
p.tracer.RejectMessage(msg, RejectMissingSignature)
return
return ValidationError{Reason: RejectMissingSignature}
}
// Actual signature verification happens in the validation pipeline,
// after checking if the message was already seen or not,
// to avoid unnecessary signature verification processing-cost.
} else {
if msg.Signature != nil {
log.Debugf("dropping message with unexpected signature from %s", src)
p.tracer.RejectMessage(msg, RejectUnexpectedSignature)
return
return ValidationError{Reason: RejectUnexpectedSignature}
}
// If we are expecting signed messages, and not authoring messages,
// then do no accept seq numbers, from data, or key data.
// The default msgID function still relies on Seqno and From,
// but is not used if we are not authoring messages ourselves.
if p.signID == "" {
if msg.Seqno != nil || msg.From != nil || msg.Key != nil {
log.Debugf("dropping message with unexpected auth info from %s", src)
p.tracer.RejectMessage(msg, RejectUnexpectedAuthInfo)
return
return ValidationError{Reason: RejectUnexpectedAuthInfo}
}
}
}
}

// reject messages claiming to be from ourselves but not locally published
self := p.host.ID()
if peer.ID(msg.GetFrom()) == self && src != self {
log.Debugf("dropping message claiming to be from self but forwarded from %s", src)
p.tracer.RejectMessage(msg, RejectSelfOrigin)
return
}

// have we already seen and validated this message?
id := p.msgID(msg.Message)
if p.seenMessage(id) {
p.tracer.DuplicateMessage(msg)
return
}

if !p.val.Push(src, msg) {
return
}

if p.markSeen(id) {
p.publishMessage(msg)
}
return nil
}

func (p *PubSub) publishMessage(msg *Message) {
Expand Down
8 changes: 1 addition & 7 deletions topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,7 @@ func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error
t.p.disc.Bootstrap(ctx, t.topic, pub.ready)
}

select {
case t.p.publish <- &Message{m, t.p.host.ID(), nil}:
case <-t.p.ctx.Done():
return t.p.ctx.Err()
}

return nil
return t.p.val.Publish(&Message{m, t.p.host.ID(), nil})
}

// WithReadiness returns a publishing option for only publishing when the router is ready.
Expand Down
61 changes: 50 additions & 11 deletions validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"runtime"
"sync"
"time"

"github.com/libp2p/go-libp2p-core/peer"
Expand All @@ -15,6 +16,16 @@ const (
defaultValidateThrottle = 8192
)

// ValidationError is an error that may be signalled from message publication when the message
// fails validation
type ValidationError struct {
Reason string
}

func (e ValidationError) Error() string {
return e.Reason
}

// Validator is a function that validates a message with a binary decision: accept or reject.
type Validator func(context.Context, peer.ID, *Message) bool

Expand Down Expand Up @@ -56,6 +67,8 @@ type validation struct {

tracer *pubsubTracer

// mx protects the validator map
mx sync.Mutex
// topicVals tracks per topic validators
topicVals map[string]*topicVal

Expand Down Expand Up @@ -123,6 +136,9 @@ func (v *validation) Start(p *PubSub) {

// AddValidator adds a new validator
func (v *validation) AddValidator(req *addValReq) {
v.mx.Lock()
defer v.mx.Unlock()

topic := req.topic

_, ok := v.topicVals[topic]
Expand Down Expand Up @@ -180,6 +196,9 @@ func (v *validation) AddValidator(req *addValReq) {

// RemoveValidator removes an existing validator
func (v *validation) RemoveValidator(req *rmValReq) {
v.mx.Lock()
defer v.mx.Unlock()

topic := req.topic

_, ok := v.topicVals[topic]
Expand All @@ -191,6 +210,20 @@ func (v *validation) RemoveValidator(req *rmValReq) {
}
}

// Publish synchronously accepts a locally published message, performs applicable
// validations and pushes the message for propagate by the pubsub system
func (v *validation) Publish(msg *Message) error {
v.p.tracer.PublishMessage(msg)

err := v.p.checkSignature(msg)
if err != nil {
return err
}

vals := v.getValidators(msg)
return v.validate(vals, msg.ReceivedFrom, msg, true)
}

// Push pushes a message into the validation pipeline.
// It returns true if the message can be forwarded immediately without validation.
func (v *validation) Push(src peer.ID, msg *Message) bool {
Expand All @@ -211,6 +244,9 @@ func (v *validation) Push(src peer.ID, msg *Message) bool {

// getValidators returns all validators that apply to a given message
func (v *validation) getValidators(msg *Message) []*topicVal {
v.mx.Lock()
defer v.mx.Unlock()

topic := msg.GetTopic()

val, ok := v.topicVals[topic]
Expand All @@ -226,24 +262,22 @@ func (v *validation) validateWorker() {
for {
select {
case req := <-v.validateQ:
v.validate(req.vals, req.src, req.msg)
v.validate(req.vals, req.src, req.msg, false)
case <-v.p.ctx.Done():
return
}
}
}

// validate performs validation and only sends the message if all validators succeed
// signature validation is performed synchronously, while user validators are invoked
// asynchronously, throttled by the global validation throttle.
func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message) {
func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message, synchronous bool) error {
// If signature verification is enabled, but signing is disabled,
// the Signature is required to be nil upon receiving the message in PubSub.pushMsg.
if msg.Signature != nil {
if !v.validateSignature(msg) {
log.Debugf("message signature validation failed; dropping message from %s", src)
v.tracer.RejectMessage(msg, RejectInvalidSignature)
return
return ValidationError{Reason: RejectInvalidSignature}
}
}

Expand All @@ -252,14 +286,14 @@ func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message) {
id := v.p.msgID(msg.Message)
if !v.p.markSeen(id) {
v.tracer.DuplicateMessage(msg)
return
return nil
} else {
v.tracer.ValidateMessage(msg)
}

var inline, async []*topicVal
for _, val := range vals {
if val.validateInline {
if val.validateInline || synchronous {
inline = append(inline, val)
} else {
async = append(async, val)
Expand All @@ -283,7 +317,7 @@ loop:
if result == ValidationReject {
log.Debugf("message validation failed; dropping message from %s", src)
v.tracer.RejectMessage(msg, RejectValidationFailed)
return
return ValidationError{Reason: RejectValidationFailed}
}

// apply async validators
Expand All @@ -298,16 +332,21 @@ loop:
log.Debugf("message validation throttled; dropping message from %s", src)
v.tracer.RejectMessage(msg, RejectValidationThrottled)
}
return
return nil
}

if result == ValidationIgnore {
v.tracer.RejectMessage(msg, RejectValidationIgnored)
return
return ValidationError{Reason: RejectValidationIgnored}
}

// no async validators, accepted message, send it!
v.p.sendMsg <- msg
select {
case v.p.sendMsg <- msg:
return nil
case <-v.p.ctx.Done():
return v.p.ctx.Err()
}
}

func (v *validation) validateSignature(msg *Message) bool {
Expand Down

0 comments on commit 005ac15

Please sign in to comment.