Skip to content

Commit

Permalink
expose internalTracer as RawTracer
Browse files Browse the repository at this point in the history
  • Loading branch information
vyzo committed Mar 16, 2021
1 parent 05c505e commit 5457a28
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 36 deletions.
2 changes: 1 addition & 1 deletion gossip_tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (gt *gossipTracer) GetBrokenPromises() map[peer.ID]int {
return res
}

var _ internalTracer = (*gossipTracer)(nil)
var _ RawTracer = (*gossipTracer)(nil)

func (gt *gossipTracer) fulfillPromise(msg *Message) {
mid := gt.msgID(msg.Message)
Expand Down
12 changes: 6 additions & 6 deletions gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er
tagTracer: newTagTracer(h.ConnManager()),
}

// use the withInternalTracer option to hook up the tag tracer
opts = append(opts, withInternalTracer(rt.tagTracer))
// hook the tag tracer
opts = append(opts, WithRawTracer(rt.tagTracer))
return NewPubSub(ctx, h, rt, opts...)
}

Expand Down Expand Up @@ -232,12 +232,12 @@ func WithPeerScore(params *PeerScoreParams, thresholds *PeerScoreThresholds) Opt

// hook the tracer
if ps.tracer != nil {
ps.tracer.internal = append(ps.tracer.internal, gs.score, gs.gossipTracer)
ps.tracer.raw = append(ps.tracer.raw, gs.score, gs.gossipTracer)
} else {
ps.tracer = &pubsubTracer{
internal: []internalTracer{gs.score, gs.gossipTracer},
pid: ps.host.ID(),
msgID: ps.msgID,
raw: []RawTracer{gs.score, gs.gossipTracer},
pid: ps.host.ID(),
msgID: ps.msgID,
}
}

Expand Down
8 changes: 4 additions & 4 deletions peer_gater.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,12 @@ func WithPeerGater(params *PeerGaterParams) Option {

// hook the tracer
if ps.tracer != nil {
ps.tracer.internal = append(ps.tracer.internal, gs.gate)
ps.tracer.raw = append(ps.tracer.raw, gs.gate)
} else {
ps.tracer = &pubsubTracer{
internal: []internalTracer{gs.gate},
pid: ps.host.ID(),
msgID: ps.msgID,
raw: []RawTracer{gs.gate},
pid: ps.host.ID(),
msgID: ps.msgID,
}
}

Expand Down
9 changes: 5 additions & 4 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,13 +430,14 @@ func WithEventTracer(tracer EventTracer) Option {
}
}

// withInternalTracer adds an internal event tracer to the pubsub system
func withInternalTracer(tracer internalTracer) Option {
// WithRawTracer adds a raw tracer to the pubsub system.
// Multiple tracers can be added using multiple invocations of the option.
func WithRawTracer(tracer RawTracer) Option {
return func(p *PubSub) error {
if p.tracer != nil {
p.tracer.internal = append(p.tracer.internal, tracer)
p.tracer.raw = append(p.tracer.raw, tracer)
} else {
p.tracer = &pubsubTracer{internal: []internalTracer{tracer}, pid: p.host.ID(), msgID: p.msgID}
p.tracer = &pubsubTracer{raw: []RawTracer{tracer}, pid: p.host.ID(), msgID: p.msgID}
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion score.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type peerScore struct {
inspectPeriod time.Duration
}

var _ internalTracer = (*peerScore)(nil)
var _ RawTracer = (*peerScore)(nil)

type messageDeliveries struct {
records map[string]*deliveryRecord
Expand Down
4 changes: 2 additions & 2 deletions tag_tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ func (t *tagTracer) nearFirstPeers(msg *Message) []peer.ID {
return peers
}

// -- internalTracer interface methods
var _ internalTracer = (*tagTracer)(nil)
// -- RawTracer interface methods
var _ RawTracer = (*tagTracer)(nil)

func (t *tagTracer) AddPeer(p peer.ID, proto protocol.ID) {
t.tagPeerIfDirect(p)
Expand Down
57 changes: 39 additions & 18 deletions trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,53 @@ import (
pb "github.com/libp2p/go-libp2p-pubsub/pb"
)

// Generic event tracer interface
// EventTracer is a generic event tracer interface.
// This is a high level tracing interface which delivers tracing events, as defined by the protobuf
// schema in pb/trace.proto.
type EventTracer interface {
Trace(evt *pb.TraceEvent)
}

// internal interface for score tracing
type internalTracer interface {
// RawTracer is a low level tracing interace that allows an application to trace the internal
// operation of the pubsub subsystem.
//
// Note that the tracers are invoked synchronously, which means that application tracers must
// take care to not block or modify arguments.
//
// Warning: this interface is not fixed, we may be adding new methods as necessitated by the system
// in the future.
type RawTracer interface {
// AddPeer is invoked when a new peer is added.
AddPeer(p peer.ID, proto protocol.ID)
// RemovePeer is invoked when a peer is removed.
RemovePeer(p peer.ID)
// Join is invoked when a new topic is joined
Join(topic string)
// Leave is invoked when a topic is abandoned
Leave(topic string)
// Graft is invoked when a new peer is grafted on the mesh (gossipsub)
Graft(p peer.ID, topic string)
// Prune is invoked when a peer is pruned from the message (gossipsub)
Prune(p peer.ID, topic string)
// ValidateMessage is invoked when a message first enters the validation pipeline.
ValidateMessage(msg *Message)
// DeliverMessage is invoked when a message is delivered
DeliverMessage(msg *Message)
// RejectMessage is invoked when a message is Rejected or Ignored.
// The reason argument can be one of the named strings Reject*.
RejectMessage(msg *Message, reason string)
// DuplicateMessage is invoked when a duplicate message is dropped.
DuplicateMessage(msg *Message)
// ThrottlePeer is invoked when a peer is throttled by the peer gater.
ThrottlePeer(p peer.ID)
}

// pubsub tracer details
type pubsubTracer struct {
tracer EventTracer
internal []internalTracer
pid peer.ID
msgID MsgIdFunction
tracer EventTracer
raw []RawTracer
pid peer.ID
msgID MsgIdFunction
}

func (t *pubsubTracer) PublishMessage(msg *Message) {
Expand Down Expand Up @@ -66,7 +87,7 @@ func (t *pubsubTracer) ValidateMessage(msg *Message) {
}

if msg.ReceivedFrom != t.pid {
for _, tr := range t.internal {
for _, tr := range t.raw {
tr.ValidateMessage(msg)
}
}
Expand All @@ -78,7 +99,7 @@ func (t *pubsubTracer) RejectMessage(msg *Message, reason string) {
}

if msg.ReceivedFrom != t.pid {
for _, tr := range t.internal {
for _, tr := range t.raw {
tr.RejectMessage(msg, reason)
}
}
Expand Down Expand Up @@ -109,7 +130,7 @@ func (t *pubsubTracer) DuplicateMessage(msg *Message) {
}

if msg.ReceivedFrom != t.pid {
for _, tr := range t.internal {
for _, tr := range t.raw {
tr.DuplicateMessage(msg)
}
}
Expand Down Expand Up @@ -139,7 +160,7 @@ func (t *pubsubTracer) DeliverMessage(msg *Message) {
}

if msg.ReceivedFrom != t.pid {
for _, tr := range t.internal {
for _, tr := range t.raw {
tr.DeliverMessage(msg)
}
}
Expand Down Expand Up @@ -168,7 +189,7 @@ func (t *pubsubTracer) AddPeer(p peer.ID, proto protocol.ID) {
return
}

for _, tr := range t.internal {
for _, tr := range t.raw {
tr.AddPeer(p, proto)
}

Expand Down Expand Up @@ -196,7 +217,7 @@ func (t *pubsubTracer) RemovePeer(p peer.ID) {
return
}

for _, tr := range t.internal {
for _, tr := range t.raw {
tr.RemovePeer(p)
}

Expand Down Expand Up @@ -366,7 +387,7 @@ func (t *pubsubTracer) Join(topic string) {
return
}

for _, tr := range t.internal {
for _, tr := range t.raw {
tr.Join(topic)
}

Expand All @@ -392,7 +413,7 @@ func (t *pubsubTracer) Leave(topic string) {
return
}

for _, tr := range t.internal {
for _, tr := range t.raw {
tr.Leave(topic)
}

Expand All @@ -418,7 +439,7 @@ func (t *pubsubTracer) Graft(p peer.ID, topic string) {
return
}

for _, tr := range t.internal {
for _, tr := range t.raw {
tr.Graft(p, topic)
}

Expand All @@ -445,7 +466,7 @@ func (t *pubsubTracer) Prune(p peer.ID, topic string) {
return
}

for _, tr := range t.internal {
for _, tr := range t.raw {
tr.Prune(p, topic)
}

Expand All @@ -472,7 +493,7 @@ func (t *pubsubTracer) ThrottlePeer(p peer.ID) {
return
}

for _, tr := range t.internal {
for _, tr := range t.raw {
tr.ThrottlePeer(p)
}
}

0 comments on commit 5457a28

Please sign in to comment.