Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add on error handler for snap blocks #2666

Merged
merged 5 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void whenStartTheServerAndClientNodes_thenTheClientWillSynchWithServer()
boolean isClientSynced = false;

while (System.currentTimeMillis() < endTime) {
if (clientNode.getOutput().contains("CLIENT - Starting Snapshot sync.") && clientNode.getOutput().contains("CLIENT - Snapshot sync finished!")) {
if (clientNode.getOutput().contains("CLIENT - Starting Snapshot sync.") && clientNode.getOutput().contains("CLIENT - Snapshot sync finished successfully!")) {
try {
JsonNode jsonResponse = OkHttpClientTestFixture.getJsonResponseForGetBestBlockMessage(portClientRpc, serverBestBlockNumber);
JsonNode jsonResult = jsonResponse.get(0).get("result");
Expand Down
234 changes: 125 additions & 109 deletions rskj-core/src/main/java/co/rsk/net/SnapshotProcessor.java

Large diffs are not rendered by default.

32 changes: 18 additions & 14 deletions rskj-core/src/main/java/co/rsk/net/sync/BlockConnectorHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,37 +30,41 @@
public class BlockConnectorHelper {
private static final Logger logger = LoggerFactory.getLogger("SnapBlockConnector");
private final BlockStore blockStore;
private final List<Pair<Block,BlockDifficulty>> blockAndDifficultiesList;

public BlockConnectorHelper(BlockStore blockStore, List<Pair<Block,BlockDifficulty>> blockAndDifficultiesList) {
public BlockConnectorHelper(BlockStore blockStore) {
this.blockStore = blockStore;
this.blockAndDifficultiesList = blockAndDifficultiesList;
blockAndDifficultiesList.sort(new BlockAndDiffComparator());
}

public void startConnecting() {
Block child = null;
logger.info("Start connecting Blocks");
public void startConnecting(List<Pair<Block, BlockDifficulty>> blockAndDifficultiesList) {
if (blockAndDifficultiesList.isEmpty()) {
logger.debug("Block list is empty, nothing to connect");
return;
}

blockAndDifficultiesList.sort(new BlockAndDiffComparator());
Block child = null;
logger.info("Start connecting blocks ranging from {} to {} - Total: {}",
blockAndDifficultiesList.get(0).getKey().getNumber(),
blockAndDifficultiesList.get(blockAndDifficultiesList.size() - 1).getKey().getNumber(),
blockAndDifficultiesList.size());

int blockIndex = blockAndDifficultiesList.size() - 1;
if (blockStore.isEmpty()) {
Pair<Block,BlockDifficulty> blockAndDifficulty = blockAndDifficultiesList.get(blockIndex);
Pair<Block, BlockDifficulty> blockAndDifficulty = blockAndDifficultiesList.get(blockIndex);
child = blockAndDifficulty.getLeft();
logger.debug("BlockStore is empty, setting child block number the last block from the list: {}", child.getNumber());
blockStore.saveBlock(child, blockAndDifficulty.getRight(), true);
logger.debug("Block number: {} saved", child.getNumber());
blockIndex--;
} else {
logger.debug("BlockStore is not empty, getting best block");
child = blockStore.getBestBlock();
logger.debug("Best block number: {}", child.getNumber());
}
while (blockIndex >= 0) {
Pair<Block,BlockDifficulty> currentBlockAndDifficulty = blockAndDifficultiesList.get(blockIndex);
Pair<Block, BlockDifficulty> currentBlockAndDifficulty = blockAndDifficultiesList.get(blockIndex);
Block currentBlock = currentBlockAndDifficulty.getLeft();
logger.info("Connecting block number: {}", currentBlock.getNumber());
logger.trace("Connecting block number: {}", currentBlock.getNumber());

if (!currentBlock.isParentOf(child)) {
throw new BlockConnectorException(currentBlock.getNumber(), child.getNumber());
Expand All @@ -69,13 +73,13 @@ public void startConnecting() {
child = currentBlock;
blockIndex--;
}
logger.info("Finished connecting blocks");
logger.info("Finished connecting blocks. Last saved block: {}",child.getNumber());
}

static class BlockAndDiffComparator implements java.util.Comparator<Pair<Block,BlockDifficulty>> {
static class BlockAndDiffComparator implements java.util.Comparator<Pair<Block, BlockDifficulty>> {
@Override
public int compare(Pair<Block,BlockDifficulty> o1, Pair<Block,BlockDifficulty> o2) {
return Long.compare(o1.getLeft().getNumber(),o2.getLeft().getNumber());
public int compare(Pair<Block, BlockDifficulty> o1, Pair<Block, BlockDifficulty> o2) {
return Long.compare(o1.getLeft().getNumber(), o2.getLeft().getNumber());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ private boolean tryStartSnapshotSync() {
}

// we consider Snap as part of the Long Sync
if (!shouldSnapSync(peerBestBlockNumOpt.get())) {
if (!isValidSnapDistance(peerBestBlockNumOpt.get())) {
logger.debug("Snap syncing not required (long sync not required)");
return false;
}
Expand Down Expand Up @@ -163,9 +163,9 @@ private boolean shouldLongSync(long peerBestBlockNumber) {
return distanceToTip > syncConfiguration.getLongSyncLimit() || checkGenesisConnected();
}

private boolean shouldSnapSync(long peerBestBlockNumber) {
private boolean isValidSnapDistance(long peerBestBlockNumber) {
long distanceToTip = peerBestBlockNumber - blockStore.getBestBlock().getNumber();
return distanceToTip > syncConfiguration.getSnapshotSyncLimit() && syncConfiguration.isClientSnapSyncEnabled();
return distanceToTip > syncConfiguration.getSnapshotSyncLimit();
}

private Optional<Long> getPeerBestBlockNumber(Peer peer) {
Expand Down
16 changes: 12 additions & 4 deletions rskj-core/src/main/java/co/rsk/net/sync/SnapSyncState.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

public class SnapSyncState extends BaseSyncState {

private static final Logger logger = LoggerFactory.getLogger("syncprocessor");
private static final Logger logger = LoggerFactory.getLogger("SnapSyncState");

private final SnapshotProcessor snapshotProcessor;

Expand Down Expand Up @@ -76,7 +76,7 @@ public SnapSyncState(SyncEventsHandler syncEventsHandler, SnapshotProcessor snap

@VisibleForTesting
SnapSyncState(SyncEventsHandler syncEventsHandler, SnapshotProcessor snapshotProcessor,
SyncConfiguration syncConfiguration, @Nullable SyncMessageHandler.Listener listener) {
SyncConfiguration syncConfiguration, @Nullable SyncMessageHandler.Listener listener) {
super(syncEventsHandler, syncConfiguration);
this.snapshotProcessor = snapshotProcessor; // TODO(snap-poc) code in SnapshotProcessor should be moved here probably
this.allNodes = Lists.newArrayList();
Expand Down Expand Up @@ -207,8 +207,16 @@ public void setRemoteTrieSize(long remoteTrieSize) {
this.remoteTrieSize = remoteTrieSize;
}

public List<Pair<Block, BlockDifficulty>> getBlocks() {
return blocks;
public void addBlock(Pair<Block, BlockDifficulty> blockPair) {
blocks.add(blockPair);
}

public void addAllBlocks(List<Pair<Block, BlockDifficulty>> blocks) {
this.blocks.addAll(blocks);
}

public void connectBlocks(BlockConnectorHelper blockConnectorHelper) {
blockConnectorHelper.startConnecting(blocks);
}

public List<TrieDTO> getAllNodes() {
Expand Down
36 changes: 33 additions & 3 deletions rskj-core/src/test/java/co/rsk/net/SnapshotProcessorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.ethereum.core.Blockchain;
import org.ethereum.core.TransactionPool;
import org.ethereum.db.BlockStore;
import org.ethereum.util.RLP;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -104,7 +105,7 @@ void givenSnapStatusResponseCalled_thenSnapChunkRequestsAreMade() {
TEST_CHUNK_SIZE,
false);

for (long blockNumber = 0; blockNumber < blockchain.getSize(); blockNumber ++){
for (long blockNumber = 0; blockNumber < blockchain.getSize(); blockNumber++) {
Block currentBlock = blockchain.getBlockByNumber(blockNumber);
blocks.add(currentBlock);
difficulties.add(blockStore.getTotalDifficultyForHash(currentBlock.getHash().getBytes()));
Expand Down Expand Up @@ -181,7 +182,7 @@ void givenSnapBlocksResponseReceived_thenSnapBlocksRequestMessageIsSent() {
200,
false);

for (long blockNumber = 0; blockNumber < blockchain.getSize(); blockNumber ++){
for (long blockNumber = 0; blockNumber < blockchain.getSize(); blockNumber++) {
Block currentBlock = blockchain.getBlockByNumber(blockNumber);
blocks.add(currentBlock);
difficulties.add(blockStore.getTotalDifficultyForHash(currentBlock.getHash().getBytes()));
Expand Down Expand Up @@ -216,7 +217,7 @@ void givenSnapStateChunkRequest_thenSnapStateChunkResponseMessageIsSent() {
TEST_CHUNK_SIZE,
false);

SnapStateChunkRequestMessage snapStateChunkRequestMessage = new SnapStateChunkRequestMessage(1L, 1L,1, TEST_CHUNK_SIZE);
SnapStateChunkRequestMessage snapStateChunkRequestMessage = new SnapStateChunkRequestMessage(1L, 1L, 1, TEST_CHUNK_SIZE);

//when
underTest.processStateChunkRequestInternal(peer, snapStateChunkRequestMessage);
Expand Down Expand Up @@ -333,6 +334,35 @@ void processStateChunkRequestInternal(Peer sender, SnapStateChunkRequestMessage
assertEquals(msg, jobArg.getValue().getMsg());
}

@Test
fmacleal marked this conversation as resolved.
Show resolved Hide resolved
void givenErrorRLPData_thenOnStateChunkErrorIsCalled() {
underTest = new SnapshotProcessor(
blockchain,
trieStore,
peersInformation,
blockStore,
transactionPool,
TEST_CHUNK_SIZE,
false);

PriorityQueue<SnapStateChunkResponseMessage> queue = new PriorityQueue<>(
Comparator.comparingLong(SnapStateChunkResponseMessage::getFrom));
when(snapSyncState.getSnapStateChunkQueue()).thenReturn(queue);
when(snapSyncState.getChunkTaskQueue()).thenReturn(new LinkedList<>());
SnapStateChunkResponseMessage responseMessage = mock(SnapStateChunkResponseMessage.class);
when(snapSyncState.getNextExpectedFrom()).thenReturn(1L);
when(responseMessage.getFrom()).thenReturn(1L);
when(responseMessage.getChunkOfTrieKeyValue()).thenReturn(RLP.encodedEmptyList());
underTest = spy(underTest);

underTest.processStateChunkResponse(snapSyncState, peer, responseMessage);

verify(snapSyncState, times(1)).onNewChunk();
verify(underTest, times(1)).onStateChunkResponseError(peer, responseMessage);
verify(peer, times(1)).sendMessage(any(SnapStateChunkRequestMessage.class));

}

private void initializeBlockchainWithAmountOfBlocks(int numberOfBlocks) {
BlockChainBuilder blockChainBuilder = new BlockChainBuilder();
blockchain = blockChainBuilder.ofSize(numberOfBlocks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ void setUp() {

@Test
void testStartConnectingWhenBlockListIsEmpty() {
blockConnectorHelper = new BlockConnectorHelper(blockStore, Collections.emptyList());
blockConnectorHelper.startConnecting();
blockConnectorHelper = new BlockConnectorHelper(blockStore);
blockConnectorHelper.startConnecting(Collections.emptyList());
verify(blockStore, never()).saveBlock(any(), any(), anyBoolean());
}

Expand All @@ -69,9 +69,9 @@ void testStartConnectingWhenBlockStoreIsEmpty() {
blockAndDifficultiesList = buildBlockDifficulties(Arrays.asList(block1, block2,block3),
Arrays.asList(diff1, diff2,diff3));

blockConnectorHelper = new BlockConnectorHelper(blockStore, blockAndDifficultiesList);
blockConnectorHelper = new BlockConnectorHelper(blockStore);

blockConnectorHelper.startConnecting();
blockConnectorHelper.startConnecting(blockAndDifficultiesList);

verify(blockStore, times(3)).saveBlock(blockCaptor.capture(), difficultyCaptor.capture(), anyBoolean());
verify(blockStore, times(0)).getBestBlock();
Expand Down Expand Up @@ -101,9 +101,9 @@ void testStartConnectingWhenBlockStoreIsEmptyAndNotOrderedList() {
blockAndDifficultiesList = buildBlockDifficulties(Arrays.asList(block2, block1),
Arrays.asList(diff2, diff1));

blockConnectorHelper = new BlockConnectorHelper(blockStore, blockAndDifficultiesList);
blockConnectorHelper = new BlockConnectorHelper(blockStore);

blockConnectorHelper.startConnecting();
blockConnectorHelper.startConnecting(blockAndDifficultiesList);

verify(blockStore, times(2)).saveBlock(blockCaptor.capture(), difficultyCaptor.capture(), anyBoolean());
verify(blockStore, times(0)).getBestBlock();
Expand Down Expand Up @@ -131,9 +131,9 @@ void testStartConnectingWhenBlockStoreIsNotEmpty() {
when(blockStore.getBestBlock()).thenReturn(block3);

blockAndDifficultiesList = buildBlockDifficulties(Arrays.asList(block1, block2), Arrays.asList(mock(BlockDifficulty.class), mock(BlockDifficulty.class)));
blockConnectorHelper = new BlockConnectorHelper(blockStore, blockAndDifficultiesList);
blockConnectorHelper = new BlockConnectorHelper(blockStore);

blockConnectorHelper.startConnecting();
blockConnectorHelper.startConnecting(blockAndDifficultiesList);
verify(blockStore, times(1)).getBestBlock();
verify(blockStore, times(2)).saveBlock(any(), any(), anyBoolean());
}
Expand All @@ -148,12 +148,12 @@ void whenBlockIsNotParentOfExistingBestBlock() {
blockAndDifficultiesList = buildBlockDifficulties(Collections.singletonList(block2),
Collections.singletonList(mock(BlockDifficulty.class)));

blockConnectorHelper = new BlockConnectorHelper(blockStore, blockAndDifficultiesList);
blockConnectorHelper = new BlockConnectorHelper(blockStore);

when(blockStore.isEmpty()).thenReturn(false);
when(blockStore.getBestBlock()).thenReturn(block3);

assertThrows(BlockConnectorException.class, () -> blockConnectorHelper.startConnecting());
assertThrows(BlockConnectorException.class, () -> blockConnectorHelper.startConnecting(blockAndDifficultiesList));
}

@Test
Expand All @@ -166,9 +166,9 @@ void testStartConnectingWhenBlockIsNotParentOfChild() {
when(blockStore.isEmpty()).thenReturn(true);
blockAndDifficultiesList = buildBlockDifficulties(Arrays.asList(block1, block2),
Arrays.asList(mock(BlockDifficulty.class), mock(BlockDifficulty.class)));
blockConnectorHelper = new BlockConnectorHelper(blockStore, blockAndDifficultiesList);
blockConnectorHelper = new BlockConnectorHelper(blockStore);

assertThrows(BlockConnectorException.class, () -> blockConnectorHelper.startConnecting());
assertThrows(BlockConnectorException.class, () -> blockConnectorHelper.startConnecting(blockAndDifficultiesList));
}

List<Pair<Block,BlockDifficulty>> buildBlockDifficulties(List<Block> blocks, List<BlockDifficulty> difficulties) {
Expand Down
Loading
Loading