From 04c50486b87c8e11dfcb987b3b27a2aabb134bd2 Mon Sep 17 00:00:00 2001 From: qcloud Date: Fri, 31 May 2024 18:33:37 +0800 Subject: [PATCH] refactor: contentLookup and traceContentLookup --- go.sum | 8 -- p2p/discover/api.go | 7 +- p2p/discover/portal_protocol.go | 193 ++++++++++++--------------- p2p/discover/portal_protocol_test.go | 14 +- 4 files changed, 100 insertions(+), 122 deletions(-) diff --git a/go.sum b/go.sum index 6390a1f2700b..adff9bbe105e 100644 --- a/go.sum +++ b/go.sum @@ -429,12 +429,6 @@ github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= -github.com/optimism-java/utp-go v0.0.0-20240309041853-b6b3a0dea581 h1:ZxgrtI0xIw+clB32iDDDWaiTcCizTeN7rNyzH9YorPI= -github.com/optimism-java/utp-go v0.0.0-20240309041853-b6b3a0dea581/go.mod h1:DZ0jYzLzt4ZsCmhI/iqYgGFoNx45OfpEoKzXB8HVALQ= -github.com/optimism-java/utp-go v0.0.0-20240530085325-d8dd9d262631 h1:01AecSuOSS6fsIU/oTVG/C70hIl3xPen99qy2hGr57w= -github.com/optimism-java/utp-go v0.0.0-20240530085325-d8dd9d262631/go.mod h1:DZ0jYzLzt4ZsCmhI/iqYgGFoNx45OfpEoKzXB8HVALQ= -github.com/optimism-java/utp-go v0.0.0-20240531021243-e12d25b6be38 h1:t0gRqfM7wUrFyryagUpw4TmYY0DLt+rjPaBd92i+W2M= -github.com/optimism-java/utp-go v0.0.0-20240531021243-e12d25b6be38/go.mod h1:DZ0jYzLzt4ZsCmhI/iqYgGFoNx45OfpEoKzXB8HVALQ= github.com/optimism-java/utp-go v0.0.0-20240531024756-00da67044c50 h1:I1jGQkNEWq7BTFZkCJKLDrqFLC1jR3EC7jz3to4kpLg= github.com/optimism-java/utp-go v0.0.0-20240531024756-00da67044c50/go.mod h1:DZ0jYzLzt4ZsCmhI/iqYgGFoNx45OfpEoKzXB8HVALQ= github.com/optimism-java/utp-go v0.0.0-20240603010819-75be99daf402 h1:jssfGQq6xdzgs0ZI/O/S2dKAyh2fIDiOTWdrbqat1Ls= @@ -715,8 +709,6 @@ golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= -golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= diff --git a/p2p/discover/api.go b/p2p/discover/api.go index 3f64b5f42adc..9e4ac67ddd00 100644 --- a/p2p/discover/api.go +++ b/p2p/discover/api.go @@ -56,7 +56,7 @@ type Trace struct { Origin string `json:"origin"` // local node id TargetId string `json:"targetId"` // target content id ReceivedFrom string `json:"receivedFrom"` // the node id of which content from - Responses map[string][]string `json:"responses"` // the node id and there response nodeIds + Responses map[string]RespByNode `json:"responses"` // the node id and there response nodeIds Metadata map[string]*NodeMetadata `json:"metadata"` // node id and there metadata object StartedAtMs int `json:"startedAtMs"` // timestamp of the beginning of this request in milliseconds Cancelled []string `json:"cancelled"` // the node ids which are send but cancelled @@ -67,6 +67,11 @@ type NodeMetadata struct { Distance string `json:"distance"` } +type RespByNode struct { + DurationMs int32 `json:"durationMs"` + RespondedWith []string `json:"respondedWith"` +} + type Enrs struct { Enrs []string `json:"enrs"` } diff --git a/p2p/discover/portal_protocol.go b/p2p/discover/portal_protocol.go index b1e222b3801d..a302bc4adbbd 100644 --- a/p2p/discover/portal_protocol.go +++ b/p2p/discover/portal_protocol.go @@ -16,6 +16,7 @@ import ( "slices" "sort" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common/hexutil" @@ -1488,64 +1489,44 @@ func (p *PortalProtocol) collectTableNodes(rip net.IP, distances []uint, limit i func (p *PortalProtocol) ContentLookup(contentKey, contentId []byte) ([]byte, bool, error) { lookupContext, cancel := context.WithCancel(context.Background()) - defer cancel() - resChan := make(chan *ContentInfoResp, 1) - defer close(resChan) + + resChan := make(chan *traceContentInfoResp, alpha) + hasResult := int32(0) + + result := ContentInfoResp{} + + var wg sync.WaitGroup + wg.Add(1) + + go func() { + defer wg.Done() + for res := range resChan { + if res.Flag != portalwire.ContentEnrsSelector { + result.Content = res.Content.([]byte) + result.UtpTransfer = res.UtpTransfer + } + } + }() + newLookup(lookupContext, p.table, enode.ID(contentId), func(n *node) ([]*node, error) { - return p.contentLookupWorker(unwrapNode(n), contentKey, resChan) + return p.contentLookupWorker(unwrapNode(n), contentKey, resChan, cancel, &hasResult) }).run() + close(resChan) - if len(resChan) > 0 { - res := <-resChan - return res.Content, res.UtpTransfer, nil + wg.Wait() + if hasResult == 1 { + return result.Content, result.UtpTransfer, nil } + defer cancel() return nil, false, ContentNotFound } -func (p *PortalProtocol) contentLookupWorker(n *enode.Node, contentKey []byte, resChan chan<- *ContentInfoResp) ([]*node, error) { - wrapedNode := make([]*node, 0) - flag, content, err := p.findContent(n, contentKey) - if err != nil { - p.Log.Error("contentLookupWorker failed", "ip", n.IP().String(), "err", err) - return nil, err - } - p.Log.Debug("contentLookupWorker reveice response", "ip", n.IP().String(), "flag", flag) - // has find content - if len(resChan) > 0 { - return []*node{}, nil - } - switch flag { - case portalwire.ContentRawSelector, portalwire.ContentConnIdSelector: - content, ok := content.([]byte) - if !ok { - return wrapedNode, fmt.Errorf("failed to assert to raw content, value is: %v", content) - } - res := &ContentInfoResp{ - Content: content, - } - if flag == portalwire.ContentConnIdSelector { - res.UtpTransfer = true - } - resChan <- res - return wrapedNode, err - case portalwire.ContentEnrsSelector: - nodes, ok := content.([]*enode.Node) - if !ok { - return wrapedNode, fmt.Errorf("failed to assert to enrs content, value is: %v", content) - } - return wrapNodes(nodes), nil - } - return wrapedNode, nil -} - func (p *PortalProtocol) TraceContentLookup(contentKey, contentId []byte) (*TraceContentResult, error) { lookupContext, cancel := context.WithCancel(context.Background()) - defer cancel() - requestNodeChan := make(chan *enode.Node, 3) - resChan := make(chan *traceContentInfoResp, 3) + // resp channel + resChan := make(chan *traceContentInfoResp, alpha) - requestNode := make([]*enode.Node, 0) - requestRes := make(map[string]*traceContentInfoResp) + hasResult := int32(0) traceContentRes := &TraceContentResult{} @@ -1555,7 +1536,7 @@ func (p *PortalProtocol) TraceContentLookup(contentKey, contentId []byte) (*Trac Origin: selfHexId, TargetId: hexutil.Encode(contentId), StartedAtMs: int(time.Now().UnixMilli()), - Responses: make(map[string][]string), + Responses: make(map[string]RespByNode), Metadata: make(map[string]*NodeMetadata), Cancelled: make([]string, 0), } @@ -1567,7 +1548,10 @@ func (p *PortalProtocol) TraceContentLookup(contentKey, contentId []byte) (*Trac id := "0x" + node.ID().String() localResponse = append(localResponse, id) } - trace.Responses[selfHexId] = localResponse + trace.Responses[selfHexId] = RespByNode{ + DurationMs: 0, + RespondedWith: localResponse, + } dis := p.Distance(p.Self().ID(), enode.ID(contentId)) @@ -1577,82 +1561,73 @@ func (p *PortalProtocol) TraceContentLookup(contentKey, contentId []byte) (*Trac } var wg sync.WaitGroup - wg.Add(2) - go func() { - defer wg.Done() - for node := range requestNodeChan { - requestNode = append(requestNode, node) - } - }() + wg.Add(1) go func() { defer wg.Done() for res := range resChan { - key := res.Node.ID().String() - requestRes[key] = res - if res.Flag == portalwire.ContentRawSelector || res.Flag == portalwire.ContentConnIdSelector { - // get the content - return + node := res.Node + hexId := "0x" + node.ID().String() + dis := p.Distance(node.ID(), enode.ID(contentId)) + p.Log.Debug("reveice res", "id", hexId, "flag", res.Flag) + trace.Metadata[hexId] = &NodeMetadata{ + Enr: node.String(), + Distance: hexutil.Encode(dis[:]), + } + // 没有返回 content + if traceContentRes.Content == "" { + if res.Flag == portalwire.ContentRawSelector || res.Flag == portalwire.ContentConnIdSelector { + trace.ReceivedFrom = hexId + content := res.Content.([]byte) + traceContentRes.Content = hexutil.Encode(content) + traceContentRes.UtpTransfer = res.UtpTransfer + trace.Responses[hexId] = RespByNode{} + } else { + nodes := res.Content.([]*enode.Node) + respByNode := RespByNode{ + RespondedWith: make([]string, 0, len(nodes)), + } + for _, node := range nodes { + idInner := "0x" + node.ID().String() + respByNode.RespondedWith = append(respByNode.RespondedWith, idInner) + if _, ok := trace.Metadata[idInner]; !ok { + dis := p.Distance(node.ID(), enode.ID(contentId)) + trace.Metadata[idInner] = &NodeMetadata{ + Enr: node.String(), + Distance: hexutil.Encode(dis[:]), + } + } + trace.Responses[hexId] = respByNode + } + } + } else { + trace.Cancelled = append(trace.Cancelled, hexId) } } }() - newLookup(lookupContext, p.table, enode.ID(contentId), func(n *node) ([]*node, error) { - node := unwrapNode(n) - requestNodeChan <- node - return p.traceContentLookupWorker(node, contentKey, resChan) - }).run() - - close(requestNodeChan) + lookup := newLookup(lookupContext, p.table, enode.ID(contentId), func(n *node) ([]*node, error) { + return p.contentLookupWorker(unwrapNode(n), contentKey, resChan, cancel, &hasResult) + }) + lookup.run() close(resChan) wg.Wait() - - for _, node := range requestNode { - id := node.ID().String() - hexId := "0x" + id - dis := p.Distance(node.ID(), enode.ID(contentId)) - trace.Metadata[hexId] = &NodeMetadata{ - Enr: node.String(), - Distance: hexutil.Encode(dis[:]), - } - if res, ok := requestRes[id]; ok { - if res.Flag == portalwire.ContentRawSelector || res.Flag == portalwire.ContentConnIdSelector { - trace.ReceivedFrom = hexId - content := res.Content.([]byte) - traceContentRes.Content = hexutil.Encode(content) - traceContentRes.UtpTransfer = res.UtpTransfer - trace.Responses[hexId] = make([]string, 0) - } else { - content := res.Content.([]*enode.Node) - ids := make([]string, 0) - for _, n := range content { - hexId := "0x" + n.ID().String() - ids = append(ids, hexId) - } - trace.Responses[hexId] = ids - } - } else { - trace.Cancelled = append(trace.Cancelled, id) - } + if hasResult == 0 { + cancel() } - traceContentRes.Trace = *trace return traceContentRes, nil } -func (p *PortalProtocol) traceContentLookupWorker(n *enode.Node, contentKey []byte, resChan chan<- *traceContentInfoResp) ([]*node, error) { +func (p *PortalProtocol) contentLookupWorker(n *enode.Node, contentKey []byte, resChan chan<- *traceContentInfoResp, cancel context.CancelFunc, done *int32) ([]*node, error) { wrapedNode := make([]*node, 0) flag, content, err := p.findContent(n, contentKey) if err != nil { return nil, err } p.Log.Debug("traceContentLookupWorker reveice response", "ip", n.IP().String(), "flag", flag) - // has find content - if len(resChan) > 0 { - return []*node{}, nil - } switch flag { case portalwire.ContentRawSelector, portalwire.ContentConnIdSelector: @@ -1669,17 +1644,23 @@ func (p *PortalProtocol) traceContentLookupWorker(n *enode.Node, contentKey []by if flag == portalwire.ContentConnIdSelector { res.UtpTransfer = true } - resChan <- res + if atomic.CompareAndSwapInt32(done, 0, 1) { + p.Log.Debug("contentLookupWorker find content", "ip", n.IP().String(), "port", n.UDP()) + resChan <- res + cancel() + } return wrapedNode, err case portalwire.ContentEnrsSelector: nodes, ok := content.([]*enode.Node) if !ok { return wrapedNode, fmt.Errorf("failed to assert to enrs content, value is: %v", content) } - resChan <- &traceContentInfoResp{Node: n, + resChan <- &traceContentInfoResp{ + Node: n, Flag: flag, Content: content, - UtpTransfer: false} + UtpTransfer: false, + } return wrapNodes(nodes), nil } return wrapedNode, nil diff --git a/p2p/discover/portal_protocol_test.go b/p2p/discover/portal_protocol_test.go index 331ba5ddc0d3..c6462579c55f 100644 --- a/p2p/discover/portal_protocol_test.go +++ b/p2p/discover/portal_protocol_test.go @@ -458,6 +458,10 @@ func TestTraceContentLookup(t *testing.T) { err = node3.Start() assert.NoError(t, err) + defer node1.Stop() + defer node2.Stop() + defer node3.Stop() + contentKey := []byte{0x3, 0x4} content := []byte{0x1, 0x2} contentId := node1.toContentId(contentKey) @@ -495,15 +499,11 @@ func TestTraceContentLookup(t *testing.T) { // check response node3Response := res.Trace.Responses[node3Id] - assert.Equal(t, node3Response, []string{node2Id}) + assert.Equal(t, node3Response.RespondedWith, []string{node2Id}) node2Response := res.Trace.Responses[node2Id] - assert.Equal(t, node2Response, []string{node1Id}) + assert.Equal(t, node2Response.RespondedWith, []string{node1Id}) node1Response := res.Trace.Responses[node1Id] - assert.Equal(t, node1Response, []string{}) - - // res, _, err = node1.ContentLookup([]byte{0x2, 0x4}) - // assert.Equal(t, ContentNotFound, err) - // assert.Nil(t, res) + assert.Equal(t, node1Response.RespondedWith, ([]string)(nil)) }