Skip to content

Commit

Permalink
p2p/discovery: pr reviews and cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
MatusKysel committed Nov 28, 2024
1 parent 106c31a commit f842963
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 21 deletions.
6 changes: 0 additions & 6 deletions p2p/discover/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package discover

import (
"net"
"slices"
"sort"
"time"
Expand Down Expand Up @@ -51,11 +50,6 @@ func unwrapNodes(ns []*tableNode) []*enode.Node {
return result
}

//nolint:unused
func (n *tableNode) addr() *net.UDPAddr {
return &net.UDPAddr{IP: n.IP(), Port: n.UDP()}
}

func (n *tableNode) String() string {
return n.Node.String()
}
Expand Down
36 changes: 27 additions & 9 deletions p2p/discover/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,10 @@ type bucket struct {
}

type addNodeOp struct {
node *enode.Node
isInbound bool
forceSetLive bool // for tests
node *enode.Node
isInbound bool
forceSetLive bool // for tests
syncExecution bool // for tests
}

type trackRequestOp struct {
Expand Down Expand Up @@ -320,7 +321,7 @@ func (tab *Table) len() (n int) {
//
// The caller must not hold tab.mutex.
func (tab *Table) addFoundNode(n *enode.Node, forceSetLive bool) bool {
op := addNodeOp{node: n, isInbound: false, forceSetLive: forceSetLive}
op := addNodeOp{node: n, isInbound: false, forceSetLive: forceSetLive, syncExecution: true}
select {
case tab.addNodeCh <- op:
return <-tab.addNodeHandled
Expand All @@ -347,6 +348,17 @@ func (tab *Table) addInboundNode(n *enode.Node) {
}
}

// Only for testing purposes
func (tab *Table) addInboundNodeSync(n *enode.Node) bool {
op := addNodeOp{node: n, isInbound: true, syncExecution: true}
select {
case tab.addNodeCh <- op:
return <-tab.addNodeHandled
case <-tab.closeReq:
return false
}
}

func (tab *Table) trackRequest(n *enode.Node, success bool, foundNodes []*enode.Node) {
op := trackRequestOp{n, foundNodes, success}
select {
Expand Down Expand Up @@ -387,9 +399,16 @@ loop:
tab.revalidation.handleResponse(tab, r)

case op := <-tab.addNodeCh:
go func() {
tab.addNodeHandled <- tab.handleAddNode(op)
}()
// only happens in tests
if op.syncExecution {
ok := tab.handleAddNode(op)
tab.addNodeHandled <- ok
} else {
// async execution as handleAddNode is blocking
go func() {
tab.handleAddNode(op)
}()
}

case op := <-tab.trackRequestCh:
tab.handleTrackRequest(op)
Expand Down Expand Up @@ -489,7 +508,6 @@ func (tab *Table) bucketAtDistance(d int) *bucket {
return tab.buckets[d-bucketMinDistance-1]
}

//nolint:unused
func (tab *Table) filterNode(n *enode.Node) bool {
if tab.enrFilter == nil {
return false
Expand Down Expand Up @@ -716,7 +734,7 @@ func (tab *Table) handleTrackRequest(op trackRequestOp) {

// Add found nodes.
for _, n := range op.foundNodes {
go tab.handleAddNode(addNodeOp{n, false, false})
go tab.handleAddNode(addNodeOp{n, false, false, false})
}
}

Expand Down
3 changes: 1 addition & 2 deletions p2p/discover/table_reval.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package discover

import (
"errors"
"fmt"
"math"
"slices"
Expand Down Expand Up @@ -176,7 +175,7 @@ func (tr *tableRevalidation) handleResponse(tab *Table, resp revalidationRespons
var endpointChanged bool
if resp.newRecord != nil {
if tab.enrFilter != nil && !tab.enrFilter(resp.newRecord.Record()) {
tab.log.Trace("ENR record filter out", "id", n.ID(), "err", errors.New("filtered node"))
tab.log.Trace("ENR record filter out", "id", n.ID())
tab.deleteInBucket(b, n.ID())
return
}
Expand Down
8 changes: 4 additions & 4 deletions p2p/discover/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func TestTable_addInboundNode(t *testing.T) {
newrec := n2.Record()
newrec.Set(enr.IP{99, 99, 99, 99})
n2v2 := enode.SignNull(newrec, n2.ID())
tab.addInboundNode(n2v2)
tab.addInboundNodeSync(n2v2)
checkBucketContent(t, tab, []*enode.Node{n1, n2v2})

// Try updating n2 without sequence number change. The update is accepted
Expand All @@ -305,7 +305,7 @@ func TestTable_addInboundNode(t *testing.T) {
newrec.Set(enr.IP{100, 100, 100, 100})
newrec.SetSeq(n2.Seq())
n2v3 := enode.SignNull(newrec, n2.ID())
tab.addInboundNode(n2v3)
tab.addInboundNodeSync(n2v3)
checkBucketContent(t, tab, []*enode.Node{n1, n2v3})
}

Expand Down Expand Up @@ -349,13 +349,13 @@ func TestTable_addInboundNodeUpdateV4Accept(t *testing.T) {
// Add a v4 node.
key, _ := crypto.HexToECDSA("dd3757a8075e88d0f2b1431e7d3c5b1562e1c0aab9643707e8cbfcc8dae5cfe3")
n1 := enode.NewV4(&key.PublicKey, net.IP{88, 77, 66, 1}, 9000, 9000)
tab.addInboundNode(n1)
tab.addInboundNodeSync(n1)
checkBucketContent(t, tab, []*enode.Node{n1})

// Add an updated version with changed IP.
// The update will be accepted because it is inbound.
n1v2 := enode.NewV4(&key.PublicKey, net.IP{99, 99, 99, 99}, 9000, 9000)
tab.addInboundNode(n1v2)
tab.addInboundNodeSync(n1v2)
checkBucketContent(t, tab, []*enode.Node{n1v2})
}

Expand Down

0 comments on commit f842963

Please sign in to comment.