From f8429633c35a31588ccc0e28381173f6550cf3b6 Mon Sep 17 00:00:00 2001 From: Matus Kysel Date: Thu, 28 Nov 2024 09:27:16 +0100 Subject: [PATCH] p2p/discovery: pr reviews and cleanup --- p2p/discover/node.go | 6 ------ p2p/discover/table.go | 36 +++++++++++++++++++++++++++--------- p2p/discover/table_reval.go | 3 +-- p2p/discover/table_test.go | 8 ++++---- 4 files changed, 32 insertions(+), 21 deletions(-) diff --git a/p2p/discover/node.go b/p2p/discover/node.go index 8454ed9b61..ac34b7c5b2 100644 --- a/p2p/discover/node.go +++ b/p2p/discover/node.go @@ -17,7 +17,6 @@ package discover import ( - "net" "slices" "sort" "time" @@ -51,11 +50,6 @@ func unwrapNodes(ns []*tableNode) []*enode.Node { return result } -//nolint:unused -func (n *tableNode) addr() *net.UDPAddr { - return &net.UDPAddr{IP: n.IP(), Port: n.UDP()} -} - func (n *tableNode) String() string { return n.Node.String() } diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 6f4cc97c05..7c95fffd43 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -112,9 +112,10 @@ type bucket struct { } type addNodeOp struct { - node *enode.Node - isInbound bool - forceSetLive bool // for tests + node *enode.Node + isInbound bool + forceSetLive bool // for tests + syncExecution bool // for tests } type trackRequestOp struct { @@ -320,7 +321,7 @@ func (tab *Table) len() (n int) { // // The caller must not hold tab.mutex. func (tab *Table) addFoundNode(n *enode.Node, forceSetLive bool) bool { - op := addNodeOp{node: n, isInbound: false, forceSetLive: forceSetLive} + op := addNodeOp{node: n, isInbound: false, forceSetLive: forceSetLive, syncExecution: true} select { case tab.addNodeCh <- op: return <-tab.addNodeHandled @@ -347,6 +348,17 @@ func (tab *Table) addInboundNode(n *enode.Node) { } } +// Only for testing purposes +func (tab *Table) addInboundNodeSync(n *enode.Node) bool { + op := addNodeOp{node: n, isInbound: true, syncExecution: true} + select { + case tab.addNodeCh <- op: + return <-tab.addNodeHandled + case <-tab.closeReq: + return false + } +} + func (tab *Table) trackRequest(n *enode.Node, success bool, foundNodes []*enode.Node) { op := trackRequestOp{n, foundNodes, success} select { @@ -387,9 +399,16 @@ loop: tab.revalidation.handleResponse(tab, r) case op := <-tab.addNodeCh: - go func() { - tab.addNodeHandled <- tab.handleAddNode(op) - }() + // only happens in tests + if op.syncExecution { + ok := tab.handleAddNode(op) + tab.addNodeHandled <- ok + } else { + // async execution as handleAddNode is blocking + go func() { + tab.handleAddNode(op) + }() + } case op := <-tab.trackRequestCh: tab.handleTrackRequest(op) @@ -489,7 +508,6 @@ func (tab *Table) bucketAtDistance(d int) *bucket { return tab.buckets[d-bucketMinDistance-1] } -//nolint:unused func (tab *Table) filterNode(n *enode.Node) bool { if tab.enrFilter == nil { return false @@ -716,7 +734,7 @@ func (tab *Table) handleTrackRequest(op trackRequestOp) { // Add found nodes. for _, n := range op.foundNodes { - go tab.handleAddNode(addNodeOp{n, false, false}) + go tab.handleAddNode(addNodeOp{n, false, false, false}) } } diff --git a/p2p/discover/table_reval.go b/p2p/discover/table_reval.go index 0404d122ff..d4326fbcab 100644 --- a/p2p/discover/table_reval.go +++ b/p2p/discover/table_reval.go @@ -17,7 +17,6 @@ package discover import ( - "errors" "fmt" "math" "slices" @@ -176,7 +175,7 @@ func (tr *tableRevalidation) handleResponse(tab *Table, resp revalidationRespons var endpointChanged bool if resp.newRecord != nil { if tab.enrFilter != nil && !tab.enrFilter(resp.newRecord.Record()) { - tab.log.Trace("ENR record filter out", "id", n.ID(), "err", errors.New("filtered node")) + tab.log.Trace("ENR record filter out", "id", n.ID()) tab.deleteInBucket(b, n.ID()) return } diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go index a6666fdcbf..ae981f8b28 100644 --- a/p2p/discover/table_test.go +++ b/p2p/discover/table_test.go @@ -296,7 +296,7 @@ func TestTable_addInboundNode(t *testing.T) { newrec := n2.Record() newrec.Set(enr.IP{99, 99, 99, 99}) n2v2 := enode.SignNull(newrec, n2.ID()) - tab.addInboundNode(n2v2) + tab.addInboundNodeSync(n2v2) checkBucketContent(t, tab, []*enode.Node{n1, n2v2}) // Try updating n2 without sequence number change. The update is accepted @@ -305,7 +305,7 @@ func TestTable_addInboundNode(t *testing.T) { newrec.Set(enr.IP{100, 100, 100, 100}) newrec.SetSeq(n2.Seq()) n2v3 := enode.SignNull(newrec, n2.ID()) - tab.addInboundNode(n2v3) + tab.addInboundNodeSync(n2v3) checkBucketContent(t, tab, []*enode.Node{n1, n2v3}) } @@ -349,13 +349,13 @@ func TestTable_addInboundNodeUpdateV4Accept(t *testing.T) { // Add a v4 node. key, _ := crypto.HexToECDSA("dd3757a8075e88d0f2b1431e7d3c5b1562e1c0aab9643707e8cbfcc8dae5cfe3") n1 := enode.NewV4(&key.PublicKey, net.IP{88, 77, 66, 1}, 9000, 9000) - tab.addInboundNode(n1) + tab.addInboundNodeSync(n1) checkBucketContent(t, tab, []*enode.Node{n1}) // Add an updated version with changed IP. // The update will be accepted because it is inbound. n1v2 := enode.NewV4(&key.PublicKey, net.IP{99, 99, 99, 99}, 9000, 9000) - tab.addInboundNode(n1v2) + tab.addInboundNodeSync(n1v2) checkBucketContent(t, tab, []*enode.Node{n1v2}) }