From 37f3933db2d8a5a909323cf8eca58646d0f1e04b Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Tue, 10 Dec 2024 15:56:32 -0500 Subject: [PATCH] feat: add NetworkPrimitives to NetworkBuilder (#13169) Co-authored-by: Arsenii Kulikov --- Cargo.lock | 1 - crates/e2e-test-utils/src/network.rs | 6 +- crates/ethereum/consensus/src/lib.rs | 120 ++++++++++-------- crates/ethereum/consensus/src/validation.rs | 25 ++-- crates/ethereum/node/src/node.rs | 4 +- crates/net/downloaders/src/bodies/bodies.rs | 5 +- crates/net/downloaders/src/bodies/queue.rs | 2 +- crates/net/downloaders/src/bodies/request.rs | 2 +- crates/net/network-api/src/events.rs | 7 +- crates/net/network-api/src/lib.rs | 5 +- crates/net/network/src/network.rs | 6 +- crates/net/p2p/src/bodies/client.rs | 6 +- crates/net/p2p/src/bodies/downloader.rs | 2 +- crates/net/p2p/src/headers/client.rs | 3 +- crates/node/builder/Cargo.toml | 1 - crates/node/builder/src/builder/mod.rs | 32 +++-- crates/node/builder/src/components/builder.rs | 37 +++++- crates/node/builder/src/components/mod.rs | 23 ++-- crates/node/builder/src/components/network.rs | 16 ++- crates/node/builder/src/launch/common.rs | 3 +- crates/node/builder/src/setup.rs | 4 +- crates/optimism/node/src/node.rs | 4 +- crates/stages/stages/src/sets.rs | 6 +- crates/stages/stages/src/stages/execution.rs | 9 +- crates/stages/stages/src/stages/merkle.rs | 10 +- .../stages/src/stages/sender_recovery.rs | 5 +- 26 files changed, 201 insertions(+), 143 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d7540dc437c8..27a33b69f853 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8118,7 +8118,6 @@ dependencies = [ name = "reth-node-builder" version = "1.1.3" dependencies = [ - "alloy-consensus", "alloy-primitives", "alloy-rpc-types", "aquamarine", diff --git a/crates/e2e-test-utils/src/network.rs b/crates/e2e-test-utils/src/network.rs index ce9d0b94612b..8d8ea68aa93a 100644 --- a/crates/e2e-test-utils/src/network.rs +++ b/crates/e2e-test-utils/src/network.rs @@ -1,7 +1,7 @@ use futures_util::StreamExt; use reth_network_api::{ events::PeerEvent, test_utils::PeersHandleProvider, NetworkEvent, NetworkEventListenerProvider, - PeersInfo, + PeerRequest, PeersInfo, }; use reth_network_peers::{NodeRecord, PeerId}; use reth_tokio_util::EventStream; @@ -9,8 +9,8 @@ use reth_tracing::tracing::info; /// Helper for network operations #[derive(Debug)] -pub struct NetworkTestContext { - network_events: EventStream, +pub struct NetworkTestContext { + network_events: EventStream>>, network: Network, } diff --git a/crates/ethereum/consensus/src/lib.rs b/crates/ethereum/consensus/src/lib.rs index 4d3ba6282694..2eef9188671f 100644 --- a/crates/ethereum/consensus/src/lib.rs +++ b/crates/ethereum/consensus/src/lib.rs @@ -8,7 +8,8 @@ #![cfg_attr(not(test), warn(unused_crate_dependencies))] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] -use alloy_consensus::{Header, EMPTY_OMMER_ROOT_HASH}; +use alloy_consensus::{BlockHeader, EMPTY_OMMER_ROOT_HASH}; +use alloy_eips::merge::ALLOWED_FUTURE_BLOCK_TIME_SECONDS; use alloy_primitives::U256; use reth_chainspec::{EthChainSpec, EthereumHardfork, EthereumHardforks}; use reth_consensus::{ @@ -20,10 +21,8 @@ use reth_consensus_common::validation::{ validate_against_parent_timestamp, validate_block_pre_execution, validate_body_against_header, validate_header_base_fee, validate_header_extradata, validate_header_gas, }; -use reth_primitives::{ - Block, BlockBody, BlockWithSenders, NodePrimitives, Receipt, SealedBlock, SealedHeader, -}; -use reth_primitives_traits::constants::MINIMUM_GAS_LIMIT; +use reth_primitives::{BlockWithSenders, NodePrimitives, Receipt, SealedBlock, SealedHeader}; +use reth_primitives_traits::{constants::MINIMUM_GAS_LIMIT, BlockBody}; use std::{fmt::Debug, sync::Arc, time::SystemTime}; /// The bound divisor of the gas limit, used in update calculations. @@ -51,43 +50,46 @@ impl EthBeaconConsensus /// /// The maximum allowable difference between self and parent gas limits is determined by the /// parent's gas limit divided by the [`GAS_LIMIT_BOUND_DIVISOR`]. - fn validate_against_parent_gas_limit( + fn validate_against_parent_gas_limit( &self, - header: &SealedHeader, - parent: &SealedHeader, + header: &SealedHeader, + parent: &SealedHeader, ) -> Result<(), ConsensusError> { // Determine the parent gas limit, considering elasticity multiplier on the London fork. let parent_gas_limit = - if self.chain_spec.fork(EthereumHardfork::London).transitions_at_block(header.number) { - parent.gas_limit * + if self.chain_spec.fork(EthereumHardfork::London).transitions_at_block(header.number()) + { + parent.gas_limit() * self.chain_spec - .base_fee_params_at_timestamp(header.timestamp) + .base_fee_params_at_timestamp(header.timestamp()) .elasticity_multiplier as u64 } else { - parent.gas_limit + parent.gas_limit() }; // Check for an increase in gas limit beyond the allowed threshold. - - if header.gas_limit > parent_gas_limit { - if header.gas_limit - parent_gas_limit >= parent_gas_limit / GAS_LIMIT_BOUND_DIVISOR { + if header.gas_limit() > parent_gas_limit { + if header.gas_limit() - parent_gas_limit >= parent_gas_limit / GAS_LIMIT_BOUND_DIVISOR { return Err(ConsensusError::GasLimitInvalidIncrease { parent_gas_limit, - child_gas_limit: header.gas_limit, + child_gas_limit: header.gas_limit(), }) } } // Check for a decrease in gas limit beyond the allowed threshold. - else if parent_gas_limit - header.gas_limit >= parent_gas_limit / GAS_LIMIT_BOUND_DIVISOR + else if parent_gas_limit - header.gas_limit() >= + parent_gas_limit / GAS_LIMIT_BOUND_DIVISOR { return Err(ConsensusError::GasLimitInvalidDecrease { parent_gas_limit, - child_gas_limit: header.gas_limit, + child_gas_limit: header.gas_limit(), }) } // Check if the self gas limit is below the minimum required limit. - else if header.gas_limit < MINIMUM_GAS_LIMIT { - return Err(ConsensusError::GasLimitInvalidMinimum { child_gas_limit: header.gas_limit }) + else if header.gas_limit() < MINIMUM_GAS_LIMIT { + return Err(ConsensusError::GasLimitInvalidMinimum { + child_gas_limit: header.gas_limit(), + }) } Ok(()) @@ -97,72 +99,75 @@ impl EthBeaconConsensus impl FullConsensus for EthBeaconConsensus where ChainSpec: Send + Sync + EthChainSpec + EthereumHardforks + Debug, - N: NodePrimitives< - BlockHeader = Header, - BlockBody = BlockBody, - Block = Block, - Receipt = Receipt, - >, + N: NodePrimitives, { fn validate_block_post_execution( &self, - block: &BlockWithSenders, + block: &BlockWithSenders, input: PostExecutionInput<'_>, ) -> Result<(), ConsensusError> { validate_block_post_execution(block, &self.chain_spec, input.receipts, input.requests) } } -impl Consensus +impl Consensus for EthBeaconConsensus +where + H: BlockHeader, + B: BlockBody, { fn validate_body_against_header( &self, - body: &BlockBody, - header: &SealedHeader, + body: &B, + header: &SealedHeader, ) -> Result<(), ConsensusError> { validate_body_against_header(body, header.header()) } - fn validate_block_pre_execution(&self, block: &SealedBlock) -> Result<(), ConsensusError> { + fn validate_block_pre_execution( + &self, + block: &SealedBlock, + ) -> Result<(), ConsensusError> { validate_block_pre_execution(block, &self.chain_spec) } } -impl HeaderValidator +impl HeaderValidator for EthBeaconConsensus +where + H: BlockHeader, { - fn validate_header(&self, header: &SealedHeader) -> Result<(), ConsensusError> { + fn validate_header(&self, header: &SealedHeader) -> Result<(), ConsensusError> { validate_header_gas(header.header())?; validate_header_base_fee(header.header(), &self.chain_spec)?; // EIP-4895: Beacon chain push withdrawals as operations - if self.chain_spec.is_shanghai_active_at_timestamp(header.timestamp) && - header.withdrawals_root.is_none() + if self.chain_spec.is_shanghai_active_at_timestamp(header.timestamp()) && + header.withdrawals_root().is_none() { return Err(ConsensusError::WithdrawalsRootMissing) - } else if !self.chain_spec.is_shanghai_active_at_timestamp(header.timestamp) && - header.withdrawals_root.is_some() + } else if !self.chain_spec.is_shanghai_active_at_timestamp(header.timestamp()) && + header.withdrawals_root().is_some() { return Err(ConsensusError::WithdrawalsRootUnexpected) } // Ensures that EIP-4844 fields are valid once cancun is active. - if self.chain_spec.is_cancun_active_at_timestamp(header.timestamp) { + if self.chain_spec.is_cancun_active_at_timestamp(header.timestamp()) { validate_4844_header_standalone(header.header())?; - } else if header.blob_gas_used.is_some() { + } else if header.blob_gas_used().is_some() { return Err(ConsensusError::BlobGasUsedUnexpected) - } else if header.excess_blob_gas.is_some() { + } else if header.excess_blob_gas().is_some() { return Err(ConsensusError::ExcessBlobGasUnexpected) - } else if header.parent_beacon_block_root.is_some() { + } else if header.parent_beacon_block_root().is_some() { return Err(ConsensusError::ParentBeaconBlockRootUnexpected) } - if self.chain_spec.is_prague_active_at_timestamp(header.timestamp) { - if header.requests_hash.is_none() { + if self.chain_spec.is_prague_active_at_timestamp(header.timestamp()) { + if header.requests_hash().is_none() { return Err(ConsensusError::RequestsHashMissing) } - } else if header.requests_hash.is_some() { + } else if header.requests_hash().is_some() { return Err(ConsensusError::RequestsHashUnexpected) } @@ -171,8 +176,8 @@ impl HeaderVa fn validate_header_against_parent( &self, - header: &SealedHeader, - parent: &SealedHeader, + header: &SealedHeader, + parent: &SealedHeader, ) -> Result<(), ConsensusError> { validate_against_parent_hash_number(header.header(), parent)?; @@ -189,7 +194,7 @@ impl HeaderVa )?; // ensure that the blob gas fields for this block - if self.chain_spec.is_cancun_active_at_timestamp(header.timestamp) { + if self.chain_spec.is_cancun_active_at_timestamp(header.timestamp()) { validate_against_parent_4844(header.header(), parent.header())?; } @@ -198,24 +203,26 @@ impl HeaderVa fn validate_header_with_total_difficulty( &self, - header: &Header, + header: &H, total_difficulty: U256, ) -> Result<(), ConsensusError> { let is_post_merge = self .chain_spec .fork(EthereumHardfork::Paris) - .active_at_ttd(total_difficulty, header.difficulty); + .active_at_ttd(total_difficulty, header.difficulty()); if is_post_merge { - if !header.is_zero_difficulty() { + // TODO: add `is_zero_difficulty` to `alloy_consensus::BlockHeader` trait + if !header.difficulty().is_zero() { return Err(ConsensusError::TheMergeDifficultyIsNotZero) } - if !header.nonce.is_zero() { + // TODO: helper fn in `alloy_consensus::BlockHeader` trait + if !header.nonce().is_some_and(|nonce| nonce.is_zero()) { return Err(ConsensusError::TheMergeNonceIsNotZero) } - if header.ommers_hash != EMPTY_OMMER_ROOT_HASH { + if header.ommers_hash() != EMPTY_OMMER_ROOT_HASH { return Err(ConsensusError::TheMergeOmmerRootIsNotEmpty) } @@ -241,9 +248,10 @@ impl HeaderVa let present_timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); - if header.exceeds_allowed_future_timestamp(present_timestamp) { + // TODO: move this to `alloy_consensus::BlockHeader` + if header.timestamp() > present_timestamp + ALLOWED_FUTURE_BLOCK_TIME_SECONDS { return Err(ConsensusError::TimestampIsInFuture { - timestamp: header.timestamp, + timestamp: header.timestamp(), present_timestamp, }) } @@ -263,7 +271,7 @@ mod tests { use reth_primitives::proofs; fn header_with_gas_limit(gas_limit: u64) -> SealedHeader { - let header = Header { gas_limit, ..Default::default() }; + let header = reth_primitives::Header { gas_limit, ..Default::default() }; SealedHeader::new(header, B256::ZERO) } @@ -343,7 +351,7 @@ mod tests { // that the header is valid let chain_spec = Arc::new(ChainSpecBuilder::mainnet().shanghai_activated().build()); - let header = Header { + let header = reth_primitives::Header { base_fee_per_gas: Some(1337), withdrawals_root: Some(proofs::calculate_withdrawals_root(&[])), ..Default::default() diff --git a/crates/ethereum/consensus/src/validation.rs b/crates/ethereum/consensus/src/validation.rs index c339c8d25c6f..b9b38b6d51c2 100644 --- a/crates/ethereum/consensus/src/validation.rs +++ b/crates/ethereum/consensus/src/validation.rs @@ -1,26 +1,31 @@ -use alloy_consensus::{proofs::calculate_receipt_root, TxReceipt}; +use alloy_consensus::{proofs::calculate_receipt_root, BlockHeader, TxReceipt}; use alloy_eips::eip7685::Requests; use alloy_primitives::{Bloom, B256}; use reth_chainspec::EthereumHardforks; use reth_consensus::ConsensusError; use reth_primitives::{gas_spent_by_transactions, BlockWithSenders, GotExpected, Receipt}; +use reth_primitives_traits::Block; /// Validate a block with regard to execution results: /// /// - Compares the receipts root in the block header to the block body /// - Compares the gas used in the block header to the actual gas usage after execution -pub fn validate_block_post_execution( - block: &BlockWithSenders, +pub fn validate_block_post_execution( + block: &BlockWithSenders, chain_spec: &ChainSpec, receipts: &[Receipt], requests: &Requests, -) -> Result<(), ConsensusError> { +) -> Result<(), ConsensusError> +where + B: Block, + ChainSpec: EthereumHardforks, +{ // Check if gas used matches the value set in header. let cumulative_gas_used = receipts.last().map(|receipt| receipt.cumulative_gas_used).unwrap_or(0); - if block.gas_used != cumulative_gas_used { + if block.header().gas_used() != cumulative_gas_used { return Err(ConsensusError::BlockGasUsed { - gas: GotExpected { got: cumulative_gas_used, expected: block.gas_used }, + gas: GotExpected { got: cumulative_gas_used, expected: block.header().gas_used() }, gas_spent_by_tx: gas_spent_by_transactions(receipts), }) } @@ -29,9 +34,9 @@ pub fn validate_block_post_execution( // operation as hashing that is required for state root got calculated in every // transaction This was replaced with is_success flag. // See more about EIP here: https://eips.ethereum.org/EIPS/eip-658 - if chain_spec.is_byzantium_active_at_block(block.header.number) { + if chain_spec.is_byzantium_active_at_block(block.header().number()) { if let Err(error) = - verify_receipts(block.header.receipts_root, block.header.logs_bloom, receipts) + verify_receipts(block.header().receipts_root(), block.header().logs_bloom(), receipts) { tracing::debug!(%error, ?receipts, "receipts verification failed"); return Err(error) @@ -39,8 +44,8 @@ pub fn validate_block_post_execution( } // Validate that the header requests hash matches the calculated requests hash - if chain_spec.is_prague_active_at_timestamp(block.timestamp) { - let Some(header_requests_hash) = block.header.requests_hash else { + if chain_spec.is_prague_active_at_timestamp(block.header().timestamp()) { + let Some(header_requests_hash) = block.header().requests_hash() else { return Err(ConsensusError::RequestsHashMissing) }; let requests_hash = requests.requests_hash(); diff --git a/crates/ethereum/node/src/node.rs b/crates/ethereum/node/src/node.rs index 54707e69b26b..4d87be68f5dc 100644 --- a/crates/ethereum/node/src/node.rs +++ b/crates/ethereum/node/src/node.rs @@ -10,7 +10,7 @@ use reth_ethereum_engine_primitives::{ }; use reth_evm::execute::BasicBlockExecutorProvider; use reth_evm_ethereum::execute::EthExecutionStrategyFactory; -use reth_network::{NetworkHandle, PeersInfo}; +use reth_network::{EthNetworkPrimitives, NetworkHandle, PeersInfo}; use reth_node_api::{ AddOnsContext, ConfigureEvm, FullNodeComponents, HeaderTy, NodeTypesWithDB, TxTy, }; @@ -318,6 +318,8 @@ where > + Unpin + 'static, { + type Primitives = EthNetworkPrimitives; + async fn build_network( self, ctx: &BuilderContext, diff --git a/crates/net/downloaders/src/bodies/bodies.rs b/crates/net/downloaders/src/bodies/bodies.rs index 54026070ec8b..2f6015a09166 100644 --- a/crates/net/downloaders/src/bodies/bodies.rs +++ b/crates/net/downloaders/src/bodies/bodies.rs @@ -464,7 +464,10 @@ impl OrderedBodiesResponse { } } -impl OrderedBodiesResponse { +impl OrderedBodiesResponse +where + H: BlockHeader, +{ /// Returns the block number of the first element /// /// # Panics diff --git a/crates/net/downloaders/src/bodies/queue.rs b/crates/net/downloaders/src/bodies/queue.rs index ed8c425e6114..892eae14cbb1 100644 --- a/crates/net/downloaders/src/bodies/queue.rs +++ b/crates/net/downloaders/src/bodies/queue.rs @@ -54,7 +54,6 @@ where self.inner.clear(); self.last_requested_block_number.take(); } - /// Add new request to the queue. /// Expects a sorted list of headers. pub(crate) fn push_new_request( @@ -71,6 +70,7 @@ where None => last.number(), }) .or(self.last_requested_block_number); + // Create request and push into the queue. self.inner.push( BodiesRequestFuture::new(client, consensus, self.metrics.clone()).with_headers(request), diff --git a/crates/net/downloaders/src/bodies/request.rs b/crates/net/downloaders/src/bodies/request.rs index 92f46fa6fdd6..a3ad1f3b9dc2 100644 --- a/crates/net/downloaders/src/bodies/request.rs +++ b/crates/net/downloaders/src/bodies/request.rs @@ -56,8 +56,8 @@ pub(crate) struct BodiesRequestFuture { impl BodiesRequestFuture where - B: BodiesClient + 'static, H: BlockHeader, + B: BodiesClient + 'static, { /// Returns an empty future. Use [`BodiesRequestFuture::with_headers`] to set the request. pub(crate) fn new( diff --git a/crates/net/network-api/src/events.rs b/crates/net/network-api/src/events.rs index e17cedef11fc..39c89f4c4e21 100644 --- a/crates/net/network-api/src/events.rs +++ b/crates/net/network-api/src/events.rs @@ -133,9 +133,12 @@ pub trait NetworkPeersEvents: Send + Sync { /// Provides event subscription for the network. #[auto_impl::auto_impl(&, Arc)] -pub trait NetworkEventListenerProvider: NetworkPeersEvents { +pub trait NetworkEventListenerProvider: NetworkPeersEvents { + /// The primitive types to use in the `PeerRequest` used in the stream. + type Primitives: NetworkPrimitives; + /// Creates a new [`NetworkEvent`] listener channel. - fn event_listener(&self) -> EventStream>; + fn event_listener(&self) -> EventStream>>; /// Returns a new [`DiscoveryEvent`] stream. /// /// This stream yields [`DiscoveryEvent`]s for each peer that is discovered. diff --git a/crates/net/network-api/src/lib.rs b/crates/net/network-api/src/lib.rs index 986d490c34f9..6163c8730033 100644 --- a/crates/net/network-api/src/lib.rs +++ b/crates/net/network-api/src/lib.rs @@ -36,7 +36,6 @@ pub use events::{ use std::{future::Future, net::SocketAddr, sync::Arc, time::Instant}; use reth_eth_wire_types::{capability::Capabilities, DisconnectReason, EthVersion, Status}; -use reth_network_p2p::EthBlockClient; use reth_network_peers::NodeRecord; /// The `PeerId` type. @@ -44,7 +43,7 @@ pub type PeerId = alloy_primitives::B512; /// Helper trait that unifies network API needed to launch node. pub trait FullNetwork: - BlockDownloaderProvider + BlockDownloaderProvider + NetworkSyncUpdater + NetworkInfo + NetworkEventListenerProvider @@ -56,7 +55,7 @@ pub trait FullNetwork: } impl FullNetwork for T where - T: BlockDownloaderProvider + T: BlockDownloaderProvider + NetworkSyncUpdater + NetworkInfo + NetworkEventListenerProvider diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index 68c57724f0df..a25ad0490818 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -205,8 +205,10 @@ impl NetworkPeersEvents for NetworkHandle { } } -impl NetworkEventListenerProvider> for NetworkHandle { - fn event_listener(&self) -> EventStream>> { +impl NetworkEventListenerProvider for NetworkHandle { + type Primitives = N; + + fn event_listener(&self) -> EventStream>> { self.inner.event_sender.new_listener() } diff --git a/crates/net/p2p/src/bodies/client.rs b/crates/net/p2p/src/bodies/client.rs index d48fccc6d000..b31954ff1a00 100644 --- a/crates/net/p2p/src/bodies/client.rs +++ b/crates/net/p2p/src/bodies/client.rs @@ -6,17 +6,17 @@ use std::{ use crate::{download::DownloadClient, error::PeerRequestResult, priority::Priority}; use alloy_primitives::B256; use futures::{Future, FutureExt}; -use reth_primitives::BlockBody; +use reth_primitives_traits::BlockBody; /// The bodies future type -pub type BodiesFut = +pub type BodiesFut = Pin>> + Send + Sync>>; /// A client capable of downloading block bodies. #[auto_impl::auto_impl(&, Arc, Box)] pub trait BodiesClient: DownloadClient { /// The body type this client fetches. - type Body: Send + Sync + Unpin + 'static; + type Body: BlockBody; /// The output of the request future for querying block bodies. type Output: Future>> + Sync + Send + Unpin; diff --git a/crates/net/p2p/src/bodies/downloader.rs b/crates/net/p2p/src/bodies/downloader.rs index b80a308d8a18..ce7827c8e885 100644 --- a/crates/net/p2p/src/bodies/downloader.rs +++ b/crates/net/p2p/src/bodies/downloader.rs @@ -15,7 +15,7 @@ pub type BodyDownloaderResult = DownloadResult>>; pub trait BodyDownloader: Send + Sync + Stream> + Unpin { - /// The type of header that can be returned in a blck + /// The type of header that is being used type Header: Debug + Send + Sync + Unpin + 'static; /// The type of the body that is being downloaded. diff --git a/crates/net/p2p/src/headers/client.rs b/crates/net/p2p/src/headers/client.rs index 4be6208c4a2c..606d8f389a84 100644 --- a/crates/net/p2p/src/headers/client.rs +++ b/crates/net/p2p/src/headers/client.rs @@ -3,6 +3,7 @@ use alloy_consensus::Header; use alloy_eips::BlockHashOrNumber; use futures::{Future, FutureExt}; pub use reth_eth_wire_types::{BlockHeaders, HeadersDirection}; +use reth_primitives_traits::BlockHeader; use std::{ fmt::Debug, pin::Pin, @@ -57,7 +58,7 @@ pub type HeadersFut = #[auto_impl::auto_impl(&, Arc, Box)] pub trait HeadersClient: DownloadClient { /// The header type this client fetches. - type Header: Send + Sync + Unpin; + type Header: BlockHeader; /// The headers future type type Output: Future>> + Sync + Send + Unpin; diff --git a/crates/node/builder/Cargo.toml b/crates/node/builder/Cargo.toml index 26d157e1e0cb..1a0b5bad0a13 100644 --- a/crates/node/builder/Cargo.toml +++ b/crates/node/builder/Cargo.toml @@ -61,7 +61,6 @@ reth-transaction-pool.workspace = true ## ethereum alloy-primitives.workspace = true alloy-rpc-types = { workspace = true, features = ["engine"] } -alloy-consensus.workspace = true revm-primitives.workspace = true ## async diff --git a/crates/node/builder/src/builder/mod.rs b/crates/node/builder/src/builder/mod.rs index e2b18f666c76..e38882fa5d8a 100644 --- a/crates/node/builder/src/builder/mod.rs +++ b/crates/node/builder/src/builder/mod.rs @@ -20,7 +20,7 @@ use reth_db_api::{ use reth_exex::ExExContext; use reth_network::{ transactions::TransactionsManagerConfig, NetworkBuilder, NetworkConfig, NetworkConfigBuilder, - NetworkHandle, NetworkManager, + NetworkHandle, NetworkManager, NetworkPrimitives, }; use reth_node_api::{ FullNodePrimitives, FullNodeTypes, FullNodeTypesAdapter, NodeAddOns, NodeTypes, @@ -648,19 +648,24 @@ impl BuilderContext { /// /// Spawns the configured network and associated tasks and returns the [`NetworkHandle`] /// connected to that network. - pub fn start_network(&self, builder: NetworkBuilder<(), ()>, pool: Pool) -> NetworkHandle + pub fn start_network( + &self, + builder: NetworkBuilder<(), (), N>, + pool: Pool, + ) -> NetworkHandle where + N: NetworkPrimitives, Pool: TransactionPool< Transaction: PoolTransaction< - Consensus = reth_primitives::TransactionSigned, - Pooled = reth_primitives::PooledTransactionsElement, + Consensus = N::BroadcastedTransaction, + Pooled = N::PooledTransaction, >, > + Unpin + 'static, Node::Provider: BlockReader< - Block = reth_primitives::Block, Receipt = reth_primitives::Receipt, - Header = reth_primitives::Header, + Block = N::Block, + Header = N::BlockHeader, >, { self.start_network_with(builder, pool, Default::default()) @@ -672,24 +677,25 @@ impl BuilderContext { /// /// Spawns the configured network and associated tasks and returns the [`NetworkHandle`] /// connected to that network. - pub fn start_network_with( + pub fn start_network_with( &self, - builder: NetworkBuilder<(), ()>, + builder: NetworkBuilder<(), (), N>, pool: Pool, tx_config: TransactionsManagerConfig, - ) -> NetworkHandle + ) -> NetworkHandle where + N: NetworkPrimitives, Pool: TransactionPool< Transaction: PoolTransaction< - Consensus = reth_primitives::TransactionSigned, - Pooled = reth_primitives::PooledTransactionsElement, + Consensus = N::BroadcastedTransaction, + Pooled = N::PooledTransaction, >, > + Unpin + 'static, Node::Provider: BlockReader< - Block = reth_primitives::Block, Receipt = reth_primitives::Receipt, - Header = reth_primitives::Header, + Block = N::Block, + Header = N::BlockHeader, >, { let (handle, network, txpool, eth) = builder diff --git a/crates/node/builder/src/components/builder.rs b/crates/node/builder/src/components/builder.rs index 7e2d0eb43cc0..ce24c8bff8df 100644 --- a/crates/node/builder/src/components/builder.rs +++ b/crates/node/builder/src/components/builder.rs @@ -9,7 +9,8 @@ use crate::{ }; use reth_consensus::FullConsensus; use reth_evm::execute::BlockExecutorProvider; -use reth_node_api::{HeaderTy, NodeTypes, NodeTypesWithEngine, TxTy}; +use reth_network::NetworkPrimitives; +use reth_node_api::{BodyTy, HeaderTy, NodeTypes, NodeTypesWithEngine, TxTy}; use reth_payload_builder::PayloadBuilderHandle; use reth_transaction_pool::{PoolTransaction, TransactionPool}; use std::{future::Future, marker::PhantomData}; @@ -295,13 +296,34 @@ impl NodeComponentsBuilder for ComponentsBuilder where Node: FullNodeTypes, - PoolB: PoolBuilder, - NetworkB: NetworkBuilder, + PoolB: PoolBuilder< + Node, + Pool: TransactionPool< + Transaction: PoolTransaction< + Pooled = ::PooledTransaction, + >, + >, + >, + NetworkB: NetworkBuilder< + Node, + PoolB::Pool, + Primitives: NetworkPrimitives< + BlockHeader = HeaderTy, + BlockBody = BodyTy, + >, + >, PayloadB: PayloadServiceBuilder, ExecB: ExecutorBuilder, ConsB: ConsensusBuilder, { - type Components = Components; + type Components = Components< + Node, + NetworkB::Primitives, + PoolB::Pool, + ExecB::EVM, + ExecB::Executor, + ConsB::Consensus, + >; async fn build_components( self, @@ -369,11 +391,12 @@ pub trait NodeComponentsBuilder: Send { ) -> impl Future> + Send; } -impl NodeComponentsBuilder for F +impl NodeComponentsBuilder for F where + N: NetworkPrimitives, BlockBody = BodyTy>, Node: FullNodeTypes, F: FnOnce(&BuilderContext) -> Fut + Send, - Fut: Future>> + Send, + Fut: Future>> + Send, Pool: TransactionPool>> + Unpin + 'static, @@ -381,7 +404,7 @@ where Executor: BlockExecutorProvider::Primitives>, Cons: FullConsensus<::Primitives> + Clone + Unpin + 'static, { - type Components = Components; + type Components = Components; fn build_components( self, diff --git a/crates/node/builder/src/components/mod.rs b/crates/node/builder/src/components/mod.rs index d62e74bda296..892380a4c6ca 100644 --- a/crates/node/builder/src/components/mod.rs +++ b/crates/node/builder/src/components/mod.rs @@ -20,13 +20,14 @@ pub use execute::*; pub use network::*; pub use payload::*; pub use pool::*; +use reth_network_p2p::BlockClient; use crate::{ConfigureEvm, FullNodeTypes}; use reth_consensus::FullConsensus; use reth_evm::execute::BlockExecutorProvider; -use reth_network::NetworkHandle; +use reth_network::{NetworkHandle, NetworkPrimitives}; use reth_network_api::FullNetwork; -use reth_node_api::{HeaderTy, NodeTypes, NodeTypesWithEngine, PayloadBuilder, TxTy}; +use reth_node_api::{BodyTy, HeaderTy, NodeTypes, NodeTypesWithEngine, PayloadBuilder, TxTy}; use reth_payload_builder::PayloadBuilderHandle; use reth_transaction_pool::{PoolTransaction, TransactionPool}; @@ -49,7 +50,9 @@ pub trait NodeComponents: Clone + Unpin + Send + Sync + 'stati type Consensus: FullConsensus<::Primitives> + Clone + Unpin + 'static; /// Network API. - type Network: FullNetwork; + type Network: FullNetwork< + Client: BlockClient
, Body = BodyTy>, + >; /// Builds new blocks. type PayloadBuilder: PayloadBuilder::Engine> @@ -78,7 +81,7 @@ pub trait NodeComponents: Clone + Unpin + Send + Sync + 'stati /// /// This provides access to all the components of the node. #[derive(Debug)] -pub struct Components { +pub struct Components { /// The transaction pool of the node. pub transaction_pool: Pool, /// The node's EVM configuration, defining settings for the Ethereum Virtual Machine. @@ -88,14 +91,15 @@ pub struct Components { /// The consensus implementation of the node. pub consensus: Consensus, /// The network implementation of the node. - pub network: NetworkHandle, + pub network: NetworkHandle, /// The handle to the payload builder service. pub payload_builder: PayloadBuilderHandle<::Engine>, } -impl NodeComponents - for Components +impl NodeComponents + for Components where + N: NetworkPrimitives, BlockBody = BodyTy>, Node: FullNodeTypes, Pool: TransactionPool>> + Unpin @@ -108,7 +112,7 @@ where type Evm = EVM; type Executor = Executor; type Consensus = Cons; - type Network = NetworkHandle; + type Network = NetworkHandle; type PayloadBuilder = PayloadBuilderHandle<::Engine>; fn pool(&self) -> &Self::Pool { @@ -136,8 +140,9 @@ where } } -impl Clone for Components +impl Clone for Components where + N: NetworkPrimitives, Node: FullNodeTypes, Pool: TransactionPool, EVM: ConfigureEvm
, Transaction = TxTy>, diff --git a/crates/node/builder/src/components/network.rs b/crates/node/builder/src/components/network.rs index 5f473e408f69..33f128a69788 100644 --- a/crates/node/builder/src/components/network.rs +++ b/crates/node/builder/src/components/network.rs @@ -2,33 +2,39 @@ use std::future::Future; -use reth_network::NetworkHandle; +use reth_network::{NetworkHandle, NetworkPrimitives}; use reth_transaction_pool::TransactionPool; use crate::{BuilderContext, FullNodeTypes}; /// A type that knows how to build the network implementation. pub trait NetworkBuilder: Send { + /// The primitive types to use for the network. + type Primitives: NetworkPrimitives; + /// Launches the network implementation and returns the handle to it. fn build_network( self, ctx: &BuilderContext, pool: Pool, - ) -> impl Future> + Send; + ) -> impl Future>> + Send; } -impl NetworkBuilder for F +impl NetworkBuilder for F where Node: FullNodeTypes, + P: NetworkPrimitives, Pool: TransactionPool, F: Fn(&BuilderContext, Pool) -> Fut + Send, - Fut: Future> + Send, + Fut: Future>> + Send, { + type Primitives = P; + fn build_network( self, ctx: &BuilderContext, pool: Pool, - ) -> impl Future> + Send { + ) -> impl Future>> + Send { self(ctx, pool) } } diff --git a/crates/node/builder/src/launch/common.rs b/crates/node/builder/src/launch/common.rs index 62226cb0b1cb..c5275647aa43 100644 --- a/crates/node/builder/src/launch/common.rs +++ b/crates/node/builder/src/launch/common.rs @@ -29,6 +29,7 @@ use reth_node_core::{ args::InvalidBlockHookType, dirs::{ChainPath, DataDirPath}, node_config::NodeConfig, + primitives::BlockHeader, version::{ BUILD_PROFILE_NAME, CARGO_PKG_VERSION, VERGEN_BUILD_TIMESTAMP, VERGEN_CARGO_FEATURES, VERGEN_CARGO_TARGET_TRIPLE, VERGEN_GIT_SHA, @@ -719,7 +720,7 @@ where /// necessary pub async fn max_block(&self, client: C) -> eyre::Result> where - C: HeadersClient
, + C: HeadersClient, { self.node_config().max_block(client, self.provider_factory().clone()).await } diff --git a/crates/node/builder/src/setup.rs b/crates/node/builder/src/setup.rs index 6dff28bd39b7..62cfbac9bea8 100644 --- a/crates/node/builder/src/setup.rs +++ b/crates/node/builder/src/setup.rs @@ -14,7 +14,7 @@ use reth_exex::ExExManagerHandle; use reth_network_p2p::{ bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader, BlockClient, }; -use reth_node_api::{BodyTy, HeaderTy, NodePrimitives}; +use reth_node_api::{BodyTy, HeaderTy}; use reth_provider::{providers::ProviderNodeTypes, ProviderFactory}; use reth_stages::{prelude::DefaultStages, stages::ExecutionStage, Pipeline, StageSet}; use reth_static_file::StaticFileProducer; @@ -41,7 +41,6 @@ where N: ProviderNodeTypes, Client: BlockClient
, Body = BodyTy> + 'static, Executor: BlockExecutorProvider, - N::Primitives: NodePrimitives, { // building network downloaders using the fetch client let header_downloader = ReverseHeadersDownloaderBuilder::new(config.headers) @@ -89,7 +88,6 @@ where H: HeaderDownloader
> + 'static, B: BodyDownloader
, Body = BodyTy> + 'static, Executor: BlockExecutorProvider, - N::Primitives: NodePrimitives, { let mut builder = Pipeline::::builder(); diff --git a/crates/optimism/node/src/node.rs b/crates/optimism/node/src/node.rs index e77b50f1e8f0..43585b3762fb 100644 --- a/crates/optimism/node/src/node.rs +++ b/crates/optimism/node/src/node.rs @@ -11,7 +11,7 @@ use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGenera use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks}; use reth_db::transaction::{DbTx, DbTxMut}; use reth_evm::{execute::BasicBlockExecutorProvider, ConfigureEvm}; -use reth_network::{NetworkConfig, NetworkHandle, NetworkManager, PeersInfo}; +use reth_network::{EthNetworkPrimitives, NetworkConfig, NetworkHandle, NetworkManager, PeersInfo}; use reth_node_api::{AddOnsContext, EngineValidator, FullNodeComponents, NodeAddOns, TxTy}; use reth_node_builder::{ components::{ @@ -656,6 +656,8 @@ where > + Unpin + 'static, { + type Primitives = EthNetworkPrimitives; + async fn build_network( self, ctx: &BuilderContext, diff --git a/crates/stages/stages/src/sets.rs b/crates/stages/stages/src/sets.rs index df5a4c542bfa..53eb23379646 100644 --- a/crates/stages/stages/src/sets.rs +++ b/crates/stages/stages/src/sets.rs @@ -215,7 +215,7 @@ where impl OnlineStages where P: HeaderSyncGapProvider + 'static, - H: HeaderDownloader
+ 'static, + H: HeaderDownloader + 'static, B: BodyDownloader + 'static, { /// Create a new builder using the given headers stage. @@ -236,7 +236,7 @@ where provider: P, tip: watch::Receiver, header_downloader: H, - consensus: Arc, + consensus: Arc>, stages_config: StageConfig, ) -> StageSetBuilder where @@ -258,7 +258,7 @@ where impl StageSet for OnlineStages where P: HeaderSyncGapProvider + 'static, - H: HeaderDownloader
+ 'static, + H: HeaderDownloader + 'static, B: BodyDownloader + 'static, HeaderStage: Stage, BodyStage: Stage, diff --git a/crates/stages/stages/src/stages/execution.rs b/crates/stages/stages/src/stages/execution.rs index 91afc33efaa0..685b0abb9e60 100644 --- a/crates/stages/stages/src/stages/execution.rs +++ b/crates/stages/stages/src/stages/execution.rs @@ -1,5 +1,5 @@ use crate::stages::MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD; -use alloy_consensus::{BlockHeader, Header}; +use alloy_consensus::{BlockHeader, Header, Sealable}; use alloy_eips::{eip1898::BlockWithParent, NumHash}; use alloy_primitives::BlockNumber; use num_traits::Zero; @@ -194,10 +194,7 @@ where unwind_to: Option, ) -> Result<(), StageError> where - Provider: StaticFileProviderFactory - + DBProvider - + BlockReader - + HeaderProvider
, + Provider: StaticFileProviderFactory + DBProvider + BlockReader + HeaderProvider, { // If thre's any receipts pruning configured, receipts are written directly to database and // inconsistencies are expected. @@ -267,7 +264,7 @@ where impl Stage for ExecutionStage where - E: BlockExecutorProvider>, + E: BlockExecutorProvider, Provider: DBProvider + BlockReader< Block = ::Block, diff --git a/crates/stages/stages/src/stages/merkle.rs b/crates/stages/stages/src/stages/merkle.rs index ff4d37cf3f61..8cd7abc7316c 100644 --- a/crates/stages/stages/src/stages/merkle.rs +++ b/crates/stages/stages/src/stages/merkle.rs @@ -136,7 +136,7 @@ where Provider: DBProvider + TrieWriter + StatsReader - + HeaderProvider
+ + HeaderProvider + StageCheckpointReader + StageCheckpointWriter, { @@ -344,18 +344,18 @@ where /// Check that the computed state root matches the root in the expected header. #[inline] -fn validate_state_root( +fn validate_state_root( got: B256, - expected: SealedHeader, + expected: SealedHeader, target_block: BlockNumber, ) -> Result<(), StageError> { - if got == expected.state_root { + if got == expected.state_root() { Ok(()) } else { error!(target: "sync::stages::merkle", ?target_block, ?got, ?expected, "Failed to verify block state root! {INVALID_STATE_ROOT_ERROR_MESSAGE}"); Err(StageError::Block { error: BlockErrorKind::Validation(ConsensusError::BodyStateRootDiff( - GotExpected { got, expected: expected.state_root }.into(), + GotExpected { got, expected: expected.state_root() }.into(), )), block: Box::new(expected.block_with_parent()), }) diff --git a/crates/stages/stages/src/stages/sender_recovery.rs b/crates/stages/stages/src/stages/sender_recovery.rs index b5506068f481..2dcce61b90d0 100644 --- a/crates/stages/stages/src/stages/sender_recovery.rs +++ b/crates/stages/stages/src/stages/sender_recovery.rs @@ -59,7 +59,7 @@ impl Default for SenderRecoveryStage { impl Stage for SenderRecoveryStage where Provider: DBProvider - + BlockReader
+ + BlockReader + StaticFileProviderFactory> + StatsReader + PruneCheckpointReader, @@ -146,8 +146,7 @@ fn recover_range( senders_cursor: &mut CURSOR, ) -> Result<(), StageError> where - Provider: - DBProvider + HeaderProvider
+ StaticFileProviderFactory, + Provider: DBProvider + HeaderProvider + StaticFileProviderFactory, CURSOR: DbCursorRW, { debug!(target: "sync::stages::sender_recovery", ?tx_range, "Sending batch for processing");