diff --git a/gossip_tracer.go b/gossip_tracer.go index f9d04068..083327fb 100644 --- a/gossip_tracer.go +++ b/gossip_tracer.go @@ -162,6 +162,7 @@ func (gt *gossipTracer) DuplicateMessage(msg *Message) {} func (gt *gossipTracer) RecvRPC(rpc *RPC) {} func (gt *gossipTracer) SendRPC(rpc *RPC, p peer.ID) {} func (gt *gossipTracer) DropRPC(rpc *RPC, p peer.ID) {} +func (gt *gossipTracer) DroppedInSubscribe(msg *Message) {} func (gt *gossipTracer) ThrottlePeer(p peer.ID) { gt.Lock() diff --git a/peer_gater.go b/peer_gater.go index 6907d138..291fc9fc 100644 --- a/peer_gater.go +++ b/peer_gater.go @@ -449,3 +449,5 @@ func (pg *peerGater) RecvRPC(rpc *RPC) {} func (pg *peerGater) SendRPC(rpc *RPC, p peer.ID) {} func (pg *peerGater) DropRPC(rpc *RPC, p peer.ID) {} + +func (pg *peerGater) DroppedInSubscribe(msg *Message) {} diff --git a/pubsub.go b/pubsub.go index 185386b1..84d4bfdf 100644 --- a/pubsub.go +++ b/pubsub.go @@ -842,6 +842,7 @@ func (p *PubSub) notifySubs(msg *Message) { select { case f.ch <- msg: default: + p.tracer.DroppedInSubscribe(msg) log.Infof("Can't deliver message to subscription for topic %s; subscriber too slow", topic) } } diff --git a/score.go b/score.go index 2e552fc8..bbd79d34 100644 --- a/score.go +++ b/score.go @@ -825,6 +825,8 @@ func (ps *peerScore) SendRPC(rpc *RPC, p peer.ID) {} func (ps *peerScore) DropRPC(rpc *RPC, p peer.ID) {} +func (ps *peerScore) DroppedInSubscribe(msg *Message) {} + // message delivery records func (d *messageDeliveries) getRecord(id string) *deliveryRecord { rec, ok := d.records[id] diff --git a/tag_tracer.go b/tag_tracer.go index b1969577..2182b8da 100644 --- a/tag_tracer.go +++ b/tag_tracer.go @@ -251,8 +251,9 @@ func (t *tagTracer) RejectMessage(msg *Message, reason string) { } } -func (t *tagTracer) RemovePeer(peer.ID) {} -func (t *tagTracer) ThrottlePeer(p peer.ID) {} -func (t *tagTracer) RecvRPC(rpc *RPC) {} -func (t *tagTracer) SendRPC(rpc *RPC, p peer.ID) {} -func (t *tagTracer) DropRPC(rpc *RPC, p peer.ID) {} +func (t *tagTracer) RemovePeer(peer.ID) {} +func (t *tagTracer) ThrottlePeer(p peer.ID) {} +func (t *tagTracer) RecvRPC(rpc *RPC) {} +func (t *tagTracer) SendRPC(rpc *RPC, p peer.ID) {} +func (t *tagTracer) DropRPC(rpc *RPC, p peer.ID) {} +func (t *tagTracer) DroppedInSubscribe(msg *Message) {} diff --git a/trace.go b/trace.go index c11424b3..c7ac52e1 100644 --- a/trace.go +++ b/trace.go @@ -16,7 +16,7 @@ type EventTracer interface { Trace(evt *pb.TraceEvent) } -// RawTracer is a low level tracing interace that allows an application to trace the internal +// RawTracer is a low level tracing interface that allows an application to trace the internal // operation of the pubsub subsystem. // // Note that the tracers are invoked synchronously, which means that application tracers must @@ -54,6 +54,9 @@ type RawTracer interface { SendRPC(rpc *RPC, p peer.ID) // DropRPC is invoked when an outbound RPC is dropped, typically because of a queue full. DropRPC(rpc *RPC, p peer.ID) + // DroppedInSubscribe is invoked when the consumer of Subscribe is not reading messages fast enough and + // the pressure release mechanism trigger, dropping messages. + DroppedInSubscribe(msg *Message) } // pubsub tracer details @@ -325,6 +328,16 @@ func (t *pubsubTracer) DropRPC(rpc *RPC, p peer.ID) { t.tracer.Trace(evt) } +func (t *pubsubTracer) DroppedInSubscribe(msg *Message) { + if t == nil { + return + } + + for _, tr := range t.raw { + tr.DroppedInSubscribe(msg) + } +} + func (t *pubsubTracer) traceRPCMeta(rpc *RPC) *pb.TraceEvent_RPCMeta { rpcMeta := new(pb.TraceEvent_RPCMeta)