Skip to content

Commit

Permalink
feat: introduce networkprimitives in transition fetcher (#12889)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse authored Nov 26, 2024
1 parent e8d63e4 commit 02f3427
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 27 deletions.
43 changes: 22 additions & 21 deletions crates/net/network/src/transactions/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use reth_eth_wire::{
DedupPayload, EthVersion, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData,
PartiallyValidData, RequestTxHashes, ValidAnnouncementData,
};
use reth_eth_wire_types::{EthNetworkPrimitives, NetworkPrimitives};
use reth_network_api::PeerRequest;
use reth_network_p2p::error::{RequestError, RequestResult};
use reth_network_peers::PeerId;
Expand All @@ -68,7 +69,7 @@ use validation::FilterOutcome;
/// new requests on announced hashes.
#[derive(Debug)]
#[pin_project]
pub struct TransactionFetcher {
pub struct TransactionFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
/// All peers with to which a [`GetPooledTransactions`] request is inflight.
pub active_peers: LruMap<PeerId, u8, ByLength>,
/// All currently active [`GetPooledTransactions`] requests.
Expand All @@ -77,7 +78,7 @@ pub struct TransactionFetcher {
/// It's disjoint from the set of hashes which are awaiting an idle fallback peer in order to
/// be fetched.
#[pin]
pub inflight_requests: FuturesUnordered<GetPooledTxRequestFut>,
pub inflight_requests: FuturesUnordered<GetPooledTxRequestFut<N::PooledTransaction>>,
/// Hashes that are awaiting an idle fallback peer so they can be fetched.
///
/// This is a subset of all hashes in the fetcher, and is disjoint from the set of hashes for
Expand All @@ -93,9 +94,7 @@ pub struct TransactionFetcher {
metrics: TransactionFetcherMetrics,
}

// === impl TransactionFetcher ===

impl TransactionFetcher {
impl<N: NetworkPrimitives> TransactionFetcher<N> {
/// Removes the peer from the active set.
pub(crate) fn remove_peer(&mut self, peer_id: &PeerId) {
self.active_peers.remove(peer_id);
Expand Down Expand Up @@ -429,7 +428,7 @@ impl TransactionFetcher {
/// the request by checking the transactions seen by the peer against the buffer.
pub fn on_fetch_pending_hashes(
&mut self,
peers: &HashMap<PeerId, PeerMetadata>,
peers: &HashMap<PeerId, PeerMetadata<N>>,
has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
) {
let init_capacity_req = approx_capacity_get_pooled_transactions_req_eth68(&self.info);
Expand Down Expand Up @@ -632,7 +631,7 @@ impl TransactionFetcher {
pub fn request_transactions_from_peer(
&mut self,
new_announced_hashes: RequestTxHashes,
peer: &PeerMetadata,
peer: &PeerMetadata<N>,
) -> Option<RequestTxHashes> {
let peer_id: PeerId = peer.request_tx.peer_id;
let conn_eth_version = peer.version;
Expand Down Expand Up @@ -896,7 +895,9 @@ impl TransactionFetcher {
approx_capacity_get_pooled_transactions_req_eth66()
}
}
}

impl TransactionFetcher {
/// Processes a resolved [`GetPooledTransactions`] request. Queues the outcome as a
/// [`FetchEvent`], which will then be streamed by
/// [`TransactionsManager`](super::TransactionsManager).
Expand Down Expand Up @@ -1044,7 +1045,7 @@ impl Stream for TransactionFetcher {
}
}

impl Default for TransactionFetcher {
impl<T: NetworkPrimitives> Default for TransactionFetcher<T> {
fn default() -> Self {
Self {
active_peers: LruMap::new(DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS),
Expand Down Expand Up @@ -1091,13 +1092,13 @@ impl TxFetchMetadata {

/// Represents possible events from fetching transactions.
#[derive(Debug)]
pub enum FetchEvent {
pub enum FetchEvent<T = PooledTransactionsElement> {
/// Triggered when transactions are successfully fetched.
TransactionsFetched {
/// The ID of the peer from which transactions were fetched.
peer_id: PeerId,
/// The transactions that were fetched, if available.
transactions: PooledTransactions,
transactions: PooledTransactions<T>,
},
/// Triggered when there is an error in fetching transactions.
FetchError {
Expand All @@ -1115,47 +1116,47 @@ pub enum FetchEvent {

/// An inflight request for [`PooledTransactions`] from a peer.
#[derive(Debug)]
pub struct GetPooledTxRequest {
pub struct GetPooledTxRequest<T = PooledTransactionsElement> {
peer_id: PeerId,
/// Transaction hashes that were requested, for cleanup purposes
requested_hashes: RequestTxHashes,
response: oneshot::Receiver<RequestResult<PooledTransactions>>,
response: oneshot::Receiver<RequestResult<PooledTransactions<T>>>,
}

/// Upon reception of a response, a [`GetPooledTxRequest`] is deconstructed to form a
/// [`GetPooledTxResponse`].
#[derive(Debug)]
pub struct GetPooledTxResponse {
pub struct GetPooledTxResponse<T = PooledTransactionsElement> {
peer_id: PeerId,
/// Transaction hashes that were requested, for cleanup purposes, since peer may only return a
/// subset of requested hashes.
requested_hashes: RequestTxHashes,
result: Result<RequestResult<PooledTransactions>, RecvError>,
result: Result<RequestResult<PooledTransactions<T>>, RecvError>,
}

/// Stores the response receiver made by sending a [`GetPooledTransactions`] request to a peer's
/// session.
#[must_use = "futures do nothing unless polled"]
#[pin_project::pin_project]
#[derive(Debug)]
pub struct GetPooledTxRequestFut {
pub struct GetPooledTxRequestFut<T = PooledTransactionsElement> {
#[pin]
inner: Option<GetPooledTxRequest>,
inner: Option<GetPooledTxRequest<T>>,
}

impl GetPooledTxRequestFut {
impl<T> GetPooledTxRequestFut<T> {
#[inline]
const fn new(
peer_id: PeerId,
requested_hashes: RequestTxHashes,
response: oneshot::Receiver<RequestResult<PooledTransactions>>,
response: oneshot::Receiver<RequestResult<PooledTransactions<T>>>,
) -> Self {
Self { inner: Some(GetPooledTxRequest { peer_id, requested_hashes, response }) }
}
}

impl Future for GetPooledTxRequestFut {
type Output = GetPooledTxResponse;
impl<T> Future for GetPooledTxRequestFut<T> {
type Output = GetPooledTxResponse<T>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut req = self.as_mut().project().inner.take().expect("polled after completion");
Expand Down Expand Up @@ -1372,7 +1373,7 @@ mod test {

// RIG TEST

let tx_fetcher = &mut TransactionFetcher::default();
let tx_fetcher = &mut TransactionFetcher::<EthNetworkPrimitives>::default();

let eth68_hashes = [
B256::from_slice(&[1; 32]),
Expand Down
12 changes: 6 additions & 6 deletions crates/net/network/src/transactions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives
/// From which we get all new incoming transaction related messages.
network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
/// Transaction fetcher to handle inflight and missing transaction requests.
transaction_fetcher: TransactionFetcher,
transaction_fetcher: TransactionFetcher<N>,
/// All currently pending transactions grouped by peers.
///
/// This way we can track incoming transactions and prevent multiple pool imports for the same
Expand All @@ -235,7 +235,7 @@ pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives
/// Bad imports.
bad_imports: LruCache<TxHash>,
/// All the connected peers.
peers: HashMap<PeerId, PeerMetadata>,
peers: HashMap<PeerId, PeerMetadata<N>>,
/// Send half for the command channel.
///
/// This is kept so that a new [`TransactionsHandle`] can be created at any time.
Expand Down Expand Up @@ -1731,23 +1731,23 @@ impl TransactionSource {

/// Tracks a single peer in the context of [`TransactionsManager`].
#[derive(Debug)]
pub struct PeerMetadata {
pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
/// Optimistically keeps track of transactions that we know the peer has seen. Optimistic, in
/// the sense that transactions are preemptively marked as seen by peer when they are sent to
/// the peer.
seen_transactions: LruCache<TxHash>,
/// A communication channel directly to the peer's session task.
request_tx: PeerRequestSender,
request_tx: PeerRequestSender<PeerRequest<N>>,
/// negotiated version of the session.
version: EthVersion,
/// The peer's client version.
client_version: Arc<str>,
}

impl PeerMetadata {
impl<N: NetworkPrimitives> PeerMetadata<N> {
/// Returns a new instance of [`PeerMetadata`].
fn new(
request_tx: PeerRequestSender,
request_tx: PeerRequestSender<PeerRequest<N>>,
version: EthVersion,
client_version: Arc<str>,
max_transactions_seen_by_peer: u32,
Expand Down

0 comments on commit 02f3427

Please sign in to comment.