Skip to content

Commit

Permalink
add a new RawTracer event to track messages dropped in Subscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelMure authored and aschmahmann committed Jul 13, 2021
1 parent 3c7689d commit 9be1c59
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 6 deletions.
1 change: 1 addition & 0 deletions gossip_tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func (gt *gossipTracer) DuplicateMessage(msg *Message) {}
func (gt *gossipTracer) RecvRPC(rpc *RPC) {}
func (gt *gossipTracer) SendRPC(rpc *RPC, p peer.ID) {}
func (gt *gossipTracer) DropRPC(rpc *RPC, p peer.ID) {}
func (gt *gossipTracer) DroppedInSubscribe(msg *Message) {}

func (gt *gossipTracer) ThrottlePeer(p peer.ID) {
gt.Lock()
Expand Down
2 changes: 2 additions & 0 deletions peer_gater.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,3 +449,5 @@ func (pg *peerGater) RecvRPC(rpc *RPC) {}
func (pg *peerGater) SendRPC(rpc *RPC, p peer.ID) {}

func (pg *peerGater) DropRPC(rpc *RPC, p peer.ID) {}

func (pg *peerGater) DroppedInSubscribe(msg *Message) {}
1 change: 1 addition & 0 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,7 @@ func (p *PubSub) notifySubs(msg *Message) {
select {
case f.ch <- msg:
default:
p.tracer.DroppedInSubscribe(msg)
log.Infof("Can't deliver message to subscription for topic %s; subscriber too slow", topic)
}
}
Expand Down
2 changes: 2 additions & 0 deletions score.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,8 @@ func (ps *peerScore) SendRPC(rpc *RPC, p peer.ID) {}

func (ps *peerScore) DropRPC(rpc *RPC, p peer.ID) {}

func (ps *peerScore) DroppedInSubscribe(msg *Message) {}

// message delivery records
func (d *messageDeliveries) getRecord(id string) *deliveryRecord {
rec, ok := d.records[id]
Expand Down
11 changes: 6 additions & 5 deletions tag_tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,9 @@ func (t *tagTracer) RejectMessage(msg *Message, reason string) {
}
}

func (t *tagTracer) RemovePeer(peer.ID) {}
func (t *tagTracer) ThrottlePeer(p peer.ID) {}
func (t *tagTracer) RecvRPC(rpc *RPC) {}
func (t *tagTracer) SendRPC(rpc *RPC, p peer.ID) {}
func (t *tagTracer) DropRPC(rpc *RPC, p peer.ID) {}
func (t *tagTracer) RemovePeer(peer.ID) {}
func (t *tagTracer) ThrottlePeer(p peer.ID) {}
func (t *tagTracer) RecvRPC(rpc *RPC) {}
func (t *tagTracer) SendRPC(rpc *RPC, p peer.ID) {}
func (t *tagTracer) DropRPC(rpc *RPC, p peer.ID) {}
func (t *tagTracer) DroppedInSubscribe(msg *Message) {}
15 changes: 14 additions & 1 deletion trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type EventTracer interface {
Trace(evt *pb.TraceEvent)
}

// RawTracer is a low level tracing interace that allows an application to trace the internal
// RawTracer is a low level tracing interface that allows an application to trace the internal
// operation of the pubsub subsystem.
//
// Note that the tracers are invoked synchronously, which means that application tracers must
Expand Down Expand Up @@ -54,6 +54,9 @@ type RawTracer interface {
SendRPC(rpc *RPC, p peer.ID)
// DropRPC is invoked when an outbound RPC is dropped, typically because of a queue full.
DropRPC(rpc *RPC, p peer.ID)
// DroppedInSubscribe is invoked when the consumer of Subscribe is not reading messages fast enough and
// the pressure release mechanism trigger, dropping messages.
DroppedInSubscribe(msg *Message)
}

// pubsub tracer details
Expand Down Expand Up @@ -325,6 +328,16 @@ func (t *pubsubTracer) DropRPC(rpc *RPC, p peer.ID) {
t.tracer.Trace(evt)
}

func (t *pubsubTracer) DroppedInSubscribe(msg *Message) {
if t == nil {
return
}

for _, tr := range t.raw {
tr.DroppedInSubscribe(msg)
}
}

func (t *pubsubTracer) traceRPCMeta(rpc *RPC) *pb.TraceEvent_RPCMeta {
rpcMeta := new(pb.TraceEvent_RPCMeta)

Expand Down

0 comments on commit 9be1c59

Please sign in to comment.