Skip to content

Commit

Permalink
Reconnect to initial list
Browse files Browse the repository at this point in the history
  • Loading branch information
trasc committed Oct 31, 2019
1 parent d3f1b53 commit 25f3852
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 6 deletions.
23 changes: 21 additions & 2 deletions p2p/libp2p/discovery/kadDhtDiscoverer.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package discovery

import (
"fmt"
"sync"
"time"

"github.com/ElrondNetwork/elrond-go/core/logger"
"github.com/ElrondNetwork/elrond-go/p2p"
"github.com/ElrondNetwork/elrond-go/p2p/libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p-kbucket"
)

var peerDiscoveryTimeout = 10 * time.Second
Expand Down Expand Up @@ -101,11 +103,28 @@ func (kdd *KadDhtDiscoverer) connectToInitialAndBootstrap() {

kdd.mutKadDht.Lock()
go func() {
i := 0
for {
if kdd.initc {
err := kdd.kadDHT.BootstrapOnce(ctx, cfg)
if err != nil {
log.Warn("error bootstrapping: %s", err)
if err == kbucket.ErrLookupFailure {
log.Warn(fmt.Sprintf("KDD: Reconnet to init: %s", err))
chanRecInit := kdd.connectToOnePeerFromInitialPeersList(
kdd.refreshInterval,
kdd.initialPeersList)
<-chanRecInit

} else if err != nil {
log.Warn(fmt.Sprintf("KDD: error bootstrapping: %s", err))
}
} else {
i++
if (i % 10) == 0 {
log.Warn("KDD: Reconnet to init in pause")
chanRecInit := kdd.connectToOnePeerFromInitialPeersList(
kdd.refreshInterval,
kdd.initialPeersList)
<-chanRecInit
}
}
select {
Expand Down
15 changes: 11 additions & 4 deletions p2p/libp2p/libp2pConnectionMonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ func (lcm *libp2pConnectionMonitor) ThresholdRandomTrim() int {
return math.MaxInt32
}

// Request a reconnet to initial list
func (lcm *libp2pConnectionMonitor) doReconn() {
select {
case lcm.chDoReconnect <- struct{}{}:
default:
}
}

// Connected is called when a connection opened
func (lcm *libp2pConnectionMonitor) Connected(netw network.Network, conn network.Conn) {
if len(netw.Conns()) > lcm.ThresholdDiscoveryPause() {
Expand All @@ -78,21 +86,20 @@ func (lcm *libp2pConnectionMonitor) Connected(netw network.Network, conn network
for i := lcm.ThresholdDiscoveryPause(); i < len(sorted); i++ {
netw.ClosePeer(sorted[i])
}
lcm.doReconn()
}
}

// Disconnected is called when a connection closed
func (lcm *libp2pConnectionMonitor) Disconnected(netw network.Network, conn network.Conn) {
currentConnCount := len(netw.Conns())
if currentConnCount < ThresholdMinimumConnectedPeers {
select {
case lcm.chDoReconnect <- struct{}{}:
default:
}
lcm.doReconn()
}

if currentConnCount < lcm.ThresholdDiscoveryResume() && lcm.reconnecter != nil {
lcm.reconnecter.Resume()
lcm.doReconn()
}
}

Expand Down

0 comments on commit 25f3852

Please sign in to comment.