Skip to content

Commit

Permalink
Merge pull request #2230 from rsksmart/reconect_previous_peers
Browse files Browse the repository at this point in the history
Load active peers from previous session saved into file.
  • Loading branch information
Vovchyk authored Apr 10, 2024
2 parents d4c1ed0 + aa6bce5 commit 12c9b32
Show file tree
Hide file tree
Showing 15 changed files with 452 additions and 36 deletions.
46 changes: 35 additions & 11 deletions rskj-core/src/main/java/co/rsk/RskContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import co.rsk.metrics.HashRateCalculatorNonMining;
import co.rsk.mine.*;
import co.rsk.net.*;
import co.rsk.net.discovery.KnownPeersHandler;
import co.rsk.net.discovery.PeerExplorer;
import co.rsk.net.discovery.UDPServer;
import co.rsk.net.discovery.table.KademliaOptions;
Expand Down Expand Up @@ -88,6 +89,7 @@
import co.rsk.util.RskCustomCache;
import co.rsk.validators.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import org.ethereum.config.Constants;
import org.ethereum.config.SystemProperties;
import org.ethereum.config.blockchain.upgrades.ActivationConfig;
Expand Down Expand Up @@ -157,7 +159,6 @@ public class RskContext implements NodeContext, NodeBootstrapper {
private static final String CACHE_FILE_NAME = "rskcache";

private final CliArgs<NodeCliOptions, NodeCliFlags> cliArgs;

private RskSystemProperties rskSystemProperties;
private Blockchain blockchain;
private MiningMainchainView miningMainchainView;
Expand Down Expand Up @@ -252,7 +253,6 @@ public class RskContext implements NodeContext, NodeBootstrapper {
private TxQuotaChecker txQuotaChecker;
private GasPriceTracker gasPriceTracker;
private BlockChainFlusher blockChainFlusher;

private final Map<String, DbKind> dbPathToDbKindMap = new HashMap<>();

private volatile boolean closed;
Expand Down Expand Up @@ -1584,17 +1584,16 @@ protected PeerExplorer getPeerExplorer() {
rskSystemProperties.getPublicIp(),
rskSystemProperties.getPeerPort()
);
List<String> initialBootNodes = rskSystemProperties.peerDiscoveryIPList();
List<Node> activePeers = rskSystemProperties.peerActive();
if (activePeers != null) {
for (Node n : activePeers) {
InetSocketAddress address = n.getAddress();
initialBootNodes.add(address.getHostName() + ":" + address.getPort());
}

KnownPeersHandler knownPeersHandler = null;
if (rskSystemProperties.usePeersFromLastSession()) {
knownPeersHandler = new KnownPeersHandler(getRskSystemProperties().getLastKnewPeersFilePath());

}

int bucketSize = rskSystemProperties.discoveryBucketSize();
peerExplorer = new PeerExplorer(
initialBootNodes,
getInitialBootNodes(knownPeersHandler),
localNode,
new NodeDistanceTable(KademliaOptions.BINS, bucketSize, localNode),
key,
Expand All @@ -1604,13 +1603,38 @@ protected PeerExplorer getPeerExplorer() {
rskSystemProperties.networkId(),
getPeerScoringManager(),
rskSystemProperties.allowMultipleConnectionsPerHostPort(),
rskSystemProperties.peerDiscoveryMaxBootRetries()
rskSystemProperties.peerDiscoveryMaxBootRetries(),
knownPeersHandler
);
}

return peerExplorer;
}

@VisibleForTesting
List<String> getInitialBootNodes(KnownPeersHandler knownPeersHandler) {
Set<String> initialBootNodes = new HashSet<>();
RskSystemProperties rskSystemProperties = getRskSystemProperties();
List<String> peerDiscoveryIPList = rskSystemProperties.peerDiscoveryIPList();
if (peerDiscoveryIPList != null) {
initialBootNodes.addAll(peerDiscoveryIPList);
}
List<Node> activePeers = rskSystemProperties.peerActive();
if (activePeers != null) {
for (Node n : activePeers) {
InetSocketAddress address = n.getAddress();
initialBootNodes.add(address.getHostName() + ":" + address.getPort());
}
}

if (rskSystemProperties.usePeersFromLastSession()) {
List<String> peerLastSession = knownPeersHandler.readPeers();
logger.debug("Loading peers from previous session: {}",peerLastSession);
initialBootNodes.addAll(peerLastSession);
}
return new ArrayList<>(initialBootNodes);
}

private BlockChainLoader getBlockChainLoader() {
if (blockChainLoader == null) {
blockChainLoader = new BlockChainLoader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class RskSystemProperties extends SystemProperties {
private static final int CHUNK_SIZE = 192;

public static final String PROPERTY_SYNC_TOP_BEST = "sync.topBest";
public static final String USE_PEERS_FROM_LAST_SESSION = "peer.discovery.usePeersFromLastSession";

//TODO: REMOVE THIS WHEN THE LocalBLockTests starts working with REMASC
private boolean remascEnabled = true;
Expand Down Expand Up @@ -254,6 +255,10 @@ public boolean skipRemasc() {
return getBoolean("rpc.skipRemasc", false);
}

public boolean usePeersFromLastSession() {
return getBoolean(USE_PEERS_FROM_LAST_SESSION, false);
}

public long peerDiscoveryMessageTimeOut() {
return getLong("peer.discovery.msg.timeout", PD_DEFAULT_TIMEOUT_MESSAGE);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* This file is part of RskJ
* Copyright (C) 2024 RSK Labs Ltd.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package co.rsk.net.discovery;

import co.rsk.util.SimpleFileWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;

public class KnownPeersHandler {
private static final Logger logger = LoggerFactory.getLogger(KnownPeersHandler.class);
private final Path peerListFileDir;
private final SimpleFileWriter fileDataSaver;

public KnownPeersHandler(Path peerListFileDir) {
this(peerListFileDir, SimpleFileWriter.getInstance());
}

public KnownPeersHandler(Path peerListFileDir, SimpleFileWriter fileDataSaver) {
this.peerListFileDir = peerListFileDir;
this.fileDataSaver = fileDataSaver;
}
public void savePeers(Map<String,String> knownPeers) {
logger.debug("Saving peers {} to file in {}", knownPeers, peerListFileDir);
Properties props = new Properties();
props.putAll(knownPeers);
try {
fileDataSaver.savePropertiesIntoFile(props, peerListFileDir);
} catch (IOException e) {
logger.error("Error saving active peers to file: {}", e.getMessage());
}
}

public List<String> readPeers(){
File file = peerListFileDir.toFile();
Properties props = new Properties();
if (file.canRead()) {
try (FileReader reader = new FileReader(file)) {
props.load(reader);
} catch (IOException e) {
logger.error("Error reading active peers from file: {}", e.getMessage());
return Collections.emptyList();
}
}
return props.values().stream().map(Object::toString).collect(Collectors.toList());
}
}
23 changes: 22 additions & 1 deletion rskj-core/src/main/java/co/rsk/net/discovery/PeerExplorer.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,20 @@ public class PeerExplorer {

private UDPChannel udpChannel;

private final KnownPeersHandler knownPeersHandler;

public PeerExplorer(List<String> initialBootNodes,
Node localNode, NodeDistanceTable distanceTable, ECKey key,
long reqTimeOut, long updatePeriod, long cleanPeriod, Integer networkId,
PeerScoringManager peerScoringManager, boolean allowMultipleConnectionsPerHostPort, long maxBootRetries) {
this(initialBootNodes, localNode, distanceTable, key, reqTimeOut, updatePeriod, cleanPeriod, networkId, peerScoringManager, allowMultipleConnectionsPerHostPort, maxBootRetries, null);
}

public PeerExplorer(List<String> initialBootNodes,
Node localNode, NodeDistanceTable distanceTable, ECKey key,
long reqTimeOut, long updatePeriod, long cleanPeriod, Integer networkId,
PeerScoringManager peerScoringManager, boolean allowMultipleConnectionsPerHostPort,
long maxBootRetries, KnownPeersHandler knownPeersHandler) {
this.localNode = localNode;
this.key = key;
this.distanceTable = distanceTable;
Expand All @@ -108,13 +118,13 @@ public PeerExplorer(List<String> initialBootNodes,
this.cleaner = new PeerExplorerCleaner(updatePeriod, cleanPeriod, this);
this.challengeManager = new NodeChallengeManager();
this.requestTimeout = reqTimeOut;

this.peerScoringManager = peerScoringManager;

this.knownHosts = new ConcurrentHashMap<>();
this.allowMultipleConnectionsPerHostPort = allowMultipleConnectionsPerHostPort;

this.maxBootRetries = maxBootRetries;
this.knownPeersHandler = knownPeersHandler;
}

void start() {
Expand Down Expand Up @@ -142,6 +152,13 @@ public synchronized void dispose() {
}
state = ExecState.FINISHED;

if (knownPeersHandler != null) {
Map<String, String> knownPeers = getKnownHosts().entrySet().stream()
.collect(Collectors.toMap(e -> e.getValue().toString(), Map.Entry::getKey));
if (knownPeers.size() > 0) {
knownPeersHandler.savePeers(knownPeers);
}
}
this.cleaner.dispose();
}

Expand Down Expand Up @@ -601,4 +618,8 @@ private boolean isBanned(Node node) {

return address != null && this.peerScoringManager.isAddressBanned(address) || this.peerScoringManager.isNodeIDBanned(node.getId());
}

Map<String, NodeID> getKnownHosts() {
return knownHosts;
}
}
62 changes: 62 additions & 0 deletions rskj-core/src/main/java/co/rsk/util/SimpleFileWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* This file is part of RskJ
* Copyright (C) 2023 RSK Labs Ltd.
* (derived from ethereumJ library, Copyright (c) 2016 <ether.camp>)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package co.rsk.util;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Properties;

import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;

public class SimpleFileWriter {
private static final String TMP = ".tmp";
private static SimpleFileWriter instance;

private SimpleFileWriter() {
}

public static SimpleFileWriter getInstance() {
if (instance == null) {
instance = new SimpleFileWriter();
}
return instance;
}

public void savePropertiesIntoFile(Properties properties, Path filePath) throws IOException {
File tempFile = File.createTempFile(filePath.toString(), TMP);
try (FileWriter writer = new FileWriter(tempFile, false)) {
properties.store(writer, null);
}
filePath.toFile().getParentFile().mkdirs();
Files.move(tempFile.toPath(), filePath, REPLACE_EXISTING);
}
public void saveDataIntoFile(String data, Path filePath) throws IOException {

File tempFile = File.createTempFile(filePath.toString(), TMP);
try (FileWriter writer = new FileWriter(tempFile, false)) {
writer.write(data);
}
filePath.toFile().getParentFile().mkdirs();
Files.move(tempFile.toPath(), filePath, REPLACE_EXISTING);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import java.net.URL;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -106,6 +108,7 @@ public abstract class SystemProperties {
public static final String PROPERTY_PERSIST_BLOOMS_CACHE_SNAPSHOT = "cache.blooms.persist-snapshot";

/* Testing */
public static final String LAST_KNEW_PEERS_FILE = "lastPeers.properties";
private static final Boolean DEFAULT_VMTEST_LOAD_LOCAL = false;

protected final Config configFromFiles;
Expand Down Expand Up @@ -312,6 +315,10 @@ public String databaseDir() {
return databaseDir == null ? configFromFiles.getString(PROPERTY_BASE_PATH) : databaseDir;
}

public Path getLastKnewPeersFilePath() {
return Paths.get(databaseDir(), LAST_KNEW_PEERS_FILE);
}

public void setDataBaseDir(String dataBaseDir) {
this.databaseDir = dataBaseDir;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
package org.ethereum.net.server;

import co.rsk.config.RskSystemProperties;
import co.rsk.net.Peer;
import co.rsk.net.NodeID;
import co.rsk.net.Peer;
import co.rsk.net.Status;
import co.rsk.net.messages.*;
import co.rsk.scoring.InetAddressUtils;
Expand Down
1 change: 1 addition & 0 deletions rskj-core/src/main/resources/expected.conf
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ peer = {
allowMultipleConnectionsPerHostPort = <bool>
maxBootRetries = <long>
bucketSize = <number>
usePeersFromLastSession = <boolean>
}
port = <port>
networkId = <networkId>
Expand Down
8 changes: 6 additions & 2 deletions rskj-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,12 @@ peer {
discovery = {
# allow multiple connections per host by default
allowMultipleConnectionsPerHostPort = true
# allows to specify a number of attempts to discover at least one peer. By default, it's -1, which means an infinite number of attempts
maxBootRetries = -1

# If true, the node will try to connect to the peers from the last session
usePeersFromLastSession = false

# allows to specify a number of attempts to discover at least one peer. By default, it's -1, which means an infinite number of attempts
maxBootRetries = -1
}

# flag that allows to propagate a received block without executing it and only checking basic validation rules.
Expand Down
2 changes: 2 additions & 0 deletions rskj-core/src/test/java/co/rsk/NodeRunnerImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,6 @@ void nodeIsAlreadyStopped_WhenStopNode_ThenShouldNotThrowError() throws Exceptio
fail();
}
}


}
Loading

0 comments on commit 12c9b32

Please sign in to comment.