From 2153ecdbc2abe5a5798320a2fd1f421ed1c5f839 Mon Sep 17 00:00:00 2001 From: ii-cruz Date: Tue, 5 Nov 2024 09:46:16 -0600 Subject: [PATCH 1/2] Stop requesting missing blocks if there are already pending missing block requests. Buffer blocks if no missing block requests should be induced --- .../block_queue/block_request_component.rs | 4 + consensus/src/sync/live/block_queue/queue.rs | 74 ++++-- consensus/tests/history.rs | 245 +++++++++++++++--- 3 files changed, 266 insertions(+), 57 deletions(-) diff --git a/consensus/src/sync/live/block_queue/block_request_component.rs b/consensus/src/sync/live/block_queue/block_request_component.rs index 70d5b4135e..27b03b1469 100644 --- a/consensus/src/sync/live/block_queue/block_request_component.rs +++ b/consensus/src/sync/live/block_queue/block_request_component.rs @@ -318,6 +318,10 @@ impl BlockRequestComponent { self.pending_requests.contains(target_block_hash) } + pub fn has_pending_requests(&self) -> bool { + !self.pending_requests.is_empty() + } + pub fn add_peer(&self, peer_id: N::PeerId) { self.peers.write().add_peer(peer_id); } diff --git a/consensus/src/sync/live/block_queue/queue.rs b/consensus/src/sync/live/block_queue/queue.rs index 71520ca976..9e967eba9b 100644 --- a/consensus/src/sync/live/block_queue/queue.rs +++ b/consensus/src/sync/live/block_queue/queue.rs @@ -176,15 +176,7 @@ impl BlockQueue { let parent_known = blockchain.contains(block.parent_hash(), true); drop(blockchain); - - // Check if a macro block boundary was passed. - // If so prune the block buffer as well as pending requests. let macro_height = Policy::last_macro_block(head_height); - if macro_height > self.current_macro_height { - self.current_macro_height = macro_height; - self.prune_pending_requests(); - self.prune_buffer(); - } if block_number < head_height.saturating_sub(self.config.tolerate_past_max) { block_source.ignore_block(&self.network); @@ -224,9 +216,24 @@ impl BlockQueue { macro_height ); block_source.ignore_block(&self.network); - } else { - // Block is inside the buffer window, put it in the buffer. + } else if !self.request_component.has_pending_requests() + || macro_height == Policy::last_macro_block(block_number) + { + // We only allow a new request missing blocks to start if the block is from the + // current batch or if there are no ongoing request. self.buffer_and_request_missing_blocks(block, block_source); + } else { + // If we are on not within the same batch or we already are requesting blocks, + // we just buffer it without requesting for blocks. + // Any potential gaps will be filled after we sync up to the batch. + if self.insert_block_into_buffer(block, block_source) { + log::trace!(block_number, "Buffering block"); + } else { + log::trace!( + block_number, + "Not buffering block - already known or exceeded the per peer limit", + ); + } } None @@ -622,6 +629,18 @@ impl BlockQueue { }); } + fn check_and_prune(&mut self) { + let block_macro_height = self.blockchain.read().macro_head().block_number(); + + // Check if a macro block boundary was passed. + // If so prune the block buffer as well as pending requests. + if self.current_macro_height < block_macro_height { + self.current_macro_height = block_macro_height; + self.prune_pending_requests(); + self.prune_buffer(); + } + } + /// Cleans up buffered blocks and removes blocks that precede the current macro block. fn prune_buffer(&mut self) { self.buffer.retain(|&block_number, blocks| { @@ -801,22 +820,8 @@ impl Stream for BlockQueue { } } - // Get as many blocks from the gossipsub stream as possible. - loop { - match self.block_stream.poll_next_unpin(cx) { - Poll::Ready(Some((block, block_source))) => { - if self.num_peers() > 0 { - log::debug!(%block, peer_id = %block_source.peer_id(), "Received block via gossipsub"); - if let Some(block) = self.check_announced_block(block, block_source) { - return Poll::Ready(Some(block)); - } - } - } - // If the block_stream is exhausted, we quit as well. - Poll::Ready(None) => return Poll::Ready(None), - Poll::Pending => break, - } - } + // Prune anything that is no longer relevant before adding more requests and blocks to our structs. + self.check_and_prune(); // Read all the responses we got for our missing blocks requests. loop { @@ -859,6 +864,23 @@ impl Stream for BlockQueue { } } + // Get as many blocks from the gossipsub stream as possible. + loop { + match self.block_stream.poll_next_unpin(cx) { + Poll::Ready(Some((block, block_source))) => { + if self.num_peers() > 0 { + log::debug!(%block, peer_id = %block_source.peer_id(), "Received block via gossipsub"); + if let Some(block) = self.check_announced_block(block, block_source) { + return Poll::Ready(Some(block)); + } + } + } + // If the block_stream is exhausted, we quit as well. + Poll::Ready(None) => return Poll::Ready(None), + Poll::Pending => break, + } + } + self.waker.store_waker(cx); Poll::Pending } diff --git a/consensus/tests/history.rs b/consensus/tests/history.rs index c854d3c154..47c0c6f742 100644 --- a/consensus/tests/history.rs +++ b/consensus/tests/history.rs @@ -28,11 +28,9 @@ use nimiq_test_utils::{ }, mock_node::MockNode, node::TESTING_BLS_CACHE_MAX_CAPACITY, - test_rng::test_rng, }; use nimiq_utils::time::OffsetTime; use parking_lot::{Mutex, RwLock}; -use rand::Rng; use tokio::{sync::mpsc, task::yield_now}; use tokio_stream::wrappers::ReceiverStream; @@ -249,7 +247,7 @@ async fn send_two_micro_blocks_out_of_order() { } #[test(tokio::test)] -async fn send_micro_blocks_out_of_order() { +async fn try_to_induce_request_missing_blocks_on_next_batch() { let blockchain1 = blockchain(); let blockchain_proxy_1 = BlockchainProxy::from(&blockchain1); let blockchain2 = blockchain(); @@ -279,73 +277,258 @@ async fn send_micro_blocks_out_of_order() { MockHistorySyncStream::new(), ); - let mock_node = - MockNode::with_network_and_blockchain(Arc::new(hub.new_network()), blockchain()); + let mut mock_node = MockNode::with_network_and_blockchain( + Arc::new(hub.new_network()), + Arc::clone(&blockchain2), + ); + network.dial_mock(&mock_node.network); syncer .live_sync .add_peer(mock_node.network.get_local_peer_id()); - let mut rng = test_rng(false); - let mut ordered_blocks = Vec::new(); + let mut blocks = Vec::new(); let mock_id = MockId::new(mock_node.network.get_local_peer_id()); let producer = BlockProducer::new(signing_key(), voting_key()); - let n_blocks = rng.gen_range(2..15); + let first_gossiped_block = 6; - for _ in 0..n_blocks { + for _ in 0..first_gossiped_block + 2 { let block = push_micro_block(&producer, &blockchain2); - ordered_blocks.push(block); + blocks.push(block); } + // Finish producing that batch and add one more block on the next batch. + produce_macro_blocks(&producer, &blockchain2, 1); + let block_next_batch = push_micro_block(&producer, &blockchain2); + let block_next_batch2 = push_micro_block(&producer, &blockchain2); + + // Send block 6. + block_tx + .send((blocks[first_gossiped_block - 1].clone(), mock_id.clone())) + .await + .unwrap(); + // Run the block_queue one iteration, i.e. until it processed one block. + let _ = poll!(syncer.next()); + // Yield to allow the internal BlockQueue task to proceed. + yield_now().await; + + // Only block 6 should be buffered and a request missing blocks should be sent. + assert_eq!( + blockchain1.read().block_number(), + Policy::genesis_block_number() + ); + // Obtain the buffered blocks + assert_eq!(syncer.live_sync.queue().num_buffered_blocks() as u64, 1); - let mut blocks = ordered_blocks.clone(); + // Send block on the next batch and assert that we buffer it without sending missing blocks requests. + block_tx + .send((block_next_batch.clone(), mock_id.clone())) + .await + .unwrap(); + let _ = poll!(syncer.next()); + yield_now().await; - while blocks.len() > 1 { - let index = rng.gen_range(1..blocks.len()); + // No new blocks should be added to the chain. + assert_eq!( + blockchain1.read().block_number(), + Policy::genesis_block_number() + ); + // We should have 2 buffered block, the first block that was gossiped and the new gossiped one. + assert_eq!(syncer.live_sync.queue().num_buffered_blocks() as u64, 2); + // Now send blocks 1-5 to fill the gap. + for i in 0..first_gossiped_block - 1 { block_tx - .send((blocks.remove(index).clone(), mock_id.clone())) + .send((blocks[i].clone(), mock_id.clone())) .await .unwrap(); + syncer.next().await; + } + syncer.next().await; - // Run the block_queue one iteration, i.e. until it processed one block - let _ = poll!(syncer.next()); - // Yield to allow the internal BlockQueue task to proceed. - yield_now().await; + // Verify all blocks except the genesis. + for i in 1..=first_gossiped_block - 1 { + assert_eq!( + blockchain1 + .read() + .get_block_at(i as u32 + Policy::genesis_block_number(), true, None) + .unwrap(), + blocks[(i - 1) as usize] + ); } + assert_eq!(syncer.live_sync.queue().num_buffered_blocks(), 1); - // All blocks should be buffered + // Send block on next batch again, nothing should be done since the block is already buffered. + block_tx + .send((block_next_batch.clone(), mock_id.clone())) + .await + .unwrap(); + let _ = poll!(syncer.next()); + yield_now().await; assert_eq!( blockchain1.read().block_number(), - Policy::genesis_block_number() + first_gossiped_block as u32 + Policy::genesis_block_number() ); + assert_eq!(syncer.live_sync.queue().num_buffered_blocks() as u64, 1); + // Let old missing blocks request be processed. + // The reply will result in the blocks being discarded since they have been applied through the block stream. + mock_node.next().await; + syncer.next().await; + assert_eq!(syncer.live_sync.queue().num_buffered_blocks() as u64, 1); - // Obtain the buffered blocks + // Send block on the next batch. It is already buffered but now a missing blocks request should be sent. + block_tx + .send((block_next_batch2.clone(), mock_id.clone())) + .await + .unwrap(); + _ = poll!(syncer.next()); + yield_now().await; assert_eq!( - syncer.live_sync.queue().num_buffered_blocks() as u64, - n_blocks - 1 + blockchain1.read().block_number(), + first_gossiped_block as u32 + Policy::genesis_block_number() ); + assert_eq!(syncer.live_sync.queue().num_buffered_blocks() as u64, 2); - // Now send block1 to fill the gap - block_tx.send((blocks[0].clone(), mock_id)).await.unwrap(); + // Allow the last requests missing blocks to finish. + mock_node.next().await; + syncer.next().await; + syncer.next().await; // To push the buffered block 1 + assert_eq!(blockchain1.read().head_hash(), block_next_batch.hash()); + syncer.next().await; + _ = poll!(syncer.next()); // To push the buffered block 2 + assert_eq!(blockchain1.read().head_hash(), block_next_batch2.hash()); + assert_eq!(syncer.live_sync.queue().num_buffered_blocks() as u64, 0); +} + +#[test(tokio::test)] +async fn try_to_induce_request_missing_blocks_gaps() { + let blockchain1 = blockchain(); + let blockchain_proxy_1 = BlockchainProxy::from(&blockchain1); + let blockchain2 = blockchain(); + + let mut hub = MockHub::new(); + let network = Arc::new(hub.new_network()); + let (block_tx, block_rx) = mpsc::channel(32); - for _ in 0..n_blocks { + let block_queue = BlockQueue::with_gossipsub_block_stream( + blockchain_proxy_1.clone(), + Arc::clone(&network), + ReceiverStream::new(block_rx).boxed(), + QueueConfig::default(), + ); + + let live_sync = BlockLiveSync::with_queue( + blockchain_proxy_1.clone(), + Arc::clone(&network), + block_queue, + bls_cache(), + ); + + let mut syncer = Syncer::new( + blockchain_proxy_1, + Arc::clone(&network), + live_sync, + MockHistorySyncStream::new(), + ); + + let mut mock_node = MockNode::with_network_and_blockchain( + Arc::new(hub.new_network()), + Arc::clone(&blockchain2), + ); + + network.dial_mock(&mock_node.network); + syncer + .live_sync + .add_peer(mock_node.network.get_local_peer_id()); + + let mut blocks = Vec::new(); + + let mock_id = MockId::new(mock_node.network.get_local_peer_id()); + let producer = BlockProducer::new(signing_key(), voting_key()); + let total_blocks: usize = 10; + let first_gossiped_block: usize = 6; + + for _ in 0..total_blocks { + let block = push_micro_block(&producer, &blockchain2); + blocks.push(block); + } + + // Send block 6. + block_tx + .send((blocks[first_gossiped_block - 1].clone(), mock_id.clone())) + .await + .unwrap(); + // Run the block_queue one iteration, i.e. until it processed one block. + let _ = poll!(syncer.next()); + // Yield to allow the internal BlockQueue task to proceed. + yield_now().await; + + // Only block 6 should be buffered and a request missing blocks should be sent. + assert_eq!( + blockchain1.read().block_number(), + Policy::genesis_block_number() + ); + // Obtain the buffered blocks + assert_eq!(syncer.live_sync.queue().num_buffered_blocks() as u64, 1); + + // Send block 8, thus creating a gap and assert that we request missing blocks for it. + let index = 7; + block_tx + .send((blocks[index].clone(), mock_id.clone())) + .await + .unwrap(); + let _ = poll!(syncer.next()); + yield_now().await; + + assert_eq!( + blockchain1.read().block_number(), + Policy::genesis_block_number() + ); + // One more block should be buffered. + assert_eq!(syncer.live_sync.queue().num_buffered_blocks() as u64, 2); + + // Now send blocks 1-5 to fill the gap. + for i in 0..first_gossiped_block - 1 { + block_tx + .send((blocks[i].clone(), mock_id.clone())) + .await + .unwrap(); syncer.next().await; } + syncer.next().await; - // Verify all blocks except the genesis - for i in 1..=n_blocks { + // Verify all blocks except the genesis. + for i in 1..=first_gossiped_block - 1 { assert_eq!( blockchain1 .read() .get_block_at(i as u32 + Policy::genesis_block_number(), true, None) .unwrap(), - ordered_blocks[(i - 1) as usize] + blocks[(i - 1) as usize] ); } + assert_eq!(syncer.live_sync.queue().num_buffered_blocks(), 1); - // No blocks buffered - assert_eq!(syncer.live_sync.queue().num_buffered_blocks(), 0); + // Let first missing blocks request be processed. + // The reply will result in the blocks for the first request to be discarded + // since they have been applied through the block stream. + mock_node.next().await; + syncer.next().await; + + // Assert that the second missing blocks request is still pending. + assert_eq!( + blockchain1.read().block_number(), + first_gossiped_block as u32 + Policy::genesis_block_number() + ); + assert_eq!(syncer.live_sync.queue().num_buffered_blocks() as u64, 1); + + // Allow the last requests missing blocks to finish. + mock_node.next().await; + syncer.next().await; + syncer.next().await; // To push the buffered block + + assert_eq!(blockchain1.read().head_hash(), blocks[index].hash()); + assert_eq!(syncer.live_sync.queue().num_buffered_blocks() as u64, 0); } #[test(tokio::test)] From 09c424d977106466ff880d97ffc905dc7ab8ba7b Mon Sep 17 00:00:00 2001 From: ii-cruz Date: Tue, 12 Nov 2024 17:35:09 -0600 Subject: [PATCH 2/2] Reverting the limiting of req missing blocks to the current batch. --- .../block_queue/block_request_component.rs | 4 - consensus/src/sync/live/block_queue/queue.rs | 19 +- consensus/tests/history.rs | 249 +++--------------- 3 files changed, 33 insertions(+), 239 deletions(-) diff --git a/consensus/src/sync/live/block_queue/block_request_component.rs b/consensus/src/sync/live/block_queue/block_request_component.rs index 27b03b1469..70d5b4135e 100644 --- a/consensus/src/sync/live/block_queue/block_request_component.rs +++ b/consensus/src/sync/live/block_queue/block_request_component.rs @@ -318,10 +318,6 @@ impl BlockRequestComponent { self.pending_requests.contains(target_block_hash) } - pub fn has_pending_requests(&self) -> bool { - !self.pending_requests.is_empty() - } - pub fn add_peer(&self, peer_id: N::PeerId) { self.peers.write().add_peer(peer_id); } diff --git a/consensus/src/sync/live/block_queue/queue.rs b/consensus/src/sync/live/block_queue/queue.rs index 9e967eba9b..bec0b66f9c 100644 --- a/consensus/src/sync/live/block_queue/queue.rs +++ b/consensus/src/sync/live/block_queue/queue.rs @@ -216,24 +216,9 @@ impl BlockQueue { macro_height ); block_source.ignore_block(&self.network); - } else if !self.request_component.has_pending_requests() - || macro_height == Policy::last_macro_block(block_number) - { - // We only allow a new request missing blocks to start if the block is from the - // current batch or if there are no ongoing request. - self.buffer_and_request_missing_blocks(block, block_source); } else { - // If we are on not within the same batch or we already are requesting blocks, - // we just buffer it without requesting for blocks. - // Any potential gaps will be filled after we sync up to the batch. - if self.insert_block_into_buffer(block, block_source) { - log::trace!(block_number, "Buffering block"); - } else { - log::trace!( - block_number, - "Not buffering block - already known or exceeded the per peer limit", - ); - } + // Block is inside the buffer window, put it in the buffer. + self.buffer_and_request_missing_blocks(block, block_source); } None diff --git a/consensus/tests/history.rs b/consensus/tests/history.rs index 47c0c6f742..67aec7262c 100644 --- a/consensus/tests/history.rs +++ b/consensus/tests/history.rs @@ -28,9 +28,11 @@ use nimiq_test_utils::{ }, mock_node::MockNode, node::TESTING_BLS_CACHE_MAX_CAPACITY, + test_rng::test_rng, }; use nimiq_utils::time::OffsetTime; use parking_lot::{Mutex, RwLock}; +use rand::Rng; use tokio::{sync::mpsc, task::yield_now}; use tokio_stream::wrappers::ReceiverStream; @@ -247,29 +249,25 @@ async fn send_two_micro_blocks_out_of_order() { } #[test(tokio::test)] -async fn try_to_induce_request_missing_blocks_on_next_batch() { +async fn send_micro_blocks_out_of_order() { let blockchain1 = blockchain(); let blockchain_proxy_1 = BlockchainProxy::from(&blockchain1); let blockchain2 = blockchain(); - let mut hub = MockHub::new(); let network = Arc::new(hub.new_network()); let (block_tx, block_rx) = mpsc::channel(32); - let block_queue = BlockQueue::with_gossipsub_block_stream( blockchain_proxy_1.clone(), Arc::clone(&network), ReceiverStream::new(block_rx).boxed(), QueueConfig::default(), ); - let live_sync = BlockLiveSync::with_queue( blockchain_proxy_1.clone(), Arc::clone(&network), block_queue, bls_cache(), ); - let mut syncer = Syncer::new( blockchain_proxy_1, Arc::clone(&network), @@ -277,258 +275,73 @@ async fn try_to_induce_request_missing_blocks_on_next_batch() { MockHistorySyncStream::new(), ); - let mut mock_node = MockNode::with_network_and_blockchain( - Arc::new(hub.new_network()), - Arc::clone(&blockchain2), - ); - + let mock_node = + MockNode::with_network_and_blockchain(Arc::new(hub.new_network()), blockchain()); network.dial_mock(&mock_node.network); syncer .live_sync .add_peer(mock_node.network.get_local_peer_id()); - let mut blocks = Vec::new(); + let mut rng = test_rng(false); + let mut ordered_blocks = Vec::new(); let mock_id = MockId::new(mock_node.network.get_local_peer_id()); let producer = BlockProducer::new(signing_key(), voting_key()); - let first_gossiped_block = 6; + let n_blocks = rng.gen_range(2..15); - for _ in 0..first_gossiped_block + 2 { + for _ in 0..n_blocks { let block = push_micro_block(&producer, &blockchain2); - blocks.push(block); + ordered_blocks.push(block); } - // Finish producing that batch and add one more block on the next batch. - produce_macro_blocks(&producer, &blockchain2, 1); - let block_next_batch = push_micro_block(&producer, &blockchain2); - let block_next_batch2 = push_micro_block(&producer, &blockchain2); - // Send block 6. - block_tx - .send((blocks[first_gossiped_block - 1].clone(), mock_id.clone())) - .await - .unwrap(); - // Run the block_queue one iteration, i.e. until it processed one block. - let _ = poll!(syncer.next()); - // Yield to allow the internal BlockQueue task to proceed. - yield_now().await; + let mut blocks = ordered_blocks.clone(); - // Only block 6 should be buffered and a request missing blocks should be sent. - assert_eq!( - blockchain1.read().block_number(), - Policy::genesis_block_number() - ); - // Obtain the buffered blocks - assert_eq!(syncer.live_sync.queue().num_buffered_blocks() as u64, 1); + while blocks.len() > 1 { + let index = rng.gen_range(1..blocks.len()); - // Send block on the next batch and assert that we buffer it without sending missing blocks requests. - block_tx - .send((block_next_batch.clone(), mock_id.clone())) - .await - .unwrap(); - let _ = poll!(syncer.next()); - yield_now().await; - - // No new blocks should be added to the chain. - assert_eq!( - blockchain1.read().block_number(), - Policy::genesis_block_number() - ); - // We should have 2 buffered block, the first block that was gossiped and the new gossiped one. - assert_eq!(syncer.live_sync.queue().num_buffered_blocks() as u64, 2); - - // Now send blocks 1-5 to fill the gap. - for i in 0..first_gossiped_block - 1 { block_tx - .send((blocks[i].clone(), mock_id.clone())) + .send((blocks.remove(index).clone(), mock_id.clone())) .await .unwrap(); - syncer.next().await; - } - syncer.next().await; - // Verify all blocks except the genesis. - for i in 1..=first_gossiped_block - 1 { - assert_eq!( - blockchain1 - .read() - .get_block_at(i as u32 + Policy::genesis_block_number(), true, None) - .unwrap(), - blocks[(i - 1) as usize] - ); + // Run the block_queue one iteration, i.e. until it processed one block + let _ = poll!(syncer.next()); + // Yield to allow the internal BlockQueue task to proceed. + yield_now().await; } - assert_eq!(syncer.live_sync.queue().num_buffered_blocks(), 1); - // Send block on next batch again, nothing should be done since the block is already buffered. - block_tx - .send((block_next_batch.clone(), mock_id.clone())) - .await - .unwrap(); - let _ = poll!(syncer.next()); - yield_now().await; - assert_eq!( - blockchain1.read().block_number(), - first_gossiped_block as u32 + Policy::genesis_block_number() - ); - assert_eq!(syncer.live_sync.queue().num_buffered_blocks() as u64, 1); - // Let old missing blocks request be processed. - // The reply will result in the blocks being discarded since they have been applied through the block stream. - mock_node.next().await; - syncer.next().await; - assert_eq!(syncer.live_sync.queue().num_buffered_blocks() as u64, 1); - - // Send block on the next batch. It is already buffered but now a missing blocks request should be sent. - block_tx - .send((block_next_batch2.clone(), mock_id.clone())) - .await - .unwrap(); - _ = poll!(syncer.next()); - yield_now().await; - assert_eq!( - blockchain1.read().block_number(), - first_gossiped_block as u32 + Policy::genesis_block_number() - ); - assert_eq!(syncer.live_sync.queue().num_buffered_blocks() as u64, 2); - - // Allow the last requests missing blocks to finish. - mock_node.next().await; - syncer.next().await; - syncer.next().await; // To push the buffered block 1 - assert_eq!(blockchain1.read().head_hash(), block_next_batch.hash()); - syncer.next().await; - _ = poll!(syncer.next()); // To push the buffered block 2 - assert_eq!(blockchain1.read().head_hash(), block_next_batch2.hash()); - assert_eq!(syncer.live_sync.queue().num_buffered_blocks() as u64, 0); -} - -#[test(tokio::test)] -async fn try_to_induce_request_missing_blocks_gaps() { - let blockchain1 = blockchain(); - let blockchain_proxy_1 = BlockchainProxy::from(&blockchain1); - let blockchain2 = blockchain(); - - let mut hub = MockHub::new(); - let network = Arc::new(hub.new_network()); - let (block_tx, block_rx) = mpsc::channel(32); - - let block_queue = BlockQueue::with_gossipsub_block_stream( - blockchain_proxy_1.clone(), - Arc::clone(&network), - ReceiverStream::new(block_rx).boxed(), - QueueConfig::default(), - ); - - let live_sync = BlockLiveSync::with_queue( - blockchain_proxy_1.clone(), - Arc::clone(&network), - block_queue, - bls_cache(), - ); - - let mut syncer = Syncer::new( - blockchain_proxy_1, - Arc::clone(&network), - live_sync, - MockHistorySyncStream::new(), - ); - - let mut mock_node = MockNode::with_network_and_blockchain( - Arc::new(hub.new_network()), - Arc::clone(&blockchain2), - ); - - network.dial_mock(&mock_node.network); - syncer - .live_sync - .add_peer(mock_node.network.get_local_peer_id()); - - let mut blocks = Vec::new(); - - let mock_id = MockId::new(mock_node.network.get_local_peer_id()); - let producer = BlockProducer::new(signing_key(), voting_key()); - let total_blocks: usize = 10; - let first_gossiped_block: usize = 6; - - for _ in 0..total_blocks { - let block = push_micro_block(&producer, &blockchain2); - blocks.push(block); - } - - // Send block 6. - block_tx - .send((blocks[first_gossiped_block - 1].clone(), mock_id.clone())) - .await - .unwrap(); - // Run the block_queue one iteration, i.e. until it processed one block. - let _ = poll!(syncer.next()); - // Yield to allow the internal BlockQueue task to proceed. - yield_now().await; - - // Only block 6 should be buffered and a request missing blocks should be sent. + // All blocks should be buffered assert_eq!( blockchain1.read().block_number(), Policy::genesis_block_number() ); - // Obtain the buffered blocks - assert_eq!(syncer.live_sync.queue().num_buffered_blocks() as u64, 1); - - // Send block 8, thus creating a gap and assert that we request missing blocks for it. - let index = 7; - block_tx - .send((blocks[index].clone(), mock_id.clone())) - .await - .unwrap(); - let _ = poll!(syncer.next()); - yield_now().await; + // Obtain the buffered blocks assert_eq!( - blockchain1.read().block_number(), - Policy::genesis_block_number() + syncer.live_sync.queue().num_buffered_blocks() as u64, + n_blocks - 1 ); - // One more block should be buffered. - assert_eq!(syncer.live_sync.queue().num_buffered_blocks() as u64, 2); - // Now send blocks 1-5 to fill the gap. - for i in 0..first_gossiped_block - 1 { - block_tx - .send((blocks[i].clone(), mock_id.clone())) - .await - .unwrap(); + // Now send block1 to fill the gap + block_tx.send((blocks[0].clone(), mock_id)).await.unwrap(); + + for _ in 0..n_blocks { syncer.next().await; } - syncer.next().await; - // Verify all blocks except the genesis. - for i in 1..=first_gossiped_block - 1 { + // Verify all blocks except the genesis + for i in 1..=n_blocks { assert_eq!( blockchain1 .read() .get_block_at(i as u32 + Policy::genesis_block_number(), true, None) .unwrap(), - blocks[(i - 1) as usize] + ordered_blocks[(i - 1) as usize] ); } - assert_eq!(syncer.live_sync.queue().num_buffered_blocks(), 1); - // Let first missing blocks request be processed. - // The reply will result in the blocks for the first request to be discarded - // since they have been applied through the block stream. - mock_node.next().await; - syncer.next().await; - - // Assert that the second missing blocks request is still pending. - assert_eq!( - blockchain1.read().block_number(), - first_gossiped_block as u32 + Policy::genesis_block_number() - ); - assert_eq!(syncer.live_sync.queue().num_buffered_blocks() as u64, 1); - - // Allow the last requests missing blocks to finish. - mock_node.next().await; - syncer.next().await; - syncer.next().await; // To push the buffered block - - assert_eq!(blockchain1.read().head_hash(), blocks[index].hash()); - assert_eq!(syncer.live_sync.queue().num_buffered_blocks() as u64, 0); + // No blocks buffered + assert_eq!(syncer.live_sync.queue().num_buffered_blocks(), 0); } #[test(tokio::test)]