Skip to content

Commit

Permalink
Adding on snap block error behaviour
Browse files Browse the repository at this point in the history
  • Loading branch information
asoto-iov committed Aug 2, 2024
1 parent 464c53a commit 46d1f85
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 98 deletions.
187 changes: 95 additions & 92 deletions rskj-core/src/main/java/co/rsk/net/SnapshotProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class SnapshotProcessor implements InternalService {
public static final int BLOCK_NUMBER_CHECKPOINT = 5000;
public static final int BLOCK_CHUNK_SIZE = 400;
public static final int BLOCKS_REQUIRED = 6000;
public static final long CHUNK_ITEM_SIZE = 1024L;
private final Blockchain blockchain;
private final TrieStore trieStore;
private final BlockStore blockStore;
Expand All @@ -77,7 +78,6 @@ public class SnapshotProcessor implements InternalService {

private volatile Boolean isRunning;
private final Thread thread;

public SnapshotProcessor(Blockchain blockchain,
TrieStore trieStore,
SnapshotPeersInformation peersInformation,
Expand Down Expand Up @@ -157,6 +157,7 @@ void processSnapStatusRequestInternal(Peer sender, SnapStatusRequestMessage igno
logger.debug("SERVER - Processing snapshot status request.");
long bestBlockNumber = blockchain.getBestBlock().getNumber();
long checkpointBlockNumber = bestBlockNumber - (bestBlockNumber % BLOCK_NUMBER_CHECKPOINT);
logger.debug("SERVER - checkpointBlockNumber: {}, bestBlockNumber: {}", checkpointBlockNumber, bestBlockNumber);
List<Block> blocks = Lists.newArrayList();
List<BlockDifficulty> difficulties = Lists.newArrayList();
for (long i = checkpointBlockNumber - BLOCK_CHUNK_SIZE; i < checkpointBlockNumber; i++) {
Expand All @@ -165,8 +166,10 @@ void processSnapStatusRequestInternal(Peer sender, SnapStatusRequestMessage igno
difficulties.add(blockStore.getTotalDifficultyForHash(block.getHash().getBytes()));
}

logger.trace("SERVER - Sending snapshot status response. From block {} to block {} - chunksize {}", blocks.get(0).getNumber(), blocks.get(blocks.size() - 1).getNumber(), BLOCK_CHUNK_SIZE);
Block checkpointBlock = blockchain.getBlockByNumber(checkpointBlockNumber);
blocks.add(checkpointBlock);
logger.trace("SERVER - adding checkpoint block: {}", checkpointBlock.getNumber());
difficulties.add(blockStore.getTotalDifficultyForHash(checkpointBlock.getHash().getBytes()));
byte[] rootHash = checkpointBlock.getStateRoot();
Optional<TrieDTO> opt = trieStore.retrieveDTO(rootHash);
Expand Down Expand Up @@ -196,7 +199,7 @@ public void processSnapStatusResponse(SnapSyncState state, Peer sender, SnapStat
state.addBlock(new ImmutablePair<>(blocksFromResponse.get(i), difficultiesFromResponse.get(i)));
}
logger.debug("CLIENT - Processing snapshot status response - last blockNumber: {} triesize: {}", lastBlock.getNumber(), state.getRemoteTrieSize());
logger.debug("Blocks included in the response: {} from {} to {}", blocksFromResponse.size(), blocksFromResponse.get(0).getNumber(),blocksFromResponse.get(blocksFromResponse.size()-1).getNumber());
logger.debug("Blocks included in the response: {} from {} to {}", blocksFromResponse.size(), blocksFromResponse.get(0).getNumber(), blocksFromResponse.get(blocksFromResponse.size() - 1).getNumber());
requestBlocksChunk(sender, blocksFromResponse.get(0).getNumber());
generateChunkRequestTasks(state);
startRequestingChunks(state);
Expand Down Expand Up @@ -255,7 +258,7 @@ public void processSnapBlocksResponse(SnapSyncState state, Peer sender, SnapBloc
state.addBlock(new ImmutablePair<>(blocksFromResponse.get(i), difficultiesFromResponse.get(i)));
}
long nextChunk = blocksFromResponse.get(0).getNumber();
logger.debug("CLIENT - SnapBlock - nexChunk : {} - lastRequired {}, missing {}", nextChunk, lastRequiredBlock, nextChunk-lastRequiredBlock);
logger.debug("CLIENT - SnapBlock - nexChunk : {} - lastRequired {}, missing {}", nextChunk, lastRequiredBlock, nextChunk - lastRequiredBlock);
if (nextChunk > lastRequiredBlock) {
requestBlocksChunk(sender, nextChunk);
} else {
Expand All @@ -267,10 +270,9 @@ public void processSnapBlocksResponse(SnapSyncState state, Peer sender, SnapBloc
* STATE CHUNK
*/
private void requestStateChunk(Peer peer, long from, long blockNumber, int chunkSize) {
logger.debug("CLIENT - Requesting state chunk to node {} - block {} - from {}", peer.getPeerNodeID(), blockNumber, from);
logger.debug("CLIENT - Requesting state chunk to node {} - block {} - chunkNumber {}", peer.getPeerNodeID(), blockNumber, from / chunkSize);
SnapStateChunkRequestMessage message = new SnapStateChunkRequestMessage(messageId++, blockNumber, from, chunkSize);
peer.sendMessage(message);
logger.debug("CLIENT - Request sent state chunk to node {} - block {} - from {}", peer.getPeerNodeID(), blockNumber, from);
}

public void processStateChunkRequest(Peer sender, SnapStateChunkRequestMessage requestMessage) {
Expand All @@ -297,7 +299,7 @@ void processStateChunkRequestInternal(Peer sender, SnapStateChunkRequestMessage

List<byte[]> trieEncoded = new ArrayList<>();
Block block = blockchain.getBlockByNumber(request.getBlockNumber());
final long to = request.getFrom() + (request.getChunkSize() * 1024);
final long to = request.getFrom() + (request.getChunkSize() * CHUNK_ITEM_SIZE);
logger.debug("SERVER - Processing state chunk request from node {}. From {} to calculated {} being chunksize {}", sender.getPeerNodeID(), request.getFrom(), to, request.getChunkSize());
logger.debug("SERVER - Sending state chunk from {} to {}", request.getFrom(), to);
TrieDTOInOrderIterator it = new TrieDTOInOrderIterator(trieStore, block.getStateRoot(), request.getFrom(), to);
Expand All @@ -319,7 +321,6 @@ void processStateChunkRequestInternal(Peer sender, SnapStateChunkRequestMessage
byte[] firstNodeLeftHash = RLP.encodeElement(first.getLeftHash());
byte[] nodesBytes = RLP.encodeList(trieEncoded.toArray(new byte[0][0]));
byte[] lastNodeHashes = last != null ? RLP.encodeList(RLP.encodeElement(getBytes(last.getLeftHash())), RLP.encodeElement(getBytes(last.getRightHash()))) : RLP.encodedEmptyList();

// Last we add the root nodes on the right of the last visited node. They are used to validate the chunk.
List<byte[]> postRootNodes = it.getNodesLeftVisiting().stream().map((t) -> RLP.encodeList(RLP.encodeElement(t.getEncoded()), RLP.encodeElement(getBytes(t.getRightHash())))).collect(Collectors.toList());
byte[] postRootNodesBytes = !postRootNodes.isEmpty() ? RLP.encodeList(postRootNodes.toArray(new byte[0][0])) : RLP.encodedEmptyList();
Expand All @@ -334,7 +335,7 @@ void processStateChunkRequestInternal(Peer sender, SnapStateChunkRequestMessage
}

public void processStateChunkResponse(SnapSyncState state, Peer peer, SnapStateChunkResponseMessage responseMessage) {
logger.debug("CLIENT - State chunk received from {} to {} of {}", responseMessage.getFrom(), responseMessage.getTo(), state.getLastBlock());
logger.debug("CLIENT - State chunk received chunkNumber {}. From {} to {} of total size {}", responseMessage.getFrom() / CHUNK_ITEM_SIZE, responseMessage.getFrom(), responseMessage.getTo(), state.getRemoteTrieSize());

PriorityQueue<SnapStateChunkResponseMessage> queue = state.getSnapStateChunkQueue();
queue.add(responseMessage);
Expand All @@ -346,120 +347,122 @@ public void processStateChunkResponse(SnapSyncState state, Peer peer, SnapStateC
if (nextMessage.getFrom() == nextExpectedFrom) {
try {
processOrderedStateChunkResponse(state, peer, queue.poll());
state.setNextExpectedFrom(nextExpectedFrom + chunkSize * CHUNK_ITEM_SIZE);
} catch (Exception e) {
logger.error("Error while processing chunk response. {}", e.getMessage(), e);
onStateChunkResponseError(peer, nextMessage);
}
state.setNextExpectedFrom(nextExpectedFrom + chunkSize * 1024L);
} else {
break;
}
}

if (!responseMessage.isComplete()) {
logger.debug("CLIENT - State chunk response not complete. Requesting next chunk." );
logger.debug("CLIENT - State chunk response not complete. Requesting next chunk.");
executeNextChunkRequestTask(state, peer);
}
}

private void onStateChunkResponseError(Peer peer, SnapStateChunkResponseMessage responseMessage) {
logger.error("Error while processing chunk response from {} of peer {}. Asking for chunk again.", responseMessage.getFrom(), peer.getPeerNodeID());
Peer alternativePeer = peersInformation.getBestSnapPeerCandidates().stream()
.filter(listedPeer -> !listedPeer.getPeerNodeID().equals(peer.getPeerNodeID()))
.findFirst()
.orElse(peer);
logger.debug("Requesting state chunk \"from\" {} to peer {}", responseMessage.getFrom(), peer.getPeerNodeID());
requestStateChunk(alternativePeer, responseMessage.getFrom(), responseMessage.getBlockNumber(), chunkSize);
}

private void processOrderedStateChunkResponse(SnapSyncState state, Peer peer, SnapStateChunkResponseMessage message) {
try {
logger.debug("CLIENT - Processing State chunk received from {} to {}", message.getFrom(), message.getTo());
peersInformation.getOrRegisterPeer(peer);
state.onNewChunk();

RLPList nodeLists = RLP.decodeList(message.getChunkOfTrieKeyValue());
final RLPList preRootElements = RLP.decodeList(nodeLists.get(0).getRLPData());
final RLPList trieElements = RLP.decodeList(nodeLists.get(1).getRLPData());
byte[] firstNodeLeftHash = nodeLists.get(2).getRLPData();
final RLPList lastNodeHashes = RLP.decodeList(nodeLists.get(3).getRLPData());
final RLPList postRootElements = RLP.decodeList(nodeLists.get(4).getRLPData());
logger.debug(
"CLIENT - Received state chunk of {} elements ({} bytes).",
trieElements.size(),
message.getChunkOfTrieKeyValue().length
);
List<TrieDTO> preRootNodes = new ArrayList<>();
List<TrieDTO> nodes = new ArrayList<>();
List<TrieDTO> postRootNodes = new ArrayList<>();


for (int i = 0; i < preRootElements.size(); i++) {
final RLPList trieElement = (RLPList) preRootElements.get(i);
final byte[] value = trieElement.get(0).getRLPData();
final byte[] leftHash = trieElement.get(1).getRLPData();
TrieDTO node = TrieDTO.decodeFromSync(value);
node.setLeftHash(leftHash);
preRootNodes.add(node);
}

if (trieElements.size() > 0) {
for (int i = 0; i < trieElements.size(); i++) {
final RLPElement trieElement = trieElements.get(i);
byte[] value = trieElement.getRLPData();
nodes.add(TrieDTO.decodeFromSync(value));
}
nodes.get(0).setLeftHash(firstNodeLeftHash);
}
private void processOrderedStateChunkResponse(SnapSyncState state, Peer peer, SnapStateChunkResponseMessage message) throws Exception {
logger.debug("CLIENT - Processing State chunk received from {} to {}", message.getFrom(), message.getTo());
peersInformation.getOrRegisterPeer(peer);
state.onNewChunk();

RLPList nodeLists = RLP.decodeList(message.getChunkOfTrieKeyValue());
final RLPList preRootElements = RLP.decodeList(nodeLists.get(0).getRLPData());
final RLPList trieElements = RLP.decodeList(nodeLists.get(1).getRLPData());
byte[] firstNodeLeftHash = nodeLists.get(2).getRLPData();
final RLPList lastNodeHashes = RLP.decodeList(nodeLists.get(3).getRLPData());
final RLPList postRootElements = RLP.decodeList(nodeLists.get(4).getRLPData());
List<TrieDTO> preRootNodes = new ArrayList<>();
List<TrieDTO> nodes = new ArrayList<>();
List<TrieDTO> postRootNodes = new ArrayList<>();


for (int i = 0; i < preRootElements.size(); i++) {
final RLPList trieElement = (RLPList) preRootElements.get(i);
final byte[] value = trieElement.get(0).getRLPData();
final byte[] leftHash = trieElement.get(1).getRLPData();
TrieDTO node = TrieDTO.decodeFromSync(value);
node.setLeftHash(leftHash);
preRootNodes.add(node);
}

if (lastNodeHashes.size() > 0) {
TrieDTO lastNode = nodes.get(nodes.size() - 1);
lastNode.setLeftHash(lastNodeHashes.get(0).getRLPData());
lastNode.setRightHash(lastNodeHashes.get(1).getRLPData());
if (trieElements.size() > 0) {
for (int i = 0; i < trieElements.size(); i++) {
final RLPElement trieElement = trieElements.get(i);
byte[] value = trieElement.getRLPData();
nodes.add(TrieDTO.decodeFromSync(value));
}
nodes.get(0).setLeftHash(firstNodeLeftHash);
}

for (int i = 0; i < postRootElements.size(); i++) {
final RLPList trieElement = (RLPList) postRootElements.get(i);
final byte[] value = trieElement.get(0).getRLPData();
final byte[] rightHash = trieElement.get(1).getRLPData();
TrieDTO node = TrieDTO.decodeFromSync(value);
node.setRightHash(rightHash);
postRootNodes.add(node);
}
if (lastNodeHashes.size() > 0) {
TrieDTO lastNode = nodes.get(nodes.size() - 1);
lastNode.setLeftHash(lastNodeHashes.get(0).getRLPData());
lastNode.setRightHash(lastNodeHashes.get(1).getRLPData());
}

if (TrieDTOInOrderRecoverer.verifyChunk(state.getRemoteRootHash(), preRootNodes, nodes, postRootNodes)) {
state.getAllNodes().addAll(nodes);
state.setStateSize(state.getStateSize().add(BigInteger.valueOf(trieElements.size())));
state.setStateChunkSize(state.getStateChunkSize().add(BigInteger.valueOf(message.getChunkOfTrieKeyValue().length)));
logger.debug("CLIENT - State progress: {} chunks ({} bytes)", state.getStateSize(), state.getStateChunkSize());
if (!message.isComplete()) {
executeNextChunkRequestTask(state, peer);
} else {
rebuildStateAndSave(state);
logger.info("CLIENT - Snapshot sync finished!");
stopSyncing(state);
}
for (int i = 0; i < postRootElements.size(); i++) {
final RLPList trieElement = (RLPList) postRootElements.get(i);
final byte[] value = trieElement.get(0).getRLPData();
final byte[] rightHash = trieElement.get(1).getRLPData();
TrieDTO node = TrieDTO.decodeFromSync(value);
node.setRightHash(rightHash);
postRootNodes.add(node);
}

if (TrieDTOInOrderRecoverer.verifyChunk(state.getRemoteRootHash(), preRootNodes, nodes, postRootNodes)) {
state.getAllNodes().addAll(nodes);
state.setStateSize(state.getStateSize().add(BigInteger.valueOf(trieElements.size())));
state.setStateChunkSize(state.getStateChunkSize().add(BigInteger.valueOf(message.getChunkOfTrieKeyValue().length)));
if (!message.isComplete()) {
executeNextChunkRequestTask(state, peer);
} else {
logger.error("Error while verifying chunk response: {}", message);
throw new Exception("Error verifying chunk.");
boolean result = rebuildStateAndSave(state);
logger.info("CLIENT - Snapshot sync finished {}! ", result ? "successfully" : "with errors");
stopSyncing(state);
}
} catch (Exception e) {
logger.error("Error while processing chunk response.", e);
} else {
logger.error("Error while verifying chunk response: {}", message);
throw new Exception("Error verifying chunk.");
}
}

/**
* Once state share is received, rebuild the trie, save it in db and save all the blocks.
*/
private void rebuildStateAndSave(SnapSyncState state) {
logger.info("CLIENT - State Completed! {} chunks ({} bytes) - chunk size = {}",
state.getStateSize(), state.getStateChunkSize(), this.chunkSize);
final TrieDTO[] nodeArray = state.getAllNodes().toArray(new TrieDTO[0]);
private boolean rebuildStateAndSave(SnapSyncState state) {
logger.info("CLIENT - Recovering trie...");
final TrieDTO[] nodeArray = state.getAllNodes().toArray(new TrieDTO[0]);
Optional<TrieDTO> result = TrieDTOInOrderRecoverer.recoverTrie(nodeArray, this.trieStore::saveDTO);
if (!result.isPresent() || !Arrays.equals(state.getRemoteRootHash(), result.get().calculateHash())) {
logger.error("CLIENT - State final validation FAILED");
} else {

if (result.isPresent() && Arrays.equals(state.getRemoteRootHash(), result.get().calculateHash())) {
logger.info("CLIENT - State final validation OK!");
}

logger.info("CLIENT - Saving previous blocks...");
this.blockchain.removeBlocksByNumber(0);
BlockConnectorHelper blockConnector = new BlockConnectorHelper(this.blockStore);
state.connectBlocks(blockConnector);
logger.info("CLIENT - Setting last block as best block...");
this.blockchain.setStatus(state.getLastBlock(), state.getLastBlockDifficulty());
this.transactionPool.setBestBlock(state.getLastBlock());
this.blockchain.removeBlocksByNumber(0);
//genesis is removed so backwards sync will always start.

BlockConnectorHelper blockConnector = new BlockConnectorHelper(this.blockStore);
state.connectBlocks(blockConnector);
logger.info("CLIENT - Setting last block as best block...");
this.blockchain.setStatus(state.getLastBlock(), state.getLastBlockDifficulty());
this.transactionPool.setBestBlock(state.getLastBlock());
return true;
}
logger.error("CLIENT - State final validation FAILED");
return false;
}

private void generateChunkRequestTasks(SnapSyncState state) {
Expand All @@ -468,7 +471,7 @@ private void generateChunkRequestTasks(SnapSyncState state) {
while (from < state.getRemoteTrieSize()) {
ChunkTask task = new ChunkTask(state.getLastBlock().getNumber(), from);
state.getChunkTaskQueue().add(task);
from += chunkSize * 1024L;
from += chunkSize * CHUNK_ITEM_SIZE;
}
}

Expand Down
Loading

0 comments on commit 46d1f85

Please sign in to comment.