From 6c40856c55fbf29d5b4837cafff95293c232bec0 Mon Sep 17 00:00:00 2001 From: Iulian Pascalau Date: Fri, 4 Mar 2022 20:11:57 +0200 Subject: [PATCH] - new pubsub lib & adaptation --- go.mod | 2 + go.sum | 4 +- p2p/libp2p/directSender.go | 8 +-- p2p/libp2p/netMessenger.go | 32 ++++++------ p2p/libp2p/netMessenger_test.go | 88 ++++++++++++++++----------------- 5 files changed, 68 insertions(+), 66 deletions(-) diff --git a/go.mod b/go.mod index 0bd2ff65404..a845f6b5a1f 100644 --- a/go.mod +++ b/go.mod @@ -56,3 +56,5 @@ replace github.com/ElrondNetwork/arwen-wasm-vm/v1_2 v1.2.30 => github.com/Elrond replace github.com/ElrondNetwork/arwen-wasm-vm/v1_3 v1.3.30 => github.com/ElrondNetwork/arwen-wasm-vm v1.3.30 replace github.com/ElrondNetwork/arwen-wasm-vm/v1_4 v1.4.22 => github.com/ElrondNetwork/arwen-wasm-vm v1.4.22 + +replace github.com/libp2p/go-libp2p-pubsub v0.5.5 => github.com/ElrondNetwork/go-libp2p-pubsub v0.5.5-gamma diff --git a/go.sum b/go.sum index 237b64ff7ba..d4e955cd0d3 100644 --- a/go.sum +++ b/go.sum @@ -44,6 +44,8 @@ github.com/ElrondNetwork/elrond-vm-common v1.2.1/go.mod h1:07N31evc3GKh+tcmOXpc3 github.com/ElrondNetwork/elrond-vm-common v1.2.3/go.mod h1:07N31evc3GKh+tcmOXpc3xz/YsgV4yUHMo3LSlF0DIs= github.com/ElrondNetwork/elrond-vm-common v1.2.4-rc1 h1:fXuoEFwwgyT9f+P13CuYQOXM1yW48ilmov4M4B8fTXY= github.com/ElrondNetwork/elrond-vm-common v1.2.4-rc1/go.mod h1:07N31evc3GKh+tcmOXpc3xz/YsgV4yUHMo3LSlF0DIs= +github.com/ElrondNetwork/go-libp2p-pubsub v0.5.5-gamma h1:k3Ko5UI2HNZlrU9laVeWx13+jnm79Maame4wIhf6J7Y= +github.com/ElrondNetwork/go-libp2p-pubsub v0.5.5-gamma/go.mod h1:gVOzwebXVdSMDQBTfH8ACO5EJ4SQrvsHqCmYsCZpD0E= github.com/ElrondNetwork/notifier-go v1.0.2 h1:s/cJMRr4o5CD07M8ElNFRnJf/deUt2vtSBU6Z8Ow6uM= github.com/ElrondNetwork/notifier-go v1.0.2/go.mod h1:2O5LGUMdvYSoff5oxq79kJ4uOSP3PSoF4k/7nPND2ic= github.com/ElrondNetwork/protobuf v1.3.2 h1:qoCSYiO+8GtXBEZWEjw0WPcZfM3g7QuuJrwpN+y6Mvg= @@ -565,8 +567,6 @@ github.com/libp2p/go-libp2p-peerstore v0.2.8 h1:nJghUlUkFVvyk7ccsM67oFA6kqUkwyCM github.com/libp2p/go-libp2p-peerstore v0.2.8/go.mod h1:gGiPlXdz7mIHd2vfAsHzBNAMqSDkt2UBFwgcITgw1lA= github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k= github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA= -github.com/libp2p/go-libp2p-pubsub v0.5.5 h1:7jTdWQsB9eHO8prTK95/0vFxekO1wQJ7OMWr8w4wt54= -github.com/libp2p/go-libp2p-pubsub v0.5.5/go.mod h1:gVOzwebXVdSMDQBTfH8ACO5EJ4SQrvsHqCmYsCZpD0E= github.com/libp2p/go-libp2p-quic-transport v0.10.0/go.mod h1:RfJbZ8IqXIhxBRm5hqUEJqjiiY8xmEuq3HUDS993MkA= github.com/libp2p/go-libp2p-quic-transport v0.11.2 h1:p1YQDZRHH4Cv2LPtHubqlQ9ggz4CKng/REZuXZbZMhM= github.com/libp2p/go-libp2p-quic-transport v0.11.2/go.mod h1:wlanzKtIh6pHrq+0U3p3DY9PJfGqxMgPaGKaK5LifwQ= diff --git a/p2p/libp2p/directSender.go b/p2p/libp2p/directSender.go index d2ac3e3c723..5cfd7e01275 100644 --- a/p2p/libp2p/directSender.go +++ b/p2p/libp2p/directSender.go @@ -68,14 +68,14 @@ func NewDirectSender( mutexForPeer: mutexForPeer, } - //wire-up a handler for direct messages + // wire-up a handler for direct messages h.SetStreamHandler(DirectSendID, ds.directStreamHandler) return ds, nil } func (ds *directSender) directStreamHandler(s network.Stream) { - reader := ggio.NewDelimitedReader(s, 1<<20) + reader := ggio.NewDelimitedReader(s, maxSendBuffSize) go func(r ggio.ReadCloser) { for { @@ -83,7 +83,7 @@ func (ds *directSender) directStreamHandler(s network.Stream) { err := reader.ReadMsg(msg) if err != nil { - //stream has encountered an error, close this go routine + // stream has encountered an error, close this go routine if err != io.EOF { _ = s.Reset() @@ -198,7 +198,7 @@ func (ds *directSender) getConnection(p core.PeerID) (network.Conn, error) { return nil, p2p.ErrPeerNotDirectlyConnected } - //return the connection that has the highest number of streams + // return the connection that has the highest number of streams lStreams := 0 var conn network.Conn for _, c := range conns { diff --git a/p2p/libp2p/netMessenger.go b/p2p/libp2p/netMessenger.go index 4955685acdd..4fda72edeca 100644 --- a/p2p/libp2p/netMessenger.go +++ b/p2p/libp2p/netMessenger.go @@ -54,12 +54,12 @@ const ( refreshPeersOnTopic = time.Second * 3 ttlPeersOnTopic = time.Second * 10 pubsubTimeCacheDuration = 10 * time.Minute - acceptMessagesInAdvanceDuration = 20 * time.Second //we are accepting the messages with timestamp in the future only for this delta + acceptMessagesInAdvanceDuration = 20 * time.Second // we are accepting the messages with timestamp in the future only for this delta broadcastGoRoutines = 1000 timeBetweenPeerPrints = time.Second * 20 timeBetweenExternalLoggersCheck = time.Second * 20 minRangePortValue = 1025 - noSignPolicy = pubsub.MessageSignaturePolicy(0) //should be used only in tests + noSignPolicy = pubsub.MessageSignaturePolicy(0) // should be used only in tests msgBindError = "address already in use" maxRetriesIfBindError = 10 ) @@ -78,10 +78,10 @@ const ( preventReusePorts reusePortsConfig = false ) -//TODO remove the header size of the message when commit d3c5ecd3a3e884206129d9f2a9a4ddfd5e7c8951 from +// TODO remove the header size of the message when commit d3c5ecd3a3e884206129d9f2a9a4ddfd5e7c8951 from // https://github.com/libp2p/go-libp2p-pubsub/pull/189/commits will be part of a new release -var messageHeader = 64 * 1024 //64kB -var maxSendBuffSize = (1 << 20) - messageHeader +var messageHeader = 64 * 1024 // 64kB +var maxSendBuffSize = (1 << 21) - messageHeader var log = logger.GetOrCreate("p2p/libp2p") var _ p2p.Messenger = (*networkMessenger)(nil) @@ -95,7 +95,7 @@ func init() { } } -//TODO refactor this struct to have be a wrapper (with logic) over a glue code +// TODO refactor this struct to have be a wrapper (with logic) over a glue code type networkMessenger struct { ctx context.Context cancelFunc context.CancelFunc @@ -103,7 +103,7 @@ type networkMessenger struct { port int pb *pubsub.PubSub ds p2p.DirectSender - //TODO refactor this (connMonitor & connMonitorWrapper) + // TODO refactor this (connMonitor & connMonitorWrapper) connMonitor ConnectionMonitor connMonitorWrapper p2p.ConnectionMonitorWrapper peerDiscoverer p2p.PeerDiscoverer @@ -200,7 +200,7 @@ func constructNode( libp2p.DefaultMuxers, libp2p.DefaultSecurity, transportOption, - //we need the disable relay option in order to save the node's bandwidth as much as possible + // we need the disable relay option in order to save the node's bandwidth as much as possible libp2p.DisableRelay(), libp2p.NATPortMap(), } @@ -237,7 +237,7 @@ func constructNodeWithPortRetry( lastErr = err if !strings.Contains(err.Error(), msgBindError) { - //not a bind error, return directly + // not a bind error, return directly return nil, err } @@ -736,7 +736,7 @@ func (netMes *networkMessenger) PeerAddresses(pid core.PeerID) []string { h := netMes.p2pHost result := make([]string, 0) - //check if the peer is connected to return it's connected address + // check if the peer is connected to return it's connected address for _, c := range h.Network().Conns() { if string(c.RemotePeer()) == string(pid.Bytes()) { result = append(result, c.RemoteMultiaddr().String()) @@ -744,7 +744,7 @@ func (netMes *networkMessenger) PeerAddresses(pid core.PeerID) []string { } } - //check in peerstore (maybe it is known but not connected) + // check in peerstore (maybe it is known but not connected) addresses := h.Peerstore().Addrs(peer.ID(pid.Bytes())) for _, addr := range addresses { result = append(result, addr.String()) @@ -797,7 +797,7 @@ func (netMes *networkMessenger) CreateTopic(name string, createChannelForTopic b err = netMes.outgoingPLB.AddChannel(name) } - //just a dummy func to consume messages received by the newly created topic + // just a dummy func to consume messages received by the newly created topic go func() { var errSubscrNext error for { @@ -937,7 +937,7 @@ func (netMes *networkMessenger) pubsubCallback(topicProcs *topicProcessors, topi func (netMes *networkMessenger) transformAndCheckMessage(pbMsg *pubsub.Message, pid core.PeerID, topic string) (p2p.MessageP2P, error) { msg, errUnmarshal := NewMessage(pbMsg, netMes.marshalizer) if errUnmarshal != nil { - //this error is so severe that will need to blacklist both the originator and the connected peer as there is + // this error is so severe that will need to blacklist both the originator and the connected peer as there is // no way this node can communicate with them pidFrom := core.PeerID(pbMsg.From) netMes.blacklistPid(pid, common.WrongP2PMessageBlacklistDuration) @@ -948,7 +948,7 @@ func (netMes *networkMessenger) transformAndCheckMessage(pbMsg *pubsub.Message, err := netMes.validMessageByTimestamp(msg) if err != nil { - //not reprocessing nor re-broadcasting the same message over and over again + // not reprocessing nor re-broadcasting the same message over and over again log.Trace("received an invalid message", "originator pid", p2p.MessageOriginatorPid(msg), "from connected pid", p2p.PeerIdToShortString(pid), @@ -1138,7 +1138,7 @@ func (netMes *networkMessenger) directMessageHandler(message *pubsub.Message, fr return } - //we won't recheck the message id against the cacher here as there might be collisions since we are using + // we won't recheck the message id against the cacher here as there might be collisions since we are using // a separate sequence counter for direct sender messageOk := true for index, handler := range handlers { @@ -1205,7 +1205,7 @@ func (netMes *networkMessenger) SetPeerShardResolver(peerShardResolver p2p.PeerS } // SetPeerDenialEvaluator sets the peer black list handler -//TODO decide if we continue on using setters or switch to options. Refactor if necessary +// TODO decide if we continue on using setters or switch to options. Refactor if necessary func (netMes *networkMessenger) SetPeerDenialEvaluator(handler p2p.PeerDenialEvaluator) error { return netMes.connMonitorWrapper.SetPeerDenialEvaluator(handler) } diff --git a/p2p/libp2p/netMessenger_test.go b/p2p/libp2p/netMessenger_test.go index 217f9de9cdd..e58c1fd826e 100644 --- a/p2p/libp2p/netMessenger_test.go +++ b/p2p/libp2p/netMessenger_test.go @@ -152,7 +152,7 @@ func containsPeerID(list []core.PeerID, searchFor core.PeerID) bool { return false } -//------- NewMemoryLibp2pMessenger +// ------- NewMemoryLibp2pMessenger func TestNewMemoryLibp2pMessenger_NilMockNetShouldErr(t *testing.T) { args := createMockNetworkArgs() @@ -173,7 +173,7 @@ func TestNewMemoryLibp2pMessenger_OkValsWithoutDiscoveryShouldWork(t *testing.T) _ = mes.Close() } -//------- NewNetworkMessenger +// ------- NewNetworkMessenger func TestNewNetworkMessenger_NilMessengerShouldErr(t *testing.T) { arg := createMockNetworkArgs() @@ -253,7 +253,7 @@ func TestNewNetworkMessenger_WithKadDiscovererListSharderShouldWork(t *testing.T _ = mes.Close() } -//------- Messenger functionality +// ------- Messenger functionality func TestLibp2pMessenger_ConnectToPeerShouldCallUpgradedHost(t *testing.T) { netw := mocknet.New(context.Background()) @@ -371,9 +371,9 @@ func TestLibp2pMessenger_RegisterTopicValidatorOkValsShouldWork(t *testing.T) { func TestLibp2pMessenger_RegisterTopicValidatorReregistrationShouldErr(t *testing.T) { mes := createMockMessenger() _ = mes.CreateTopic("test", false) - //registration + // registration _ = mes.RegisterMessageProcessor("test", "identifier", &mock.MessageProcessorStub{}) - //re-registration + // re-registration err := mes.RegisterMessageProcessor("test", "identifier", &mock.MessageProcessorStub{}) assert.True(t, errors.Is(err, p2p.ErrMessageProcessorAlreadyDefined)) @@ -397,10 +397,10 @@ func TestLibp2pMessenger_UnregisterTopicValidatorShouldWork(t *testing.T) { _ = mes.CreateTopic("test", false) - //registration + // registration _ = mes.RegisterMessageProcessor("test", "identifier", &mock.MessageProcessorStub{}) - //unregistration + // unregistration err := mes.UnregisterMessageProcessor("test", "identifier") assert.Nil(t, err) @@ -411,12 +411,12 @@ func TestLibp2pMessenger_UnregisterTopicValidatorShouldWork(t *testing.T) { func TestLibp2pMessenger_UnregisterAllTopicValidatorShouldWork(t *testing.T) { mes := createMockMessenger() _ = mes.CreateTopic("test", false) - //registration + // registration _ = mes.CreateTopic("test1", false) _ = mes.RegisterMessageProcessor("test1", "identifier", &mock.MessageProcessorStub{}) _ = mes.CreateTopic("test2", false) _ = mes.RegisterMessageProcessor("test2", "identifier", &mock.MessageProcessorStub{}) - //unregistration + // unregistration err := mes.UnregisterAllMessageProcessors() assert.Nil(t, err) err = mes.RegisterMessageProcessor("test1", "identifier", &mock.MessageProcessorStub{}) @@ -569,7 +569,7 @@ func TestLibp2pMessenger_BroadcastOnChannelBlockingShouldLimitNumberOfGoRoutines } func TestLibp2pMessenger_BroadcastDataBetween2PeersWithLargeMsgShouldWork(t *testing.T) { - msg := make([]byte, libp2p.MaxSendBuffSize) + msg := bytes.Repeat([]byte{'A'}, libp2p.MaxSendBuffSize) _, mes1, mes2 := createMockNetworkOf2() @@ -613,7 +613,7 @@ func TestLibp2pMessenger_Peers(t *testing.T) { _ = mes1.ConnectToPeer(adr2) - //should know both peers + // should know both peers foundCurrent := false foundConnected := false @@ -647,12 +647,12 @@ func TestLibp2pMessenger_ConnectedPeers(t *testing.T) { _ = mes1.ConnectToPeer(adr2) _ = mes3.ConnectToPeer(adr2) - //connected peers: 1 ----- 2 ----- 3 + // connected peers: 1 ----- 2 ----- 3 assert.Equal(t, []core.PeerID{mes2.ID()}, mes1.ConnectedPeers()) assert.Equal(t, []core.PeerID{mes2.ID()}, mes3.ConnectedPeers()) assert.Equal(t, 2, len(mes2.ConnectedPeers())) - //no need to further test that mes2 is connected to mes1 and mes3 s this was tested in first 2 asserts + // no need to further test that mes2 is connected to mes1 and mes3 s this was tested in first 2 asserts _ = mes1.Close() _ = mes2.Close() @@ -672,7 +672,7 @@ func TestLibp2pMessenger_ConnectedAddresses(t *testing.T) { _ = mes1.ConnectToPeer(adr2) _ = mes3.ConnectToPeer(adr2) - //connected peers: 1 ----- 2 ----- 3 + // connected peers: 1 ----- 2 ----- 3 foundAddr1 := false foundAddr3 := false @@ -694,7 +694,7 @@ func TestLibp2pMessenger_ConnectedAddresses(t *testing.T) { assert.True(t, foundAddr1) assert.True(t, foundAddr3) assert.Equal(t, 2, len(mes2.ConnectedAddresses())) - //no need to further test that mes2 is connected to mes1 and mes3 s this was tested in first 2 asserts + // no need to further test that mes2 is connected to mes1 and mes3 s this was tested in first 2 asserts _ = mes1.Close() _ = mes2.Close() @@ -714,7 +714,7 @@ func TestLibp2pMessenger_PeerAddressConnectedPeerShouldWork(t *testing.T) { _ = mes1.ConnectToPeer(adr2) _ = mes3.ConnectToPeer(adr2) - //connected peers: 1 ----- 2 ----- 3 + // connected peers: 1 ----- 2 ----- 3 defer func() { _ = mes1.Close() @@ -726,7 +726,7 @@ func TestLibp2pMessenger_PeerAddressConnectedPeerShouldWork(t *testing.T) { for _, addr := range mes1.Addresses() { for _, addrRecov := range addressesRecov { if strings.Contains(addr, addrRecov) { - //address returned is valid, test is successful + // address returned is valid, test is successful return } } @@ -800,7 +800,7 @@ func TestLibp2pMessenger_PeerAddressDisconnectedPeerShouldWork(t *testing.T) { _ = netw.DisconnectPeers(peer.ID(mes1.ID().Bytes()), peer.ID(mes2.ID().Bytes())) _ = netw.DisconnectPeers(peer.ID(mes2.ID().Bytes()), peer.ID(mes1.ID().Bytes())) - //connected peers: 1 --x-- 2 ----- 3 + // connected peers: 1 --x-- 2 ----- 3 assert.False(t, mes2.IsConnected(mes1.ID())) } @@ -816,7 +816,7 @@ func TestLibp2pMessenger_PeerAddressUnknownPeerShouldReturnEmpty(t *testing.T) { assert.Equal(t, 0, len(adr1Recov)) } -//------- ConnectedPeersOnTopic +// ------- ConnectedPeersOnTopic func TestLibp2pMessenger_ConnectedPeersOnTopicInvalidTopicShouldRetEmptyList(t *testing.T) { netw, mes1, mes2 := createMockNetworkOf2() @@ -828,7 +828,7 @@ func TestLibp2pMessenger_ConnectedPeersOnTopicInvalidTopicShouldRetEmptyList(t * _ = mes1.ConnectToPeer(adr2) _ = mes3.ConnectToPeer(adr2) - //connected peers: 1 ----- 2 ----- 3 + // connected peers: 1 ----- 2 ----- 3 connPeers := mes1.ConnectedPeersOnTopic("non-existent topic") assert.Equal(t, 0, len(connPeers)) @@ -849,15 +849,15 @@ func TestLibp2pMessenger_ConnectedPeersOnTopicOneTopicShouldWork(t *testing.T) { _ = mes1.ConnectToPeer(adr2) _ = mes3.ConnectToPeer(adr2) _ = mes4.ConnectToPeer(adr2) - //connected peers: 1 ----- 2 ----- 3 + // connected peers: 1 ----- 2 ----- 3 // | // 4 - //1, 2, 3 should be on topic "topic123" + // 1, 2, 3 should be on topic "topic123" _ = mes1.CreateTopic("topic123", false) _ = mes2.CreateTopic("topic123", false) _ = mes3.CreateTopic("topic123", false) - //wait a bit for topic announcements + // wait a bit for topic announcements time.Sleep(time.Second) peersOnTopic123 := mes2.ConnectedPeersOnTopic("topic123") @@ -884,21 +884,21 @@ func TestLibp2pMessenger_ConnectedPeersOnTopicOneTopicDifferentViewsShouldWork(t _ = mes1.ConnectToPeer(adr2) _ = mes3.ConnectToPeer(adr2) _ = mes4.ConnectToPeer(adr2) - //connected peers: 1 ----- 2 ----- 3 + // connected peers: 1 ----- 2 ----- 3 // | // 4 - //1, 2, 3 should be on topic "topic123" + // 1, 2, 3 should be on topic "topic123" _ = mes1.CreateTopic("topic123", false) _ = mes2.CreateTopic("topic123", false) _ = mes3.CreateTopic("topic123", false) - //wait a bit for topic announcements + // wait a bit for topic announcements time.Sleep(time.Second) peersOnTopic123FromMes2 := mes2.ConnectedPeersOnTopic("topic123") peersOnTopic123FromMes4 := mes4.ConnectedPeersOnTopic("topic123") - //keep the same checks as the test above as to be 100% that the returned list are correct + // keep the same checks as the test above as to be 100% that the returned list are correct assert.Equal(t, 2, len(peersOnTopic123FromMes2)) assert.True(t, containsPeerID(peersOnTopic123FromMes2, mes1.ID())) assert.True(t, containsPeerID(peersOnTopic123FromMes2, mes3.ID())) @@ -924,24 +924,24 @@ func TestLibp2pMessenger_ConnectedPeersOnTopicTwoTopicsShouldWork(t *testing.T) _ = mes1.ConnectToPeer(adr2) _ = mes3.ConnectToPeer(adr2) _ = mes4.ConnectToPeer(adr2) - //connected peers: 1 ----- 2 ----- 3 + // connected peers: 1 ----- 2 ----- 3 // | // 4 - //1, 2, 3 should be on topic "topic123" - //2, 4 should be on topic "topic24" + // 1, 2, 3 should be on topic "topic123" + // 2, 4 should be on topic "topic24" _ = mes1.CreateTopic("topic123", false) _ = mes2.CreateTopic("topic123", false) _ = mes2.CreateTopic("topic24", false) _ = mes3.CreateTopic("topic123", false) _ = mes4.CreateTopic("topic24", false) - //wait a bit for topic announcements + // wait a bit for topic announcements time.Sleep(time.Second) peersOnTopic123 := mes2.ConnectedPeersOnTopic("topic123") peersOnTopic24 := mes2.ConnectedPeersOnTopic("topic24") - //keep the same checks as the test above as to be 100% that the returned list are correct + // keep the same checks as the test above as to be 100% that the returned list are correct assert.Equal(t, 2, len(peersOnTopic123)) assert.True(t, containsPeerID(peersOnTopic123, mes1.ID())) assert.True(t, containsPeerID(peersOnTopic123, mes3.ID())) @@ -955,7 +955,7 @@ func TestLibp2pMessenger_ConnectedPeersOnTopicTwoTopicsShouldWork(t *testing.T) _ = mes4.Close() } -//------- ConnectedFullHistoryPeersOnTopic +// ------- ConnectedFullHistoryPeersOnTopic func TestLibp2pMessenger_ConnectedFullHistoryPeersOnTopicShouldWork(t *testing.T) { mes1, mes2, mes3 := createMockNetworkOf3() @@ -967,7 +967,7 @@ func TestLibp2pMessenger_ConnectedFullHistoryPeersOnTopicShouldWork(t *testing.T _ = mes1.ConnectToPeer(adr2) _ = mes3.ConnectToPeer(adr2) _ = mes1.ConnectToPeer(adr3) - //connected peers: 1 ----- 2 + // connected peers: 1 ----- 2 // | | // 3 ------+ @@ -975,7 +975,7 @@ func TestLibp2pMessenger_ConnectedFullHistoryPeersOnTopicShouldWork(t *testing.T _ = mes2.CreateTopic("topic123", false) _ = mes3.CreateTopic("topic123", false) - //wait a bit for topic announcements + // wait a bit for topic announcements time.Sleep(time.Second) assert.Equal(t, 2, len(mes1.ConnectedPeersOnTopic("topic123"))) @@ -1002,7 +1002,7 @@ func TestLibp2pMessenger_ConnectedPeersShouldReturnUniquePeers(t *testing.T) { NetworkCalled: func() network.Network { return &mock.NetworkStub{ ConnsCalled: func() []network.Conn { - //generate a mock list that contain duplicates + // generate a mock list that contain duplicates return []network.Conn{ generateConnWithRemotePeer(pid1), generateConnWithRemotePeer(pid1), @@ -1027,7 +1027,7 @@ func TestLibp2pMessenger_ConnectedPeersShouldReturnUniquePeers(t *testing.T) { netw := mocknet.New(context.Background()) mes, _ := libp2p.NewMockMessenger(createMockNetworkArgs(), netw) - //we can safely close the host as the next operations will be done on a mock + // we can safely close the host as the next operations will be done on a mock _ = mes.Close() mes.SetHost(hs) @@ -1166,7 +1166,7 @@ func TestLibp2pMessenger_SendDirectWithRealNetToSelfShouldWork(t *testing.T) { _ = mes.Close() } -//------- Bootstrap +// ------- Bootstrap func TestNetworkMessenger_BootstrapPeerDiscoveryShouldCallPeerBootstrapper(t *testing.T) { wasCalled := false @@ -1191,7 +1191,7 @@ func TestNetworkMessenger_BootstrapPeerDiscoveryShouldCallPeerBootstrapper(t *te _ = mes.Close() } -//------- SetThresholdMinConnectedPeers +// ------- SetThresholdMinConnectedPeers func TestNetworkMessenger_SetThresholdMinConnectedPeersInvalidValueShouldErr(t *testing.T) { mes := createMockMessenger() @@ -1217,7 +1217,7 @@ func TestNetworkMessenger_SetThresholdMinConnectedPeersShouldWork(t *testing.T) assert.Equal(t, minConnectedPeers, mes.ThresholdMinConnectedPeers()) } -//------- IsConnectedToTheNetwork +// ------- IsConnectedToTheNetwork func TestNetworkMessenger_IsConnectedToTheNetworkRetFalse(t *testing.T) { mes := createMockMessenger() @@ -1243,7 +1243,7 @@ func TestNetworkMessenger_IsConnectedToTheNetworkWithZeroRetTrue(t *testing.T) { assert.True(t, mes.IsConnectedToTheNetwork()) } -//------- SetPeerShardResolver +// ------- SetPeerShardResolver func TestNetworkMessenger_SetPeerShardResolverNilShouldErr(t *testing.T) { mes := createMockMessenger() @@ -1336,8 +1336,8 @@ func TestNetworkMessenger_PreventReprocessingShouldWork(t *testing.T) { ValidatorData: nil, } - assert.False(t, callBackFunc(ctx, pid, msg)) //this will not call - assert.False(t, callBackFunc(ctx, pid, msg)) //this will not call + assert.False(t, callBackFunc(ctx, pid, msg)) // this will not call + assert.False(t, callBackFunc(ctx, pid, msg)) // this will not call assert.Equal(t, uint32(0), atomic.LoadUint32(&numCalled)) _ = mes.Close() @@ -1367,7 +1367,7 @@ func TestNetworkMessenger_PubsubCallbackNotMessageNotValidShouldNotCallHandler(t _ = mes.SetPeerDenialEvaluator(&mock.PeerDenialEvaluatorStub{ UpsertPeerIDCalled: func(pid core.PeerID, duration time.Duration) error { atomic.AddInt32(&numUpserts, 1) - //any error thrown here should not impact the execution + // any error thrown here should not impact the execution return fmt.Errorf("expected error") }, IsDeniedCalled: func(pid core.PeerID) bool {