Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revisit block announce handling #70

Merged
merged 10 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ hex-literal = "0.4.1"
indexmap = "2.2.6"
ip_network = "0.4.1"
log = { version = "0.4", default-features = false }
once_cell = "1.19.0"
parking_lot = "0.12"
scale-info = { version = "2.6.0", default-features = false }
serde = "1"
Expand Down
1 change: 0 additions & 1 deletion crates/subcoin-network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ fastrand = { workspace = true }
futures = { workspace = true }
indexmap = { workspace = true }
ip_network = { workspace = true }
once_cell = { workspace = true }
parking_lot = { workspace = true }
sc-client-api = { workspace = true }
sc-consensus = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions crates/subcoin-network/src/checkpoint.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use bitcoin::p2p::message::MAX_INV_SIZE;
use once_cell::sync::Lazy;
use std::sync::LazyLock;
use subcoin_primitives::IndexedBlock;

// NOTE: The checkpoints were initially copied from btcd.
static CHECKPOINTS: Lazy<Vec<IndexedBlock>> = Lazy::new(|| {
static CHECKPOINTS: LazyLock<Vec<IndexedBlock>> = LazyLock::new(|| {
let mut start = 0u32;
[
(
Expand Down
20 changes: 9 additions & 11 deletions crates/subcoin-network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,21 @@

mod address_book;
mod checkpoint;
mod connection;
mod metrics;
mod net_processor;
mod network_api;
mod network_processor;
mod peer_connection;
mod peer_manager;
mod peer_store;
mod sync;
#[cfg(test)]
mod tests;
mod transaction_manager;

use crate::connection::ConnectionInitiator;
use crate::metrics::BandwidthMetrics;
use crate::net_processor::NetworkProcessor;
use crate::network_api::NetworkProcessorMessage;
use crate::network_processor::NetworkProcessor;
use crate::peer_connection::ConnectionInitiator;
use crate::peer_store::{PersistentPeerStore, PersistentPeerStoreHandle};
use bitcoin::p2p::ServiceFlags;
use bitcoin::{BlockHash, Network as BitcoinNetwork};
Expand Down Expand Up @@ -120,8 +120,8 @@ pub enum Error {
SlowPeer(Latency),
#[error("Unexpected pong message")]
UnexpectedPong,
#[error("Invalid pong message: bad nonce")]
BadPong,
#[error("Bad nonce in pong, expected: {expected}, got: {got}")]
BadPong { expected: u64, got: u64 },
#[error("Cannot find the parent of the first header in headers message")]
MissingFirstHeaderParent,
#[error("Other: {0}")]
Expand Down Expand Up @@ -364,8 +364,8 @@ where

spawn_handle.spawn("peer-store", None, persistent_peer_store.run(receiver));

let network_processor = NetworkProcessor::new(
net_processor::Params {
let net_processor = NetworkProcessor::new(
network_processor::Params {
client: client.clone(),
header_verifier: HeaderVerifier::new(
client.clone(),
Expand Down Expand Up @@ -461,9 +461,7 @@ where
}
}

network_processor
.run(processor_msg_receiver, bandwidth)
.await;
net_processor.run(processor_msg_receiver, bandwidth).await;

Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::connection::{ConnectionInitiator, Direction, NewConnection};
use crate::metrics::Metrics;
use crate::network_api::{
IncomingTransaction, NetworkProcessorMessage, NetworkStatus, SendTransactionResult,
};
use crate::peer_connection::{ConnectionInitiator, Direction, NewConnection};
use crate::peer_manager::{Config, NewPeer, PeerManager, SlowPeer, PEER_LATENCY_THRESHOLD};
use crate::peer_store::PeerStore;
use crate::sync::{ChainSync, LocatorRequest, RestartReason, SyncAction, SyncRequest};
Expand Down Expand Up @@ -80,7 +80,7 @@ pub struct NetworkProcessor<Block, Client> {
transaction_manager: TransactionManager,
network_event_receiver: UnboundedReceiver<Event>,
/// Broadcasted blocks that are being requested.
inflight_announced_blocks: HashMap<PeerId, HashSet<BlockHash>>,
requested_block_announce: HashMap<PeerId, HashSet<BlockHash>>,
metrics: Option<Metrics>,
}

Expand Down Expand Up @@ -139,7 +139,7 @@ where
peer_manager,
header_verifier,
transaction_manager: TransactionManager::new(),
inflight_announced_blocks: HashMap::new(),
requested_block_announce: HashMap::new(),
network_event_receiver,
metrics,
}
Expand Down Expand Up @@ -466,7 +466,7 @@ where

let mut is_new_block_announce = false;

self.inflight_announced_blocks
self.requested_block_announce
.entry(from)
.and_modify(|announcements| {
is_new_block_announce = announcements.insert(block_hash);
Expand All @@ -493,13 +493,13 @@ where
let block_hash = block.block_hash();

if self
.inflight_announced_blocks
.requested_block_announce
.get(&from)
.map(|annoucements| annoucements.contains(&block_hash))
.unwrap_or(false)
.map_or(false, |annoucements| annoucements.contains(&block_hash))
{
tracing::debug!("Recv announced block {block_hash} from {from:?}");
self.inflight_announced_blocks.entry(from).and_modify(|e| {

self.requested_block_announce.entry(from).and_modify(|e| {
e.remove(&block_hash);
});

Expand All @@ -509,19 +509,23 @@ where
tracing::debug!(?block_hash, "No height in coinbase transaction");
}

let Some(best_hash) = self.client.block_hash(self.client.best_number()) else {
return Ok(SyncAction::None);
};

if block.header.prev_blockhash == best_hash
&& self.client.block_number(block_hash).is_none()
{
self.chain_sync
.import_queue
.import_blocks(sc_consensus_nakamoto::ImportBlocks {
origin: sp_consensus::BlockOrigin::NetworkBroadcast,
blocks: vec![block],
});
if self.client.substrate_block_hash_for(block_hash).is_some() {
// Block has already been processed.
} else {
let best_hash = self
.client
.block_hash(self.client.best_number())
.expect("Best hash must exist; qed");

// TODO: handle the orphan block?
if block.header.prev_blockhash == best_hash {
self.chain_sync.import_queue.import_blocks(
sc_consensus_nakamoto::ImportBlocks {
origin: sp_consensus::BlockOrigin::NetworkBroadcast,
blocks: vec![block],
},
);
}
}

return Ok(SyncAction::None);
Expand Down Expand Up @@ -562,7 +566,7 @@ where
.map(|header| {
let block_hash = header.block_hash();

self.inflight_announced_blocks
self.requested_block_announce
.entry(from)
.and_modify(|e| {
e.insert(block_hash);
Expand Down Expand Up @@ -613,7 +617,7 @@ where
fn do_sync_action(&mut self, sync_action: SyncAction) {
match sync_action {
SyncAction::Request(sync_request) => match sync_request {
SyncRequest::GetHeaders(request) => {
SyncRequest::Header(request) => {
let LocatorRequest {
locator_hashes,
stop_hash,
Expand All @@ -629,20 +633,20 @@ where
let _ = self.send(to, NetworkMessage::GetHeaders(msg));
}
}
SyncRequest::GetBlocks(request) => {
self.send_get_blocks_request(request);
SyncRequest::Inventory(request) => {
self.send_get_blocks_message(request);
}
SyncRequest::GetData(invs, to) => {
SyncRequest::Data(invs, to) => {
if !invs.is_empty() {
let _ = self.send(to, NetworkMessage::GetData(invs));
}
}
},
SyncAction::SwitchToBlocksFirstSync => {
if let Some(SyncAction::Request(SyncRequest::GetBlocks(request))) =
if let Some(SyncAction::Request(SyncRequest::Inventory(request))) =
self.chain_sync.attempt_blocks_first_sync()
{
self.send_get_blocks_request(request);
self.send_get_blocks_message(request);
}
}
SyncAction::SetIdle => {
Expand All @@ -660,7 +664,7 @@ where
}
}

fn send_get_blocks_request(&self, request: LocatorRequest) {
fn send_get_blocks_message(&self, request: LocatorRequest) {
let LocatorRequest {
locator_hashes,
stop_hash,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::net_processor::Event;
use crate::network_processor::Event;
use crate::{Bandwidth, Error, PeerId};
use bitcoin::consensus::{encode, Decodable, Encodable};
use bitcoin::p2p::message::{NetworkMessage, RawNetworkMessage, MAX_MSG_SIZE};
Expand Down
15 changes: 8 additions & 7 deletions crates/subcoin-network/src/peer_manager.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::address_book::AddressBook;
use crate::connection::{
use crate::metrics::Metrics;
use crate::peer_connection::{
ConnectionCloser, ConnectionInitiator, ConnectionWriter, Direction, NewConnection,
};
use crate::metrics::Metrics;
use crate::{validate_outbound_services, Error, Latency, LocalTime, PeerId};
use bitcoin::p2p::address::AddrV2Message;
use bitcoin::p2p::message::NetworkMessage;
Expand Down Expand Up @@ -434,7 +434,7 @@ where
}

pub(crate) fn on_outbound_connection_failure(&mut self, addr: PeerId, err: Error) {
tracing::trace!(?err, ?addr, "Failed to initiate outbound connection");
tracing::trace!(?err, "Failed to initiate outbound connection to {addr:?}");

self.address_book.note_failed_address(addr);

Expand Down Expand Up @@ -465,7 +465,7 @@ where
}

if let Some(connection) = self.connections.remove(&peer_id) {
tracing::debug!(?reason, ?peer_id, "💔 Disconnecting peer");
tracing::debug!(?reason, "💔 Disconnecting peer {peer_id:?}");
connection.closer.terminate();

if let Some(metrics) = &self.metrics {
Expand Down Expand Up @@ -710,7 +710,7 @@ where

match direction {
Direction::Inbound => {
// Do not log the inbound connection success as what Bitcoin Core does.
// Do not log the inbound connection success, following Bitcoin Core's behaviour.
#[cfg(test)]
tracing::debug!(?direction, "🤝 New peer {peer_id:?}");
}
Expand Down Expand Up @@ -769,8 +769,9 @@ where
last_ping_at,
nonce: expected,
} => {
if nonce != expected {
return Err(Error::BadPong);
let got = nonce;
if got != expected {
return Err(Error::BadPong { expected, got });
}

let duration = last_ping_at.elapsed();
Expand Down
55 changes: 37 additions & 18 deletions crates/subcoin-network/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,13 @@ pub(crate) struct LocatorRequest {

/// Represents different kinds of sync requests.
#[derive(Debug)]
// We prefer the variant to align with the actual message mame.
#[allow(clippy::enum_variant_names)]
pub(crate) enum SyncRequest {
/// Request headers via `getheaders`.
GetHeaders(LocatorRequest),
Header(LocatorRequest),
/// Request inventories via `getblocks`.
GetBlocks(LocatorRequest),
Inventory(LocatorRequest),
/// Request blocks via `getdata`.
GetData(Vec<Inventory>, PeerId),
Data(Vec<Inventory>, PeerId),
}

/// Represents actions that can be taken during the syncing.
Expand All @@ -128,6 +126,20 @@ pub(crate) enum SyncAction {
None,
}

impl SyncAction {
pub(crate) fn get_headers(request: LocatorRequest) -> Self {
Self::Request(SyncRequest::Header(request))
}

pub(crate) fn get_inventory(request: LocatorRequest) -> Self {
Self::Request(SyncRequest::Inventory(request))
}

pub(crate) fn get_data(inv: Vec<Inventory>, from: PeerId) -> Self {
Self::Request(SyncRequest::Data(inv, from))
}
}

#[derive(Debug)]
pub(crate) enum RestartReason {
Stalled,
Expand Down Expand Up @@ -257,18 +269,25 @@ where
}

pub(super) fn update_peer_best(&mut self, peer_id: PeerId, peer_best: u32) {
let mut peer_best_updated = false;

self.peers.entry(peer_id).and_modify(|e| {
tracing::debug!(
"Tip of {peer_id:?} updated from #{} to #{peer_best}",
e.best_number
);
e.best_number = peer_best;
if peer_best > e.best_number {
e.best_number = peer_best;
peer_best_updated = true;
tracing::debug!(
"Tip of {peer_id:?} updated from #{} to #{peer_best}",
e.best_number
);
}
});

match &mut self.syncing {
Syncing::Idle => {}
Syncing::BlocksFirst(strategy) => strategy.update_peer_best(peer_id, peer_best),
Syncing::HeadersFirst(strategy) => strategy.update_peer_best(peer_id, peer_best),
if peer_best_updated {
match &mut self.syncing {
Syncing::Idle => {}
Syncing::BlocksFirst(strategy) => strategy.set_peer_best(peer_id, peer_best),
Syncing::HeadersFirst(strategy) => strategy.set_peer_best(peer_id, peer_best),
}
}
}

Expand Down Expand Up @@ -397,8 +416,8 @@ where
pub(super) fn update_sync_peer_on_lower_latency(&mut self) {
let maybe_sync_peer_id = match &self.syncing {
Syncing::Idle => return,
Syncing::BlocksFirst(strategy) => strategy.replaceable_sync_peer(),
Syncing::HeadersFirst(strategy) => strategy.replaceable_sync_peer(),
Syncing::BlocksFirst(strategy) => strategy.can_swap_sync_peer(),
Syncing::HeadersFirst(strategy) => strategy.can_swap_sync_peer(),
};

let Some(current_sync_peer_id) = maybe_sync_peer_id else {
Expand Down Expand Up @@ -445,11 +464,11 @@ where

let sync_peer_updated = match &mut self.syncing {
Syncing::BlocksFirst(strategy) => {
strategy.replace_sync_peer(peer_id, target_block_number);
strategy.swap_sync_peer(peer_id, target_block_number);
true
}
Syncing::HeadersFirst(strategy) => {
strategy.replace_sync_peer(peer_id, target_block_number);
strategy.swap_sync_peer(peer_id, target_block_number);
true
}
Syncing::Idle => unreachable!("Must not be Idle as checked; qed"),
Expand Down
Loading