diff --git a/gossip_tracer.go b/gossip_tracer.go index a55f38b5..d1fe5a01 100644 --- a/gossip_tracer.go +++ b/gossip_tracer.go @@ -17,6 +17,8 @@ type gossipTracer struct { msgID MsgIdFunction + followUpTime time.Duration + // promises for messages by message ID; for each message tracked, we track the promise // expiration time for each peer. promises map[string]map[peer.ID]time.Time @@ -39,6 +41,7 @@ func (gt *gossipTracer) Start(gs *GossipSubRouter) { } gt.msgID = gs.p.msgID + gt.followUpTime = gs.params.IWantFollowupTime } // track a promise to deliver a message from a list of msgIDs we are requesting @@ -61,7 +64,7 @@ func (gt *gossipTracer) AddPromise(p peer.ID, msgIDs []string) { _, ok = promises[p] if !ok { - promises[p] = time.Now().Add(GossipSubIWantFollowupTime) + promises[p] = time.Now().Add(gt.followUpTime) peerPromises, ok := gt.peerPromises[p] if !ok { peerPromises = make(map[string]struct{}) diff --git a/gossip_tracer_test.go b/gossip_tracer_test.go index afabdb44..a06d240d 100644 --- a/gossip_tracer_test.go +++ b/gossip_tracer_test.go @@ -11,13 +11,8 @@ import ( func TestBrokenPromises(t *testing.T) { // tests that unfullfilled promises are tracked correctly - originalGossipSubIWantFollowupTime := GossipSubIWantFollowupTime - GossipSubIWantFollowupTime = 100 * time.Millisecond - defer func() { - GossipSubIWantFollowupTime = originalGossipSubIWantFollowupTime - }() - gt := newGossipTracer() + gt.followUpTime = 100 * time.Millisecond peerA := peer.ID("A") peerB := peer.ID("B") diff --git a/gossipsub.go b/gossipsub.go index 16081e20..00991630 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -28,175 +28,192 @@ const ( GossipSubID_v11 = protocol.ID("/meshsub/1.1.0") ) +// Defines the default gossipsub parameters. var ( + GossipSubD = 6 + GossipSubDlo = 5 + GossipSubDhi = 12 + GossipSubDscore = 4 + GossipSubDout = 2 + GossipSubHistoryLength = 5 + GossipSubHistoryGossip = 3 + GossipSubDlazy = 6 + GossipSubGossipFactor = 0.25 + GossipSubGossipRetransmission = 3 + GossipSubHeartbeatInitialDelay = 100 * time.Millisecond + GossipSubHeartbeatInterval = 1 * time.Second + GossipSubFanoutTTL = 60 * time.Second + GossipSubPrunePeers = 16 + GossipSubPruneBackoff = time.Minute + GossipSubConnectors = 8 + GossipSubMaxPendingConnections = 128 + GossipSubConnectionTimeout = 30 * time.Second + GossipSubDirectConnectTicks uint64 = 300 + GossipSubDirectConnectInitialDelay = time.Second + GossipSubOpportunisticGraftTicks uint64 = 60 + GossipSubOpportunisticGraftPeers = 2 + GossipSubGraftFloodThreshold = 10 * time.Second + GossipSubMaxIHaveLength = 5000 + GossipSubMaxIHaveMessages = 10 + GossipSubIWantFollowupTime = 3 * time.Second +) + +// GossipSubParams defines all the gossipsub specific parameters. +type GossipSubParams struct { // overlay parameters. - // GossipSubD sets the optimal degree for a GossipSub topic mesh. For example, if GossipSubD == 6, + // D sets the optimal degree for a GossipSub topic mesh. For example, if D == 6, // each peer will want to have about six peers in their mesh for each topic they're subscribed to. - // GossipSubD should be set somewhere between GossipSubDlo and GossipSubDhi. - GossipSubD = 6 + // D should be set somewhere between Dlo and Dhi. + D int - // GossipSubDlo sets the lower bound on the number of peers we keep in a GossipSub topic mesh. - // If we have fewer than GossipSubDlo peers, we will attempt to graft some more into the mesh at + // Dlo sets the lower bound on the number of peers we keep in a GossipSub topic mesh. + // If we have fewer than Dlo peers, we will attempt to graft some more into the mesh at // the next heartbeat. - GossipSubDlo = 5 + Dlo int - // GossipSubDhi sets the upper bound on the number of peers we keep in a GossipSub topic mesh. - // If we have more than GossipSubDhi peers, we will select some to prune from the mesh at the next heartbeat. - GossipSubDhi = 12 + // Dhi sets the upper bound on the number of peers we keep in a GossipSub topic mesh. + // If we have more than Dhi peers, we will select some to prune from the mesh at the next heartbeat. + Dhi int - // GossipSubDscore affects how peers are selected when pruning a mesh due to over subscription. - // At least GossipSubDscore of the retained peers will be high-scoring, while the remainder are + // Dscore affects how peers are selected when pruning a mesh due to over subscription. + // At least Dscore of the retained peers will be high-scoring, while the remainder are // chosen randomly. - GossipSubDscore = 4 + Dscore int - // GossipSubDout sets the quota for the number of outbound connections to maintain in a topic mesh. + // Dout sets the quota for the number of outbound connections to maintain in a topic mesh. // When the mesh is pruned due to over subscription, we make sure that we have outbound connections - // to at least GossipSubDout of the survivor peers. This prevents sybil attackers from overwhelming + // to at least Dout of the survivor peers. This prevents sybil attackers from overwhelming // our mesh with incoming connections. // - // GossipSubDout must be set below GossipSubDlo, and must not exceed GossipSubD / 2. - GossipSubDout = 2 + // Dout must be set below Dlo, and must not exceed D / 2. + Dout int // gossip parameters - // GossipSubHistoryLength controls the size of the message cache used for gossip. - // The message cache will remember messages for GossipSubHistoryLength heartbeats. - GossipSubHistoryLength = 5 + // HistoryLength controls the size of the message cache used for gossip. + // The message cache will remember messages for HistoryLength heartbeats. + HistoryLength int - // GossipSubHistoryGossip controls how many cached message ids we will advertise in + // HistoryGossip controls how many cached message ids we will advertise in // IHAVE gossip messages. When asked for our seen message IDs, we will return - // only those from the most recent GossipSubHistoryGossip heartbeats. The slack between - // GossipSubHistoryGossip and GossipSubHistoryLength allows us to avoid advertising messages + // only those from the most recent HistoryGossip heartbeats. The slack between + // HistoryGossip and HistoryLength allows us to avoid advertising messages // that will be expired by the time they're requested. // - // GossipSubHistoryGossip must be less than or equal to GossipSubHistoryLength to + // HistoryGossip must be less than or equal to HistoryLength to // avoid a runtime panic. - GossipSubHistoryGossip = 3 + HistoryGossip int - // GossipSubDlazy affects how many peers we will emit gossip to at each heartbeat. - // We will send gossip to at least GossipSubDlazy peers outside our mesh. The actual - // number may be more, depending on GossipSubGossipFactor and how many peers we're + // Dlazy affects how many peers we will emit gossip to at each heartbeat. + // We will send gossip to at least Dlazy peers outside our mesh. The actual + // number may be more, depending on GossipFactor and how many peers we're // connected to. - GossipSubDlazy = 6 + Dlazy int - // GossipSubGossipFactor affects how many peers we will emit gossip to at each heartbeat. - // We will send gossip to GossipSubGossipFactor * (total number of non-mesh peers), or - // GossipSubDlazy, whichever is greater. - GossipSubGossipFactor = 0.25 + // GossipFactor affects how many peers we will emit gossip to at each heartbeat. + // We will send gossip to GossipFactor * (total number of non-mesh peers), or + // Dlazy, whichever is greater. + GossipFactor float64 - // GossipSubGossipRetransmission controls how many times we will allow a peer to request + // GossipRetransmission controls how many times we will allow a peer to request // the same message id through IWANT gossip before we start ignoring them. This is designed // to prevent peers from spamming us with requests and wasting our resources. - GossipSubGossipRetransmission = 3 + GossipRetransmission int // heartbeat interval - // GossipSubHeartbeatInitialDelay is the short delay before the heartbeat timer begins + // HeartbeatInitialDelay is the short delay before the heartbeat timer begins // after the router is initialized. - GossipSubHeartbeatInitialDelay = 100 * time.Millisecond + HeartbeatInitialDelay time.Duration - // GossipSubHeartbeatInterval controls the time between heartbeats. - GossipSubHeartbeatInterval = 1 * time.Second + // HeartbeatInterval controls the time between heartbeats. + HeartbeatInterval time.Duration - // GossipSubFanoutTTL controls how long we keep track of the fanout state. If it's been - // GossipSubFanoutTTL since we've published to a topic that we're not subscribed to, + // FanoutTTL controls how long we keep track of the fanout state. If it's been + // FanoutTTL since we've published to a topic that we're not subscribed to, // we'll delete the fanout map for that topic. - GossipSubFanoutTTL = 60 * time.Second + FanoutTTL time.Duration - // GossipSubPrunePeers controls the number of peers to include in prune Peer eXchange. + // PrunePeers controls the number of peers to include in prune Peer eXchange. // When we prune a peer that's eligible for PX (has a good score, etc), we will try to - // send them signed peer records for up to GossipSubPrunePeers other peers that we + // send them signed peer records for up to PrunePeers other peers that we // know of. - GossipSubPrunePeers = 16 + PrunePeers int - // GossipSubPruneBackoff controls the backoff time for pruned peers. This is how long + // PruneBackoff controls the backoff time for pruned peers. This is how long // a peer must wait before attempting to graft into our mesh again after being pruned. - // When pruning a peer, we send them our value of GossipSubPruneBackoff so they know + // When pruning a peer, we send them our value of PruneBackoff so they know // the minimum time to wait. Peers running older versions may not send a backoff time, - // so if we receive a prune message without one, we will wait at least GossipSubPruneBackoff + // so if we receive a prune message without one, we will wait at least PruneBackoff // before attempting to re-graft. - GossipSubPruneBackoff = time.Minute + PruneBackoff time.Duration - // GossipSubConnectors controls the number of active connection attempts for peers obtained through PX. - GossipSubConnectors = 8 + // Connectors controls the number of active connection attempts for peers obtained through PX. + Connectors int - // GossipSubMaxPendingConnections sets the maximum number of pending connections for peers attempted through px. - GossipSubMaxPendingConnections = 128 + // MaxPendingConnections sets the maximum number of pending connections for peers attempted through px. + MaxPendingConnections int - // GossipSubConnectionTimeout controls the timeout for connection attempts. - GossipSubConnectionTimeout = 30 * time.Second + // ConnectionTimeout controls the timeout for connection attempts. + ConnectionTimeout time.Duration - // GossipSubDirectConnectTicks is the number of heartbeat ticks for attempting to reconnect direct peers + // DirectConnectTicks is the number of heartbeat ticks for attempting to reconnect direct peers // that are not currently connected. - GossipSubDirectConnectTicks uint64 = 300 + DirectConnectTicks uint64 - // GossipSubDirectConnectInitialDelay is the initial delay before opening connections to direct peers - GossipSubDirectConnectInitialDelay = time.Second + // DirectConnectInitialDelay is the initial delay before opening connections to direct peers + DirectConnectInitialDelay time.Duration - // GossipSubOpportunisticGraftTicks is the number of heartbeat ticks for attempting to improve the mesh - // with opportunistic grafting. Every GossipSubOpportunisticGraftTicks we will attempt to select some + // OpportunisticGraftTicks is the number of heartbeat ticks for attempting to improve the mesh + // with opportunistic grafting. Every OpportunisticGraftTicks we will attempt to select some // high-scoring mesh peers to replace lower-scoring ones, if the median score of our mesh peers falls // below a threshold (see https://godoc.org/github.com/libp2p/go-libp2p-pubsub#PeerScoreThresholds). - GossipSubOpportunisticGraftTicks uint64 = 60 + OpportunisticGraftTicks uint64 - // GossipSubOpportunisticGraftPeers is the number of peers to opportunistically graft. - GossipSubOpportunisticGraftPeers = 2 + // OpportunisticGraftPeers is the number of peers to opportunistically graft. + OpportunisticGraftPeers int - // If a GRAFT comes before GossipSubGraftFloodThreshold has elapsed since the last PRUNE, + // If a GRAFT comes before GraftFloodThreshold has elapsed since the last PRUNE, // then there is an extra score penalty applied to the peer through P7. - GossipSubGraftFloodThreshold = 10 * time.Second + GraftFloodThreshold time.Duration - // GossipSubMaxIHaveLength is the maximum number of messages to include in an IHAVE message. + // MaxIHaveLength is the maximum number of messages to include in an IHAVE message. // Also controls the maximum number of IHAVE ids we will accept and request with IWANT from a // peer within a heartbeat, to protect from IHAVE floods. You should adjust this value from the - // default if your system is pushing more than 5000 messages in GossipSubHistoryGossip heartbeats; + // default if your system is pushing more than 5000 messages in HistoryGossip heartbeats; // with the defaults this is 1666 messages/s. - GossipSubMaxIHaveLength = 5000 + MaxIHaveLength int - // GossipSubMaxIHaveMessages is the maximum number of IHAVE messages to accept from a peer within a heartbeat. - GossipSubMaxIHaveMessages = 10 + // MaxIHaveMessages is the maximum number of IHAVE messages to accept from a peer within a heartbeat. + MaxIHaveMessages int // Time to wait for a message requested through IWANT following an IHAVE advertisement. // If the message is not received within this window, a broken promise is declared and // the router may apply bahavioural penalties. - GossipSubIWantFollowupTime = 3 * time.Second -) + IWantFollowupTime time.Duration +} // NewGossipSub returns a new PubSub object using GossipSubRouter as the router. func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) { + params := DefaultGossipSubParams() rt := &GossipSubRouter{ - peers: make(map[peer.ID]protocol.ID), - mesh: make(map[string]map[peer.ID]struct{}), - fanout: make(map[string]map[peer.ID]struct{}), - lastpub: make(map[string]int64), - gossip: make(map[peer.ID][]*pb.ControlIHave), - control: make(map[peer.ID]*pb.ControlMessage), - backoff: make(map[string]map[peer.ID]time.Time), - peerhave: make(map[peer.ID]int), - iasked: make(map[peer.ID]int), - outbound: make(map[peer.ID]bool), - connect: make(chan connectInfo, GossipSubMaxPendingConnections), - mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength), - + peers: make(map[peer.ID]protocol.ID), + mesh: make(map[string]map[peer.ID]struct{}), + fanout: make(map[string]map[peer.ID]struct{}), + lastpub: make(map[string]int64), + gossip: make(map[peer.ID][]*pb.ControlIHave), + control: make(map[peer.ID]*pb.ControlMessage), + backoff: make(map[string]map[peer.ID]time.Time), + peerhave: make(map[peer.ID]int), + iasked: make(map[peer.ID]int), + outbound: make(map[peer.ID]bool), + connect: make(chan connectInfo, params.MaxPendingConnections), + mcache: NewMessageCache(params.HistoryGossip, params.HistoryLength), protos: GossipSubDefaultProtocols, feature: GossipSubDefaultFeatures, - - // these are configured per router to allow variation in tests - D: GossipSubD, - Dlo: GossipSubDlo, - Dhi: GossipSubDhi, - Dscore: GossipSubDscore, - Dout: GossipSubDout, - Dlazy: GossipSubDlazy, - - // these must be pulled in to resolve races in tests... sigh. - directConnectTicks: GossipSubDirectConnectTicks, - opportunisticGraftTicks: GossipSubOpportunisticGraftTicks, - - fanoutTTL: GossipSubFanoutTTL, - tagTracer: newTagTracer(h.ConnManager()), + params: params, } // hook the tag tracer @@ -204,6 +221,39 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er return NewPubSub(ctx, h, rt, opts...) } +// DefaultGossipSubParams returns the default gossip sub parameters +// as a config. +func DefaultGossipSubParams() GossipSubParams { + return GossipSubParams{ + D: GossipSubD, + Dlo: GossipSubDlo, + Dhi: GossipSubDhi, + Dscore: GossipSubDscore, + Dout: GossipSubDout, + HistoryLength: GossipSubHistoryLength, + HistoryGossip: GossipSubHistoryLength, + Dlazy: GossipSubDlazy, + GossipFactor: GossipSubGossipFactor, + GossipRetransmission: GossipSubGossipRetransmission, + HeartbeatInitialDelay: GossipSubHeartbeatInitialDelay, + HeartbeatInterval: GossipSubHeartbeatInterval, + FanoutTTL: GossipSubFanoutTTL, + PrunePeers: GossipSubPrunePeers, + PruneBackoff: GossipSubPruneBackoff, + Connectors: GossipSubConnectors, + MaxPendingConnections: GossipSubMaxPendingConnections, + ConnectionTimeout: GossipSubConnectionTimeout, + DirectConnectTicks: GossipSubDirectConnectTicks, + DirectConnectInitialDelay: GossipSubDirectConnectInitialDelay, + OpportunisticGraftTicks: GossipSubOpportunisticGraftTicks, + OpportunisticGraftPeers: GossipSubOpportunisticGraftPeers, + GraftFloodThreshold: GossipSubGraftFloodThreshold, + MaxIHaveLength: GossipSubMaxIHaveLength, + MaxIHaveMessages: GossipSubMaxIHaveMessages, + IWantFollowupTime: GossipSubIWantFollowupTime, + } +} + // WithPeerScore is a gossipsub router option that enables peer scoring. func WithPeerScore(params *PeerScoreParams, thresholds *PeerScoreThresholds) Option { return func(ps *PubSub) error { @@ -318,7 +368,24 @@ func WithDirectConnectTicks(t uint64) Option { if !ok { return fmt.Errorf("pubsub router is not gossipsub") } - gs.directConnectTicks = t + gs.params.DirectConnectTicks = t + return nil + } +} + +// WithGossipSubParams is a gossip sub router option that allows a custom +// config to be set when instantiating the gossipsub router. +func WithGossipSubParams(cfg GossipSubParams) Option { + return func(ps *PubSub) error { + gs, ok := ps.rt.(*GossipSubRouter) + if !ok { + return fmt.Errorf("pubsub router is not gossipsub") + } + // Overwrite current config and associated variables in the router. + gs.params = cfg + gs.connect = make(chan connectInfo, cfg.MaxPendingConnections) + gs.mcache = NewMessageCache(cfg.HistoryGossip, cfg.HistoryLength) + return nil } } @@ -355,6 +422,9 @@ type GossipSubRouter struct { tagTracer *tagTracer gate *peerGater + // config for gossipsub parameters + params GossipSubParams + // whether PX is enabled; this should be enabled in bootstrappers and other well connected/trusted // nodes. doPX bool @@ -384,19 +454,6 @@ type GossipSubRouter struct { // number of heartbeats since the beginning of time; this allows us to amortize some resource // clean up -- eg backoff clean up. heartbeatTicks uint64 - - // overly parameter "constants" - // these are pulled from their global value or else the race detector is angry on travis - // it also allows us to change them per peer in tests, which is a plus - D, Dlo, Dhi, Dscore, Dout, Dlazy int - - // tick "constants" for triggering direct connect and opportunistic grafting - // these are pulled from their global value or else the race detector is angry on travis - directConnectTicks, opportunisticGraftTicks uint64 - - // fanout expiry ttl "constant" - // this is pulled from its global value or else the race detector is angry on travis - fanoutTTL time.Duration } type connectInfo struct { @@ -428,15 +485,15 @@ func (gs *GossipSubRouter) Attach(p *PubSub) { go gs.heartbeatTimer() // start the PX connectors - for i := 0; i < GossipSubConnectors; i++ { + for i := 0; i < gs.params.Connectors; i++ { go gs.connector() } // connect to direct peers if len(gs.direct) > 0 { go func() { - if GossipSubDirectConnectInitialDelay > 0 { - time.Sleep(GossipSubDirectConnectInitialDelay) + if gs.params.DirectConnectInitialDelay > 0 { + time.Sleep(gs.params.DirectConnectInitialDelay) } for p := range gs.direct { gs.connect <- connectInfo{p: p} @@ -508,10 +565,10 @@ func (gs *GossipSubRouter) EnoughPeers(topic string, suggested int) bool { gsPeers = len(gs.mesh[topic]) if suggested == 0 { - suggested = gs.Dlo + suggested = gs.params.Dlo } - if fsPeers+gsPeers >= suggested || gsPeers >= gs.Dhi { + if fsPeers+gsPeers >= suggested || gsPeers >= gs.params.Dhi { return true } @@ -560,12 +617,12 @@ func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb. // IHAVE flood protection gs.peerhave[p]++ - if gs.peerhave[p] > GossipSubMaxIHaveMessages { + if gs.peerhave[p] > gs.params.MaxIHaveMessages { log.Debugf("IHAVE: peer %s has advertised too many times (%d) within this heartbeat interval; ignoring", p, gs.peerhave[p]) return nil } - if gs.iasked[p] >= GossipSubMaxIHaveLength { + if gs.iasked[p] >= gs.params.MaxIHaveLength { log.Debugf("IHAVE: peer %s has already advertised too many messages (%d); ignoring", p, gs.iasked[p]) return nil } @@ -591,8 +648,8 @@ func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb. } iask := len(iwant) - if iask+gs.iasked[p] > GossipSubMaxIHaveLength { - iask = GossipSubMaxIHaveLength - gs.iasked[p] + if iask+gs.iasked[p] > gs.params.MaxIHaveLength { + iask = gs.params.MaxIHaveLength - gs.iasked[p] } log.Debugf("IHAVE: Asking for %d out of %d messages from %s", iask, len(iwant), p) @@ -630,7 +687,7 @@ func (gs *GossipSubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb. continue } - if count > GossipSubGossipRetransmission { + if count > gs.params.GossipRetransmission { log.Debugf("IWANT: Peer %s has asked for message %s too many times; ignoring request", p, mid) continue } @@ -696,7 +753,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb. // no PX doPX = false // check the flood cutoff -- is the GRAFT coming too fast? - floodCutoff := expire.Add(GossipSubGraftFloodThreshold - GossipSubPruneBackoff) + floodCutoff := expire.Add(gs.params.GraftFloodThreshold - gs.params.PruneBackoff) if now.Before(floodCutoff) { // extra penalty gs.score.AddPenalty(p, 1) @@ -723,7 +780,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb. // check the number of mesh peers; if it is at (or over) Dhi, we only accept grafts // from peers with outbound connections; this is a defensive check to restrict potential // mesh takeover attacks combined with love bombing - if len(peers) >= gs.Dhi && !gs.outbound[p] { + if len(peers) >= gs.params.Dhi && !gs.outbound[p] { prune = append(prune, topic) gs.addBackoff(p, topic) continue @@ -781,7 +838,7 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) { } func (gs *GossipSubRouter) addBackoff(p peer.ID, topic string) { - gs.doAddBackoff(p, topic, GossipSubPruneBackoff) + gs.doAddBackoff(p, topic, gs.params.PruneBackoff) } func (gs *GossipSubRouter) doAddBackoff(p peer.ID, topic string, interval time.Duration) { @@ -797,9 +854,9 @@ func (gs *GossipSubRouter) doAddBackoff(p peer.ID, topic string, interval time.D } func (gs *GossipSubRouter) pxConnect(peers []*pb.PeerInfo) { - if len(peers) > GossipSubPrunePeers { + if len(peers) > gs.params.PrunePeers { shufflePeerInfo(peers) - peers = peers[:GossipSubPrunePeers] + peers = peers[:gs.params.PrunePeers] } toconnect := make([]connectInfo, 0, len(peers)) @@ -866,7 +923,7 @@ func (gs *GossipSubRouter) connector() { } } - ctx, cancel := context.WithTimeout(gs.p.ctx, GossipSubConnectionTimeout) + ctx, cancel := context.WithTimeout(gs.p.ctx, gs.params.ConnectionTimeout) err := gs.p.host.Connect(ctx, peer.AddrInfo{ID: ci.p}) cancel() if err != nil { @@ -923,7 +980,7 @@ func (gs *GossipSubRouter) Publish(msg *Message) { gmap, ok = gs.fanout[topic] if !ok || len(gmap) == 0 { // we don't have any, pick some with score above the publish threshold - peers := gs.getPeers(topic, gs.D, func(p peer.ID) bool { + peers := gs.getPeers(topic, gs.params.D, func(p peer.ID) bool { _, direct := gs.direct[p] return !direct && gs.score.Score(p) >= gs.publishThreshold }) @@ -970,9 +1027,9 @@ func (gs *GossipSubRouter) Join(topic string) { } } - if len(gmap) < gs.D { + if len(gmap) < gs.params.D { // we need more peers; eager, as this would get fixed in the next heartbeat - more := gs.getPeers(topic, gs.D-len(gmap), func(p peer.ID) bool { + more := gs.getPeers(topic, gs.params.D-len(gmap), func(p peer.ID) bool { // filter our current peers, direct peers, and peers with negative scores _, inMesh := gmap[p] _, direct := gs.direct[p] @@ -986,7 +1043,7 @@ func (gs *GossipSubRouter) Join(topic string) { delete(gs.fanout, topic) delete(gs.lastpub, topic) } else { - peers := gs.getPeers(topic, gs.D, func(p peer.ID) bool { + peers := gs.getPeers(topic, gs.params.D, func(p peer.ID) bool { // filter direct peers and peers with negative score _, direct := gs.direct[p] return !direct && gs.score.Score(p) >= 0 @@ -1215,14 +1272,14 @@ func fragmentMessageIds(msgIds []string, limit int) [][]string { } func (gs *GossipSubRouter) heartbeatTimer() { - time.Sleep(GossipSubHeartbeatInitialDelay) + time.Sleep(gs.params.HeartbeatInitialDelay) select { case gs.p.eval <- gs.heartbeat: case <-gs.p.ctx.Done(): return } - ticker := time.NewTicker(GossipSubHeartbeatInterval) + ticker := time.NewTicker(gs.params.HeartbeatInterval) defer ticker.Stop() for { @@ -1299,9 +1356,9 @@ func (gs *GossipSubRouter) heartbeat() { } // do we have enough peers? - if l := len(peers); l < gs.Dlo { + if l := len(peers); l < gs.params.Dlo { backoff := gs.backoff[topic] - ineed := gs.D - l + ineed := gs.params.D - l plst := gs.getPeers(topic, ineed, func(p peer.ID) bool { // filter our current and direct peers, peers we are backing off, and peers with negative score _, inMesh := peers[p] @@ -1316,7 +1373,7 @@ func (gs *GossipSubRouter) heartbeat() { } // do we have too many peers? - if len(peers) > gs.Dhi { + if len(peers) > gs.params.Dhi { plst := peerMapToList(peers) // sort by score (but shuffle first for the case we don't use the score) @@ -1327,18 +1384,18 @@ func (gs *GossipSubRouter) heartbeat() { // We keep the first D_score peers by score and the remaining up to D randomly // under the constraint that we keep D_out peers in the mesh (if we have that many) - shufflePeers(plst[gs.Dscore:]) + shufflePeers(plst[gs.params.Dscore:]) // count the outbound peers we are keeping outbound := 0 - for _, p := range plst[:gs.D] { + for _, p := range plst[:gs.params.D] { if gs.outbound[p] { outbound++ } } // if it's less than D_out, bubble up some outbound peers from the random selection - if outbound < gs.Dout { + if outbound < gs.params.Dout { rotate := func(i int) { // rotate the plst to the right and put the ith peer in the front p := plst[i] @@ -1351,7 +1408,7 @@ func (gs *GossipSubRouter) heartbeat() { // first bubble up all outbound peers already in the selection to the front if outbound > 0 { ihave := outbound - for i := 1; i < gs.D && ihave > 0; i++ { + for i := 1; i < gs.params.D && ihave > 0; i++ { p := plst[i] if gs.outbound[p] { rotate(i) @@ -1361,8 +1418,8 @@ func (gs *GossipSubRouter) heartbeat() { } // now bubble up enough outbound peers outside the selection to the front - ineed := gs.Dout - outbound - for i := gs.D; i < len(plst) && ineed > 0; i++ { + ineed := gs.params.Dout - outbound + for i := gs.params.D; i < len(plst) && ineed > 0; i++ { p := plst[i] if gs.outbound[p] { rotate(i) @@ -1372,14 +1429,14 @@ func (gs *GossipSubRouter) heartbeat() { } // prune the excess peers - for _, p := range plst[gs.D:] { + for _, p := range plst[gs.params.D:] { log.Debugf("HEARTBEAT: Remove mesh link to %s in %s", p, topic) prunePeer(p) } } // do we have enough outboud peers? - if len(peers) >= gs.Dlo { + if len(peers) >= gs.params.Dlo { // count the outbound peers we have outbound := 0 for p := range peers { @@ -1389,8 +1446,8 @@ func (gs *GossipSubRouter) heartbeat() { } // if it's less than D_out, select some peers with outbound connections and graft them - if outbound < gs.Dout { - ineed := gs.Dout - outbound + if outbound < gs.params.Dout { + ineed := gs.params.Dout - outbound backoff := gs.backoff[topic] plst := gs.getPeers(topic, ineed, func(p peer.ID) bool { // filter our current and direct peers, peers we are backing off, and peers with negative score @@ -1407,7 +1464,7 @@ func (gs *GossipSubRouter) heartbeat() { } // should we try to improve the mesh with opportunistic grafting? - if gs.heartbeatTicks%gs.opportunisticGraftTicks == 0 && len(peers) > 1 { + if gs.heartbeatTicks%gs.params.OpportunisticGraftTicks == 0 && len(peers) > 1 { // Opportunistic grafting works as follows: we check the median score of peers in the // mesh; if this score is below the opportunisticGraftThreshold, we select a few peers at // random with score over the median. @@ -1426,7 +1483,7 @@ func (gs *GossipSubRouter) heartbeat() { // if the median score is below the threshold, select a better peer (if any) and GRAFT if medianScore < gs.opportunisticGraftThreshold { backoff := gs.backoff[topic] - plst = gs.getPeers(topic, GossipSubOpportunisticGraftPeers, func(p peer.ID) bool { + plst = gs.getPeers(topic, gs.params.OpportunisticGraftPeers, func(p peer.ID) bool { _, inMesh := peers[p] _, doBackoff := backoff[p] _, direct := gs.direct[p] @@ -1448,7 +1505,7 @@ func (gs *GossipSubRouter) heartbeat() { // expire fanout for topics we haven't published to in a while now := time.Now().UnixNano() for topic, lastpub := range gs.lastpub { - if lastpub+int64(gs.fanoutTTL) < now { + if lastpub+int64(gs.params.FanoutTTL) < now { delete(gs.fanout, topic) delete(gs.lastpub, topic) } @@ -1465,8 +1522,8 @@ func (gs *GossipSubRouter) heartbeat() { } // do we need more peers? - if len(peers) < gs.D { - ineed := gs.D - len(peers) + if len(peers) < gs.params.D { + ineed := gs.params.D - len(peers) plst := gs.getPeers(topic, ineed, func(p peer.ID) bool { // filter our current and direct peers and peers with score above the publish threshold _, inFanout := peers[p] @@ -1537,7 +1594,7 @@ func (gs *GossipSubRouter) clearBackoff() { func (gs *GossipSubRouter) directConnect() { // we donly do this every some ticks to allow pending connections to complete and account // for restarts/downtime - if gs.heartbeatTicks%gs.directConnectTicks != 0 { + if gs.heartbeatTicks%gs.params.DirectConnectTicks != 0 { return } @@ -1608,7 +1665,7 @@ func (gs *GossipSubRouter) emitGossip(topic string, exclude map[peer.ID]struct{} shuffleStrings(mids) // if we are emitting more than GossipSubMaxIHaveLength mids, truncate the list - if len(mids) > GossipSubMaxIHaveLength { + if len(mids) > gs.params.MaxIHaveLength { // we do the truncation (with shuffling) per peer below log.Debugf("too many messages for gossip; will truncate IHAVE list (%d messages)", len(mids)) } @@ -1626,8 +1683,8 @@ func (gs *GossipSubRouter) emitGossip(topic string, exclude map[peer.ID]struct{} } } - target := gs.Dlazy - factor := int(GossipSubGossipFactor * float64(len(peers))) + target := gs.params.Dlazy + factor := int(gs.params.GossipFactor * float64(len(peers))) if factor > target { target = factor } @@ -1642,11 +1699,11 @@ func (gs *GossipSubRouter) emitGossip(topic string, exclude map[peer.ID]struct{} // Emit the IHAVE gossip to the selected peers. for _, p := range peers { peerMids := mids - if len(mids) > GossipSubMaxIHaveLength { + if len(mids) > gs.params.MaxIHaveLength { // we do this per peer so that we emit a different set for each peer. // we have enough redundancy in the system that this will significantly increase the message // coverage when we do truncate. - peerMids = make([]string, GossipSubMaxIHaveLength) + peerMids = make([]string, gs.params.MaxIHaveLength) shuffleStrings(mids) copy(peerMids, mids) } @@ -1749,11 +1806,11 @@ func (gs *GossipSubRouter) makePrune(p peer.ID, topic string, doPX bool) *pb.Con return &pb.ControlPrune{TopicID: &topic} } - backoff := uint64(GossipSubPruneBackoff / time.Second) + backoff := uint64(gs.params.PruneBackoff / time.Second) var px []*pb.PeerInfo if doPX { // select peers for Peer eXchange - peers := gs.getPeers(topic, GossipSubPrunePeers, func(xp peer.ID) bool { + peers := gs.getPeers(topic, gs.params.PrunePeers, func(xp peer.ID) bool { return p != xp && gs.score.Score(xp) >= 0 }) diff --git a/gossipsub_test.go b/gossipsub_test.go index 11f9ed35..e7e3762c 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -967,10 +967,10 @@ func TestGossipsubStarTopology(t *testing.T) { // configure the center of the star with a very low D psubs[0].eval <- func() { gs := psubs[0].rt.(*GossipSubRouter) - gs.D = 0 - gs.Dlo = 0 - gs.Dhi = 0 - gs.Dscore = 0 + gs.params.D = 0 + gs.params.Dlo = 0 + gs.params.Dhi = 0 + gs.params.Dscore = 0 } // add all peer addresses to the peerstores @@ -1051,10 +1051,10 @@ func TestGossipsubStarTopologyWithSignedPeerRecords(t *testing.T) { // configure the center of the star with a very low D psubs[0].eval <- func() { gs := psubs[0].rt.(*GossipSubRouter) - gs.D = 0 - gs.Dlo = 0 - gs.Dhi = 0 - gs.Dscore = 0 + gs.params.D = 0 + gs.params.Dlo = 0 + gs.params.Dhi = 0 + gs.params.Dscore = 0 } // manually create signed peer records for each host and add them to the @@ -1346,6 +1346,45 @@ func TestGossipsubEnoughPeers(t *testing.T) { } } +func TestGossipsubCustomParams(t *testing.T) { + // in this test we score sinkhole a peer to exercise code paths relative to negative scores + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + params := DefaultGossipSubParams() + + wantedFollowTime := 1 * time.Second + params.IWantFollowupTime = wantedFollowTime + + customGossipFactor := 0.12 + params.GossipFactor = customGossipFactor + + wantedMaxPendingConns := 23 + params.MaxPendingConnections = wantedMaxPendingConns + hosts := getNetHosts(t, ctx, 1) + psubs := getGossipsubs(ctx, hosts, + WithGossipSubParams(params)) + + if len(psubs) != 1 { + t.Fatalf("incorrect number of pusbub objects received: wanted %d but got %d", 1, len(psubs)) + } + + rt, ok := psubs[0].rt.(*GossipSubRouter) + if !ok { + t.Fatal("Did not get gossip sub router from pub sub object") + } + + if rt.params.IWantFollowupTime != wantedFollowTime { + t.Errorf("Wanted %d of param GossipSubIWantFollowupTime but got %d", wantedFollowTime, rt.params.IWantFollowupTime) + } + if rt.params.GossipFactor != customGossipFactor { + t.Errorf("Wanted %f of param GossipSubGossipFactor but got %f", customGossipFactor, rt.params.GossipFactor) + } + if rt.params.MaxPendingConnections != wantedMaxPendingConns { + t.Errorf("Wanted %d of param GossipSubMaxPendingConnections but got %d", wantedMaxPendingConns, rt.params.MaxPendingConnections) + } +} + func TestGossipsubNegativeScore(t *testing.T) { // in this test we score sinkhole a peer to exercise code paths relative to negative scores ctx, cancel := context.WithCancel(context.Background())