Skip to content

Commit

Permalink
feat: use network tx for Pool::Pooled (paradigmxyz#13159)
Browse files Browse the repository at this point in the history
Co-authored-by: Matthias Seitz <[email protected]>
  • Loading branch information
klkvr and mattsse authored Dec 5, 2024
1 parent 4fe5c2a commit 8226fa0
Show file tree
Hide file tree
Showing 17 changed files with 148 additions and 138 deletions.
10 changes: 7 additions & 3 deletions crates/ethereum/node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use reth_node_builder::{
BuilderContext, Node, NodeAdapter, NodeComponentsBuilder, PayloadBuilderConfig, PayloadTypes,
};
use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService};
use reth_primitives::EthPrimitives;
use reth_primitives::{EthPrimitives, PooledTransactionsElement};
use reth_provider::{CanonStateSubscriptions, EthStorage};
use reth_rpc::EthApi;
use reth_tracing::tracing::{debug, info};
Expand Down Expand Up @@ -309,8 +309,12 @@ pub struct EthereumNetworkBuilder {
impl<Node, Pool> NetworkBuilder<Node, Pool> for EthereumNetworkBuilder
where
Node: FullNodeTypes<Types: NodeTypes<ChainSpec = ChainSpec, Primitives = EthPrimitives>>,
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>>
+ Unpin
Pool: TransactionPool<
Transaction: PoolTransaction<
Consensus = TxTy<Node::Types>,
Pooled = PooledTransactionsElement,
>,
> + Unpin
+ 'static,
{
async fn build_network(
Expand Down
26 changes: 19 additions & 7 deletions crates/net/network/src/test_utils/testnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use reth_network_api::{
NetworkEvent, NetworkEventListenerProvider, NetworkInfo, Peers,
};
use reth_network_peers::PeerId;
use reth_primitives::TransactionSigned;
use reth_primitives::{PooledTransactionsElement, TransactionSigned};
use reth_provider::{test_utils::NoopProvider, ChainSpecProvider};
use reth_storage_api::{BlockReader, BlockReaderIdExt, HeaderProvider, StateProviderFactory};
use reth_tasks::TokioTaskExecutor;
Expand Down Expand Up @@ -206,8 +206,12 @@ where
+ Clone
+ Unpin
+ 'static,
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TransactionSigned>>
+ Unpin
Pool: TransactionPool<
Transaction: PoolTransaction<
Consensus = TransactionSigned,
Pooled = PooledTransactionsElement,
>,
> + Unpin
+ 'static,
{
/// Spawns the testnet to a separate task
Expand Down Expand Up @@ -273,8 +277,12 @@ where
> + HeaderProvider
+ Unpin
+ 'static,
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TransactionSigned>>
+ Unpin
Pool: TransactionPool<
Transaction: PoolTransaction<
Consensus = TransactionSigned,
Pooled = PooledTransactionsElement,
>,
> + Unpin
+ 'static,
{
type Output = ();
Expand Down Expand Up @@ -476,8 +484,12 @@ where
> + HeaderProvider
+ Unpin
+ 'static,
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TransactionSigned>>
+ Unpin
Pool: TransactionPool<
Transaction: PoolTransaction<
Consensus = TransactionSigned,
Pooled = PooledTransactionsElement,
>,
> + Unpin
+ 'static,
{
type Output = ();
Expand Down
21 changes: 7 additions & 14 deletions crates/net/network/src/transactions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ use reth_network_p2p::{
};
use reth_network_peers::PeerId;
use reth_network_types::ReputationChangeKind;
use reth_primitives::{
transaction::SignedTransactionIntoRecoveredExt, RecoveredTx, TransactionSigned,
};
use reth_primitives::{transaction::SignedTransactionIntoRecoveredExt, TransactionSigned};
use reth_primitives_traits::{SignedTransaction, TxType};
use reth_tokio_util::EventStream;
use reth_transaction_pool::{
Expand Down Expand Up @@ -703,10 +701,8 @@ where
BroadcastedTransaction: SignedTransaction,
PooledTransaction: SignedTransaction,
>,
Pool::Transaction: PoolTransaction<
Consensus = N::BroadcastedTransaction,
Pooled: Into<N::PooledTransaction> + From<RecoveredTx<N::PooledTransaction>>,
>,
Pool::Transaction:
PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
{
/// Invoked when transactions in the local mempool are considered __pending__.
///
Expand Down Expand Up @@ -991,13 +987,12 @@ where
let _ = response.send(Ok(PooledTransactions::default()));
return
}
let transactions = self.pool.get_pooled_transactions_as::<N::PooledTransaction>(
let transactions = self.pool.get_pooled_transaction_elements(
request.0,
GetPooledTransactionLimit::ResponseSizeSoftLimit(
self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
),
);

trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer");

// we sent a response at which point we assume that the peer is aware of the
Expand Down Expand Up @@ -1247,7 +1242,7 @@ where
} else {
// this is a new transaction that should be imported into the pool

let pool_transaction = Pool::Transaction::from_pooled(tx.into());
let pool_transaction = Pool::Transaction::from_pooled(tx);
new_txs.push(pool_transaction);

entry.insert(HashSet::from([peer_id]));
Expand Down Expand Up @@ -1338,10 +1333,8 @@ where
BroadcastedTransaction: SignedTransaction,
PooledTransaction: SignedTransaction,
>,
Pool::Transaction: PoolTransaction<
Consensus = N::BroadcastedTransaction,
Pooled: Into<N::PooledTransaction> + From<RecoveredTx<N::PooledTransaction>>,
>,
Pool::Transaction:
PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
{
type Output = ();

Expand Down
10 changes: 8 additions & 2 deletions crates/node/builder/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,10 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
pub fn start_network<Pool>(&self, builder: NetworkBuilder<(), ()>, pool: Pool) -> NetworkHandle
where
Pool: TransactionPool<
Transaction: PoolTransaction<Consensus = reth_primitives::TransactionSigned>,
Transaction: PoolTransaction<
Consensus = reth_primitives::TransactionSigned,
Pooled = reth_primitives::PooledTransactionsElement,
>,
> + Unpin
+ 'static,
Node::Provider: BlockReader<
Expand All @@ -677,7 +680,10 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
) -> NetworkHandle
where
Pool: TransactionPool<
Transaction: PoolTransaction<Consensus = reth_primitives::TransactionSigned>,
Transaction: PoolTransaction<
Consensus = reth_primitives::TransactionSigned,
Pooled = reth_primitives::PooledTransactionsElement,
>,
> + Unpin
+ 'static,
Node::Provider: BlockReader<
Expand Down
10 changes: 7 additions & 3 deletions crates/optimism/node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use reth_optimism_rpc::{
OpEthApi, SequencerClient,
};
use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService};
use reth_primitives::{BlockBody, TransactionSigned};
use reth_primitives::{BlockBody, PooledTransactionsElement, TransactionSigned};
use reth_provider::{
providers::ChainStorage, BlockBodyReader, BlockBodyWriter, CanonStateSubscriptions,
ChainSpecProvider, DBProvider, EthStorage, ProviderResult, ReadBodyInput,
Expand Down Expand Up @@ -633,8 +633,12 @@ impl OpNetworkBuilder {
impl<Node, Pool> NetworkBuilder<Node, Pool> for OpNetworkBuilder
where
Node: FullNodeTypes<Types: NodeTypes<ChainSpec = OpChainSpec, Primitives = OpPrimitives>>,
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>>
+ Unpin
Pool: TransactionPool<
Transaction: PoolTransaction<
Consensus = TxTy<Node::Types>,
Pooled = PooledTransactionsElement,
>,
> + Unpin
+ 'static,
{
async fn build_network(
Expand Down
3 changes: 1 addition & 2 deletions crates/optimism/rpc/src/eth/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ where
/// Returns the hash of the transaction.
async fn send_raw_transaction(&self, tx: Bytes) -> Result<B256, Self::Error> {
let recovered = recover_raw_transaction(tx.clone())?;
let pool_transaction =
<Self::Pool as TransactionPool>::Transaction::from_pooled(recovered.into());
let pool_transaction = <Self::Pool as TransactionPool>::Transaction::from_pooled(recovered);

// On optimism, transactions are forwarded directly to the sequencer to be included in
// blocks that it builds.
Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/rpc-eth-api/src/helpers/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ pub trait EthTransactions: LoadTransaction<Provider: BlockReaderIdExt> {
async move {
let recovered = recover_raw_transaction(tx)?;
let pool_transaction =
<Self::Pool as TransactionPool>::Transaction::from_pooled(recovered.into());
<Self::Pool as TransactionPool>::Transaction::from_pooled(recovered);

// submit the transaction to the pool with a `Local` origin
let hash = self
Expand Down
12 changes: 6 additions & 6 deletions crates/rpc/rpc-eth-types/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
//! Commonly used code snippets
use alloy_eips::eip2718::Decodable2718;
use alloy_primitives::Bytes;
use reth_primitives::{PooledTransactionsElement, PooledTransactionsElementEcRecovered};
use reth_primitives::{transaction::SignedTransactionIntoRecoveredExt, RecoveredTx};
use reth_primitives_traits::SignedTransaction;
use std::future::Future;

use super::{EthApiError, EthResult};

/// Recovers a [`PooledTransactionsElementEcRecovered`] from an enveloped encoded byte stream.
/// Recovers a [`SignedTransaction`] from an enveloped encoded byte stream.
///
/// See [`Decodable2718::decode_2718`]
pub fn recover_raw_transaction(data: Bytes) -> EthResult<PooledTransactionsElementEcRecovered> {
/// See [`alloy_eips::eip2718::Decodable2718::decode_2718`]
pub fn recover_raw_transaction<T: SignedTransaction>(data: Bytes) -> EthResult<RecoveredTx<T>> {
if data.is_empty() {
return Err(EthApiError::EmptyRawTransactionData)
}

let transaction = PooledTransactionsElement::decode_2718(&mut data.as_ref())
let transaction = T::decode_2718(&mut data.as_ref())
.map_err(|_| EthApiError::FailedToDecodeSignedTransaction)?;

transaction.try_into_ecrecovered().or(Err(EthApiError::InvalidTransactionSignature))
Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/rpc/src/eth/bundle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ where

let transactions = txs
.into_iter()
.map(recover_raw_transaction)
.map(recover_raw_transaction::<PooledTransactionsElement>)
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.map(|tx| tx.to_components())
Expand Down
5 changes: 3 additions & 2 deletions crates/rpc/rpc/src/eth/sim_bundle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use alloy_rpc_types_mev::{
use jsonrpsee::core::RpcResult;
use reth_chainspec::EthChainSpec;
use reth_evm::{ConfigureEvm, ConfigureEvmEnv};
use reth_primitives::TransactionSigned;
use reth_primitives::{PooledTransactionsElement, TransactionSigned};
use reth_provider::{ChainSpecProvider, HeaderProvider};
use reth_revm::database::StateProviderDatabase;
use reth_rpc_api::MevSimApiServer;
Expand Down Expand Up @@ -171,7 +171,8 @@ where
match &body[idx] {
BundleItem::Tx { tx, can_revert } => {
let recovered_tx =
recover_raw_transaction(tx.clone()).map_err(EthApiError::from)?;
recover_raw_transaction::<PooledTransactionsElement>(tx.clone())
.map_err(EthApiError::from)?;
let (tx, signer) = recovered_tx.to_components();
let tx = tx.into_transaction();

Expand Down
4 changes: 3 additions & 1 deletion crates/rpc/rpc/src/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use reth_consensus_common::calc::{
base_block_reward, base_block_reward_pre_merge, block_reward, ommer_reward,
};
use reth_evm::ConfigureEvmEnv;
use reth_primitives::PooledTransactionsElement;
use reth_provider::{BlockReader, ChainSpecProvider, EvmEnvProvider, StateProviderFactory};
use reth_revm::database::StateProviderDatabase;
use reth_rpc_api::TraceApiServer;
Expand Down Expand Up @@ -115,7 +116,8 @@ where
trace_types: HashSet<TraceType>,
block_id: Option<BlockId>,
) -> Result<TraceResults, Eth::Error> {
let tx = recover_raw_transaction(tx)?.into_ecrecovered_transaction();
let tx = recover_raw_transaction::<PooledTransactionsElement>(tx)?
.into_ecrecovered_transaction();

let (cfg, block, at) = self.eth_api().evm_env_at(block_id.unwrap_or_default()).await?;

Expand Down
15 changes: 3 additions & 12 deletions crates/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ use alloy_primitives::{Address, TxHash, B256, U256};
use aquamarine as _;
use reth_eth_wire_types::HandleMempoolData;
use reth_execution_types::ChangedAccount;
use reth_primitives::RecoveredTx;
use reth_storage_api::StateProviderFactory;
use std::{collections::HashSet, sync::Arc};
use tokio::sync::mpsc::Receiver;
Expand Down Expand Up @@ -419,21 +420,11 @@ where
self.pool.get_pooled_transaction_elements(tx_hashes, limit)
}

fn get_pooled_transactions_as<P>(
&self,
tx_hashes: Vec<TxHash>,
limit: GetPooledTransactionLimit,
) -> Vec<P>
where
<Self::Transaction as PoolTransaction>::Pooled: Into<P>,
{
self.pool.get_pooled_transactions_as(tx_hashes, limit)
}

fn get_pooled_transaction_element(
&self,
tx_hash: TxHash,
) -> Option<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled> {
) -> Option<RecoveredTx<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
{
self.pool.get_pooled_transaction_element(tx_hash)
}

Expand Down
11 changes: 3 additions & 8 deletions crates/transaction-pool/src/maintain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
blobstore::{BlobStoreCanonTracker, BlobStoreUpdates},
error::PoolError,
metrics::MaintainPoolMetrics,
traits::{CanonicalStateUpdate, TransactionPool, TransactionPoolExt},
traits::{CanonicalStateUpdate, EthPoolTransaction, TransactionPool, TransactionPoolExt},
BlockInfo, PoolTransaction, PoolUpdateKind,
};
use alloy_consensus::BlockHeader;
Expand All @@ -20,8 +20,7 @@ use reth_chainspec::{ChainSpecProvider, EthChainSpec};
use reth_execution_types::ChangedAccount;
use reth_fs_util::FsPathError;
use reth_primitives::{
transaction::SignedTransactionIntoRecoveredExt, PooledTransactionsElementEcRecovered,
SealedHeader, TransactionSigned,
transaction::SignedTransactionIntoRecoveredExt, SealedHeader, TransactionSigned,
};
use reth_primitives_traits::SignedTransaction;
use reth_storage_api::{errors::provider::ProviderError, BlockReaderIdExt, StateProviderFactory};
Expand Down Expand Up @@ -335,13 +334,9 @@ pub async fn maintain_transaction_pool<Client, P, St, Tasks>(
.flatten()
.map(Arc::unwrap_or_clone)
.and_then(|sidecar| {
PooledTransactionsElementEcRecovered::try_from_blob_transaction(
<P as TransactionPool>::Transaction::try_from_eip4844(
tx, sidecar,
)
.ok()
})
.map(|tx| {
<P as TransactionPool>::Transaction::from_pooled(tx.into())
})
} else {
<P as TransactionPool>::Transaction::try_from_consensus(tx).ok()
Expand Down
14 changes: 2 additions & 12 deletions crates/transaction-pool/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use alloy_eips::{
};
use alloy_primitives::{Address, TxHash, B256, U256};
use reth_eth_wire_types::HandleMempoolData;
use reth_primitives::RecoveredTx;
use std::{collections::HashSet, marker::PhantomData, sync::Arc};
use tokio::sync::{mpsc, mpsc::Receiver};

Expand Down Expand Up @@ -139,21 +140,10 @@ impl TransactionPool for NoopTransactionPool {
vec![]
}

fn get_pooled_transactions_as<T>(
&self,
_tx_hashes: Vec<TxHash>,
_limit: GetPooledTransactionLimit,
) -> Vec<T>
where
<Self::Transaction as PoolTransaction>::Pooled: Into<T>,
{
vec![]
}

fn get_pooled_transaction_element(
&self,
_tx_hash: TxHash,
) -> Option<<Self::Transaction as PoolTransaction>::Pooled> {
) -> Option<RecoveredTx<<Self::Transaction as PoolTransaction>::Pooled>> {
None
}

Expand Down
Loading

0 comments on commit 8226fa0

Please sign in to comment.