Skip to content

Commit

Permalink
Updating logged info and small refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
asoto-iov committed Aug 2, 2024
1 parent ecdb18b commit 464c53a
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 58 deletions.
69 changes: 41 additions & 28 deletions rskj-core/src/main/java/co/rsk/net/SnapshotProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.ethereum.core.Block;
import org.ethereum.core.Blockchain;
import org.ethereum.core.TransactionPool;
Expand Down Expand Up @@ -91,13 +90,13 @@ public SnapshotProcessor(Blockchain blockchain,

@VisibleForTesting
SnapshotProcessor(Blockchain blockchain,
TrieStore trieStore,
SnapshotPeersInformation peersInformation,
BlockStore blockStore,
TransactionPool transactionPool,
int chunkSize,
boolean isParallelEnabled,
@Nullable SyncMessageHandler.Listener listener) {
TrieStore trieStore,
SnapshotPeersInformation peersInformation,
BlockStore blockStore,
TransactionPool transactionPool,
int chunkSize,
boolean isParallelEnabled,
@Nullable SyncMessageHandler.Listener listener) {
this.blockchain = blockchain;
this.trieStore = trieStore;
this.peersInformation = peersInformation;
Expand Down Expand Up @@ -192,12 +191,12 @@ public void processSnapStatusResponse(SnapSyncState state, Peer sender, SnapStat
state.setLastBlockDifficulty(lastBlock.getCumulativeDifficulty());
state.setRemoteRootHash(lastBlock.getStateRoot());
state.setRemoteTrieSize(responseMessage.getTrieSize());
List<Pair<Block, BlockDifficulty>> blocks = state.getBlocks();

for (int i = 0; i < blocksFromResponse.size(); i++) {
blocks.add(new ImmutablePair<>(blocksFromResponse.get(i), difficultiesFromResponse.get(i)));
state.addBlock(new ImmutablePair<>(blocksFromResponse.get(i), difficultiesFromResponse.get(i)));
}
logger.debug("CLIENT - Processing snapshot status response - last blockNumber: {} rootHash: {} triesize: {}", lastBlock.getNumber(), state.getRemoteRootHash(), state.getRemoteTrieSize());
logger.debug("CLIENT - Processing snapshot status response - last blockNumber: {} triesize: {}", lastBlock.getNumber(), state.getRemoteTrieSize());
logger.debug("Blocks included in the response: {} from {} to {}", blocksFromResponse.size(), blocksFromResponse.get(0).getNumber(),blocksFromResponse.get(blocksFromResponse.size()-1).getNumber());
requestBlocksChunk(sender, blocksFromResponse.get(0).getNumber());
generateChunkRequestTasks(state);
startRequestingChunks(state);
Expand Down Expand Up @@ -234,27 +233,33 @@ void processSnapBlocksRequestInternal(Peer sender, SnapBlocksRequestMessage requ
logger.debug("SERVER - Processing snap blocks request");
List<Block> blocks = Lists.newArrayList();
List<BlockDifficulty> difficulties = Lists.newArrayList();
for (long i = requestMessage.getBlockNumber() - BLOCK_CHUNK_SIZE; i < requestMessage.getBlockNumber(); i++) {
long startingBlockNumber = requestMessage.getBlockNumber() - BLOCK_CHUNK_SIZE;
for (long i = startingBlockNumber; i < requestMessage.getBlockNumber(); i++) {
Block block = blockchain.getBlockByNumber(i);
blocks.add(block);
difficulties.add(blockStore.getTotalDifficultyForHash(block.getHash().getBytes()));
}
logger.debug("SERVER - Sending snap blocks response. From block {} to block {} - chunksize {}", blocks.get(0).getNumber(), blocks.get(blocks.size() - 1).getNumber(), BLOCK_CHUNK_SIZE);
SnapBlocksResponseMessage responseMessage = new SnapBlocksResponseMessage(blocks, difficulties);
sender.sendMessage(responseMessage);
}

//TODO no multipeer here.
public void processSnapBlocksResponse(SnapSyncState state, Peer sender, SnapBlocksResponseMessage responseMessage) {
logger.debug("CLIENT - Processing snap blocks response");
long lastRequiredBlock = state.getLastBlock().getNumber() - BLOCKS_REQUIRED;
List<Block> blocksFromResponse = responseMessage.getBlocks();
logger.debug("CLIENT - Processing snap blocks response. Receiving from block {} to block {} Objective: {}.", blocksFromResponse.get(0).getNumber(), blocksFromResponse.get(blocksFromResponse.size() - 1).getNumber(), lastRequiredBlock);
List<BlockDifficulty> difficultiesFromResponse = responseMessage.getDifficulties();

for (int i = 0; i < blocksFromResponse.size(); i++) {
state.getBlocks().add(new ImmutablePair<>(blocksFromResponse.get(i), difficultiesFromResponse.get(i)));
state.addBlock(new ImmutablePair<>(blocksFromResponse.get(i), difficultiesFromResponse.get(i)));
}
long nextChunk = blocksFromResponse.get(0).getNumber();
if (nextChunk > state.getLastBlock().getNumber() - BLOCKS_REQUIRED) {
logger.debug("CLIENT - SnapBlock - nexChunk : {} - lastRequired {}, missing {}", nextChunk, lastRequiredBlock, nextChunk-lastRequiredBlock);
if (nextChunk > lastRequiredBlock) {
requestBlocksChunk(sender, nextChunk);
} else {
logger.info("CLIENT - Finished Snap blocks sync.");
logger.info("CLIENT - Finished Snap blocks request sending.");
}
}

Expand Down Expand Up @@ -289,10 +294,12 @@ public void run() {

void processStateChunkRequestInternal(Peer sender, SnapStateChunkRequestMessage request) {
long startChunk = System.currentTimeMillis();
logger.debug("SERVER - Processing state chunk request from node {}", sender.getPeerNodeID());

List<byte[]> trieEncoded = new ArrayList<>();
Block block = blockchain.getBlockByNumber(request.getBlockNumber());
final long to = request.getFrom() + (request.getChunkSize() * 1024);
logger.debug("SERVER - Processing state chunk request from node {}. From {} to calculated {} being chunksize {}", sender.getPeerNodeID(), request.getFrom(), to, request.getChunkSize());
logger.debug("SERVER - Sending state chunk from {} to {}", request.getFrom(), to);
TrieDTOInOrderIterator it = new TrieDTOInOrderIterator(trieStore, block.getStateRoot(), request.getFrom(), to);

// First we add the root nodes on the left of the current node. They are used to validate the chunk.
Expand Down Expand Up @@ -327,7 +334,7 @@ void processStateChunkRequestInternal(Peer sender, SnapStateChunkRequestMessage
}

public void processStateChunkResponse(SnapSyncState state, Peer peer, SnapStateChunkResponseMessage responseMessage) {
logger.debug("CLIENT - State chunk received from: {}", responseMessage.getFrom());
logger.debug("CLIENT - State chunk received from {} to {} of {}", responseMessage.getFrom(), responseMessage.getTo(), state.getLastBlock());

PriorityQueue<SnapStateChunkResponseMessage> queue = state.getSnapStateChunkQueue();
queue.add(responseMessage);
Expand All @@ -337,21 +344,27 @@ public void processStateChunkResponse(SnapSyncState state, Peer peer, SnapStateC
long nextExpectedFrom = state.getNextExpectedFrom();
logger.debug("CLIENT - State chunk dequeued from: {} - expected: {}", nextMessage.getFrom(), nextExpectedFrom);
if (nextMessage.getFrom() == nextExpectedFrom) {
processOrderedStateChunkResponse(state, peer, queue.poll());
try {
processOrderedStateChunkResponse(state, peer, queue.poll());
} catch (Exception e) {
logger.error("Error while processing chunk response. {}", e.getMessage(), e);
}
state.setNextExpectedFrom(nextExpectedFrom + chunkSize * 1024L);
} else {
break;
}
}

if (!responseMessage.isComplete()) {
logger.debug("CLIENT - State chunk response not complete. Requesting next chunk." );
executeNextChunkRequestTask(state, peer);
}
}


private void processOrderedStateChunkResponse(SnapSyncState state, Peer peer, SnapStateChunkResponseMessage message) {
try {
logger.debug("CLIENT - Processing State chunk received from: {}", message.getFrom());
logger.debug("CLIENT - Processing State chunk received from {} to {}", message.getFrom(), message.getTo());
peersInformation.getOrRegisterPeer(peer);
state.onNewChunk();

Expand Down Expand Up @@ -429,29 +442,29 @@ private void processOrderedStateChunkResponse(SnapSyncState state, Peer peer, Sn
* Once state share is received, rebuild the trie, save it in db and save all the blocks.
*/
private void rebuildStateAndSave(SnapSyncState state) {
logger.debug("CLIENT - State Completed! {} chunks ({} bytes) - chunk size = {}",
logger.info("CLIENT - State Completed! {} chunks ({} bytes) - chunk size = {}",
state.getStateSize(), state.getStateChunkSize(), this.chunkSize);
final TrieDTO[] nodeArray = state.getAllNodes().toArray(new TrieDTO[0]);
logger.debug("CLIENT - Recovering trie...");
logger.info("CLIENT - Recovering trie...");
Optional<TrieDTO> result = TrieDTOInOrderRecoverer.recoverTrie(nodeArray, this.trieStore::saveDTO);
if (!result.isPresent() || !Arrays.equals(state.getRemoteRootHash(), result.get().calculateHash())) {
logger.error("CLIENT - State final validation FAILED");
} else {
logger.debug("CLIENT - State final validation OK!");
logger.info("CLIENT - State final validation OK!");
}

logger.debug("CLIENT - Saving previous blocks...");
logger.info("CLIENT - Saving previous blocks...");
this.blockchain.removeBlocksByNumber(0);
BlockConnectorHelper blockConnector = new BlockConnectorHelper(this.blockStore, state.getBlocks());
blockConnector.startConnecting();
logger.debug("CLIENT - Setting last block as best block...");
BlockConnectorHelper blockConnector = new BlockConnectorHelper(this.blockStore);
state.connectBlocks(blockConnector);
logger.info("CLIENT - Setting last block as best block...");
this.blockchain.setStatus(state.getLastBlock(), state.getLastBlockDifficulty());
this.transactionPool.setBestBlock(state.getLastBlock());
}

private void generateChunkRequestTasks(SnapSyncState state) {
long from = 0;
logger.debug("Generating chunk request tasks...");
logger.debug("Generating chunk request tasks... chunksize {}", chunkSize);
while (from < state.getRemoteTrieSize()) {
ChunkTask task = new ChunkTask(state.getLastBlock().getNumber(), from);
state.getChunkTaskQueue().add(task);
Expand Down
24 changes: 13 additions & 11 deletions rskj-core/src/main/java/co/rsk/net/sync/BlockConnectorHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,26 @@
public class BlockConnectorHelper {
private static final Logger logger = LoggerFactory.getLogger("SnapBlockConnector");
private final BlockStore blockStore;
private final List<Pair<Block,BlockDifficulty>> blockAndDifficultiesList;

public BlockConnectorHelper(BlockStore blockStore, List<Pair<Block,BlockDifficulty>> blockAndDifficultiesList) {
public BlockConnectorHelper(BlockStore blockStore) {
this.blockStore = blockStore;
this.blockAndDifficultiesList = blockAndDifficultiesList;
blockAndDifficultiesList.sort(new BlockAndDiffComparator());
}

public void startConnecting() {
public void startConnecting(List<Pair<Block, BlockDifficulty>> blockAndDifficultiesList) {
blockAndDifficultiesList.sort(new BlockAndDiffComparator());
Block child = null;
logger.info("Start connecting Blocks");
logger.info("Start connecting Blocks. To connect from {} to {} - Total: {}",
blockAndDifficultiesList.get(0).getKey().getNumber(),
blockAndDifficultiesList.get(blockAndDifficultiesList.size() - 1).getKey().getNumber(),
blockAndDifficultiesList.size());

if (blockAndDifficultiesList.isEmpty()) {
logger.debug("Block list is empty, nothing to connect");
return;
}
int blockIndex = blockAndDifficultiesList.size() - 1;
if (blockStore.isEmpty()) {
Pair<Block,BlockDifficulty> blockAndDifficulty = blockAndDifficultiesList.get(blockIndex);
Pair<Block, BlockDifficulty> blockAndDifficulty = blockAndDifficultiesList.get(blockIndex);
child = blockAndDifficulty.getLeft();
logger.debug("BlockStore is empty, setting child block number the last block from the list: {}", child.getNumber());
blockStore.saveBlock(child, blockAndDifficulty.getRight(), true);
Expand All @@ -58,7 +60,7 @@ public void startConnecting() {
logger.debug("Best block number: {}", child.getNumber());
}
while (blockIndex >= 0) {
Pair<Block,BlockDifficulty> currentBlockAndDifficulty = blockAndDifficultiesList.get(blockIndex);
Pair<Block, BlockDifficulty> currentBlockAndDifficulty = blockAndDifficultiesList.get(blockIndex);
Block currentBlock = currentBlockAndDifficulty.getLeft();
logger.info("Connecting block number: {}", currentBlock.getNumber());

Expand All @@ -72,10 +74,10 @@ public void startConnecting() {
logger.info("Finished connecting blocks");
}

static class BlockAndDiffComparator implements java.util.Comparator<Pair<Block,BlockDifficulty>> {
static class BlockAndDiffComparator implements java.util.Comparator<Pair<Block, BlockDifficulty>> {
@Override
public int compare(Pair<Block,BlockDifficulty> o1, Pair<Block,BlockDifficulty> o2) {
return Long.compare(o1.getLeft().getNumber(),o2.getLeft().getNumber());
public int compare(Pair<Block, BlockDifficulty> o1, Pair<Block, BlockDifficulty> o2) {
return Long.compare(o1.getLeft().getNumber(), o2.getLeft().getNumber());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ private boolean tryStartSnapshotSync() {
}

// we consider Snap as part of the Long Sync
if (!shouldSnapSync(peerBestBlockNumOpt.get())) {
if (!isValidSnapDistance(peerBestBlockNumOpt.get())) {
logger.debug("Snap syncing not required (long sync not required)");
return false;
}
Expand Down Expand Up @@ -163,9 +163,9 @@ private boolean shouldLongSync(long peerBestBlockNumber) {
return distanceToTip > syncConfiguration.getLongSyncLimit() || checkGenesisConnected();
}

private boolean shouldSnapSync(long peerBestBlockNumber) {
private boolean isValidSnapDistance(long peerBestBlockNumber) {
long distanceToTip = peerBestBlockNumber - blockStore.getBestBlock().getNumber();
return distanceToTip > syncConfiguration.getSnapshotSyncLimit() && syncConfiguration.isClientSnapSyncEnabled();
return distanceToTip > syncConfiguration.getSnapshotSyncLimit();
}

private Optional<Long> getPeerBestBlockNumber(Peer peer) {
Expand Down
16 changes: 12 additions & 4 deletions rskj-core/src/main/java/co/rsk/net/sync/SnapSyncState.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

public class SnapSyncState extends BaseSyncState {

private static final Logger logger = LoggerFactory.getLogger("syncprocessor");
private static final Logger logger = LoggerFactory.getLogger("snapSyncState");

private final SnapshotProcessor snapshotProcessor;

Expand Down Expand Up @@ -76,7 +76,7 @@ public SnapSyncState(SyncEventsHandler syncEventsHandler, SnapshotProcessor snap

@VisibleForTesting
SnapSyncState(SyncEventsHandler syncEventsHandler, SnapshotProcessor snapshotProcessor,
SyncConfiguration syncConfiguration, @Nullable SyncMessageHandler.Listener listener) {
SyncConfiguration syncConfiguration, @Nullable SyncMessageHandler.Listener listener) {
super(syncEventsHandler, syncConfiguration);
this.snapshotProcessor = snapshotProcessor; // TODO(snap-poc) code in SnapshotProcessor should be moved here probably
this.allNodes = Lists.newArrayList();
Expand Down Expand Up @@ -207,8 +207,16 @@ public void setRemoteTrieSize(long remoteTrieSize) {
this.remoteTrieSize = remoteTrieSize;
}

public List<Pair<Block, BlockDifficulty>> getBlocks() {
return blocks;
public void addBlock(Pair<Block, BlockDifficulty> blockPair) {
blocks.add(blockPair);
}

public void addAllBlocks(List<Pair<Block, BlockDifficulty>> blocks) {
this.blocks.addAll(blocks);
}

public void connectBlocks(BlockConnectorHelper blockConnectorHelper) {
blockConnectorHelper.startConnecting(blocks);
}

public List<TrieDTO> getAllNodes() {
Expand Down
Loading

0 comments on commit 464c53a

Please sign in to comment.