Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple block queue pruning from block announcements #3027

Open
wants to merge 2 commits into
base: albatross
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 31 additions & 24 deletions consensus/src/sync/live/block_queue/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,7 @@ impl<N: Network> BlockQueue<N> {

let parent_known = blockchain.contains(block.parent_hash(), true);
drop(blockchain);

// Check if a macro block boundary was passed.
// If so prune the block buffer as well as pending requests.
let macro_height = Policy::last_macro_block(head_height);
if macro_height > self.current_macro_height {
self.current_macro_height = macro_height;
self.prune_pending_requests();
self.prune_buffer();
}

if block_number < head_height.saturating_sub(self.config.tolerate_past_max) {
block_source.ignore_block(&self.network);
Expand Down Expand Up @@ -622,6 +614,18 @@ impl<N: Network> BlockQueue<N> {
});
}

fn check_and_prune(&mut self) {
let block_macro_height = self.blockchain.read().macro_head().block_number();

// Check if a macro block boundary was passed.
// If so prune the block buffer as well as pending requests.
if self.current_macro_height < block_macro_height {
self.current_macro_height = block_macro_height;
self.prune_pending_requests();
self.prune_buffer();
}
}

/// Cleans up buffered blocks and removes blocks that precede the current macro block.
fn prune_buffer(&mut self) {
self.buffer.retain(|&block_number, blocks| {
Expand Down Expand Up @@ -801,22 +805,8 @@ impl<N: Network> Stream for BlockQueue<N> {
}
}

// Get as many blocks from the gossipsub stream as possible.
loop {
match self.block_stream.poll_next_unpin(cx) {
Poll::Ready(Some((block, block_source))) => {
if self.num_peers() > 0 {
log::debug!(%block, peer_id = %block_source.peer_id(), "Received block via gossipsub");
if let Some(block) = self.check_announced_block(block, block_source) {
return Poll::Ready(Some(block));
}
}
}
// If the block_stream is exhausted, we quit as well.
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => break,
}
}
// Prune anything that is no longer relevant before adding more requests and blocks to our structs.
self.check_and_prune();

// Read all the responses we got for our missing blocks requests.
loop {
Expand Down Expand Up @@ -859,6 +849,23 @@ impl<N: Network> Stream for BlockQueue<N> {
}
}

// Get as many blocks from the gossipsub stream as possible.
loop {
match self.block_stream.poll_next_unpin(cx) {
Poll::Ready(Some((block, block_source))) => {
if self.num_peers() > 0 {
log::debug!(%block, peer_id = %block_source.peer_id(), "Received block via gossipsub");
if let Some(block) = self.check_announced_block(block, block_source) {
return Poll::Ready(Some(block));
}
}
}
// If the block_stream is exhausted, we quit as well.
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => break,
}
}

self.waker.store_waker(cx);
Poll::Pending
}
Expand Down
4 changes: 0 additions & 4 deletions consensus/tests/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,25 +253,21 @@ async fn send_micro_blocks_out_of_order() {
let blockchain1 = blockchain();
let blockchain_proxy_1 = BlockchainProxy::from(&blockchain1);
let blockchain2 = blockchain();

let mut hub = MockHub::new();
let network = Arc::new(hub.new_network());
let (block_tx, block_rx) = mpsc::channel(32);

let block_queue = BlockQueue::with_gossipsub_block_stream(
blockchain_proxy_1.clone(),
Arc::clone(&network),
ReceiverStream::new(block_rx).boxed(),
QueueConfig::default(),
);

let live_sync = BlockLiveSync::with_queue(
blockchain_proxy_1.clone(),
Arc::clone(&network),
block_queue,
bls_cache(),
);

let mut syncer = Syncer::new(
blockchain_proxy_1,
Arc::clone(&network),
Expand Down
Loading