Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
Merge pull request #416 from ipfs/fix/want-gauge
Browse files Browse the repository at this point in the history
fix want gauge calculation
  • Loading branch information
Stebalien authored Jun 10, 2020
2 parents 06129d6 + 654e5b4 commit f29c774
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 65 deletions.
168 changes: 103 additions & 65 deletions internal/peermanager/peerwantmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,25 +84,28 @@ func (pwm *peerWantManager) removePeer(p peer.ID) {
// Clean up want-blocks
_ = pws.wantBlocks.ForEach(func(c cid.Cid) error {
// Clean up want-blocks from the reverse index
removedLastPeer := pwm.reverseIndexRemove(c, p)
pwm.reverseIndexRemove(c, p)

// Decrement the gauges by the number of pending want-blocks to the peer
if removedLastPeer {
peerCounts := pwm.wantPeerCounts(c)
if peerCounts.wantBlock == 0 {
pwm.wantBlockGauge.Dec()
if !pwm.broadcastWants.Has(c) {
pwm.wantGauge.Dec()
}
}
if !peerCounts.wanted() {
pwm.wantGauge.Dec()
}

return nil
})

// Clean up want-haves
_ = pws.wantHaves.ForEach(func(c cid.Cid) error {
// Clean up want-haves from the reverse index
removedLastPeer := pwm.reverseIndexRemove(c, p)
pwm.reverseIndexRemove(c, p)

// Decrement the gauge by the number of pending want-haves to the peer
if removedLastPeer && !pwm.broadcastWants.Has(c) {
peerCounts := pwm.wantPeerCounts(c)
if !peerCounts.wanted() {
pwm.wantGauge.Dec()
}
return nil
Expand All @@ -122,8 +125,9 @@ func (pwm *peerWantManager) broadcastWantHaves(wantHaves []cid.Cid) {
pwm.broadcastWants.Add(c)
unsent = append(unsent, c)

// Increment the total wants gauge
// If no peer has a pending want for the key
if _, ok := pwm.wantPeers[c]; !ok {
// Increment the total wants gauge
pwm.wantGauge.Inc()
}
}
Expand Down Expand Up @@ -168,27 +172,30 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves
// Iterate over the requested want-blocks
for _, c := range wantBlocks {
// If the want-block hasn't been sent to the peer
if !pws.wantBlocks.Has(c) {
// Record that the CID was sent as a want-block
pws.wantBlocks.Add(c)
if pws.wantBlocks.Has(c) {
continue
}

// Add the CID to the results
fltWantBlks = append(fltWantBlks, c)
// Increment the want gauges
peerCounts := pwm.wantPeerCounts(c)
if peerCounts.wantBlock == 0 {
pwm.wantBlockGauge.Inc()
}
if !peerCounts.wanted() {
pwm.wantGauge.Inc()
}

// Make sure the CID is no longer recorded as a want-have
pws.wantHaves.Remove(c)
// Make sure the CID is no longer recorded as a want-have
pws.wantHaves.Remove(c)

// Update the reverse index
isNew := pwm.reverseIndexAdd(c, p)

// Increment the want gauges
if isNew {
pwm.wantBlockGauge.Inc()
if !pwm.broadcastWants.Has(c) {
pwm.wantGauge.Inc()
}
}
}
// Record that the CID was sent as a want-block
pws.wantBlocks.Add(c)

// Add the CID to the results
fltWantBlks = append(fltWantBlks, c)

// Update the reverse index
pwm.reverseIndexAdd(c, p)
}

// Iterate over the requested want-haves
Expand All @@ -201,19 +208,20 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves

// If the CID has not been sent as a want-block or want-have
if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) {
// Increment the total wants gauge
peerCounts := pwm.wantPeerCounts(c)
if !peerCounts.wanted() {
pwm.wantGauge.Inc()
}

// Record that the CID was sent as a want-have
pws.wantHaves.Add(c)

// Add the CID to the results
fltWantHvs = append(fltWantHvs, c)

// Update the reverse index
isNew := pwm.reverseIndexAdd(c, p)

// Increment the total wants gauge
if isNew && !pwm.broadcastWants.Has(c) {
pwm.wantGauge.Inc()
}
pwm.reverseIndexAdd(c, p)
}
}

Expand All @@ -228,6 +236,13 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
return
}

// Record how many peers have a pending want-block and want-have for each
// key to be cancelled
peerCounts := make(map[cid.Cid]wantPeerCnts, len(cancelKs))
for _, c := range cancelKs {
peerCounts[c] = pwm.wantPeerCounts(c)
}

// Create a buffer to use for filtering cancels per peer, with the
// broadcast wants at the front of the buffer (broadcast wants are sent to
// all peers)
Expand All @@ -238,9 +253,6 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
}
}

cancelledWantBlocks := cid.NewSet()
cancelledWantHaves := cid.NewSet()

// Send cancels to a particular peer
send := func(p peer.ID, pws *peerWant) {
// Start from the broadcast cancels
Expand All @@ -249,15 +261,7 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
// For each key to be cancelled
for _, c := range cancelKs {
// Check if a want was sent for the key
wantBlock := pws.wantBlocks.Has(c)
wantHave := pws.wantHaves.Has(c)

// Update the want gauges
if wantBlock {
cancelledWantBlocks.Add(c)
} else if wantHave {
cancelledWantHaves.Add(c)
} else {
if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) {
continue
}

Expand Down Expand Up @@ -304,33 +308,70 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
}
}

// Remove cancelled broadcast wants
for _, c := range broadcastCancels {
pwm.broadcastWants.Remove(c)
// Decrement the wants gauges
for _, c := range cancelKs {
peerCnts := peerCounts[c]

// Decrement the total wants gauge for broadcast wants
if !cancelledWantHaves.Has(c) && !cancelledWantBlocks.Has(c) {
// If there were any peers that had a pending want-block for the key
if peerCnts.wantBlock > 0 {
// Decrement the want-block gauge
pwm.wantBlockGauge.Dec()
}

// If there was a peer that had a pending want or it was a broadcast want
if peerCnts.wanted() {
// Decrement the total wants gauge
pwm.wantGauge.Dec()
}
}

// Decrement the total wants gauge for peer wants
_ = cancelledWantHaves.ForEach(func(c cid.Cid) error {
pwm.wantGauge.Dec()
return nil
})
_ = cancelledWantBlocks.ForEach(func(c cid.Cid) error {
pwm.wantGauge.Dec()
pwm.wantBlockGauge.Dec()
return nil
})
// Remove cancelled broadcast wants
for _, c := range broadcastCancels {
pwm.broadcastWants.Remove(c)
}

// Finally, batch-remove the reverse-index. There's no need to
// clear this index peer-by-peer.
// Batch-remove the reverse-index. There's no need to clear this index
// peer-by-peer.
for _, c := range cancelKs {
delete(pwm.wantPeers, c)
}
}

// wantPeerCnts stores the number of peers that have pending wants for a CID
type wantPeerCnts struct {
// number of peers that have a pending want-block for the CID
wantBlock int
// number of peers that have a pending want-have for the CID
wantHave int
// whether the CID is a broadcast want
isBroadcast bool
}

// wanted returns true if any peer wants the CID or it's a broadcast want
func (pwm *wantPeerCnts) wanted() bool {
return pwm.wantBlock > 0 || pwm.wantHave > 0 || pwm.isBroadcast
}

// wantPeerCounts counts how many peers have a pending want-block and want-have
// for the given CID
func (pwm *peerWantManager) wantPeerCounts(c cid.Cid) wantPeerCnts {
blockCount := 0
haveCount := 0
for p := range pwm.wantPeers[c] {
pws, ok := pwm.peerWants[p]
if !ok {
log.Errorf("reverse index has extra peer %s for key %s in peerWantManager", string(p), c)
continue
}

if pws.wantBlocks.Has(c) {
blockCount++
} else if pws.wantHaves.Has(c) {
haveCount++
}
}

return wantPeerCnts{blockCount, haveCount, pwm.broadcastWants.Has(c)}
}

// Add the peer to the list of peers that have sent a want with the cid
Expand All @@ -345,16 +386,13 @@ func (pwm *peerWantManager) reverseIndexAdd(c cid.Cid, p peer.ID) bool {
}

// Remove the peer from the list of peers that have sent a want with the cid
func (pwm *peerWantManager) reverseIndexRemove(c cid.Cid, p peer.ID) bool {
func (pwm *peerWantManager) reverseIndexRemove(c cid.Cid, p peer.ID) {
if peers, ok := pwm.wantPeers[c]; ok {
delete(peers, p)
if len(peers) == 0 {
delete(pwm.wantPeers, c)
return true
}
}

return false
}

// GetWantBlocks returns the set of all want-blocks sent to all peers
Expand Down
78 changes: 78 additions & 0 deletions internal/peermanager/peerwantmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,3 +436,81 @@ func TestStats(t *testing.T) {
t.Fatal("Expected 0 want-blocks")
}
}

func TestStatsOverlappingWantBlockWantHave(t *testing.T) {
g := &gauge{}
wbg := &gauge{}
pwm := newPeerWantManager(g, wbg)

peers := testutil.GeneratePeers(2)
p0 := peers[0]
p1 := peers[1]
cids := testutil.GenerateCids(2)
cids2 := testutil.GenerateCids(2)

pwm.addPeer(&mockPQ{}, p0)
pwm.addPeer(&mockPQ{}, p1)

// Send 2 want-blocks and 2 want-haves to p0
pwm.sendWants(p0, cids, cids2)

// Send opposite:
// 2 want-haves and 2 want-blocks to p1
pwm.sendWants(p1, cids2, cids)

if g.count != 4 {
t.Fatal("Expected 4 wants")
}
if wbg.count != 4 {
t.Fatal("Expected 4 want-blocks")
}

// Cancel 1 of each group of cids
pwm.sendCancels([]cid.Cid{cids[0], cids2[0]})

if g.count != 2 {
t.Fatal("Expected 2 wants")
}
if wbg.count != 2 {
t.Fatal("Expected 2 want-blocks")
}
}

func TestStatsRemovePeerOverlappingWantBlockWantHave(t *testing.T) {
g := &gauge{}
wbg := &gauge{}
pwm := newPeerWantManager(g, wbg)

peers := testutil.GeneratePeers(2)
p0 := peers[0]
p1 := peers[1]
cids := testutil.GenerateCids(2)
cids2 := testutil.GenerateCids(2)

pwm.addPeer(&mockPQ{}, p0)
pwm.addPeer(&mockPQ{}, p1)

// Send 2 want-blocks and 2 want-haves to p0
pwm.sendWants(p0, cids, cids2)

// Send opposite:
// 2 want-haves and 2 want-blocks to p1
pwm.sendWants(p1, cids2, cids)

if g.count != 4 {
t.Fatal("Expected 4 wants")
}
if wbg.count != 4 {
t.Fatal("Expected 4 want-blocks")
}

// Remove p0
pwm.removePeer(p0)

if g.count != 4 {
t.Fatal("Expected 4 wants")
}
if wbg.count != 2 {
t.Fatal("Expected 2 want-blocks")
}
}

0 comments on commit f29c774

Please sign in to comment.