diff --git a/rskj-core/src/integrationTest/java/co/rsk/snapshotsync/SnapshotSyncIntegrationTest.java b/rskj-core/src/integrationTest/java/co/rsk/snapshotsync/SnapshotSyncIntegrationTest.java index db5bb3ab38e..e6a2c90ca46 100644 --- a/rskj-core/src/integrationTest/java/co/rsk/snapshotsync/SnapshotSyncIntegrationTest.java +++ b/rskj-core/src/integrationTest/java/co/rsk/snapshotsync/SnapshotSyncIntegrationTest.java @@ -104,7 +104,7 @@ public void whenStartTheServerAndClientNodes_thenTheClientWillSynchWithServer() boolean isClientSynced = false; while (System.currentTimeMillis() < endTime) { - if (clientNode.getOutput().contains("CLIENT - Starting Snapshot sync.") && clientNode.getOutput().contains("CLIENT - Snapshot sync finished!")) { + if (clientNode.getOutput().contains("CLIENT - Starting Snapshot sync.") && clientNode.getOutput().contains("CLIENT - Snapshot sync finished successfully!")) { try { JsonNode jsonResponse = OkHttpClientTestFixture.getJsonResponseForGetBestBlockMessage(portClientRpc, serverBestBlockNumber); JsonNode jsonResult = jsonResponse.get(0).get("result"); diff --git a/rskj-core/src/main/java/co/rsk/net/SnapshotProcessor.java b/rskj-core/src/main/java/co/rsk/net/SnapshotProcessor.java index d2f8ec68811..862ddc6fed0 100644 --- a/rskj-core/src/main/java/co/rsk/net/SnapshotProcessor.java +++ b/rskj-core/src/main/java/co/rsk/net/SnapshotProcessor.java @@ -29,7 +29,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; import org.ethereum.core.Block; import org.ethereum.core.Blockchain; import org.ethereum.core.TransactionPool; @@ -63,6 +62,7 @@ public class SnapshotProcessor implements InternalService { public static final int BLOCK_NUMBER_CHECKPOINT = 5000; public static final int BLOCK_CHUNK_SIZE = 400; public static final int BLOCKS_REQUIRED = 6000; + public static final long CHUNK_ITEM_SIZE = 1024L; private final Blockchain blockchain; private final TrieStore trieStore; private final BlockStore blockStore; @@ -78,7 +78,6 @@ public class SnapshotProcessor implements InternalService { private volatile Boolean isRunning; private final Thread thread; - public SnapshotProcessor(Blockchain blockchain, TrieStore trieStore, SnapshotPeersInformation peersInformation, @@ -91,13 +90,13 @@ public SnapshotProcessor(Blockchain blockchain, @VisibleForTesting SnapshotProcessor(Blockchain blockchain, - TrieStore trieStore, - SnapshotPeersInformation peersInformation, - BlockStore blockStore, - TransactionPool transactionPool, - int chunkSize, - boolean isParallelEnabled, - @Nullable SyncMessageHandler.Listener listener) { + TrieStore trieStore, + SnapshotPeersInformation peersInformation, + BlockStore blockStore, + TransactionPool transactionPool, + int chunkSize, + boolean isParallelEnabled, + @Nullable SyncMessageHandler.Listener listener) { this.blockchain = blockchain; this.trieStore = trieStore; this.peersInformation = peersInformation; @@ -158,6 +157,7 @@ void processSnapStatusRequestInternal(Peer sender, SnapStatusRequestMessage igno logger.debug("SERVER - Processing snapshot status request."); long bestBlockNumber = blockchain.getBestBlock().getNumber(); long checkpointBlockNumber = bestBlockNumber - (bestBlockNumber % BLOCK_NUMBER_CHECKPOINT); + logger.debug("SERVER - checkpointBlockNumber: {}, bestBlockNumber: {}", checkpointBlockNumber, bestBlockNumber); List blocks = Lists.newArrayList(); List difficulties = Lists.newArrayList(); for (long i = checkpointBlockNumber - BLOCK_CHUNK_SIZE; i < checkpointBlockNumber; i++) { @@ -166,8 +166,10 @@ void processSnapStatusRequestInternal(Peer sender, SnapStatusRequestMessage igno difficulties.add(blockStore.getTotalDifficultyForHash(block.getHash().getBytes())); } + logger.trace("SERVER - Sending snapshot status response. From block {} to block {} - chunksize {}", blocks.get(0).getNumber(), blocks.get(blocks.size() - 1).getNumber(), BLOCK_CHUNK_SIZE); Block checkpointBlock = blockchain.getBlockByNumber(checkpointBlockNumber); blocks.add(checkpointBlock); + logger.trace("SERVER - adding checkpoint block: {}", checkpointBlock.getNumber()); difficulties.add(blockStore.getTotalDifficultyForHash(checkpointBlock.getHash().getBytes())); byte[] rootHash = checkpointBlock.getStateRoot(); Optional opt = trieStore.retrieveDTO(rootHash); @@ -192,12 +194,12 @@ public void processSnapStatusResponse(SnapSyncState state, Peer sender, SnapStat state.setLastBlockDifficulty(lastBlock.getCumulativeDifficulty()); state.setRemoteRootHash(lastBlock.getStateRoot()); state.setRemoteTrieSize(responseMessage.getTrieSize()); - List> blocks = state.getBlocks(); for (int i = 0; i < blocksFromResponse.size(); i++) { - blocks.add(new ImmutablePair<>(blocksFromResponse.get(i), difficultiesFromResponse.get(i))); + state.addBlock(new ImmutablePair<>(blocksFromResponse.get(i), difficultiesFromResponse.get(i))); } - logger.debug("CLIENT - Processing snapshot status response - last blockNumber: {} rootHash: {} triesize: {}", lastBlock.getNumber(), state.getRemoteRootHash(), state.getRemoteTrieSize()); + logger.debug("CLIENT - Processing snapshot status response - last blockNumber: {} triesize: {}", lastBlock.getNumber(), state.getRemoteTrieSize()); + logger.debug("Blocks included in the response: {} from {} to {}", blocksFromResponse.size(), blocksFromResponse.get(0).getNumber(), blocksFromResponse.get(blocksFromResponse.size() - 1).getNumber()); requestBlocksChunk(sender, blocksFromResponse.get(0).getNumber()); generateChunkRequestTasks(state); startRequestingChunks(state); @@ -234,27 +236,32 @@ void processSnapBlocksRequestInternal(Peer sender, SnapBlocksRequestMessage requ logger.debug("SERVER - Processing snap blocks request"); List blocks = Lists.newArrayList(); List difficulties = Lists.newArrayList(); - for (long i = requestMessage.getBlockNumber() - BLOCK_CHUNK_SIZE; i < requestMessage.getBlockNumber(); i++) { + long startingBlockNumber = requestMessage.getBlockNumber() - BLOCK_CHUNK_SIZE; + for (long i = startingBlockNumber; i < requestMessage.getBlockNumber(); i++) { Block block = blockchain.getBlockByNumber(i); blocks.add(block); difficulties.add(blockStore.getTotalDifficultyForHash(block.getHash().getBytes())); } + logger.debug("SERVER - Sending snap blocks response. From block {} to block {} - chunksize {}", blocks.get(0).getNumber(), blocks.get(blocks.size() - 1).getNumber(), BLOCK_CHUNK_SIZE); SnapBlocksResponseMessage responseMessage = new SnapBlocksResponseMessage(blocks, difficulties); sender.sendMessage(responseMessage); } public void processSnapBlocksResponse(SnapSyncState state, Peer sender, SnapBlocksResponseMessage responseMessage) { - logger.debug("CLIENT - Processing snap blocks response"); + long lastRequiredBlock = state.getLastBlock().getNumber() - BLOCKS_REQUIRED; List blocksFromResponse = responseMessage.getBlocks(); + logger.debug("CLIENT - Processing snap blocks response. Receiving from block {} to block {} Objective: {}.", blocksFromResponse.get(0).getNumber(), blocksFromResponse.get(blocksFromResponse.size() - 1).getNumber(), lastRequiredBlock); List difficultiesFromResponse = responseMessage.getDifficulties(); + for (int i = 0; i < blocksFromResponse.size(); i++) { - state.getBlocks().add(new ImmutablePair<>(blocksFromResponse.get(i), difficultiesFromResponse.get(i))); + state.addBlock(new ImmutablePair<>(blocksFromResponse.get(i), difficultiesFromResponse.get(i))); } long nextChunk = blocksFromResponse.get(0).getNumber(); - if (nextChunk > state.getLastBlock().getNumber() - BLOCKS_REQUIRED) { + logger.debug("CLIENT - SnapBlock - nexChunk : {} - lastRequired {}, missing {}", nextChunk, lastRequiredBlock, nextChunk - lastRequiredBlock); + if (nextChunk > lastRequiredBlock) { requestBlocksChunk(sender, nextChunk); } else { - logger.info("CLIENT - Finished Snap blocks sync."); + logger.info("CLIENT - Finished Snap blocks request sending."); } } @@ -262,10 +269,9 @@ public void processSnapBlocksResponse(SnapSyncState state, Peer sender, SnapBloc * STATE CHUNK */ private void requestStateChunk(Peer peer, long from, long blockNumber, int chunkSize) { - logger.debug("CLIENT - Requesting state chunk to node {} - block {} - from {}", peer.getPeerNodeID(), blockNumber, from); + logger.debug("CLIENT - Requesting state chunk to node {} - block {} - chunkNumber {}", peer.getPeerNodeID(), blockNumber, from / chunkSize); SnapStateChunkRequestMessage message = new SnapStateChunkRequestMessage(messageId++, blockNumber, from, chunkSize); peer.sendMessage(message); - logger.debug("CLIENT - Request sent state chunk to node {} - block {} - from {}", peer.getPeerNodeID(), blockNumber, from); } public void processStateChunkRequest(Peer sender, SnapStateChunkRequestMessage requestMessage) { @@ -289,10 +295,12 @@ public void run() { void processStateChunkRequestInternal(Peer sender, SnapStateChunkRequestMessage request) { long startChunk = System.currentTimeMillis(); - logger.debug("SERVER - Processing state chunk request from node {}", sender.getPeerNodeID()); + List trieEncoded = new ArrayList<>(); Block block = blockchain.getBlockByNumber(request.getBlockNumber()); - final long to = request.getFrom() + (request.getChunkSize() * 1024); + final long to = request.getFrom() + (request.getChunkSize() * CHUNK_ITEM_SIZE); + logger.debug("SERVER - Processing state chunk request from node {}. From {} to calculated {} being chunksize {}", sender.getPeerNodeID(), request.getFrom(), to, request.getChunkSize()); + logger.debug("SERVER - Sending state chunk from {} to {}", request.getFrom(), to); TrieDTOInOrderIterator it = new TrieDTOInOrderIterator(trieStore, block.getStateRoot(), request.getFrom(), to); // First we add the root nodes on the left of the current node. They are used to validate the chunk. @@ -312,7 +320,6 @@ void processStateChunkRequestInternal(Peer sender, SnapStateChunkRequestMessage byte[] firstNodeLeftHash = RLP.encodeElement(first.getLeftHash()); byte[] nodesBytes = RLP.encodeList(trieEncoded.toArray(new byte[0][0])); byte[] lastNodeHashes = last != null ? RLP.encodeList(RLP.encodeElement(getBytes(last.getLeftHash())), RLP.encodeElement(getBytes(last.getRightHash()))) : RLP.encodedEmptyList(); - // Last we add the root nodes on the right of the last visited node. They are used to validate the chunk. List postRootNodes = it.getNodesLeftVisiting().stream().map((t) -> RLP.encodeList(RLP.encodeElement(t.getEncoded()), RLP.encodeElement(getBytes(t.getRightHash())))).collect(Collectors.toList()); byte[] postRootNodesBytes = !postRootNodes.isEmpty() ? RLP.encodeList(postRootNodes.toArray(new byte[0][0])) : RLP.encodedEmptyList(); @@ -327,7 +334,7 @@ void processStateChunkRequestInternal(Peer sender, SnapStateChunkRequestMessage } public void processStateChunkResponse(SnapSyncState state, Peer peer, SnapStateChunkResponseMessage responseMessage) { - logger.debug("CLIENT - State chunk received from: {}", responseMessage.getFrom()); + logger.debug("CLIENT - State chunk received chunkNumber {}. From {} to {} of total size {}", responseMessage.getFrom() / CHUNK_ITEM_SIZE, responseMessage.getFrom(), responseMessage.getTo(), state.getRemoteTrieSize()); PriorityQueue queue = state.getSnapStateChunkQueue(); queue.add(responseMessage); @@ -337,125 +344,134 @@ public void processStateChunkResponse(SnapSyncState state, Peer peer, SnapStateC long nextExpectedFrom = state.getNextExpectedFrom(); logger.debug("CLIENT - State chunk dequeued from: {} - expected: {}", nextMessage.getFrom(), nextExpectedFrom); if (nextMessage.getFrom() == nextExpectedFrom) { - processOrderedStateChunkResponse(state, peer, queue.poll()); - state.setNextExpectedFrom(nextExpectedFrom + chunkSize * 1024L); + try { + processOrderedStateChunkResponse(state, peer, queue.poll()); + state.setNextExpectedFrom(nextExpectedFrom + chunkSize * CHUNK_ITEM_SIZE); + } catch (Exception e) { + logger.error("Error while processing chunk response. {}", e.getMessage(), e); + onStateChunkResponseError(peer, nextMessage); + } } else { break; } } if (!responseMessage.isComplete()) { + logger.debug("CLIENT - State chunk response not complete. Requesting next chunk."); executeNextChunkRequestTask(state, peer); } } - private void processOrderedStateChunkResponse(SnapSyncState state, Peer peer, SnapStateChunkResponseMessage message) { - try { - logger.debug("CLIENT - Processing State chunk received from: {}", message.getFrom()); - peersInformation.getOrRegisterPeer(peer); - state.onNewChunk(); - - RLPList nodeLists = RLP.decodeList(message.getChunkOfTrieKeyValue()); - final RLPList preRootElements = RLP.decodeList(nodeLists.get(0).getRLPData()); - final RLPList trieElements = RLP.decodeList(nodeLists.get(1).getRLPData()); - byte[] firstNodeLeftHash = nodeLists.get(2).getRLPData(); - final RLPList lastNodeHashes = RLP.decodeList(nodeLists.get(3).getRLPData()); - final RLPList postRootElements = RLP.decodeList(nodeLists.get(4).getRLPData()); - logger.debug( - "CLIENT - Received state chunk of {} elements ({} bytes).", - trieElements.size(), - message.getChunkOfTrieKeyValue().length - ); - List preRootNodes = new ArrayList<>(); - List nodes = new ArrayList<>(); - List postRootNodes = new ArrayList<>(); - - - for (int i = 0; i < preRootElements.size(); i++) { - final RLPList trieElement = (RLPList) preRootElements.get(i); - final byte[] value = trieElement.get(0).getRLPData(); - final byte[] leftHash = trieElement.get(1).getRLPData(); - TrieDTO node = TrieDTO.decodeFromSync(value); - node.setLeftHash(leftHash); - preRootNodes.add(node); - } + @VisibleForTesting + void onStateChunkResponseError(Peer peer, SnapStateChunkResponseMessage responseMessage) { + logger.error("Error while processing chunk response from {} of peer {}. Asking for chunk again.", responseMessage.getFrom(), peer.getPeerNodeID()); + Peer alternativePeer = peersInformation.getBestSnapPeerCandidates().stream() + .filter(listedPeer -> !listedPeer.getPeerNodeID().equals(peer.getPeerNodeID())) + .findFirst() + .orElse(peer); + logger.debug("Requesting state chunk \"from\" {} to peer {}", responseMessage.getFrom(), peer.getPeerNodeID()); + requestStateChunk(alternativePeer, responseMessage.getFrom(), responseMessage.getBlockNumber(), chunkSize); + } - if (trieElements.size() > 0) { - for (int i = 0; i < trieElements.size(); i++) { - final RLPElement trieElement = trieElements.get(i); - byte[] value = trieElement.getRLPData(); - nodes.add(TrieDTO.decodeFromSync(value)); - } - nodes.get(0).setLeftHash(firstNodeLeftHash); - } - if (lastNodeHashes.size() > 0) { - TrieDTO lastNode = nodes.get(nodes.size() - 1); - lastNode.setLeftHash(lastNodeHashes.get(0).getRLPData()); - lastNode.setRightHash(lastNodeHashes.get(1).getRLPData()); - } + private void processOrderedStateChunkResponse(SnapSyncState state, Peer peer, SnapStateChunkResponseMessage message) throws Exception { + logger.debug("CLIENT - Processing State chunk received from {} to {}", message.getFrom(), message.getTo()); + peersInformation.getOrRegisterPeer(peer); + state.onNewChunk(); + + RLPList nodeLists = RLP.decodeList(message.getChunkOfTrieKeyValue()); + final RLPList preRootElements = RLP.decodeList(nodeLists.get(0).getRLPData()); + final RLPList trieElements = RLP.decodeList(nodeLists.get(1).getRLPData()); + byte[] firstNodeLeftHash = nodeLists.get(2).getRLPData(); + final RLPList lastNodeHashes = RLP.decodeList(nodeLists.get(3).getRLPData()); + final RLPList postRootElements = RLP.decodeList(nodeLists.get(4).getRLPData()); + List preRootNodes = new ArrayList<>(); + List nodes = new ArrayList<>(); + List postRootNodes = new ArrayList<>(); + + + for (int i = 0; i < preRootElements.size(); i++) { + final RLPList trieElement = (RLPList) preRootElements.get(i); + final byte[] value = trieElement.get(0).getRLPData(); + final byte[] leftHash = trieElement.get(1).getRLPData(); + TrieDTO node = TrieDTO.decodeFromSync(value); + node.setLeftHash(leftHash); + preRootNodes.add(node); + } - for (int i = 0; i < postRootElements.size(); i++) { - final RLPList trieElement = (RLPList) postRootElements.get(i); - final byte[] value = trieElement.get(0).getRLPData(); - final byte[] rightHash = trieElement.get(1).getRLPData(); - TrieDTO node = TrieDTO.decodeFromSync(value); - node.setRightHash(rightHash); - postRootNodes.add(node); + if (trieElements.size() > 0) { + for (int i = 0; i < trieElements.size(); i++) { + final RLPElement trieElement = trieElements.get(i); + byte[] value = trieElement.getRLPData(); + nodes.add(TrieDTO.decodeFromSync(value)); } + nodes.get(0).setLeftHash(firstNodeLeftHash); + } - if (TrieDTOInOrderRecoverer.verifyChunk(state.getRemoteRootHash(), preRootNodes, nodes, postRootNodes)) { - state.getAllNodes().addAll(nodes); - state.setStateSize(state.getStateSize().add(BigInteger.valueOf(trieElements.size()))); - state.setStateChunkSize(state.getStateChunkSize().add(BigInteger.valueOf(message.getChunkOfTrieKeyValue().length))); - logger.debug("CLIENT - State progress: {} chunks ({} bytes)", state.getStateSize(), state.getStateChunkSize()); - if (!message.isComplete()) { - executeNextChunkRequestTask(state, peer); - } else { - rebuildStateAndSave(state); - logger.info("CLIENT - Snapshot sync finished!"); - stopSyncing(state); - } + if (lastNodeHashes.size() > 0) { + TrieDTO lastNode = nodes.get(nodes.size() - 1); + lastNode.setLeftHash(lastNodeHashes.get(0).getRLPData()); + lastNode.setRightHash(lastNodeHashes.get(1).getRLPData()); + } + + for (int i = 0; i < postRootElements.size(); i++) { + final RLPList trieElement = (RLPList) postRootElements.get(i); + final byte[] value = trieElement.get(0).getRLPData(); + final byte[] rightHash = trieElement.get(1).getRLPData(); + TrieDTO node = TrieDTO.decodeFromSync(value); + node.setRightHash(rightHash); + postRootNodes.add(node); + } + + if (TrieDTOInOrderRecoverer.verifyChunk(state.getRemoteRootHash(), preRootNodes, nodes, postRootNodes)) { + state.getAllNodes().addAll(nodes); + state.setStateSize(state.getStateSize().add(BigInteger.valueOf(trieElements.size()))); + state.setStateChunkSize(state.getStateChunkSize().add(BigInteger.valueOf(message.getChunkOfTrieKeyValue().length))); + if (!message.isComplete()) { + executeNextChunkRequestTask(state, peer); } else { - logger.error("Error while verifying chunk response: {}", message); - throw new Exception("Error verifying chunk."); + boolean result = rebuildStateAndSave(state); + logger.info("CLIENT - Snapshot sync finished {}! ", result ? "successfully" : "with errors"); + stopSyncing(state); } - } catch (Exception e) { - logger.error("Error while processing chunk response.", e); + } else { + logger.error("Error while verifying chunk response: {}", message); + throw new Exception("Error verifying chunk."); } } /** * Once state share is received, rebuild the trie, save it in db and save all the blocks. */ - private void rebuildStateAndSave(SnapSyncState state) { - logger.debug("CLIENT - State Completed! {} chunks ({} bytes) - chunk size = {}", - state.getStateSize(), state.getStateChunkSize(), this.chunkSize); + private boolean rebuildStateAndSave(SnapSyncState state) { + logger.info("CLIENT - Recovering trie..."); final TrieDTO[] nodeArray = state.getAllNodes().toArray(new TrieDTO[0]); - logger.debug("CLIENT - Recovering trie..."); Optional result = TrieDTOInOrderRecoverer.recoverTrie(nodeArray, this.trieStore::saveDTO); - if (!result.isPresent() || !Arrays.equals(state.getRemoteRootHash(), result.get().calculateHash())) { - logger.error("CLIENT - State final validation FAILED"); - } else { - logger.debug("CLIENT - State final validation OK!"); - } - logger.debug("CLIENT - Saving previous blocks..."); - this.blockchain.removeBlocksByNumber(0); - BlockConnectorHelper blockConnector = new BlockConnectorHelper(this.blockStore, state.getBlocks()); - blockConnector.startConnecting(); - logger.debug("CLIENT - Setting last block as best block..."); - this.blockchain.setStatus(state.getLastBlock(), state.getLastBlockDifficulty()); - this.transactionPool.setBestBlock(state.getLastBlock()); + if (result.isPresent() && Arrays.equals(state.getRemoteRootHash(), result.get().calculateHash())) { + logger.info("CLIENT - State final validation OK!"); + + this.blockchain.removeBlocksByNumber(0); + //genesis is removed so backwards sync will always start. + + BlockConnectorHelper blockConnector = new BlockConnectorHelper(this.blockStore); + state.connectBlocks(blockConnector); + logger.info("CLIENT - Setting last block as best block..."); + this.blockchain.setStatus(state.getLastBlock(), state.getLastBlockDifficulty()); + this.transactionPool.setBestBlock(state.getLastBlock()); + return true; + } + logger.error("CLIENT - State final validation FAILED"); + return false; } private void generateChunkRequestTasks(SnapSyncState state) { long from = 0; - logger.debug("Generating chunk request tasks..."); + logger.debug("Generating chunk request tasks... chunksize {}", chunkSize); while (from < state.getRemoteTrieSize()) { ChunkTask task = new ChunkTask(state.getLastBlock().getNumber(), from); state.getChunkTaskQueue().add(task); - from += chunkSize * 1024L; + from += chunkSize * CHUNK_ITEM_SIZE; } } diff --git a/rskj-core/src/main/java/co/rsk/net/sync/BlockConnectorHelper.java b/rskj-core/src/main/java/co/rsk/net/sync/BlockConnectorHelper.java index 99b30e6d9fd..58191074de5 100644 --- a/rskj-core/src/main/java/co/rsk/net/sync/BlockConnectorHelper.java +++ b/rskj-core/src/main/java/co/rsk/net/sync/BlockConnectorHelper.java @@ -30,27 +30,31 @@ public class BlockConnectorHelper { private static final Logger logger = LoggerFactory.getLogger("SnapBlockConnector"); private final BlockStore blockStore; - private final List> blockAndDifficultiesList; - public BlockConnectorHelper(BlockStore blockStore, List> blockAndDifficultiesList) { + public BlockConnectorHelper(BlockStore blockStore) { this.blockStore = blockStore; - this.blockAndDifficultiesList = blockAndDifficultiesList; - blockAndDifficultiesList.sort(new BlockAndDiffComparator()); } - public void startConnecting() { - Block child = null; - logger.info("Start connecting Blocks"); + public void startConnecting(List> blockAndDifficultiesList) { if (blockAndDifficultiesList.isEmpty()) { logger.debug("Block list is empty, nothing to connect"); return; } + + blockAndDifficultiesList.sort(new BlockAndDiffComparator()); + Block child = null; + logger.info("Start connecting blocks ranging from {} to {} - Total: {}", + blockAndDifficultiesList.get(0).getKey().getNumber(), + blockAndDifficultiesList.get(blockAndDifficultiesList.size() - 1).getKey().getNumber(), + blockAndDifficultiesList.size()); + int blockIndex = blockAndDifficultiesList.size() - 1; if (blockStore.isEmpty()) { - Pair blockAndDifficulty = blockAndDifficultiesList.get(blockIndex); + Pair blockAndDifficulty = blockAndDifficultiesList.get(blockIndex); child = blockAndDifficulty.getLeft(); logger.debug("BlockStore is empty, setting child block number the last block from the list: {}", child.getNumber()); blockStore.saveBlock(child, blockAndDifficulty.getRight(), true); + logger.debug("Block number: {} saved", child.getNumber()); blockIndex--; } else { logger.debug("BlockStore is not empty, getting best block"); @@ -58,9 +62,9 @@ public void startConnecting() { logger.debug("Best block number: {}", child.getNumber()); } while (blockIndex >= 0) { - Pair currentBlockAndDifficulty = blockAndDifficultiesList.get(blockIndex); + Pair currentBlockAndDifficulty = blockAndDifficultiesList.get(blockIndex); Block currentBlock = currentBlockAndDifficulty.getLeft(); - logger.info("Connecting block number: {}", currentBlock.getNumber()); + logger.trace("Connecting block number: {}", currentBlock.getNumber()); if (!currentBlock.isParentOf(child)) { throw new BlockConnectorException(currentBlock.getNumber(), child.getNumber()); @@ -69,13 +73,13 @@ public void startConnecting() { child = currentBlock; blockIndex--; } - logger.info("Finished connecting blocks"); + logger.info("Finished connecting blocks. Last saved block: {}",child.getNumber()); } - static class BlockAndDiffComparator implements java.util.Comparator> { + static class BlockAndDiffComparator implements java.util.Comparator> { @Override - public int compare(Pair o1, Pair o2) { - return Long.compare(o1.getLeft().getNumber(),o2.getLeft().getNumber()); + public int compare(Pair o1, Pair o2) { + return Long.compare(o1.getLeft().getNumber(), o2.getLeft().getNumber()); } } } diff --git a/rskj-core/src/main/java/co/rsk/net/sync/PeerAndModeDecidingSyncState.java b/rskj-core/src/main/java/co/rsk/net/sync/PeerAndModeDecidingSyncState.java index a238092c270..1389d0e9ad1 100644 --- a/rskj-core/src/main/java/co/rsk/net/sync/PeerAndModeDecidingSyncState.java +++ b/rskj-core/src/main/java/co/rsk/net/sync/PeerAndModeDecidingSyncState.java @@ -107,7 +107,7 @@ private boolean tryStartSnapshotSync() { } // we consider Snap as part of the Long Sync - if (!shouldSnapSync(peerBestBlockNumOpt.get())) { + if (!isValidSnapDistance(peerBestBlockNumOpt.get())) { logger.debug("Snap syncing not required (long sync not required)"); return false; } @@ -163,9 +163,9 @@ private boolean shouldLongSync(long peerBestBlockNumber) { return distanceToTip > syncConfiguration.getLongSyncLimit() || checkGenesisConnected(); } - private boolean shouldSnapSync(long peerBestBlockNumber) { + private boolean isValidSnapDistance(long peerBestBlockNumber) { long distanceToTip = peerBestBlockNumber - blockStore.getBestBlock().getNumber(); - return distanceToTip > syncConfiguration.getSnapshotSyncLimit() && syncConfiguration.isClientSnapSyncEnabled(); + return distanceToTip > syncConfiguration.getSnapshotSyncLimit(); } private Optional getPeerBestBlockNumber(Peer peer) { diff --git a/rskj-core/src/main/java/co/rsk/net/sync/SnapSyncState.java b/rskj-core/src/main/java/co/rsk/net/sync/SnapSyncState.java index 3b8d7d7f047..65494b86129 100644 --- a/rskj-core/src/main/java/co/rsk/net/sync/SnapSyncState.java +++ b/rskj-core/src/main/java/co/rsk/net/sync/SnapSyncState.java @@ -41,7 +41,7 @@ public class SnapSyncState extends BaseSyncState { - private static final Logger logger = LoggerFactory.getLogger("syncprocessor"); + private static final Logger logger = LoggerFactory.getLogger("SnapSyncState"); private final SnapshotProcessor snapshotProcessor; @@ -76,7 +76,7 @@ public SnapSyncState(SyncEventsHandler syncEventsHandler, SnapshotProcessor snap @VisibleForTesting SnapSyncState(SyncEventsHandler syncEventsHandler, SnapshotProcessor snapshotProcessor, - SyncConfiguration syncConfiguration, @Nullable SyncMessageHandler.Listener listener) { + SyncConfiguration syncConfiguration, @Nullable SyncMessageHandler.Listener listener) { super(syncEventsHandler, syncConfiguration); this.snapshotProcessor = snapshotProcessor; // TODO(snap-poc) code in SnapshotProcessor should be moved here probably this.allNodes = Lists.newArrayList(); @@ -207,8 +207,16 @@ public void setRemoteTrieSize(long remoteTrieSize) { this.remoteTrieSize = remoteTrieSize; } - public List> getBlocks() { - return blocks; + public void addBlock(Pair blockPair) { + blocks.add(blockPair); + } + + public void addAllBlocks(List> blocks) { + this.blocks.addAll(blocks); + } + + public void connectBlocks(BlockConnectorHelper blockConnectorHelper) { + blockConnectorHelper.startConnecting(blocks); } public List getAllNodes() { diff --git a/rskj-core/src/test/java/co/rsk/net/SnapshotProcessorTest.java b/rskj-core/src/test/java/co/rsk/net/SnapshotProcessorTest.java index 9e869d6a89b..0a6b126bb2d 100644 --- a/rskj-core/src/test/java/co/rsk/net/SnapshotProcessorTest.java +++ b/rskj-core/src/test/java/co/rsk/net/SnapshotProcessorTest.java @@ -29,6 +29,7 @@ import org.ethereum.core.Blockchain; import org.ethereum.core.TransactionPool; import org.ethereum.db.BlockStore; +import org.ethereum.util.RLP; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -104,7 +105,7 @@ void givenSnapStatusResponseCalled_thenSnapChunkRequestsAreMade() { TEST_CHUNK_SIZE, false); - for (long blockNumber = 0; blockNumber < blockchain.getSize(); blockNumber ++){ + for (long blockNumber = 0; blockNumber < blockchain.getSize(); blockNumber++) { Block currentBlock = blockchain.getBlockByNumber(blockNumber); blocks.add(currentBlock); difficulties.add(blockStore.getTotalDifficultyForHash(currentBlock.getHash().getBytes())); @@ -181,7 +182,7 @@ void givenSnapBlocksResponseReceived_thenSnapBlocksRequestMessageIsSent() { 200, false); - for (long blockNumber = 0; blockNumber < blockchain.getSize(); blockNumber ++){ + for (long blockNumber = 0; blockNumber < blockchain.getSize(); blockNumber++) { Block currentBlock = blockchain.getBlockByNumber(blockNumber); blocks.add(currentBlock); difficulties.add(blockStore.getTotalDifficultyForHash(currentBlock.getHash().getBytes())); @@ -216,7 +217,7 @@ void givenSnapStateChunkRequest_thenSnapStateChunkResponseMessageIsSent() { TEST_CHUNK_SIZE, false); - SnapStateChunkRequestMessage snapStateChunkRequestMessage = new SnapStateChunkRequestMessage(1L, 1L,1, TEST_CHUNK_SIZE); + SnapStateChunkRequestMessage snapStateChunkRequestMessage = new SnapStateChunkRequestMessage(1L, 1L, 1, TEST_CHUNK_SIZE); //when underTest.processStateChunkRequestInternal(peer, snapStateChunkRequestMessage); @@ -333,6 +334,35 @@ void processStateChunkRequestInternal(Peer sender, SnapStateChunkRequestMessage assertEquals(msg, jobArg.getValue().getMsg()); } + @Test + void givenErrorRLPData_thenOnStateChunkErrorIsCalled() { + underTest = new SnapshotProcessor( + blockchain, + trieStore, + peersInformation, + blockStore, + transactionPool, + TEST_CHUNK_SIZE, + false); + + PriorityQueue queue = new PriorityQueue<>( + Comparator.comparingLong(SnapStateChunkResponseMessage::getFrom)); + when(snapSyncState.getSnapStateChunkQueue()).thenReturn(queue); + when(snapSyncState.getChunkTaskQueue()).thenReturn(new LinkedList<>()); + SnapStateChunkResponseMessage responseMessage = mock(SnapStateChunkResponseMessage.class); + when(snapSyncState.getNextExpectedFrom()).thenReturn(1L); + when(responseMessage.getFrom()).thenReturn(1L); + when(responseMessage.getChunkOfTrieKeyValue()).thenReturn(RLP.encodedEmptyList()); + underTest = spy(underTest); + + underTest.processStateChunkResponse(snapSyncState, peer, responseMessage); + + verify(snapSyncState, times(1)).onNewChunk(); + verify(underTest, times(1)).onStateChunkResponseError(peer, responseMessage); + verify(peer, times(1)).sendMessage(any(SnapStateChunkRequestMessage.class)); + + } + private void initializeBlockchainWithAmountOfBlocks(int numberOfBlocks) { BlockChainBuilder blockChainBuilder = new BlockChainBuilder(); blockchain = blockChainBuilder.ofSize(numberOfBlocks); diff --git a/rskj-core/src/test/java/co/rsk/net/sync/BlockConnectorHelperTest.java b/rskj-core/src/test/java/co/rsk/net/sync/BlockConnectorHelperTest.java index f29fefb8e40..4f80a509a12 100644 --- a/rskj-core/src/test/java/co/rsk/net/sync/BlockConnectorHelperTest.java +++ b/rskj-core/src/test/java/co/rsk/net/sync/BlockConnectorHelperTest.java @@ -44,8 +44,8 @@ void setUp() { @Test void testStartConnectingWhenBlockListIsEmpty() { - blockConnectorHelper = new BlockConnectorHelper(blockStore, Collections.emptyList()); - blockConnectorHelper.startConnecting(); + blockConnectorHelper = new BlockConnectorHelper(blockStore); + blockConnectorHelper.startConnecting(Collections.emptyList()); verify(blockStore, never()).saveBlock(any(), any(), anyBoolean()); } @@ -69,9 +69,9 @@ void testStartConnectingWhenBlockStoreIsEmpty() { blockAndDifficultiesList = buildBlockDifficulties(Arrays.asList(block1, block2,block3), Arrays.asList(diff1, diff2,diff3)); - blockConnectorHelper = new BlockConnectorHelper(blockStore, blockAndDifficultiesList); + blockConnectorHelper = new BlockConnectorHelper(blockStore); - blockConnectorHelper.startConnecting(); + blockConnectorHelper.startConnecting(blockAndDifficultiesList); verify(blockStore, times(3)).saveBlock(blockCaptor.capture(), difficultyCaptor.capture(), anyBoolean()); verify(blockStore, times(0)).getBestBlock(); @@ -101,9 +101,9 @@ void testStartConnectingWhenBlockStoreIsEmptyAndNotOrderedList() { blockAndDifficultiesList = buildBlockDifficulties(Arrays.asList(block2, block1), Arrays.asList(diff2, diff1)); - blockConnectorHelper = new BlockConnectorHelper(blockStore, blockAndDifficultiesList); + blockConnectorHelper = new BlockConnectorHelper(blockStore); - blockConnectorHelper.startConnecting(); + blockConnectorHelper.startConnecting(blockAndDifficultiesList); verify(blockStore, times(2)).saveBlock(blockCaptor.capture(), difficultyCaptor.capture(), anyBoolean()); verify(blockStore, times(0)).getBestBlock(); @@ -131,9 +131,9 @@ void testStartConnectingWhenBlockStoreIsNotEmpty() { when(blockStore.getBestBlock()).thenReturn(block3); blockAndDifficultiesList = buildBlockDifficulties(Arrays.asList(block1, block2), Arrays.asList(mock(BlockDifficulty.class), mock(BlockDifficulty.class))); - blockConnectorHelper = new BlockConnectorHelper(blockStore, blockAndDifficultiesList); + blockConnectorHelper = new BlockConnectorHelper(blockStore); - blockConnectorHelper.startConnecting(); + blockConnectorHelper.startConnecting(blockAndDifficultiesList); verify(blockStore, times(1)).getBestBlock(); verify(blockStore, times(2)).saveBlock(any(), any(), anyBoolean()); } @@ -148,12 +148,12 @@ void whenBlockIsNotParentOfExistingBestBlock() { blockAndDifficultiesList = buildBlockDifficulties(Collections.singletonList(block2), Collections.singletonList(mock(BlockDifficulty.class))); - blockConnectorHelper = new BlockConnectorHelper(blockStore, blockAndDifficultiesList); + blockConnectorHelper = new BlockConnectorHelper(blockStore); when(blockStore.isEmpty()).thenReturn(false); when(blockStore.getBestBlock()).thenReturn(block3); - assertThrows(BlockConnectorException.class, () -> blockConnectorHelper.startConnecting()); + assertThrows(BlockConnectorException.class, () -> blockConnectorHelper.startConnecting(blockAndDifficultiesList)); } @Test @@ -166,9 +166,9 @@ void testStartConnectingWhenBlockIsNotParentOfChild() { when(blockStore.isEmpty()).thenReturn(true); blockAndDifficultiesList = buildBlockDifficulties(Arrays.asList(block1, block2), Arrays.asList(mock(BlockDifficulty.class), mock(BlockDifficulty.class))); - blockConnectorHelper = new BlockConnectorHelper(blockStore, blockAndDifficultiesList); + blockConnectorHelper = new BlockConnectorHelper(blockStore); - assertThrows(BlockConnectorException.class, () -> blockConnectorHelper.startConnecting()); + assertThrows(BlockConnectorException.class, () -> blockConnectorHelper.startConnecting(blockAndDifficultiesList)); } List> buildBlockDifficulties(List blocks, List difficulties) { diff --git a/rskj-core/src/test/java/co/rsk/net/sync/SnapSyncStateTest.java b/rskj-core/src/test/java/co/rsk/net/sync/SnapSyncStateTest.java index 9b51929c4e5..8ce31717683 100644 --- a/rskj-core/src/test/java/co/rsk/net/sync/SnapSyncStateTest.java +++ b/rskj-core/src/test/java/co/rsk/net/sync/SnapSyncStateTest.java @@ -18,29 +18,35 @@ */ package co.rsk.net.sync; +import co.rsk.core.BlockDifficulty; import co.rsk.net.NodeID; import co.rsk.net.Peer; import co.rsk.net.SnapshotProcessor; import co.rsk.net.messages.SnapBlocksResponseMessage; import co.rsk.net.messages.SnapStateChunkResponseMessage; import co.rsk.net.messages.SnapStatusResponseMessage; +import org.apache.commons.lang3.tuple.Pair; +import org.ethereum.core.Block; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; +import java.math.BigInteger; import java.net.InetAddress; import java.net.UnknownHostException; import java.time.Duration; +import java.util.List; import java.util.Optional; +import java.util.PriorityQueue; +import java.util.Queue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; class SnapSyncStateTest { @@ -56,8 +62,8 @@ class SnapSyncStateTest { private final SnapSyncState underTest = new SnapSyncState(syncEventsHandler, snapshotProcessor, syncConfiguration, listener); @BeforeEach - void setUp(){ - reset(syncEventsHandler,peersInformation, snapshotProcessor); + void setUp() { + reset(syncEventsHandler, peersInformation, snapshotProcessor); } @AfterEach @@ -66,7 +72,7 @@ void tearDown() { } @Test - void givenOnEnterWasCalledAndNotRunningYet_thenSyncingStartsWithTestObjectAsParameter(){ + void givenOnEnterWasCalledAndNotRunningYet_thenSyncingStartsWithTestObjectAsParameter() { //given-when underTest.onEnter(); //then @@ -74,7 +80,7 @@ void givenOnEnterWasCalledAndNotRunningYet_thenSyncingStartsWithTestObjectAsPara } @Test - void givenFinishWasCalledTwice_thenStopSyncingOnlyOnce(){ + void givenFinishWasCalledTwice_thenStopSyncingOnlyOnce() { //given-when underTest.setRunning(); underTest.finish(); @@ -84,7 +90,7 @@ void givenFinishWasCalledTwice_thenStopSyncingOnlyOnce(){ } @Test - void givenOnEnterWasCalledTwice_thenSyncingStartsOnlyOnce(){ + void givenOnEnterWasCalledTwice_thenSyncingStartsOnlyOnce() { //given-when underTest.onEnter(); underTest.onEnter(); @@ -93,7 +99,7 @@ void givenOnEnterWasCalledTwice_thenSyncingStartsOnlyOnce(){ } @Test - void givenOnMessageTimeOutCalled_thenSyncingStops(){ + void givenOnMessageTimeOutCalled_thenSyncingStops() { //given-when underTest.setRunning(); underTest.onMessageTimeOut(); @@ -102,7 +108,7 @@ void givenOnMessageTimeOutCalled_thenSyncingStops(){ } @Test - void givenNewChunk_thenTimerIsReset(){ + void givenNewChunk_thenTimerIsReset() { //given underTest.timeElapsed = Duration.ofMinutes(1); assertThat(underTest.timeElapsed, greaterThan(Duration.ZERO)); @@ -114,7 +120,7 @@ void givenNewChunk_thenTimerIsReset(){ } @Test - void givenTickIsCalledBeforeTimeout_thenTimerIsUpdated_andNoTimeoutHappens(){ + void givenTickIsCalledBeforeTimeout_thenTimerIsUpdated_andNoTimeoutHappens() { //given Duration elapsedTime = Duration.ofMillis(10); underTest.timeElapsed = Duration.ZERO; @@ -123,7 +129,7 @@ void givenTickIsCalledBeforeTimeout_thenTimerIsUpdated_andNoTimeoutHappens(){ //then assertThat(underTest.timeElapsed, equalTo(elapsedTime)); verify(syncEventsHandler, never()).stopSyncing(); - verify(syncEventsHandler, never()).onErrorSyncing(any(),any(),any(),any()); + verify(syncEventsHandler, never()).onErrorSyncing(any(), any(), any(), any()); } @Test @@ -145,7 +151,7 @@ void givenTickIsCalledAfterTimeout_thenTimerIsUpdated_andTimeoutHappens() throws } @Test - void givenFinishIsCalled_thenSyncEventHandlerStopsSync(){ + void givenFinishIsCalled_thenSyncEventHandlerStopsSync() { //given-when underTest.setRunning(); underTest.finish(); @@ -219,10 +225,86 @@ void givenOnSnapStateChunkIsCalled_thenJobIsAddedAndRun() throws InterruptedExce assertEquals(msg, jobArg.getValue().getMsg()); } + @Test + void testSetAndGetLastBlock() { + Block mockBlock = mock(Block.class); + underTest.setLastBlock(mockBlock); + assertEquals(mockBlock, underTest.getLastBlock()); + } + + @Test + void testSetAndGetStateChunkSize() { + BigInteger expectedSize = BigInteger.valueOf(100L); + underTest.setStateChunkSize(expectedSize); + assertEquals(expectedSize, underTest.getStateChunkSize()); + } + + @Test + void testSetAndGetStateSize() { + BigInteger expectedSize = BigInteger.valueOf(1000L); + underTest.setStateSize(expectedSize); + assertEquals(expectedSize, underTest.getStateSize()); + } + + @Test + void testGetChunkTaskQueue() { + Queue queue = underTest.getChunkTaskQueue(); + assertNotNull(queue); + } + + @Test + void testSetAndGetNextExpectedFrom() { + long expectedValue = 100L; + underTest.setNextExpectedFrom(expectedValue); + assertEquals(expectedValue, underTest.getNextExpectedFrom()); + } + private static void doCountDownOnQueueEmpty(SyncMessageHandler.Listener listener, CountDownLatch latch) { doAnswer(invocation -> { latch.countDown(); return null; }).when(listener).onQueueEmpty(); } + + @Test + void testGetSnapStateChunkQueue() { + PriorityQueue queue = underTest.getSnapStateChunkQueue(); + assertNotNull(queue); + } + + @Test + void testSetAndGetLastBlockDifficulty() { + BlockDifficulty mockBlockDifficulty = mock(BlockDifficulty.class); + underTest.setLastBlockDifficulty(mockBlockDifficulty); + assertEquals(mockBlockDifficulty, underTest.getLastBlockDifficulty()); + } + + @Test + void testSetAndGetRemoteRootHash() { + byte[] mockRootHash = new byte[]{1, 2, 3}; + underTest.setRemoteRootHash(mockRootHash); + assertArrayEquals(mockRootHash, underTest.getRemoteRootHash()); + } + + @Test + void testSetAndGetRemoteTrieSize() { + long expectedSize = 12345L; + underTest.setRemoteTrieSize(expectedSize); + assertEquals(expectedSize, underTest.getRemoteTrieSize()); + } + + @Test + void testConnectBlocks() { + BlockConnectorHelper blockConnectorHelper = mock(BlockConnectorHelper.class); + Pair mockBlockPair = mock(Pair.class); + underTest.addBlock(mockBlockPair); + ArgumentCaptor>> captor = ArgumentCaptor.forClass(List.class); + + underTest.connectBlocks(blockConnectorHelper); + + verify(blockConnectorHelper, times(1)).startConnecting(captor.capture()); + assertTrue(captor.getValue().contains(mockBlockPair)); + } + + }