Skip to content

Commit

Permalink
- new pubsub lib & adaptation
Browse files Browse the repository at this point in the history
  • Loading branch information
iulianpascalau committed Mar 4, 2022
1 parent 53fd226 commit 6c40856
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 66 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,5 @@ replace github.com/ElrondNetwork/arwen-wasm-vm/v1_2 v1.2.30 => github.com/Elrond
replace github.com/ElrondNetwork/arwen-wasm-vm/v1_3 v1.3.30 => github.com/ElrondNetwork/arwen-wasm-vm v1.3.30

replace github.com/ElrondNetwork/arwen-wasm-vm/v1_4 v1.4.22 => github.com/ElrondNetwork/arwen-wasm-vm v1.4.22

replace github.com/libp2p/go-libp2p-pubsub v0.5.5 => github.com/ElrondNetwork/go-libp2p-pubsub v0.5.5-gamma
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ github.com/ElrondNetwork/elrond-vm-common v1.2.1/go.mod h1:07N31evc3GKh+tcmOXpc3
github.com/ElrondNetwork/elrond-vm-common v1.2.3/go.mod h1:07N31evc3GKh+tcmOXpc3xz/YsgV4yUHMo3LSlF0DIs=
github.com/ElrondNetwork/elrond-vm-common v1.2.4-rc1 h1:fXuoEFwwgyT9f+P13CuYQOXM1yW48ilmov4M4B8fTXY=
github.com/ElrondNetwork/elrond-vm-common v1.2.4-rc1/go.mod h1:07N31evc3GKh+tcmOXpc3xz/YsgV4yUHMo3LSlF0DIs=
github.com/ElrondNetwork/go-libp2p-pubsub v0.5.5-gamma h1:k3Ko5UI2HNZlrU9laVeWx13+jnm79Maame4wIhf6J7Y=
github.com/ElrondNetwork/go-libp2p-pubsub v0.5.5-gamma/go.mod h1:gVOzwebXVdSMDQBTfH8ACO5EJ4SQrvsHqCmYsCZpD0E=
github.com/ElrondNetwork/notifier-go v1.0.2 h1:s/cJMRr4o5CD07M8ElNFRnJf/deUt2vtSBU6Z8Ow6uM=
github.com/ElrondNetwork/notifier-go v1.0.2/go.mod h1:2O5LGUMdvYSoff5oxq79kJ4uOSP3PSoF4k/7nPND2ic=
github.com/ElrondNetwork/protobuf v1.3.2 h1:qoCSYiO+8GtXBEZWEjw0WPcZfM3g7QuuJrwpN+y6Mvg=
Expand Down Expand Up @@ -565,8 +567,6 @@ github.com/libp2p/go-libp2p-peerstore v0.2.8 h1:nJghUlUkFVvyk7ccsM67oFA6kqUkwyCM
github.com/libp2p/go-libp2p-peerstore v0.2.8/go.mod h1:gGiPlXdz7mIHd2vfAsHzBNAMqSDkt2UBFwgcITgw1lA=
github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k=
github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA=
github.com/libp2p/go-libp2p-pubsub v0.5.5 h1:7jTdWQsB9eHO8prTK95/0vFxekO1wQJ7OMWr8w4wt54=
github.com/libp2p/go-libp2p-pubsub v0.5.5/go.mod h1:gVOzwebXVdSMDQBTfH8ACO5EJ4SQrvsHqCmYsCZpD0E=
github.com/libp2p/go-libp2p-quic-transport v0.10.0/go.mod h1:RfJbZ8IqXIhxBRm5hqUEJqjiiY8xmEuq3HUDS993MkA=
github.com/libp2p/go-libp2p-quic-transport v0.11.2 h1:p1YQDZRHH4Cv2LPtHubqlQ9ggz4CKng/REZuXZbZMhM=
github.com/libp2p/go-libp2p-quic-transport v0.11.2/go.mod h1:wlanzKtIh6pHrq+0U3p3DY9PJfGqxMgPaGKaK5LifwQ=
Expand Down
8 changes: 4 additions & 4 deletions p2p/libp2p/directSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,22 @@ func NewDirectSender(
mutexForPeer: mutexForPeer,
}

//wire-up a handler for direct messages
// wire-up a handler for direct messages
h.SetStreamHandler(DirectSendID, ds.directStreamHandler)

return ds, nil
}

func (ds *directSender) directStreamHandler(s network.Stream) {
reader := ggio.NewDelimitedReader(s, 1<<20)
reader := ggio.NewDelimitedReader(s, maxSendBuffSize)

go func(r ggio.ReadCloser) {
for {
msg := &pubsubPb.Message{}

err := reader.ReadMsg(msg)
if err != nil {
//stream has encountered an error, close this go routine
// stream has encountered an error, close this go routine

if err != io.EOF {
_ = s.Reset()
Expand Down Expand Up @@ -198,7 +198,7 @@ func (ds *directSender) getConnection(p core.PeerID) (network.Conn, error) {
return nil, p2p.ErrPeerNotDirectlyConnected
}

//return the connection that has the highest number of streams
// return the connection that has the highest number of streams
lStreams := 0
var conn network.Conn
for _, c := range conns {
Expand Down
32 changes: 16 additions & 16 deletions p2p/libp2p/netMessenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ const (
refreshPeersOnTopic = time.Second * 3
ttlPeersOnTopic = time.Second * 10
pubsubTimeCacheDuration = 10 * time.Minute
acceptMessagesInAdvanceDuration = 20 * time.Second //we are accepting the messages with timestamp in the future only for this delta
acceptMessagesInAdvanceDuration = 20 * time.Second // we are accepting the messages with timestamp in the future only for this delta
broadcastGoRoutines = 1000
timeBetweenPeerPrints = time.Second * 20
timeBetweenExternalLoggersCheck = time.Second * 20
minRangePortValue = 1025
noSignPolicy = pubsub.MessageSignaturePolicy(0) //should be used only in tests
noSignPolicy = pubsub.MessageSignaturePolicy(0) // should be used only in tests
msgBindError = "address already in use"
maxRetriesIfBindError = 10
)
Expand All @@ -78,10 +78,10 @@ const (
preventReusePorts reusePortsConfig = false
)

//TODO remove the header size of the message when commit d3c5ecd3a3e884206129d9f2a9a4ddfd5e7c8951 from
// TODO remove the header size of the message when commit d3c5ecd3a3e884206129d9f2a9a4ddfd5e7c8951 from
// https://github.com/libp2p/go-libp2p-pubsub/pull/189/commits will be part of a new release
var messageHeader = 64 * 1024 //64kB
var maxSendBuffSize = (1 << 20) - messageHeader
var messageHeader = 64 * 1024 // 64kB
var maxSendBuffSize = (1 << 21) - messageHeader
var log = logger.GetOrCreate("p2p/libp2p")

var _ p2p.Messenger = (*networkMessenger)(nil)
Expand All @@ -95,15 +95,15 @@ func init() {
}
}

//TODO refactor this struct to have be a wrapper (with logic) over a glue code
// TODO refactor this struct to have be a wrapper (with logic) over a glue code
type networkMessenger struct {
ctx context.Context
cancelFunc context.CancelFunc
p2pHost ConnectableHost
port int
pb *pubsub.PubSub
ds p2p.DirectSender
//TODO refactor this (connMonitor & connMonitorWrapper)
// TODO refactor this (connMonitor & connMonitorWrapper)
connMonitor ConnectionMonitor
connMonitorWrapper p2p.ConnectionMonitorWrapper
peerDiscoverer p2p.PeerDiscoverer
Expand Down Expand Up @@ -200,7 +200,7 @@ func constructNode(
libp2p.DefaultMuxers,
libp2p.DefaultSecurity,
transportOption,
//we need the disable relay option in order to save the node's bandwidth as much as possible
// we need the disable relay option in order to save the node's bandwidth as much as possible
libp2p.DisableRelay(),
libp2p.NATPortMap(),
}
Expand Down Expand Up @@ -237,7 +237,7 @@ func constructNodeWithPortRetry(

lastErr = err
if !strings.Contains(err.Error(), msgBindError) {
//not a bind error, return directly
// not a bind error, return directly
return nil, err
}

Expand Down Expand Up @@ -736,15 +736,15 @@ func (netMes *networkMessenger) PeerAddresses(pid core.PeerID) []string {
h := netMes.p2pHost
result := make([]string, 0)

//check if the peer is connected to return it's connected address
// check if the peer is connected to return it's connected address
for _, c := range h.Network().Conns() {
if string(c.RemotePeer()) == string(pid.Bytes()) {
result = append(result, c.RemoteMultiaddr().String())
break
}
}

//check in peerstore (maybe it is known but not connected)
// check in peerstore (maybe it is known but not connected)
addresses := h.Peerstore().Addrs(peer.ID(pid.Bytes()))
for _, addr := range addresses {
result = append(result, addr.String())
Expand Down Expand Up @@ -797,7 +797,7 @@ func (netMes *networkMessenger) CreateTopic(name string, createChannelForTopic b
err = netMes.outgoingPLB.AddChannel(name)
}

//just a dummy func to consume messages received by the newly created topic
// just a dummy func to consume messages received by the newly created topic
go func() {
var errSubscrNext error
for {
Expand Down Expand Up @@ -937,7 +937,7 @@ func (netMes *networkMessenger) pubsubCallback(topicProcs *topicProcessors, topi
func (netMes *networkMessenger) transformAndCheckMessage(pbMsg *pubsub.Message, pid core.PeerID, topic string) (p2p.MessageP2P, error) {
msg, errUnmarshal := NewMessage(pbMsg, netMes.marshalizer)
if errUnmarshal != nil {
//this error is so severe that will need to blacklist both the originator and the connected peer as there is
// this error is so severe that will need to blacklist both the originator and the connected peer as there is
// no way this node can communicate with them
pidFrom := core.PeerID(pbMsg.From)
netMes.blacklistPid(pid, common.WrongP2PMessageBlacklistDuration)
Expand All @@ -948,7 +948,7 @@ func (netMes *networkMessenger) transformAndCheckMessage(pbMsg *pubsub.Message,

err := netMes.validMessageByTimestamp(msg)
if err != nil {
//not reprocessing nor re-broadcasting the same message over and over again
// not reprocessing nor re-broadcasting the same message over and over again
log.Trace("received an invalid message",
"originator pid", p2p.MessageOriginatorPid(msg),
"from connected pid", p2p.PeerIdToShortString(pid),
Expand Down Expand Up @@ -1138,7 +1138,7 @@ func (netMes *networkMessenger) directMessageHandler(message *pubsub.Message, fr
return
}

//we won't recheck the message id against the cacher here as there might be collisions since we are using
// we won't recheck the message id against the cacher here as there might be collisions since we are using
// a separate sequence counter for direct sender
messageOk := true
for index, handler := range handlers {
Expand Down Expand Up @@ -1205,7 +1205,7 @@ func (netMes *networkMessenger) SetPeerShardResolver(peerShardResolver p2p.PeerS
}

// SetPeerDenialEvaluator sets the peer black list handler
//TODO decide if we continue on using setters or switch to options. Refactor if necessary
// TODO decide if we continue on using setters or switch to options. Refactor if necessary
func (netMes *networkMessenger) SetPeerDenialEvaluator(handler p2p.PeerDenialEvaluator) error {
return netMes.connMonitorWrapper.SetPeerDenialEvaluator(handler)
}
Expand Down
Loading

0 comments on commit 6c40856

Please sign in to comment.