Skip to content

Commit

Permalink
Fix the Router's Ability to Prune the Mesh Periodically (#589)
Browse files Browse the repository at this point in the history
When a new peer wants to graft us into their mesh, we check our current
mesh size to determine whether we can add any more new peers to it. This
is done to prevent our mesh size from being greater than `Dhi` and
prevent mesh takeover attacks here:


https://github.com/libp2p/go-libp2p-pubsub/blob/c06df2f9a38e9382e644b241adf0e96e5ca00955/gossipsub.go#L943

During every heartbeat we check our mesh size and if it is **greater**
than `Dhi` then we will prune our mesh back down to `D`.

https://github.com/libp2p/go-libp2p-pubsub/blob/c06df2f9a38e9382e644b241adf0e96e5ca00955/gossipsub.go#L1608

However if you look closely at both lines there is a problematic end
result. Since we only stop grafting new peers into our mesh if our
current mesh size is **greater than or equal to** `Dhi` and we only
prune peers if the current mesh size is greater than `Dhi`.

This would result in the mesh being in a state of stasis at `Dhi`.
Rather than float between `D` and `Dhi` , the mesh stagnates at `Dhi` .
This would end up increasing the target degree of the node to `Dhi` from
`D`. This had been observed in ethereum mainnet by recording mesh
interactions and message fulfillment from those peers.

This PR fixes it by adding an equality check to the conditional so that
it can be periodically pruned. The PR also adds a regression test for
this particular case.
  • Loading branch information
nisdas authored Dec 26, 2024
1 parent c06df2f commit 3536508
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 1 deletion.
2 changes: 1 addition & 1 deletion gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -1605,7 +1605,7 @@ func (gs *GossipSubRouter) heartbeat() {
}

// do we have too many peers?
if len(peers) > gs.params.Dhi {
if len(peers) >= gs.params.Dhi {
plst := peerMapToList(peers)

// sort by score (but shuffle first for the case we don't use the score)
Expand Down
47 changes: 47 additions & 0 deletions gossipsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3176,6 +3176,53 @@ func TestGossipsubIdontwantClear(t *testing.T) {
<-ctx.Done()
}

func TestGossipsubPruneMeshCorrectly(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getDefaultHosts(t, 9)

msgID := func(pmsg *pb.Message) string {
// silly content-based test message-ID: just use the data as whole
return base64.URLEncoding.EncodeToString(pmsg.Data)
}

params := DefaultGossipSubParams()
params.Dhi = 8

psubs := make([]*PubSub, 9)
for i := 0; i < 9; i++ {
psubs[i] = getGossipsub(ctx, hosts[i],
WithGossipSubParams(params),
WithMessageIdFn(msgID))
}

topic := "foobar"
for _, ps := range psubs {
_, err := ps.Subscribe(topic)
if err != nil {
t.Fatal(err)
}
}

// Connect first peer with the rest of the 8 other
// peers.
for i := 1; i < 9; i++ {
connect(t, hosts[0], hosts[i])
}

// Wait for 2 heartbeats to be able to prune excess peers back down to D.
totalTimeToWait := params.HeartbeatInitialDelay + 2*params.HeartbeatInterval
time.Sleep(totalTimeToWait)

meshPeers, ok := psubs[0].rt.(*GossipSubRouter).mesh[topic]
if !ok {
t.Fatal("mesh does not exist for topic")
}
if len(meshPeers) != params.D {
t.Fatalf("mesh does not have the correct number of peers. Wanted %d but got %d", params.D, len(meshPeers))
}
}

func BenchmarkAllocDoDropRPC(b *testing.B) {
gs := GossipSubRouter{tracer: &pubsubTracer{}}

Expand Down

0 comments on commit 3536508

Please sign in to comment.