From 597d21ec85c0612e2c14a9cf82668bb789520afd Mon Sep 17 00:00:00 2001 From: iulianpascalau Date: Fri, 21 Jun 2019 21:13:33 +0300 Subject: [PATCH 1/4] fixed directSender by eliminating the nested go routine call. NetMessenger now sends async requests and always return nil error. --- .../block/interceptedRequestHdr_test.go | 24 ++++++------ p2p/libp2p/directSender.go | 39 +++++++++++-------- p2p/libp2p/netMessenger.go | 4 +- p2p/p2p.go | 1 + 4 files changed, 38 insertions(+), 30 deletions(-) diff --git a/integrationTests/singleShard/block/interceptedRequestHdr_test.go b/integrationTests/singleShard/block/interceptedRequestHdr_test.go index 09df0218029..fe508f511d1 100644 --- a/integrationTests/singleShard/block/interceptedRequestHdr_test.go +++ b/integrationTests/singleShard/block/interceptedRequestHdr_test.go @@ -4,6 +4,7 @@ import ( "encoding/base64" "fmt" "reflect" + "sync" "testing" "time" @@ -108,8 +109,14 @@ func TestNode_GenerateSendInterceptHeaderByNonceWithNetMessenger(t *testing.T) { storeResolver.GetStorer(dataRetriever.ShardHdrNonceHashDataUnit).Put(uint64Converter.ToByteSlice(1), hdrHash2) //Step 3. wire up a received handler - chanDone1 := make(chan struct{}) - chanDone2 := make(chan struct{}) + chanDone := make(chan struct{}) + wg := sync.WaitGroup{} + wg.Add(2) + + go func() { + wg.Wait() + chanDone <- struct{}{} + }() dPoolRequestor.Headers().RegisterHandler(func(key []byte) { hdrStored, _ := dPoolRequestor.Headers().Peek(key) @@ -118,13 +125,13 @@ func TestNode_GenerateSendInterceptHeaderByNonceWithNetMessenger(t *testing.T) { if reflect.DeepEqual(hdrStored, &hdr1) && hdr1.Signature != nil { assert.Equal(t, hdrStored, &hdr1) fmt.Printf("Recieved header with hash %v\n", base64.StdEncoding.EncodeToString(key)) - chanDone1 <- struct{}{} + wg.Done() } if reflect.DeepEqual(hdrStored, &hdr2) && hdr2.Signature != nil { assert.Equal(t, hdrStored, &hdr2) fmt.Printf("Recieved header with hash %v\n", base64.StdEncoding.EncodeToString(key)) - chanDone2 <- struct{}{} + wg.Done() } }) @@ -134,18 +141,11 @@ func TestNode_GenerateSendInterceptHeaderByNonceWithNetMessenger(t *testing.T) { hdrResolver := res.(*resolvers.HeaderResolver) hdrResolver.RequestDataFromNonce(0) - select { - case <-chanDone1: - case <-time.After(time.Second * 10): - assert.Fail(t, "timeout") - } - - // TODO fix bug in directSender.go so requests can be made one after another (also concurrent) //Step 5. request header that is stored hdrResolver.RequestDataFromNonce(1) select { - case <-chanDone2: + case <-chanDone: case <-time.After(time.Second * 10): assert.Fail(t, "timeout") } diff --git a/p2p/libp2p/directSender.go b/p2p/libp2p/directSender.go index 5f3b16a3364..ca4ad5b1a4c 100644 --- a/p2p/libp2p/directSender.go +++ b/p2p/libp2p/directSender.go @@ -12,7 +12,6 @@ import ( "github.com/ElrondNetwork/elrond-go/p2p" ggio "github.com/gogo/protobuf/io" - "github.com/gogo/protobuf/proto" "github.com/libp2p/go-libp2p-core/helpers" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" @@ -161,27 +160,33 @@ func (ds *directSender) Send(topic string, buff []byte, peer p2p.PeerID) error { bufw := bufio.NewWriter(stream) w := ggio.NewDelimitedWriter(bufw) - go func(msg proto.Message) { - err := w.WriteMsg(msg) - if err != nil { - log.LogIfError(err) - _ = stream.Reset() - _ = helpers.FullClose(stream) - return - } + err = w.WriteMsg(msg) + if err != nil { + _ = stream.Reset() + _ = helpers.FullClose(stream) + return err + } - err = bufw.Flush() - if err != nil { - log.LogIfError(err) - _ = stream.Reset() - _ = helpers.FullClose(stream) - return - } - }(msg) + err = bufw.Flush() + if err != nil { + _ = stream.Reset() + _ = helpers.FullClose(stream) + return err + } return nil } +// SendAsync will asynchronously send a direct message to the connected peer +func (ds *directSender) SendAsync(topic string, buff []byte, peer p2p.PeerID) { + go func() { + err := ds.Send(topic, buff, peer) + if err != nil { + log.Debug(err.Error()) + } + }() +} + func (ds *directSender) getConnection(p p2p.PeerID) (network.Conn, error) { conns := ds.hostP2P.Network().ConnsToPeer(peer.ID(p)) diff --git a/p2p/libp2p/netMessenger.go b/p2p/libp2p/netMessenger.go index f648084e09c..df23b27694e 100644 --- a/p2p/libp2p/netMessenger.go +++ b/p2p/libp2p/netMessenger.go @@ -452,7 +452,9 @@ func (netMes *networkMessenger) UnregisterMessageProcessor(topic string) error { // SendToConnectedPeer sends a direct message to a connected peer func (netMes *networkMessenger) SendToConnectedPeer(topic string, buff []byte, peerID p2p.PeerID) error { - return netMes.ds.Send(topic, buff, peerID) + netMes.ds.SendAsync(topic, buff, peerID) + + return nil } func (netMes *networkMessenger) directMessageHandler(message p2p.MessageP2P) error { diff --git a/p2p/p2p.go b/p2p/p2p.go index 8546277bd21..5ff50f3b43e 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -174,6 +174,7 @@ type ChannelLoadBalancer interface { type DirectSender interface { NextSeqno(counter *uint64) []byte Send(topic string, buff []byte, peer PeerID) error + SendAsync(topic string, buff []byte, peer PeerID) } // PeerDiscoveryFactory defines the factory for peer discoverer implementation From 0b947c649dca67308b7d9ea540ac1a29480b9d66 Mon Sep 17 00:00:00 2001 From: iulianpascalau Date: Mon, 24 Jun 2019 14:22:35 +0300 Subject: [PATCH 2/4] changed the tx data pool to LRU. --- cmd/node/config/config.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/node/config/config.toml b/cmd/node/config/config.toml index 72e8b90c51f..697020c0893 100644 --- a/cmd/node/config/config.toml +++ b/cmd/node/config/config.toml @@ -148,7 +148,7 @@ [TxDataPool] Size = 100000 - Type = "FIFOSharded" + Type = "LRU" Shards = 32 [SmartContractDataPool] From 0b58de856ac43653ac577a78c6d04cfe779b63a8 Mon Sep 17 00:00:00 2001 From: iulianpascalau Date: Mon, 24 Jun 2019 20:05:24 +0300 Subject: [PATCH 3/4] changed back the cache type on tx processor --- cmd/node/config/config.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/node/config/config.toml b/cmd/node/config/config.toml index 697020c0893..72e8b90c51f 100644 --- a/cmd/node/config/config.toml +++ b/cmd/node/config/config.toml @@ -148,7 +148,7 @@ [TxDataPool] Size = 100000 - Type = "LRU" + Type = "FIFOSharded" Shards = 32 [SmartContractDataPool] From 6c4fc1a7a4e31581d06ebe8f68509234bdc136e2 Mon Sep 17 00:00:00 2001 From: iulianpascalau Date: Mon, 24 Jun 2019 20:29:39 +0300 Subject: [PATCH 4/4] changed log level to info when returning error on directSender.Send --- p2p/libp2p/directSender.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/libp2p/directSender.go b/p2p/libp2p/directSender.go index ca4ad5b1a4c..c5b998dfd44 100644 --- a/p2p/libp2p/directSender.go +++ b/p2p/libp2p/directSender.go @@ -182,7 +182,7 @@ func (ds *directSender) SendAsync(topic string, buff []byte, peer p2p.PeerID) { go func() { err := ds.Send(topic, buff, peer) if err != nil { - log.Debug(err.Error()) + log.Info(err.Error()) } }() }