diff --git a/internal/peermanager/peerwantmanager.go b/internal/peermanager/peerwantmanager.go index ee81649a..46a3ac34 100644 --- a/internal/peermanager/peerwantmanager.go +++ b/internal/peermanager/peerwantmanager.go @@ -84,25 +84,28 @@ func (pwm *peerWantManager) removePeer(p peer.ID) { // Clean up want-blocks _ = pws.wantBlocks.ForEach(func(c cid.Cid) error { // Clean up want-blocks from the reverse index - removedLastPeer := pwm.reverseIndexRemove(c, p) + pwm.reverseIndexRemove(c, p) // Decrement the gauges by the number of pending want-blocks to the peer - if removedLastPeer { + peerCounts := pwm.wantPeerCounts(c) + if peerCounts.wantBlock == 0 { pwm.wantBlockGauge.Dec() - if !pwm.broadcastWants.Has(c) { - pwm.wantGauge.Dec() - } } + if !peerCounts.wanted() { + pwm.wantGauge.Dec() + } + return nil }) // Clean up want-haves _ = pws.wantHaves.ForEach(func(c cid.Cid) error { // Clean up want-haves from the reverse index - removedLastPeer := pwm.reverseIndexRemove(c, p) + pwm.reverseIndexRemove(c, p) // Decrement the gauge by the number of pending want-haves to the peer - if removedLastPeer && !pwm.broadcastWants.Has(c) { + peerCounts := pwm.wantPeerCounts(c) + if !peerCounts.wanted() { pwm.wantGauge.Dec() } return nil @@ -122,8 +125,9 @@ func (pwm *peerWantManager) broadcastWantHaves(wantHaves []cid.Cid) { pwm.broadcastWants.Add(c) unsent = append(unsent, c) - // Increment the total wants gauge + // If no peer has a pending want for the key if _, ok := pwm.wantPeers[c]; !ok { + // Increment the total wants gauge pwm.wantGauge.Inc() } } @@ -168,27 +172,30 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves // Iterate over the requested want-blocks for _, c := range wantBlocks { // If the want-block hasn't been sent to the peer - if !pws.wantBlocks.Has(c) { - // Record that the CID was sent as a want-block - pws.wantBlocks.Add(c) + if pws.wantBlocks.Has(c) { + continue + } - // Add the CID to the results - fltWantBlks = append(fltWantBlks, c) + // Increment the want gauges + peerCounts := pwm.wantPeerCounts(c) + if peerCounts.wantBlock == 0 { + pwm.wantBlockGauge.Inc() + } + if !peerCounts.wanted() { + pwm.wantGauge.Inc() + } - // Make sure the CID is no longer recorded as a want-have - pws.wantHaves.Remove(c) + // Make sure the CID is no longer recorded as a want-have + pws.wantHaves.Remove(c) - // Update the reverse index - isNew := pwm.reverseIndexAdd(c, p) - - // Increment the want gauges - if isNew { - pwm.wantBlockGauge.Inc() - if !pwm.broadcastWants.Has(c) { - pwm.wantGauge.Inc() - } - } - } + // Record that the CID was sent as a want-block + pws.wantBlocks.Add(c) + + // Add the CID to the results + fltWantBlks = append(fltWantBlks, c) + + // Update the reverse index + pwm.reverseIndexAdd(c, p) } // Iterate over the requested want-haves @@ -201,6 +208,12 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves // If the CID has not been sent as a want-block or want-have if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) { + // Increment the total wants gauge + peerCounts := pwm.wantPeerCounts(c) + if !peerCounts.wanted() { + pwm.wantGauge.Inc() + } + // Record that the CID was sent as a want-have pws.wantHaves.Add(c) @@ -208,12 +221,7 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves fltWantHvs = append(fltWantHvs, c) // Update the reverse index - isNew := pwm.reverseIndexAdd(c, p) - - // Increment the total wants gauge - if isNew && !pwm.broadcastWants.Has(c) { - pwm.wantGauge.Inc() - } + pwm.reverseIndexAdd(c, p) } } @@ -228,6 +236,13 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) { return } + // Record how many peers have a pending want-block and want-have for each + // key to be cancelled + peerCounts := make(map[cid.Cid]wantPeerCnts, len(cancelKs)) + for _, c := range cancelKs { + peerCounts[c] = pwm.wantPeerCounts(c) + } + // Create a buffer to use for filtering cancels per peer, with the // broadcast wants at the front of the buffer (broadcast wants are sent to // all peers) @@ -238,9 +253,6 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) { } } - cancelledWantBlocks := cid.NewSet() - cancelledWantHaves := cid.NewSet() - // Send cancels to a particular peer send := func(p peer.ID, pws *peerWant) { // Start from the broadcast cancels @@ -249,15 +261,7 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) { // For each key to be cancelled for _, c := range cancelKs { // Check if a want was sent for the key - wantBlock := pws.wantBlocks.Has(c) - wantHave := pws.wantHaves.Has(c) - - // Update the want gauges - if wantBlock { - cancelledWantBlocks.Add(c) - } else if wantHave { - cancelledWantHaves.Add(c) - } else { + if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) { continue } @@ -304,33 +308,70 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) { } } - // Remove cancelled broadcast wants - for _, c := range broadcastCancels { - pwm.broadcastWants.Remove(c) + // Decrement the wants gauges + for _, c := range cancelKs { + peerCnts := peerCounts[c] - // Decrement the total wants gauge for broadcast wants - if !cancelledWantHaves.Has(c) && !cancelledWantBlocks.Has(c) { + // If there were any peers that had a pending want-block for the key + if peerCnts.wantBlock > 0 { + // Decrement the want-block gauge + pwm.wantBlockGauge.Dec() + } + + // If there was a peer that had a pending want or it was a broadcast want + if peerCnts.wanted() { + // Decrement the total wants gauge pwm.wantGauge.Dec() } } - // Decrement the total wants gauge for peer wants - _ = cancelledWantHaves.ForEach(func(c cid.Cid) error { - pwm.wantGauge.Dec() - return nil - }) - _ = cancelledWantBlocks.ForEach(func(c cid.Cid) error { - pwm.wantGauge.Dec() - pwm.wantBlockGauge.Dec() - return nil - }) + // Remove cancelled broadcast wants + for _, c := range broadcastCancels { + pwm.broadcastWants.Remove(c) + } - // Finally, batch-remove the reverse-index. There's no need to - // clear this index peer-by-peer. + // Batch-remove the reverse-index. There's no need to clear this index + // peer-by-peer. for _, c := range cancelKs { delete(pwm.wantPeers, c) } +} + +// wantPeerCnts stores the number of peers that have pending wants for a CID +type wantPeerCnts struct { + // number of peers that have a pending want-block for the CID + wantBlock int + // number of peers that have a pending want-have for the CID + wantHave int + // whether the CID is a broadcast want + isBroadcast bool +} + +// wanted returns true if any peer wants the CID or it's a broadcast want +func (pwm *wantPeerCnts) wanted() bool { + return pwm.wantBlock > 0 || pwm.wantHave > 0 || pwm.isBroadcast +} + +// wantPeerCounts counts how many peers have a pending want-block and want-have +// for the given CID +func (pwm *peerWantManager) wantPeerCounts(c cid.Cid) wantPeerCnts { + blockCount := 0 + haveCount := 0 + for p := range pwm.wantPeers[c] { + pws, ok := pwm.peerWants[p] + if !ok { + log.Errorf("reverse index has extra peer %s for key %s in peerWantManager", string(p), c) + continue + } + + if pws.wantBlocks.Has(c) { + blockCount++ + } else if pws.wantHaves.Has(c) { + haveCount++ + } + } + return wantPeerCnts{blockCount, haveCount, pwm.broadcastWants.Has(c)} } // Add the peer to the list of peers that have sent a want with the cid @@ -345,16 +386,13 @@ func (pwm *peerWantManager) reverseIndexAdd(c cid.Cid, p peer.ID) bool { } // Remove the peer from the list of peers that have sent a want with the cid -func (pwm *peerWantManager) reverseIndexRemove(c cid.Cid, p peer.ID) bool { +func (pwm *peerWantManager) reverseIndexRemove(c cid.Cid, p peer.ID) { if peers, ok := pwm.wantPeers[c]; ok { delete(peers, p) if len(peers) == 0 { delete(pwm.wantPeers, c) - return true } } - - return false } // GetWantBlocks returns the set of all want-blocks sent to all peers diff --git a/internal/peermanager/peerwantmanager_test.go b/internal/peermanager/peerwantmanager_test.go index 60b7c8e7..5a00f27f 100644 --- a/internal/peermanager/peerwantmanager_test.go +++ b/internal/peermanager/peerwantmanager_test.go @@ -436,3 +436,81 @@ func TestStats(t *testing.T) { t.Fatal("Expected 0 want-blocks") } } + +func TestStatsOverlappingWantBlockWantHave(t *testing.T) { + g := &gauge{} + wbg := &gauge{} + pwm := newPeerWantManager(g, wbg) + + peers := testutil.GeneratePeers(2) + p0 := peers[0] + p1 := peers[1] + cids := testutil.GenerateCids(2) + cids2 := testutil.GenerateCids(2) + + pwm.addPeer(&mockPQ{}, p0) + pwm.addPeer(&mockPQ{}, p1) + + // Send 2 want-blocks and 2 want-haves to p0 + pwm.sendWants(p0, cids, cids2) + + // Send opposite: + // 2 want-haves and 2 want-blocks to p1 + pwm.sendWants(p1, cids2, cids) + + if g.count != 4 { + t.Fatal("Expected 4 wants") + } + if wbg.count != 4 { + t.Fatal("Expected 4 want-blocks") + } + + // Cancel 1 of each group of cids + pwm.sendCancels([]cid.Cid{cids[0], cids2[0]}) + + if g.count != 2 { + t.Fatal("Expected 2 wants") + } + if wbg.count != 2 { + t.Fatal("Expected 2 want-blocks") + } +} + +func TestStatsRemovePeerOverlappingWantBlockWantHave(t *testing.T) { + g := &gauge{} + wbg := &gauge{} + pwm := newPeerWantManager(g, wbg) + + peers := testutil.GeneratePeers(2) + p0 := peers[0] + p1 := peers[1] + cids := testutil.GenerateCids(2) + cids2 := testutil.GenerateCids(2) + + pwm.addPeer(&mockPQ{}, p0) + pwm.addPeer(&mockPQ{}, p1) + + // Send 2 want-blocks and 2 want-haves to p0 + pwm.sendWants(p0, cids, cids2) + + // Send opposite: + // 2 want-haves and 2 want-blocks to p1 + pwm.sendWants(p1, cids2, cids) + + if g.count != 4 { + t.Fatal("Expected 4 wants") + } + if wbg.count != 4 { + t.Fatal("Expected 4 want-blocks") + } + + // Remove p0 + pwm.removePeer(p0) + + if g.count != 4 { + t.Fatal("Expected 4 wants") + } + if wbg.count != 2 { + t.Fatal("Expected 2 want-blocks") + } +}