Skip to content

Commit

Permalink
Merge pull request #2666 from rsksmart/add_on_error_handler_for_snap_…
Browse files Browse the repository at this point in the history
…blocks

Add on error handler for snap blocks
  • Loading branch information
fmacleal authored Aug 21, 2024
2 parents 80483ff + 4460e83 commit 8fc7aa0
Show file tree
Hide file tree
Showing 8 changed files with 298 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void whenStartTheServerAndClientNodes_thenTheClientWillSynchWithServer()
boolean isClientSynced = false;

while (System.currentTimeMillis() < endTime) {
if (clientNode.getOutput().contains("CLIENT - Starting Snapshot sync.") && clientNode.getOutput().contains("CLIENT - Snapshot sync finished!")) {
if (clientNode.getOutput().contains("CLIENT - Starting Snapshot sync.") && clientNode.getOutput().contains("CLIENT - Snapshot sync finished successfully!")) {
try {
JsonNode jsonResponse = OkHttpClientTestFixture.getJsonResponseForGetBestBlockMessage(portClientRpc, serverBestBlockNumber);
JsonNode jsonResult = jsonResponse.get(0).get("result");
Expand Down
234 changes: 125 additions & 109 deletions rskj-core/src/main/java/co/rsk/net/SnapshotProcessor.java

Large diffs are not rendered by default.

32 changes: 18 additions & 14 deletions rskj-core/src/main/java/co/rsk/net/sync/BlockConnectorHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,37 +30,41 @@
public class BlockConnectorHelper {
private static final Logger logger = LoggerFactory.getLogger("SnapBlockConnector");
private final BlockStore blockStore;
private final List<Pair<Block,BlockDifficulty>> blockAndDifficultiesList;

public BlockConnectorHelper(BlockStore blockStore, List<Pair<Block,BlockDifficulty>> blockAndDifficultiesList) {
public BlockConnectorHelper(BlockStore blockStore) {
this.blockStore = blockStore;
this.blockAndDifficultiesList = blockAndDifficultiesList;
blockAndDifficultiesList.sort(new BlockAndDiffComparator());
}

public void startConnecting() {
Block child = null;
logger.info("Start connecting Blocks");
public void startConnecting(List<Pair<Block, BlockDifficulty>> blockAndDifficultiesList) {
if (blockAndDifficultiesList.isEmpty()) {
logger.debug("Block list is empty, nothing to connect");
return;
}

blockAndDifficultiesList.sort(new BlockAndDiffComparator());
Block child = null;
logger.info("Start connecting blocks ranging from {} to {} - Total: {}",
blockAndDifficultiesList.get(0).getKey().getNumber(),
blockAndDifficultiesList.get(blockAndDifficultiesList.size() - 1).getKey().getNumber(),
blockAndDifficultiesList.size());

int blockIndex = blockAndDifficultiesList.size() - 1;
if (blockStore.isEmpty()) {
Pair<Block,BlockDifficulty> blockAndDifficulty = blockAndDifficultiesList.get(blockIndex);
Pair<Block, BlockDifficulty> blockAndDifficulty = blockAndDifficultiesList.get(blockIndex);
child = blockAndDifficulty.getLeft();
logger.debug("BlockStore is empty, setting child block number the last block from the list: {}", child.getNumber());
blockStore.saveBlock(child, blockAndDifficulty.getRight(), true);
logger.debug("Block number: {} saved", child.getNumber());
blockIndex--;
} else {
logger.debug("BlockStore is not empty, getting best block");
child = blockStore.getBestBlock();
logger.debug("Best block number: {}", child.getNumber());
}
while (blockIndex >= 0) {
Pair<Block,BlockDifficulty> currentBlockAndDifficulty = blockAndDifficultiesList.get(blockIndex);
Pair<Block, BlockDifficulty> currentBlockAndDifficulty = blockAndDifficultiesList.get(blockIndex);
Block currentBlock = currentBlockAndDifficulty.getLeft();
logger.info("Connecting block number: {}", currentBlock.getNumber());
logger.trace("Connecting block number: {}", currentBlock.getNumber());

if (!currentBlock.isParentOf(child)) {
throw new BlockConnectorException(currentBlock.getNumber(), child.getNumber());
Expand All @@ -69,13 +73,13 @@ public void startConnecting() {
child = currentBlock;
blockIndex--;
}
logger.info("Finished connecting blocks");
logger.info("Finished connecting blocks. Last saved block: {}",child.getNumber());
}

static class BlockAndDiffComparator implements java.util.Comparator<Pair<Block,BlockDifficulty>> {
static class BlockAndDiffComparator implements java.util.Comparator<Pair<Block, BlockDifficulty>> {
@Override
public int compare(Pair<Block,BlockDifficulty> o1, Pair<Block,BlockDifficulty> o2) {
return Long.compare(o1.getLeft().getNumber(),o2.getLeft().getNumber());
public int compare(Pair<Block, BlockDifficulty> o1, Pair<Block, BlockDifficulty> o2) {
return Long.compare(o1.getLeft().getNumber(), o2.getLeft().getNumber());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ private boolean tryStartSnapshotSync() {
}

// we consider Snap as part of the Long Sync
if (!shouldSnapSync(peerBestBlockNumOpt.get())) {
if (!isValidSnapDistance(peerBestBlockNumOpt.get())) {
logger.debug("Snap syncing not required (long sync not required)");
return false;
}
Expand Down Expand Up @@ -163,9 +163,9 @@ private boolean shouldLongSync(long peerBestBlockNumber) {
return distanceToTip > syncConfiguration.getLongSyncLimit() || checkGenesisConnected();
}

private boolean shouldSnapSync(long peerBestBlockNumber) {
private boolean isValidSnapDistance(long peerBestBlockNumber) {
long distanceToTip = peerBestBlockNumber - blockStore.getBestBlock().getNumber();
return distanceToTip > syncConfiguration.getSnapshotSyncLimit() && syncConfiguration.isClientSnapSyncEnabled();
return distanceToTip > syncConfiguration.getSnapshotSyncLimit();
}

private Optional<Long> getPeerBestBlockNumber(Peer peer) {
Expand Down
16 changes: 12 additions & 4 deletions rskj-core/src/main/java/co/rsk/net/sync/SnapSyncState.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

public class SnapSyncState extends BaseSyncState {

private static final Logger logger = LoggerFactory.getLogger("syncprocessor");
private static final Logger logger = LoggerFactory.getLogger("SnapSyncState");

private final SnapshotProcessor snapshotProcessor;

Expand Down Expand Up @@ -76,7 +76,7 @@ public SnapSyncState(SyncEventsHandler syncEventsHandler, SnapshotProcessor snap

@VisibleForTesting
SnapSyncState(SyncEventsHandler syncEventsHandler, SnapshotProcessor snapshotProcessor,
SyncConfiguration syncConfiguration, @Nullable SyncMessageHandler.Listener listener) {
SyncConfiguration syncConfiguration, @Nullable SyncMessageHandler.Listener listener) {
super(syncEventsHandler, syncConfiguration);
this.snapshotProcessor = snapshotProcessor; // TODO(snap-poc) code in SnapshotProcessor should be moved here probably
this.allNodes = Lists.newArrayList();
Expand Down Expand Up @@ -207,8 +207,16 @@ public void setRemoteTrieSize(long remoteTrieSize) {
this.remoteTrieSize = remoteTrieSize;
}

public List<Pair<Block, BlockDifficulty>> getBlocks() {
return blocks;
public void addBlock(Pair<Block, BlockDifficulty> blockPair) {
blocks.add(blockPair);
}

public void addAllBlocks(List<Pair<Block, BlockDifficulty>> blocks) {
this.blocks.addAll(blocks);
}

public void connectBlocks(BlockConnectorHelper blockConnectorHelper) {
blockConnectorHelper.startConnecting(blocks);
}

public List<TrieDTO> getAllNodes() {

Check notice

Code scanning / CodeQL

Exposing internal representation Note

getAllNodes exposes the internal representation stored in field allNodes. The value may be modified
after this call to getAllNodes
.
Expand Down
36 changes: 33 additions & 3 deletions rskj-core/src/test/java/co/rsk/net/SnapshotProcessorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.ethereum.core.Blockchain;
import org.ethereum.core.TransactionPool;
import org.ethereum.db.BlockStore;
import org.ethereum.util.RLP;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -104,7 +105,7 @@ void givenSnapStatusResponseCalled_thenSnapChunkRequestsAreMade() {
TEST_CHUNK_SIZE,
false);

for (long blockNumber = 0; blockNumber < blockchain.getSize(); blockNumber ++){
for (long blockNumber = 0; blockNumber < blockchain.getSize(); blockNumber++) {
Block currentBlock = blockchain.getBlockByNumber(blockNumber);
blocks.add(currentBlock);
difficulties.add(blockStore.getTotalDifficultyForHash(currentBlock.getHash().getBytes()));
Expand Down Expand Up @@ -181,7 +182,7 @@ void givenSnapBlocksResponseReceived_thenSnapBlocksRequestMessageIsSent() {
200,
false);

for (long blockNumber = 0; blockNumber < blockchain.getSize(); blockNumber ++){
for (long blockNumber = 0; blockNumber < blockchain.getSize(); blockNumber++) {
Block currentBlock = blockchain.getBlockByNumber(blockNumber);
blocks.add(currentBlock);
difficulties.add(blockStore.getTotalDifficultyForHash(currentBlock.getHash().getBytes()));
Expand Down Expand Up @@ -216,7 +217,7 @@ void givenSnapStateChunkRequest_thenSnapStateChunkResponseMessageIsSent() {
TEST_CHUNK_SIZE,
false);

SnapStateChunkRequestMessage snapStateChunkRequestMessage = new SnapStateChunkRequestMessage(1L, 1L,1, TEST_CHUNK_SIZE);
SnapStateChunkRequestMessage snapStateChunkRequestMessage = new SnapStateChunkRequestMessage(1L, 1L, 1, TEST_CHUNK_SIZE);

//when
underTest.processStateChunkRequestInternal(peer, snapStateChunkRequestMessage);
Expand Down Expand Up @@ -333,6 +334,35 @@ void processStateChunkRequestInternal(Peer sender, SnapStateChunkRequestMessage
assertEquals(msg, jobArg.getValue().getMsg());
}

@Test
void givenErrorRLPData_thenOnStateChunkErrorIsCalled() {
underTest = new SnapshotProcessor(
blockchain,
trieStore,
peersInformation,
blockStore,
transactionPool,
TEST_CHUNK_SIZE,
false);

PriorityQueue<SnapStateChunkResponseMessage> queue = new PriorityQueue<>(
Comparator.comparingLong(SnapStateChunkResponseMessage::getFrom));
when(snapSyncState.getSnapStateChunkQueue()).thenReturn(queue);
when(snapSyncState.getChunkTaskQueue()).thenReturn(new LinkedList<>());
SnapStateChunkResponseMessage responseMessage = mock(SnapStateChunkResponseMessage.class);
when(snapSyncState.getNextExpectedFrom()).thenReturn(1L);
when(responseMessage.getFrom()).thenReturn(1L);
when(responseMessage.getChunkOfTrieKeyValue()).thenReturn(RLP.encodedEmptyList());
underTest = spy(underTest);

underTest.processStateChunkResponse(snapSyncState, peer, responseMessage);

verify(snapSyncState, times(1)).onNewChunk();
verify(underTest, times(1)).onStateChunkResponseError(peer, responseMessage);
verify(peer, times(1)).sendMessage(any(SnapStateChunkRequestMessage.class));

}

private void initializeBlockchainWithAmountOfBlocks(int numberOfBlocks) {
BlockChainBuilder blockChainBuilder = new BlockChainBuilder();
blockchain = blockChainBuilder.ofSize(numberOfBlocks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ void setUp() {

@Test
void testStartConnectingWhenBlockListIsEmpty() {
blockConnectorHelper = new BlockConnectorHelper(blockStore, Collections.emptyList());
blockConnectorHelper.startConnecting();
blockConnectorHelper = new BlockConnectorHelper(blockStore);
blockConnectorHelper.startConnecting(Collections.emptyList());
verify(blockStore, never()).saveBlock(any(), any(), anyBoolean());
}

Expand All @@ -69,9 +69,9 @@ void testStartConnectingWhenBlockStoreIsEmpty() {
blockAndDifficultiesList = buildBlockDifficulties(Arrays.asList(block1, block2,block3),
Arrays.asList(diff1, diff2,diff3));

blockConnectorHelper = new BlockConnectorHelper(blockStore, blockAndDifficultiesList);
blockConnectorHelper = new BlockConnectorHelper(blockStore);

blockConnectorHelper.startConnecting();
blockConnectorHelper.startConnecting(blockAndDifficultiesList);

verify(blockStore, times(3)).saveBlock(blockCaptor.capture(), difficultyCaptor.capture(), anyBoolean());
verify(blockStore, times(0)).getBestBlock();
Expand Down Expand Up @@ -101,9 +101,9 @@ void testStartConnectingWhenBlockStoreIsEmptyAndNotOrderedList() {
blockAndDifficultiesList = buildBlockDifficulties(Arrays.asList(block2, block1),
Arrays.asList(diff2, diff1));

blockConnectorHelper = new BlockConnectorHelper(blockStore, blockAndDifficultiesList);
blockConnectorHelper = new BlockConnectorHelper(blockStore);

blockConnectorHelper.startConnecting();
blockConnectorHelper.startConnecting(blockAndDifficultiesList);

verify(blockStore, times(2)).saveBlock(blockCaptor.capture(), difficultyCaptor.capture(), anyBoolean());
verify(blockStore, times(0)).getBestBlock();
Expand Down Expand Up @@ -131,9 +131,9 @@ void testStartConnectingWhenBlockStoreIsNotEmpty() {
when(blockStore.getBestBlock()).thenReturn(block3);

blockAndDifficultiesList = buildBlockDifficulties(Arrays.asList(block1, block2), Arrays.asList(mock(BlockDifficulty.class), mock(BlockDifficulty.class)));
blockConnectorHelper = new BlockConnectorHelper(blockStore, blockAndDifficultiesList);
blockConnectorHelper = new BlockConnectorHelper(blockStore);

blockConnectorHelper.startConnecting();
blockConnectorHelper.startConnecting(blockAndDifficultiesList);
verify(blockStore, times(1)).getBestBlock();
verify(blockStore, times(2)).saveBlock(any(), any(), anyBoolean());
}
Expand All @@ -148,12 +148,12 @@ void whenBlockIsNotParentOfExistingBestBlock() {
blockAndDifficultiesList = buildBlockDifficulties(Collections.singletonList(block2),
Collections.singletonList(mock(BlockDifficulty.class)));

blockConnectorHelper = new BlockConnectorHelper(blockStore, blockAndDifficultiesList);
blockConnectorHelper = new BlockConnectorHelper(blockStore);

when(blockStore.isEmpty()).thenReturn(false);
when(blockStore.getBestBlock()).thenReturn(block3);

assertThrows(BlockConnectorException.class, () -> blockConnectorHelper.startConnecting());
assertThrows(BlockConnectorException.class, () -> blockConnectorHelper.startConnecting(blockAndDifficultiesList));
}

@Test
Expand All @@ -166,9 +166,9 @@ void testStartConnectingWhenBlockIsNotParentOfChild() {
when(blockStore.isEmpty()).thenReturn(true);
blockAndDifficultiesList = buildBlockDifficulties(Arrays.asList(block1, block2),
Arrays.asList(mock(BlockDifficulty.class), mock(BlockDifficulty.class)));
blockConnectorHelper = new BlockConnectorHelper(blockStore, blockAndDifficultiesList);
blockConnectorHelper = new BlockConnectorHelper(blockStore);

assertThrows(BlockConnectorException.class, () -> blockConnectorHelper.startConnecting());
assertThrows(BlockConnectorException.class, () -> blockConnectorHelper.startConnecting(blockAndDifficultiesList));
}

List<Pair<Block,BlockDifficulty>> buildBlockDifficulties(List<Block> blocks, List<BlockDifficulty> difficulties) {
Expand Down
Loading

0 comments on commit 8fc7aa0

Please sign in to comment.