Skip to content

Commit

Permalink
Merge pull request #229 from ElrondNetwork/bugs/EN-2588-fix-direct-se…
Browse files Browse the repository at this point in the history
…nder

Bugs/en 2588 fix direct sender
  • Loading branch information
iulianpascalau authored Jun 24, 2019
2 parents 5f583d7 + 623ab52 commit 0543638
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 30 deletions.
24 changes: 12 additions & 12 deletions integrationTests/singleShard/block/interceptedRequestHdr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/base64"
"fmt"
"reflect"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -108,21 +109,27 @@ func TestNode_GenerateSendInterceptHeaderByNonceWithNetMessenger(t *testing.T) {
_ = storeResolver.GetStorer(dataRetriever.ShardHdrNonceHashDataUnit).Put(uint64Converter.ToByteSlice(1), hdrHash2)

//Step 3. wire up a received handler
chanDone1 := make(chan struct{})
chanDone2 := make(chan struct{})
chanDone := make(chan struct{})
wg := sync.WaitGroup{}
wg.Add(2)

go func() {
wg.Wait()
chanDone <- struct{}{}
}()

dPoolRequester.Headers().RegisterHandler(func(key []byte) {
hdrStored, _ := dPoolRequester.Headers().Peek(key)
fmt.Printf("Recieved hash %v\n", base64.StdEncoding.EncodeToString(key))

if reflect.DeepEqual(hdrStored, &hdr1) && hdr1.Signature != nil {
fmt.Printf("Recieved header with hash %v\n", base64.StdEncoding.EncodeToString(key))
chanDone1 <- struct{}{}
wg.Done()
}

if reflect.DeepEqual(hdrStored, &hdr2) && hdr2.Signature != nil {
fmt.Printf("Recieved header with hash %v\n", base64.StdEncoding.EncodeToString(key))
chanDone2 <- struct{}{}
wg.Done()
}
})

Expand All @@ -132,18 +139,11 @@ func TestNode_GenerateSendInterceptHeaderByNonceWithNetMessenger(t *testing.T) {
hdrResolver := res.(*resolvers.HeaderResolver)
_ = hdrResolver.RequestDataFromNonce(0)

select {
case <-chanDone1:
case <-time.After(time.Second * 10):
assert.Fail(t, "timeout")
}

// TODO fix bug in directSender.go so requests can be made one after another (also concurrent)
//Step 5. request header that is stored
_ = hdrResolver.RequestDataFromNonce(1)

select {
case <-chanDone2:
case <-chanDone:
case <-time.After(time.Second * 10):
assert.Fail(t, "timeout")
}
Expand Down
39 changes: 22 additions & 17 deletions p2p/libp2p/directSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"github.com/ElrondNetwork/elrond-go/p2p"
ggio "github.com/gogo/protobuf/io"
"github.com/gogo/protobuf/proto"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
Expand Down Expand Up @@ -161,27 +160,33 @@ func (ds *directSender) Send(topic string, buff []byte, peer p2p.PeerID) error {
bufw := bufio.NewWriter(stream)
w := ggio.NewDelimitedWriter(bufw)

go func(msg proto.Message) {
err := w.WriteMsg(msg)
if err != nil {
log.LogIfError(err)
_ = stream.Reset()
_ = helpers.FullClose(stream)
return
}
err = w.WriteMsg(msg)
if err != nil {
_ = stream.Reset()
_ = helpers.FullClose(stream)
return err
}

err = bufw.Flush()
if err != nil {
log.LogIfError(err)
_ = stream.Reset()
_ = helpers.FullClose(stream)
return
}
}(msg)
err = bufw.Flush()
if err != nil {
_ = stream.Reset()
_ = helpers.FullClose(stream)
return err
}

return nil
}

// SendAsync will asynchronously send a direct message to the connected peer
func (ds *directSender) SendAsync(topic string, buff []byte, peer p2p.PeerID) {
go func() {
err := ds.Send(topic, buff, peer)
if err != nil {
log.Info(err.Error())
}
}()
}

func (ds *directSender) getConnection(p p2p.PeerID) (network.Conn, error) {
conns := ds.hostP2P.Network().ConnsToPeer(peer.ID(p))

Expand Down
4 changes: 3 additions & 1 deletion p2p/libp2p/netMessenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,9 @@ func (netMes *networkMessenger) UnregisterMessageProcessor(topic string) error {

// SendToConnectedPeer sends a direct message to a connected peer
func (netMes *networkMessenger) SendToConnectedPeer(topic string, buff []byte, peerID p2p.PeerID) error {
return netMes.ds.Send(topic, buff, peerID)
netMes.ds.SendAsync(topic, buff, peerID)

return nil
}

func (netMes *networkMessenger) directMessageHandler(message p2p.MessageP2P) error {
Expand Down
1 change: 1 addition & 0 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ type ChannelLoadBalancer interface {
type DirectSender interface {
NextSeqno(counter *uint64) []byte
Send(topic string, buff []byte, peer PeerID) error
SendAsync(topic string, buff []byte, peer PeerID)
}

// PeerDiscoveryFactory defines the factory for peer discoverer implementation
Expand Down

0 comments on commit 0543638

Please sign in to comment.