From f03bce592f20c9dc619a5269fc18940795444f6e Mon Sep 17 00:00:00 2001 From: kirill lykov Date: Mon, 14 Oct 2024 16:25:12 +0200 Subject: [PATCH] Add a new client implementation targeting TPU (#2905) Although tpu-client, component which is currently used for sending transactions over TPU, suited for bulk transaction sent, it was not designed for handling the stream of transactions. Additionally, the call stack for sending transactions using tpu-client has grown too deep, making the code difficult to maintain. This motivated us to create a new client implementation that is optimized for handling transaction streams and is built using Tokio from the start. --- Cargo.lock | 24 + Cargo.toml | 1 + tpu-client-next/Cargo.toml | 34 + tpu-client-next/src/connection_worker.rs | 258 +++++++ .../src/connection_workers_scheduler.rs | 213 ++++++ tpu-client-next/src/leader_updater.rs | 124 ++++ tpu-client-next/src/lib.rs | 12 + tpu-client-next/src/quic_networking.rs | 70 ++ tpu-client-next/src/quic_networking/error.rs | 49 ++ .../quic_client_certificate.rs | 17 + .../skip_server_verification.rs | 74 ++ tpu-client-next/src/send_transaction_stats.rs | 166 +++++ tpu-client-next/src/transaction_batch.rs | 33 + tpu-client-next/src/workers_cache.rs | 184 +++++ .../connection_workers_scheduler_test.rs | 671 ++++++++++++++++++ 15 files changed, 1930 insertions(+) create mode 100644 tpu-client-next/Cargo.toml create mode 100644 tpu-client-next/src/connection_worker.rs create mode 100644 tpu-client-next/src/connection_workers_scheduler.rs create mode 100644 tpu-client-next/src/leader_updater.rs create mode 100644 tpu-client-next/src/lib.rs create mode 100644 tpu-client-next/src/quic_networking.rs create mode 100644 tpu-client-next/src/quic_networking/error.rs create mode 100644 tpu-client-next/src/quic_networking/quic_client_certificate.rs create mode 100644 tpu-client-next/src/quic_networking/skip_server_verification.rs create mode 100644 tpu-client-next/src/send_transaction_stats.rs create mode 100644 tpu-client-next/src/transaction_batch.rs create mode 100644 tpu-client-next/src/workers_cache.rs create mode 100644 tpu-client-next/tests/connection_workers_scheduler_test.rs diff --git a/Cargo.lock b/Cargo.lock index 0e9569f47e6a82..8be26642cb791d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8407,6 +8407,30 @@ dependencies = [ "tokio", ] +[[package]] +name = "solana-tpu-client-next" +version = "2.1.0" +dependencies = [ + "async-trait", + "crossbeam-channel", + "futures 0.3.31", + "log", + "lru", + "quinn", + "rustls 0.23.14", + "solana-cli-config", + "solana-connection-cache", + "solana-logger", + "solana-measure", + "solana-rpc-client", + "solana-sdk", + "solana-streamer", + "solana-tpu-client", + "thiserror", + "tokio", + "tokio-util 0.7.12", +] + [[package]] name = "solana-transaction-dos" version = "2.1.0" diff --git a/Cargo.toml b/Cargo.toml index 925d3c01cb13e3..db33afda423e7f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -148,6 +148,7 @@ members = [ "tokens", "tps-client", "tpu-client", + "tpu-client-next", "transaction-dos", "transaction-metrics-tracker", "transaction-status", diff --git a/tpu-client-next/Cargo.toml b/tpu-client-next/Cargo.toml new file mode 100644 index 00000000000000..13ceb0e584a10f --- /dev/null +++ b/tpu-client-next/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "solana-tpu-client-next" +description = "Client code to send transaction to TPU." +version = { workspace = true } +authors = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +edition = { workspace = true } + +[dependencies] +async-trait = { workspace = true } +log = { workspace = true } +lru = { workspace = true } +quinn = { workspace = true } +rustls = { workspace = true } +solana-connection-cache = { workspace = true } +solana-logger = { workspace = true } +solana-measure = { workspace = true } +solana-rpc-client = { workspace = true } +solana-sdk = { workspace = true } +solana-streamer = { workspace = true } +solana-tpu-client = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true } +tokio-util = { workspace = true } + +[dev-dependencies] +crossbeam-channel = { workspace = true } +futures = { workspace = true } +solana-cli-config = { workspace = true } + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] diff --git a/tpu-client-next/src/connection_worker.rs b/tpu-client-next/src/connection_worker.rs new file mode 100644 index 00000000000000..7d77bc3f6ed2a2 --- /dev/null +++ b/tpu-client-next/src/connection_worker.rs @@ -0,0 +1,258 @@ +//! This module defines [`ConnectionWorker`] which encapsulates the functionality +//! needed to handle one connection within the scope of task. + +use { + super::SendTransactionStats, + crate::{ + quic_networking::send_data_over_stream, send_transaction_stats::record_error, + transaction_batch::TransactionBatch, + }, + log::*, + quinn::{ConnectError, Connection, Endpoint}, + solana_measure::measure::Measure, + solana_sdk::{ + clock::{DEFAULT_MS_PER_SLOT, MAX_PROCESSING_AGE, NUM_CONSECUTIVE_LEADER_SLOTS}, + timing::timestamp, + }, + std::net::SocketAddr, + tokio::{ + sync::mpsc, + time::{sleep, Duration}, + }, + tokio_util::sync::CancellationToken, +}; + +/// Interval between retry attempts for creating a new connection. This value is +/// a best-effort estimate, based on current network conditions. +const RETRY_SLEEP_INTERVAL: Duration = + Duration::from_millis(NUM_CONSECUTIVE_LEADER_SLOTS * DEFAULT_MS_PER_SLOT); + +/// Maximum age (in milliseconds) of a blockhash, beyond which transaction +/// batches are dropped. +const MAX_PROCESSING_AGE_MS: u64 = MAX_PROCESSING_AGE as u64 * DEFAULT_MS_PER_SLOT; + +/// [`ConnectionState`] represents the current state of a quic connection. +/// +/// It tracks the lifecycle of connection from initial setup to closing phase. +/// The transition function between states is defined in `ConnectionWorker` +/// implementation. +enum ConnectionState { + NotSetup, + Active(Connection), + Retry(usize), + Closing, +} + +impl Drop for ConnectionState { + /// When [`ConnectionState`] is dropped, underlying connection is closed + /// which means that there is no guarantee that the open streams will + /// finish. + fn drop(&mut self) { + if let Self::Active(connection) = self { + debug!( + "Close connection with {:?}, stats: {:?}. All pending streams will be dropped.", + connection.remote_address(), + connection.stats() + ); + connection.close(0u32.into(), b"done"); + } + } +} + +/// [`ConnectionWorker`] holds connection to the validator with address `peer`. +/// +/// If connection has been closed, [`ConnectionWorker`] tries to reconnect +/// `max_reconnect_attempts` times. If connection is in `Active` state, it sends +/// transactions received from `transactions_receiver`. Additionally, it +/// accumulates statistics about connections and streams failures. +pub(crate) struct ConnectionWorker { + endpoint: Endpoint, + peer: SocketAddr, + transactions_receiver: mpsc::Receiver, + connection: ConnectionState, + skip_check_transaction_age: bool, + max_reconnect_attempts: usize, + send_txs_stats: SendTransactionStats, + cancel: CancellationToken, +} + +impl ConnectionWorker { + /// Constructs a [`ConnectionWorker`]. + /// + /// [`ConnectionWorker`] maintains a connection to a `peer` and processes + /// transactions from `transactions_receiver`. If + /// `skip_check_transaction_age` is set to `true`, the worker skips checking + /// for transaction blockhash expiration. The `max_reconnect_attempts` + /// parameter controls how many times the worker will attempt to reconnect + /// in case of connection failure. Returns the created `ConnectionWorker` + /// along with a cancellation token that can be used by the caller to stop + /// the worker. + pub fn new( + endpoint: Endpoint, + peer: SocketAddr, + transactions_receiver: mpsc::Receiver, + skip_check_transaction_age: bool, + max_reconnect_attempts: usize, + ) -> (Self, CancellationToken) { + let cancel = CancellationToken::new(); + + let this = Self { + endpoint, + peer, + transactions_receiver, + connection: ConnectionState::NotSetup, + skip_check_transaction_age, + max_reconnect_attempts, + send_txs_stats: SendTransactionStats::default(), + cancel: cancel.clone(), + }; + + (this, cancel) + } + + /// Starts the main loop of the [`ConnectionWorker`]. + /// + /// This method manages the connection to the peer and handles state + /// transitions. It runs indefinitely until the connection is closed or an + /// unrecoverable error occurs. + pub async fn run(&mut self) { + let cancel = self.cancel.clone(); + + let main_loop = async move { + loop { + match &self.connection { + ConnectionState::Closing => { + break; + } + ConnectionState::NotSetup => { + self.create_connection(0).await; + } + ConnectionState::Active(connection) => { + let Some(transactions) = self.transactions_receiver.recv().await else { + debug!("Transactions sender has been dropped."); + self.connection = ConnectionState::Closing; + continue; + }; + self.send_transactions(connection.clone(), transactions) + .await; + } + ConnectionState::Retry(num_reconnects) => { + if *num_reconnects > self.max_reconnect_attempts { + error!("Failed to establish connection: reach max reconnect attempts."); + self.connection = ConnectionState::Closing; + continue; + } + sleep(RETRY_SLEEP_INTERVAL).await; + self.reconnect(*num_reconnects).await; + } + } + } + }; + + tokio::select! { + () = main_loop => (), + () = cancel.cancelled() => (), + } + } + + /// Retrieves the statistics for transactions sent by this worker. + pub fn transaction_stats(&self) -> &SendTransactionStats { + &self.send_txs_stats + } + + /// Sends a batch of transactions using the provided `connection`. + /// + /// Each transaction in the batch is sent over the QUIC streams one at the + /// time, which prevents traffic fragmentation and shows better TPS in + /// comparison with multistream send. If the batch is determined to be + /// outdated and flag `skip_check_transaction_age` is unset, it will be + /// dropped without being sent. + /// + /// In case of error, it doesn't retry to send the same transactions again. + async fn send_transactions(&mut self, connection: Connection, transactions: TransactionBatch) { + let now = timestamp(); + if !self.skip_check_transaction_age + && now.saturating_sub(transactions.timestamp()) > MAX_PROCESSING_AGE_MS + { + debug!("Drop outdated transaction batch."); + return; + } + let mut measure_send = Measure::start("send transaction batch"); + for data in transactions.into_iter() { + let result = send_data_over_stream(&connection, &data).await; + + if let Err(error) = result { + trace!("Failed to send transaction over stream with error: {error}."); + record_error(error, &mut self.send_txs_stats); + self.connection = ConnectionState::Retry(0); + } else { + self.send_txs_stats.successfully_sent = + self.send_txs_stats.successfully_sent.saturating_add(1); + } + } + measure_send.stop(); + debug!( + "Time to send transactions batch: {} us", + measure_send.as_us() + ); + } + + /// Attempts to create a new connection to the specified `peer` address. + /// + /// If the connection is successful, the state is updated to `Active`. + /// + /// If an error occurs, the state may transition to `Retry` or `Closing`, + /// depending on the nature of the error. + async fn create_connection(&mut self, max_retries_attempt: usize) { + let connecting = self.endpoint.connect(self.peer, "connect"); + match connecting { + Ok(connecting) => { + let mut measure_connection = Measure::start("establish connection"); + let res = connecting.await; + measure_connection.stop(); + debug!( + "Establishing connection with {} took: {} us", + self.peer, + measure_connection.as_us() + ); + match res { + Ok(connection) => { + self.connection = ConnectionState::Active(connection); + } + Err(err) => { + warn!("Connection error {}: {}", self.peer, err); + record_error(err.into(), &mut self.send_txs_stats); + self.connection = + ConnectionState::Retry(max_retries_attempt.saturating_add(1)); + } + } + } + Err(connecting_error) => { + record_error(connecting_error.clone().into(), &mut self.send_txs_stats); + match connecting_error { + ConnectError::EndpointStopping => { + debug!("Endpoint stopping, exit connection worker."); + self.connection = ConnectionState::Closing; + } + ConnectError::InvalidRemoteAddress(_) => { + warn!("Invalid remote address."); + self.connection = ConnectionState::Closing; + } + e => { + error!("Unexpected error has happen while trying to create connection {e}"); + self.connection = ConnectionState::Closing; + } + } + } + } + } + + /// Attempts to reconnect to the peer after a connection failure. + async fn reconnect(&mut self, num_reconnects: usize) { + debug!("Trying to reconnect. Reopen connection, 0rtt is not implemented yet."); + // We can reconnect using 0rtt, but not a priority for now. Check if we + // need to call config.enable_0rtt() on the client side and where + // session tickets are stored. + self.create_connection(num_reconnects).await; + } +} diff --git a/tpu-client-next/src/connection_workers_scheduler.rs b/tpu-client-next/src/connection_workers_scheduler.rs new file mode 100644 index 00000000000000..82b038827b48eb --- /dev/null +++ b/tpu-client-next/src/connection_workers_scheduler.rs @@ -0,0 +1,213 @@ +//! This module defines [`ConnectionWorkersScheduler`] which sends transactions +//! to the upcoming leaders. + +use { + super::{leader_updater::LeaderUpdater, SendTransactionStatsPerAddr}, + crate::{ + connection_worker::ConnectionWorker, + quic_networking::{ + create_client_config, create_client_endpoint, QuicClientCertificate, QuicError, + }, + transaction_batch::TransactionBatch, + workers_cache::{WorkerInfo, WorkersCache, WorkersCacheError}, + }, + log::*, + quinn::Endpoint, + solana_sdk::signature::Keypair, + std::{net::SocketAddr, sync::Arc}, + thiserror::Error, + tokio::sync::mpsc, + tokio_util::sync::CancellationToken, +}; + +/// The [`ConnectionWorkersScheduler`] sends transactions from the provided +/// receiver channel to upcoming leaders. It obtains information about future +/// leaders from the implementation of the [`LeaderUpdater`] trait. +/// +/// Internally, it enables the management and coordination of multiple network +/// connections, schedules and oversees connection workers. +pub struct ConnectionWorkersScheduler; + +/// Errors that arise from running [`ConnectionWorkersSchedulerError`]. +#[derive(Debug, Error, PartialEq)] +pub enum ConnectionWorkersSchedulerError { + #[error(transparent)] + QuicError(#[from] QuicError), + #[error(transparent)] + WorkersCacheError(#[from] WorkersCacheError), + #[error("Leader receiver unexpectedly dropped.")] + LeaderReceiverDropped, +} + +/// Configuration for the [`ConnectionWorkersScheduler`]. +/// +/// This struct holds the necessary settings to initialize and manage connection +/// workers, including network binding, identity, connection limits, and +/// behavior related to transaction handling. +pub struct ConnectionWorkersSchedulerConfig { + /// The local address to bind the scheduler to. + pub bind: SocketAddr, + + /// Optional stake identity keypair used in the endpoint certificate for + /// identifying the sender. + pub stake_identity: Option, + + /// The number of connections to be maintained by the scheduler. + pub num_connections: usize, + + /// Whether to skip checking the transaction blockhash expiration. + pub skip_check_transaction_age: bool, + + /// The size of the channel used to transmit transaction batches to the + /// worker tasks. + pub worker_channel_size: usize, + + /// The maximum number of reconnection attempts allowed in case of + /// connection failure. + pub max_reconnect_attempts: usize, + + /// The number of slots to look ahead during the leader estimation + /// procedure. Determines how far into the future leaders are estimated, + /// allowing connections to be established with those leaders in advance. + pub lookahead_slots: u64, +} + +impl ConnectionWorkersScheduler { + /// Starts the scheduler, which manages the distribution of transactions to + /// the network's upcoming leaders. + /// + /// Runs the main loop that handles worker scheduling and management for + /// connections. Returns the error quic statistics per connection address or + /// an error. + /// + /// Importantly, if some transactions were not delivered due to network + /// problems, they will not be retried when the problem is resolved. + pub async fn run( + ConnectionWorkersSchedulerConfig { + bind, + stake_identity: validator_identity, + num_connections, + skip_check_transaction_age, + worker_channel_size, + max_reconnect_attempts, + lookahead_slots, + }: ConnectionWorkersSchedulerConfig, + mut leader_updater: Box, + mut transaction_receiver: mpsc::Receiver, + cancel: CancellationToken, + ) -> Result { + let endpoint = Self::setup_endpoint(bind, validator_identity)?; + debug!("Client endpoint bind address: {:?}", endpoint.local_addr()); + let mut workers = WorkersCache::new(num_connections, cancel.clone()); + + loop { + let transaction_batch = tokio::select! { + recv_res = transaction_receiver.recv() => match recv_res { + Some(txs) => txs, + None => { + debug!("End of `transaction_receiver`: shutting down."); + break; + } + }, + () = cancel.cancelled() => { + debug!("Cancelled: Shutting down"); + break; + } + }; + let updated_leaders = leader_updater.next_leaders(lookahead_slots); + let new_leader = &updated_leaders[0]; + let future_leaders = &updated_leaders[1..]; + if !workers.contains(new_leader) { + debug!("No existing workers for {new_leader:?}, starting a new one."); + let worker = Self::spawn_worker( + &endpoint, + new_leader, + worker_channel_size, + skip_check_transaction_age, + max_reconnect_attempts, + ); + workers.push(*new_leader, worker).await; + } + + tokio::select! { + send_res = workers.send_transactions_to_address(new_leader, transaction_batch) => match send_res { + Ok(()) => (), + Err(WorkersCacheError::ShutdownError) => { + debug!("Connection to {new_leader} was closed, worker cache shutdown"); + } + Err(err) => { + warn!("Connection to {new_leader} was closed, worker error: {err}"); + // If we has failed to send batch, it will be dropped. + } + }, + () = cancel.cancelled() => { + debug!("Cancelled: Shutting down"); + break; + } + }; + + // Regardless of who is leader, add future leaders to the cache to + // hide the latency of opening the connection. + for peer in future_leaders { + if !workers.contains(peer) { + let worker = Self::spawn_worker( + &endpoint, + peer, + worker_channel_size, + skip_check_transaction_age, + max_reconnect_attempts, + ); + workers.push(*peer, worker).await; + } + } + } + + workers.shutdown().await; + + endpoint.close(0u32.into(), b"Closing connection"); + leader_updater.stop().await; + Ok(workers.transaction_stats().clone()) + } + + /// Sets up the QUIC endpoint for the scheduler to handle connections. + fn setup_endpoint( + bind: SocketAddr, + validator_identity: Option, + ) -> Result { + let client_certificate = if let Some(validator_identity) = validator_identity { + Arc::new(QuicClientCertificate::new(&validator_identity)) + } else { + Arc::new(QuicClientCertificate::new(&Keypair::new())) + }; + let client_config = create_client_config(client_certificate); + let endpoint = create_client_endpoint(bind, client_config)?; + Ok(endpoint) + } + + /// Spawns a worker to handle communication with a given peer. + fn spawn_worker( + endpoint: &Endpoint, + peer: &SocketAddr, + worker_channel_size: usize, + skip_check_transaction_age: bool, + max_reconnect_attempts: usize, + ) -> WorkerInfo { + let (txs_sender, txs_receiver) = mpsc::channel(worker_channel_size); + let endpoint = endpoint.clone(); + let peer = *peer; + + let (mut worker, cancel) = ConnectionWorker::new( + endpoint, + peer, + txs_receiver, + skip_check_transaction_age, + max_reconnect_attempts, + ); + let handle = tokio::spawn(async move { + worker.run().await; + worker.transaction_stats().clone() + }); + + WorkerInfo::new(txs_sender, handle, cancel) + } +} diff --git a/tpu-client-next/src/leader_updater.rs b/tpu-client-next/src/leader_updater.rs new file mode 100644 index 00000000000000..5e07b9b0bfe612 --- /dev/null +++ b/tpu-client-next/src/leader_updater.rs @@ -0,0 +1,124 @@ +//! This module provides [`LeaderUpdater`] trait along with +//! `create_leader_updater` function to create an instance of this trait. +//! +//! Currently, the main purpose of [`LeaderUpdater`] is to abstract over leader +//! updates, hiding the details of how leaders are retrieved and which +//! structures are used. It contains trait implementations +//! `LeaderUpdaterService` and `PinnedLeaderUpdater`, where +//! `LeaderUpdaterService` keeps [`LeaderTpuService`] internal to this module. +//! Yet, it also allows to implement custom leader estimation. + +use { + async_trait::async_trait, + log::*, + solana_connection_cache::connection_cache::Protocol, + solana_rpc_client::nonblocking::rpc_client::RpcClient, + solana_tpu_client::nonblocking::tpu_client::LeaderTpuService, + std::{ + fmt, + net::SocketAddr, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + }, +}; + +/// [`LeaderUpdater`] trait abstracts out functionality required for the +/// [`ConnectionWorkersScheduler`](crate::ConnectionWorkersScheduler) to +/// identify next leaders to send transactions to. +#[async_trait] +pub trait LeaderUpdater: Send { + /// Returns next unique leaders for the next `lookahead_slots` starting from + /// current estimated slot. + /// + /// If the current leader estimation is incorrect and transactions are sent to + /// only one estimated leader, there is a risk of losing all the transactions, + /// depending on the forwarding policy. + fn next_leaders(&self, lookahead_slots: u64) -> Vec; + + /// Stop [`LeaderUpdater`] and releases all associated resources. + async fn stop(&mut self); +} + +/// Error type for [`LeaderUpdater`]. +pub struct LeaderUpdaterError; + +impl fmt::Display for LeaderUpdaterError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Leader updater encountered an error") + } +} + +impl fmt::Debug for LeaderUpdaterError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "LeaderUpdaterError") + } +} + +/// Creates a [`LeaderUpdater`] based on the configuration provided by the +/// caller. +/// +/// If `pinned_address` is provided, it returns a `PinnedLeaderUpdater` that +/// always returns the provided address instead of checking leader schedule. +/// Otherwise, it creates a `LeaderUpdaterService` which dynamically updates the +/// leaders by connecting to the network via the [`LeaderTpuService`]. +pub async fn create_leader_updater( + rpc_client: Arc, + websocket_url: String, + pinned_address: Option, +) -> Result, LeaderUpdaterError> { + if let Some(pinned_address) = pinned_address { + return Ok(Box::new(PinnedLeaderUpdater { + address: vec![pinned_address], + })); + } + + let exit = Arc::new(AtomicBool::new(false)); + let leader_tpu_service = + LeaderTpuService::new(rpc_client, &websocket_url, Protocol::QUIC, exit.clone()) + .await + .map_err(|error| { + error!("Failed to create a LeaderTpuService: {error}"); + LeaderUpdaterError + })?; + Ok(Box::new(LeaderUpdaterService { + leader_tpu_service, + exit, + })) +} + +/// `LeaderUpdaterService` is an implementation of the [`LeaderUpdater`] trait +/// that dynamically retrieves the current and upcoming leaders by communicating +/// with the Solana network using [`LeaderTpuService`]. +struct LeaderUpdaterService { + leader_tpu_service: LeaderTpuService, + exit: Arc, +} + +#[async_trait] +impl LeaderUpdater for LeaderUpdaterService { + fn next_leaders(&self, lookahead_slots: u64) -> Vec { + self.leader_tpu_service.leader_tpu_sockets(lookahead_slots) + } + + async fn stop(&mut self) { + self.exit.store(true, Ordering::Relaxed); + self.leader_tpu_service.join().await; + } +} + +/// `PinnedLeaderUpdater` is an implementation of [`LeaderUpdater`] that always +/// returns a fixed, "pinned" leader address. It is mainly used for testing. +struct PinnedLeaderUpdater { + address: Vec, +} + +#[async_trait] +impl LeaderUpdater for PinnedLeaderUpdater { + fn next_leaders(&self, _lookahead_slots: u64) -> Vec { + self.address.clone() + } + + async fn stop(&mut self) {} +} diff --git a/tpu-client-next/src/lib.rs b/tpu-client-next/src/lib.rs new file mode 100644 index 00000000000000..720b3876b47cb4 --- /dev/null +++ b/tpu-client-next/src/lib.rs @@ -0,0 +1,12 @@ +pub(crate) mod connection_worker; +pub mod connection_workers_scheduler; +pub mod send_transaction_stats; +pub(crate) mod workers_cache; +pub use crate::{ + connection_workers_scheduler::{ConnectionWorkersScheduler, ConnectionWorkersSchedulerError}, + send_transaction_stats::{SendTransactionStats, SendTransactionStatsPerAddr}, +}; +pub(crate) mod quic_networking; +pub(crate) use crate::quic_networking::QuicError; +pub mod leader_updater; +pub mod transaction_batch; diff --git a/tpu-client-next/src/quic_networking.rs b/tpu-client-next/src/quic_networking.rs new file mode 100644 index 00000000000000..b18fa469241da9 --- /dev/null +++ b/tpu-client-next/src/quic_networking.rs @@ -0,0 +1,70 @@ +//! Utility code to handle quic networking. + +use { + quinn::{ + crypto::rustls::QuicClientConfig, ClientConfig, Connection, Endpoint, IdleTimeout, + TransportConfig, + }, + skip_server_verification::SkipServerVerification, + solana_sdk::quic::{QUIC_KEEP_ALIVE, QUIC_MAX_TIMEOUT}, + solana_streamer::nonblocking::quic::ALPN_TPU_PROTOCOL_ID, + std::{net::SocketAddr, sync::Arc}, +}; + +pub mod error; +pub mod quic_client_certificate; +pub mod skip_server_verification; + +pub use { + error::{IoErrorWithPartialEq, QuicError}, + quic_client_certificate::QuicClientCertificate, +}; + +pub(crate) fn create_client_config(client_certificate: Arc) -> ClientConfig { + // adapted from QuicLazyInitializedEndpoint::create_endpoint + let mut crypto = rustls::ClientConfig::builder() + .dangerous() + .with_custom_certificate_verifier(SkipServerVerification::new()) + .with_client_auth_cert( + vec![client_certificate.certificate.clone()], + client_certificate.key.clone_key(), + ) + .expect("Failed to set QUIC client certificates"); + crypto.enable_early_data = true; + crypto.alpn_protocols = vec![ALPN_TPU_PROTOCOL_ID.to_vec()]; + + let transport_config = { + let mut res = TransportConfig::default(); + + let timeout = IdleTimeout::try_from(QUIC_MAX_TIMEOUT).unwrap(); + res.max_idle_timeout(Some(timeout)); + res.keep_alive_interval(Some(QUIC_KEEP_ALIVE)); + + res + }; + + let mut config = ClientConfig::new(Arc::new(QuicClientConfig::try_from(crypto).unwrap())); + config.transport_config(Arc::new(transport_config)); + + config +} + +pub(crate) fn create_client_endpoint( + bind_addr: SocketAddr, + client_config: ClientConfig, +) -> Result { + let mut endpoint = Endpoint::client(bind_addr).map_err(IoErrorWithPartialEq::from)?; + endpoint.set_default_client_config(client_config); + Ok(endpoint) +} + +pub(crate) async fn send_data_over_stream( + connection: &Connection, + data: &[u8], +) -> Result<(), QuicError> { + let mut send_stream = connection.open_uni().await?; + send_stream.write_all(data).await.map_err(QuicError::from)?; + + // Stream will be finished when dropped. Finishing here explicitly is a noop. + Ok(()) +} diff --git a/tpu-client-next/src/quic_networking/error.rs b/tpu-client-next/src/quic_networking/error.rs new file mode 100644 index 00000000000000..8fa79265cb69a4 --- /dev/null +++ b/tpu-client-next/src/quic_networking/error.rs @@ -0,0 +1,49 @@ +use { + quinn::{ConnectError, ConnectionError, WriteError}, + std::{ + fmt::{self, Formatter}, + io, + }, + thiserror::Error, +}; + +/// Wrapper for [`io::Error`] implementing [`PartialEq`] to simplify error +/// checking for the [`QuicError`] type. The reasons why [`io::Error`] doesn't +/// implement [`PartialEq`] are discusses in +/// . +#[derive(Debug, Error)] +pub struct IoErrorWithPartialEq(pub io::Error); + +impl PartialEq for IoErrorWithPartialEq { + fn eq(&self, other: &Self) -> bool { + let formatted_self = format!("{self:?}"); + let formatted_other = format!("{other:?}"); + formatted_self == formatted_other + } +} + +impl fmt::Display for IoErrorWithPartialEq { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + +impl From for IoErrorWithPartialEq { + fn from(err: io::Error) -> Self { + IoErrorWithPartialEq(err) + } +} + +/// Error types that can occur when dealing with QUIC connections or +/// transmissions. +#[derive(Error, Debug, PartialEq)] +pub enum QuicError { + #[error(transparent)] + StreamWrite(#[from] WriteError), + #[error(transparent)] + Connection(#[from] ConnectionError), + #[error(transparent)] + Connect(#[from] ConnectError), + #[error(transparent)] + Endpoint(#[from] IoErrorWithPartialEq), +} diff --git a/tpu-client-next/src/quic_networking/quic_client_certificate.rs b/tpu-client-next/src/quic_networking/quic_client_certificate.rs new file mode 100644 index 00000000000000..b9f0c8d1cf27a6 --- /dev/null +++ b/tpu-client-next/src/quic_networking/quic_client_certificate.rs @@ -0,0 +1,17 @@ +use { + rustls::pki_types::{CertificateDer, PrivateKeyDer}, + solana_sdk::signature::Keypair, + solana_streamer::tls_certificates::new_dummy_x509_certificate, +}; + +pub struct QuicClientCertificate { + pub certificate: CertificateDer<'static>, + pub key: PrivateKeyDer<'static>, +} + +impl QuicClientCertificate { + pub fn new(keypair: &Keypair) -> Self { + let (certificate, key) = new_dummy_x509_certificate(keypair); + Self { certificate, key } + } +} diff --git a/tpu-client-next/src/quic_networking/skip_server_verification.rs b/tpu-client-next/src/quic_networking/skip_server_verification.rs new file mode 100644 index 00000000000000..70b17e0fcd5bab --- /dev/null +++ b/tpu-client-next/src/quic_networking/skip_server_verification.rs @@ -0,0 +1,74 @@ +use { + rustls::{ + client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier}, + crypto::{ring, verify_tls12_signature, verify_tls13_signature, CryptoProvider}, + pki_types::{CertificateDer, ServerName, UnixTime}, + DigitallySignedStruct, Error, SignatureScheme, + }, + std::{ + fmt::{self, Debug, Formatter}, + sync::Arc, + }, +}; + +/// Implementation of [`ServerCertVerifier`] that ignores the server +/// certificate. Yet still checks the TLS signatures. +pub(crate) struct SkipServerVerification(Arc); + +impl SkipServerVerification { + pub fn new() -> Arc { + Arc::new(Self(Arc::new(ring::default_provider()))) + } +} + +impl ServerCertVerifier for SkipServerVerification { + fn verify_tls12_signature( + &self, + message: &[u8], + cert: &CertificateDer<'_>, + dss: &DigitallySignedStruct, + ) -> Result { + verify_tls12_signature( + message, + cert, + dss, + &self.0.signature_verification_algorithms, + ) + } + + fn verify_tls13_signature( + &self, + message: &[u8], + cert: &CertificateDer<'_>, + dss: &DigitallySignedStruct, + ) -> Result { + verify_tls13_signature( + message, + cert, + dss, + &self.0.signature_verification_algorithms, + ) + } + + fn supported_verify_schemes(&self) -> Vec { + self.0.signature_verification_algorithms.supported_schemes() + } + + fn verify_server_cert( + &self, + _end_entity: &CertificateDer<'_>, + _intermediates: &[CertificateDer<'_>], + _server_name: &ServerName, + _ocsp_response: &[u8], + _now: UnixTime, + ) -> Result { + Ok(ServerCertVerified::assertion()) + } +} + +impl Debug for SkipServerVerification { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("SkipServerVerification") + .finish_non_exhaustive() + } +} diff --git a/tpu-client-next/src/send_transaction_stats.rs b/tpu-client-next/src/send_transaction_stats.rs new file mode 100644 index 00000000000000..abe68b8bf60213 --- /dev/null +++ b/tpu-client-next/src/send_transaction_stats.rs @@ -0,0 +1,166 @@ +//! This module defines [`SendTransactionStats`] which is used to collect per IP +//! statistics about relevant network errors. + +use { + super::QuicError, + quinn::{ConnectError, ConnectionError, WriteError}, + std::{collections::HashMap, fmt, net::IpAddr}, +}; + +/// [`SendTransactionStats`] aggregates counters related to sending transactions. +#[derive(Debug, Default, Clone, PartialEq)] +pub struct SendTransactionStats { + pub successfully_sent: u64, + pub connect_error_cids_exhausted: u64, + pub connect_error_invalid_remote_address: u64, + pub connect_error_other: u64, + pub connection_error_application_closed: u64, + pub connection_error_cids_exhausted: u64, + pub connection_error_connection_closed: u64, + pub connection_error_locally_closed: u64, + pub connection_error_reset: u64, + pub connection_error_timed_out: u64, + pub connection_error_transport_error: u64, + pub connection_error_version_mismatch: u64, + pub write_error_closed_stream: u64, + pub write_error_connection_lost: u64, + pub write_error_stopped: u64, + pub write_error_zero_rtt_rejected: u64, +} + +#[allow(clippy::arithmetic_side_effects)] +pub fn record_error(err: QuicError, stats: &mut SendTransactionStats) { + match err { + QuicError::Connect(ConnectError::EndpointStopping) => { + stats.connect_error_other += 1; + } + QuicError::Connect(ConnectError::CidsExhausted) => { + stats.connect_error_cids_exhausted += 1; + } + QuicError::Connect(ConnectError::InvalidServerName(_)) => { + stats.connect_error_other += 1; + } + QuicError::Connect(ConnectError::InvalidRemoteAddress(_)) => { + stats.connect_error_invalid_remote_address += 1; + } + QuicError::Connect(ConnectError::NoDefaultClientConfig) => { + stats.connect_error_other += 1; + } + QuicError::Connect(ConnectError::UnsupportedVersion) => { + stats.connect_error_other += 1; + } + QuicError::Connection(ConnectionError::VersionMismatch) => { + stats.connection_error_version_mismatch += 1; + } + QuicError::Connection(ConnectionError::TransportError(_)) => { + stats.connection_error_transport_error += 1; + } + QuicError::Connection(ConnectionError::ConnectionClosed(_)) => { + stats.connection_error_connection_closed += 1; + } + QuicError::Connection(ConnectionError::ApplicationClosed(_)) => { + stats.connection_error_application_closed += 1; + } + QuicError::Connection(ConnectionError::Reset) => { + stats.connection_error_reset += 1; + } + QuicError::Connection(ConnectionError::TimedOut) => { + stats.connection_error_timed_out += 1; + } + QuicError::Connection(ConnectionError::LocallyClosed) => { + stats.connection_error_locally_closed += 1; + } + QuicError::Connection(ConnectionError::CidsExhausted) => { + stats.connection_error_cids_exhausted += 1; + } + QuicError::StreamWrite(WriteError::Stopped(_)) => { + stats.write_error_stopped += 1; + } + QuicError::StreamWrite(WriteError::ConnectionLost(_)) => { + stats.write_error_connection_lost += 1; + } + QuicError::StreamWrite(WriteError::ClosedStream) => { + stats.write_error_closed_stream += 1; + } + QuicError::StreamWrite(WriteError::ZeroRttRejected) => { + stats.write_error_zero_rtt_rejected += 1; + } + // Endpoint is created on the scheduler level and handled separately + // No counters are used for this case. + QuicError::Endpoint(_) => (), + } +} + +pub type SendTransactionStatsPerAddr = HashMap; + +macro_rules! add_fields { + ($self:ident += $other:ident for: $( $field:ident ),* $(,)? ) => { + $( + $self.$field = $self.$field.saturating_add($other.$field); + )* + }; +} + +impl SendTransactionStats { + pub fn add(&mut self, other: &SendTransactionStats) { + add_fields!( + self += other for: + successfully_sent, + connect_error_cids_exhausted, + connect_error_invalid_remote_address, + connect_error_other, + connection_error_application_closed, + connection_error_cids_exhausted, + connection_error_connection_closed, + connection_error_locally_closed, + connection_error_reset, + connection_error_timed_out, + connection_error_transport_error, + connection_error_version_mismatch, + write_error_closed_stream, + write_error_connection_lost, + write_error_stopped, + write_error_zero_rtt_rejected, + ); + } +} + +macro_rules! display_send_transaction_stats_body { + ($self:ident, $f:ident, $($field:ident),* $(,)?) => { + write!( + $f, + concat!( + "SendTransactionStats:\n", + $( + "\x20 ", stringify!($field), ": {},\n", + )* + ), + $($self.$field),* + ) + }; +} + +impl fmt::Display for SendTransactionStats { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + display_send_transaction_stats_body!( + self, + f, + successfully_sent, + connect_error_cids_exhausted, + connect_error_invalid_remote_address, + connect_error_other, + connection_error_application_closed, + connection_error_cids_exhausted, + connection_error_connection_closed, + connection_error_locally_closed, + connection_error_reset, + connection_error_timed_out, + connection_error_transport_error, + connection_error_version_mismatch, + write_error_closed_stream, + write_error_connection_lost, + write_error_stopped, + write_error_zero_rtt_rejected, + ) + } +} diff --git a/tpu-client-next/src/transaction_batch.rs b/tpu-client-next/src/transaction_batch.rs new file mode 100644 index 00000000000000..a3c2d92fc8386e --- /dev/null +++ b/tpu-client-next/src/transaction_batch.rs @@ -0,0 +1,33 @@ +//! This module holds [`TransactionBatch`] structure. + +use solana_sdk::timing::timestamp; + +/// Batch of generated transactions timestamp is used to discard batches which +/// are too old to have valid blockhash. +#[derive(Clone, PartialEq)] +pub struct TransactionBatch { + wired_transactions: Vec, + timestamp: u64, +} + +type WiredTransaction = Vec; + +impl IntoIterator for TransactionBatch { + type Item = Vec; + type IntoIter = std::vec::IntoIter; + fn into_iter(self) -> Self::IntoIter { + self.wired_transactions.into_iter() + } +} + +impl TransactionBatch { + pub fn new(wired_transactions: Vec) -> Self { + Self { + wired_transactions, + timestamp: timestamp(), + } + } + pub fn timestamp(&self) -> u64 { + self.timestamp + } +} diff --git a/tpu-client-next/src/workers_cache.rs b/tpu-client-next/src/workers_cache.rs new file mode 100644 index 00000000000000..90d2954b669d7f --- /dev/null +++ b/tpu-client-next/src/workers_cache.rs @@ -0,0 +1,184 @@ +//! This module defines `WorkersCache` along with aux struct `WorkerInfo`. These +//! structures provide mechanisms for caching workers, sending transaction +//! batches, and gathering send transaction statistics. + +use { + super::SendTransactionStats, + crate::transaction_batch::TransactionBatch, + log::*, + lru::LruCache, + std::{ + collections::HashMap, + net::{IpAddr, SocketAddr}, + }, + thiserror::Error, + tokio::{sync::mpsc, task::JoinHandle}, + tokio_util::sync::CancellationToken, +}; + +/// [`WorkerInfo`] holds information about a worker responsible for sending +/// transaction batches. +pub(crate) struct WorkerInfo { + pub sender: mpsc::Sender, + pub handle: JoinHandle, + pub cancel: CancellationToken, +} + +impl WorkerInfo { + pub fn new( + sender: mpsc::Sender, + handle: JoinHandle, + cancel: CancellationToken, + ) -> Self { + Self { + sender, + handle, + cancel, + } + } + + async fn send_transactions( + &self, + txs_batch: TransactionBatch, + ) -> Result<(), WorkersCacheError> { + self.sender + .send(txs_batch) + .await + .map_err(|_| WorkersCacheError::ReceiverDropped)?; + Ok(()) + } + + /// Closes the worker by dropping the sender and awaiting the worker's + /// statistics. + async fn shutdown(self) -> Result { + self.cancel.cancel(); + drop(self.sender); + let stats = self + .handle + .await + .map_err(|_| WorkersCacheError::TaskJoinFailure)?; + Ok(stats) + } +} + +/// [`WorkersCache`] manages and caches workers. It uses an LRU cache to store and +/// manage workers. It also tracks transaction statistics for each peer. +pub(crate) struct WorkersCache { + workers: LruCache, + send_stats_per_addr: HashMap, + + /// Indicates that the `WorkersCache` is been `shutdown()`, interrupting any outstanding + /// `send_txs()` invocations. + cancel: CancellationToken, +} + +#[derive(Debug, Error, PartialEq)] +pub enum WorkersCacheError { + /// typically happens when the client could not establish the connection. + #[error("Work receiver has been dropped unexpectedly.")] + ReceiverDropped, + + #[error("Task failed to join.")] + TaskJoinFailure, + + #[error("The WorkersCache is being shutdown.")] + ShutdownError, +} + +impl WorkersCache { + pub fn new(capacity: usize, cancel: CancellationToken) -> Self { + Self { + workers: LruCache::new(capacity), + send_stats_per_addr: HashMap::new(), + cancel, + } + } + + pub fn contains(&self, peer: &SocketAddr) -> bool { + self.workers.contains(peer) + } + + pub async fn push(&mut self, peer: SocketAddr, peer_worker: WorkerInfo) { + // Although there might be concerns about the performance implications + // of waiting for the worker to be closed when trying to add a new + // worker, the idea is that these workers are almost always created in + // advance so the latency is hidden. + if let Some((leader, popped_worker)) = self.workers.push(peer, peer_worker) { + self.shutdown_worker(leader, popped_worker).await; + } + } + + /// Sends a batch of transactions to the worker for a given peer. If the + /// worker for the peer is disconnected or fails, it is removed from the + /// cache. + pub async fn send_transactions_to_address( + &mut self, + peer: &SocketAddr, + txs_batch: TransactionBatch, + ) -> Result<(), WorkersCacheError> { + let Self { + workers, cancel, .. + } = self; + + let body = async move { + let current_worker = workers.get(peer).expect( + "Failed to fetch worker for peer {peer}.\n\ + Peer existence must be checked before this call using `contains` method.", + ); + let send_res = current_worker.send_transactions(txs_batch).await; + + if let Err(WorkersCacheError::ReceiverDropped) = send_res { + // Remove the worker from the cache, if the peer has disconnected. + if let Some(current_worker) = workers.pop(peer) { + // To avoid obscuring the error from send, ignore a possible + // `TaskJoinFailure`. + let close_result = current_worker.shutdown().await; + if let Err(error) = close_result { + error!("Error while closing worker: {error}."); + } + } + } + + send_res + }; + + tokio::select! { + send_res = body => send_res, + () = cancel.cancelled() => Err(WorkersCacheError::ShutdownError), + } + } + + pub fn transaction_stats(&self) -> &HashMap { + &self.send_stats_per_addr + } + + /// Closes and removes all workers in the cache. This is typically done when + /// shutting down the system. + pub async fn shutdown(&mut self) { + // Interrupt any outstanding `send_txs()` calls. + self.cancel.cancel(); + + while let Some((leader, worker)) = self.workers.pop_lru() { + self.shutdown_worker(leader, worker).await; + } + } + + /// Shuts down a worker for a given peer by closing the worker and gathering + /// its transaction statistics. + async fn shutdown_worker(&mut self, leader: SocketAddr, worker: WorkerInfo) { + let res = worker.shutdown().await; + + let stats = match res { + Ok(stats) => stats, + Err(err) => { + debug!("Error while shutting down worker for {leader}: {err}"); + return; + } + }; + + self.send_stats_per_addr + .entry(leader.ip()) + .and_modify(|e| e.add(&stats)) + .or_insert(stats); + } +} diff --git a/tpu-client-next/tests/connection_workers_scheduler_test.rs b/tpu-client-next/tests/connection_workers_scheduler_test.rs new file mode 100644 index 00000000000000..0ffabb6640f7a3 --- /dev/null +++ b/tpu-client-next/tests/connection_workers_scheduler_test.rs @@ -0,0 +1,671 @@ +use { + crossbeam_channel::Receiver as CrossbeamReceiver, + futures::future::BoxFuture, + solana_cli_config::ConfigInput, + solana_rpc_client::nonblocking::rpc_client::RpcClient, + solana_sdk::{ + commitment_config::CommitmentConfig, + pubkey::Pubkey, + signer::{keypair::Keypair, Signer}, + }, + solana_streamer::{ + nonblocking::testing_utilities::{ + make_client_endpoint, setup_quic_server, SpawnTestServerResult, TestServerConfig, + }, + packet::PacketBatch, + streamer::StakedNodes, + }, + solana_tpu_client_next::{ + connection_workers_scheduler::ConnectionWorkersSchedulerConfig, + leader_updater::create_leader_updater, transaction_batch::TransactionBatch, + ConnectionWorkersScheduler, ConnectionWorkersSchedulerError, SendTransactionStats, + SendTransactionStatsPerAddr, + }, + std::{ + collections::HashMap, + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, + num::Saturating, + str::FromStr, + sync::{atomic::Ordering, Arc}, + time::Duration, + }, + tokio::{ + sync::{ + mpsc::{channel, Receiver}, + oneshot, + }, + task::JoinHandle, + time::{sleep, Instant}, + }, + tokio_util::sync::CancellationToken, +}; + +fn test_config(validator_identity: Option) -> ConnectionWorkersSchedulerConfig { + ConnectionWorkersSchedulerConfig { + bind: SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 0), + stake_identity: validator_identity, + num_connections: 1, + skip_check_transaction_age: false, + worker_channel_size: 2, + max_reconnect_attempts: 4, + lookahead_slots: 1, + } +} + +async fn setup_connection_worker_scheduler( + tpu_address: SocketAddr, + transaction_receiver: Receiver, + validator_identity: Option, +) -> ( + JoinHandle>, + CancellationToken, +) { + let json_rpc_url = "http://127.0.0.1:8899"; + let (_, websocket_url) = ConfigInput::compute_websocket_url_setting("", "", json_rpc_url, ""); + + let rpc_client = Arc::new(RpcClient::new_with_commitment( + json_rpc_url.to_string(), + CommitmentConfig::confirmed(), + )); + + // Setup sending txs + let leader_updater = create_leader_updater(rpc_client, websocket_url, Some(tpu_address)) + .await + .expect("Leader updates was successfully created"); + + let cancel = CancellationToken::new(); + let config = test_config(validator_identity); + let scheduler = tokio::spawn(ConnectionWorkersScheduler::run( + config, + leader_updater, + transaction_receiver, + cancel.clone(), + )); + + (scheduler, cancel) +} + +async fn join_scheduler( + scheduler_handle: JoinHandle< + Result, + >, +) -> SendTransactionStats { + let stats_per_ip = scheduler_handle + .await + .unwrap() + .expect("Scheduler should stop successfully."); + stats_per_ip + .get(&IpAddr::from_str("127.0.0.1").unwrap()) + .expect("setup_connection_worker_scheduler() connected to a leader at 127.0.0.1") + .clone() +} + +// Specify the pessimistic time to finish generation and result checks. +const TEST_MAX_TIME: Duration = Duration::from_millis(2500); + +struct SpawnTxGenerator { + tx_receiver: Receiver, + tx_sender_shutdown: BoxFuture<'static, ()>, + tx_sender_done: oneshot::Receiver<()>, +} + +/// Generates `num_tx_batches` batches of transactions, each holding a single transaction of +/// `tx_size` bytes. +/// +/// It will not close the returned `tx_receiver` until `tx_sender_shutdown` is invoked. Otherwise, +/// there is a race condition, that exists between the last transaction being scheduled for delivery +/// and the server connection being closed. +fn spawn_tx_sender( + tx_size: usize, + num_tx_batches: usize, + time_per_tx: Duration, +) -> SpawnTxGenerator { + let num_tx_batches: u32 = num_tx_batches + .try_into() + .expect("`num_tx_batches` fits into u32 for all the tests"); + let (tx_sender, tx_receiver) = channel(1); + let cancel = CancellationToken::new(); + let (done_sender, tx_sender_done) = oneshot::channel(); + + let sender = tokio::spawn({ + let start = Instant::now(); + + let tx_sender = tx_sender.clone(); + + let main_loop = async move { + for i in 0..num_tx_batches { + let txs = vec![vec![i as u8; tx_size]; 1]; + tx_sender + .send(TransactionBatch::new(txs)) + .await + .expect("Receiver should not close their side"); + + // Pretend the client runs at the specified TPS. + let sleep_time = time_per_tx + .saturating_mul(i) + .saturating_sub(start.elapsed()); + if !sleep_time.is_zero() { + sleep(sleep_time).await; + } + } + + // It is OK if the receiver has disconnected. + let _ = done_sender.send(()); + }; + + let cancel = cancel.clone(); + async move { + tokio::select! { + () = main_loop => (), + () = cancel.cancelled() => (), + } + } + }); + + let tx_sender_shutdown = Box::pin(async move { + cancel.cancel(); + // This makes sure that the sender exists up until the shutdown is invoked. + drop(tx_sender); + + sender.await.unwrap(); + }); + + SpawnTxGenerator { + tx_receiver, + tx_sender_shutdown, + tx_sender_done, + } +} + +#[tokio::test] +async fn test_basic_transactions_sending() { + let SpawnTestServerResult { + join_handle: server_handle, + exit, + receiver, + server_address, + stats: _stats, + } = setup_quic_server(None, TestServerConfig::default()); + + // Setup sending txs + let tx_size = 1; + let expected_num_txs: usize = 100; + // Pretend that we are running at ~100 TPS. + let SpawnTxGenerator { + tx_receiver, + tx_sender_shutdown, + .. + } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(10)); + + let (scheduler_handle, _scheduler_cancel) = + setup_connection_worker_scheduler(server_address, tx_receiver, None).await; + + // Check results + let mut received_data = Vec::with_capacity(expected_num_txs); + let now = Instant::now(); + let mut actual_num_packets = 0; + while actual_num_packets < expected_num_txs { + { + let elapsed = now.elapsed(); + assert!( + elapsed < TEST_MAX_TIME, + "Failed to send {} transaction in {:?}. Only sent {}", + expected_num_txs, + elapsed, + actual_num_packets, + ); + } + + let Ok(packets) = receiver.try_recv() else { + sleep(Duration::from_millis(10)).await; + continue; + }; + + actual_num_packets += packets.len(); + for p in packets.iter() { + let packet_id = p.data(0).expect("Data should not be lost by server."); + received_data.push(*packet_id); + assert_eq!(p.meta().size, 1); + } + } + + received_data.sort_unstable(); + for i in 1..received_data.len() { + assert_eq!(received_data[i - 1] + 1, received_data[i]); + } + + // Stop sending + tx_sender_shutdown.await; + let localhost_stats = join_scheduler(scheduler_handle).await; + assert_eq!( + localhost_stats, + SendTransactionStats { + successfully_sent: expected_num_txs as u64, + ..Default::default() + } + ); + + // Stop server + exit.store(true, Ordering::Relaxed); + server_handle.await.unwrap(); +} + +async fn count_received_packets_for( + receiver: CrossbeamReceiver, + expected_tx_size: usize, + receive_duration: Duration, +) -> usize { + let now = Instant::now(); + let mut num_packets_received = Saturating(0usize); + + while now.elapsed() < receive_duration { + if let Ok(packets) = receiver.try_recv() { + num_packets_received += packets.len(); + for p in packets.iter() { + assert_eq!(p.meta().size, expected_tx_size); + } + } else { + sleep(Duration::from_millis(100)).await; + } + } + + num_packets_received.0 +} + +// Check that client can create connection even if the first several attempts were unsuccessful. +#[tokio::test] +async fn test_connection_denied_until_allowed() { + let SpawnTestServerResult { + join_handle: server_handle, + exit, + receiver, + server_address, + stats: _stats, + } = setup_quic_server(None, TestServerConfig::default()); + + // To prevent server from accepting a new connection, we use the following observation. + // Since max_connections_per_peer == 1 (< max_unstaked_connections == 500), if we create a first + // connection and later try another one, the second connection will be immediately closed. + // + // Since client is not retrying sending failed transactions, this leads to the packets loss. + // The connection has been created and closed when we already have sent the data. + let throttling_connection = make_client_endpoint(&server_address, None).await; + + // Setup sending txs + let tx_size = 1; + let expected_num_txs: usize = 10; + let SpawnTxGenerator { + tx_receiver, + tx_sender_shutdown, + .. + } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(100)); + + let (scheduler_handle, _scheduler_cancel) = + setup_connection_worker_scheduler(server_address, tx_receiver, None).await; + + // Check results + let actual_num_packets = count_received_packets_for(receiver, tx_size, TEST_MAX_TIME).await; + assert!( + actual_num_packets < expected_num_txs, + "Expected to receive {expected_num_txs} packets in {TEST_MAX_TIME:?}\n\ + Got packets: {actual_num_packets}" + ); + + // Wait for the exchange to finish. + tx_sender_shutdown.await; + let localhost_stats = join_scheduler(scheduler_handle).await; + // in case of pruning, server closes the connection with code 1 and error + // message b"dropped". This might lead to connection error + // (ApplicationClosed::ApplicationClose) or to stream error + // (ConnectionLost::ApplicationClosed::ApplicationClose). + assert_eq!( + localhost_stats.write_error_connection_lost + + localhost_stats.connection_error_application_closed, + 1 + ); + + drop(throttling_connection); + + // Exit server + exit.store(true, Ordering::Relaxed); + server_handle.await.unwrap(); +} + +// Check that if the client connection has been pruned, client manages to +// reestablish it. Pruning will lead to 1 packet loss, because when we send the +// next packet we will reestablish connection. +#[tokio::test] +async fn test_connection_pruned_and_reopened() { + let SpawnTestServerResult { + join_handle: server_handle, + exit, + receiver, + server_address, + stats: _stats, + } = setup_quic_server( + None, + TestServerConfig { + max_connections_per_peer: 100, + max_unstaked_connections: 1, + ..Default::default() + }, + ); + + // Setup sending txs + let tx_size = 1; + let expected_num_txs: usize = 16; + let SpawnTxGenerator { + tx_receiver, + tx_sender_shutdown, + .. + } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(100)); + + let (scheduler_handle, _scheduler_cancel) = + setup_connection_worker_scheduler(server_address, tx_receiver, None).await; + + sleep(Duration::from_millis(400)).await; + let _connection_to_prune_client = make_client_endpoint(&server_address, None).await; + + // Check results + let actual_num_packets = count_received_packets_for(receiver, tx_size, TEST_MAX_TIME).await; + assert!(actual_num_packets < expected_num_txs); + + // Wait for the exchange to finish. + tx_sender_shutdown.await; + let localhost_stats = join_scheduler(scheduler_handle).await; + // in case of pruning, server closes the connection with code 1 and error + // message b"dropped". This might lead to connection error + // (ApplicationClosed::ApplicationClose) or to stream error + // (ConnectionLost::ApplicationClosed::ApplicationClose). + assert_eq!( + localhost_stats.connection_error_application_closed + + localhost_stats.write_error_connection_lost, + 1, + ); + + // Exit server + exit.store(true, Ordering::Relaxed); + server_handle.await.unwrap(); +} + +/// Check that client creates staked connection. To do that prohibit unstaked +/// connection and verify that all the txs has been received. +#[tokio::test] +async fn test_staked_connection() { + let validator_identity = Keypair::new(); + let stakes = HashMap::from([(validator_identity.pubkey(), 100_000)]); + let staked_nodes = StakedNodes::new(Arc::new(stakes), HashMap::::default()); + + let SpawnTestServerResult { + join_handle: server_handle, + exit, + receiver, + server_address, + stats: _stats, + } = setup_quic_server( + Some(staked_nodes), + TestServerConfig { + // Must use at least the number of endpoints (10) because + // `max_staked_connections` and `max_unstaked_connections` are + // cumulative for all the endpoints. + max_staked_connections: 10, + max_unstaked_connections: 0, + ..Default::default() + }, + ); + + // Setup sending txs + let tx_size = 1; + let expected_num_txs: usize = 10; + let SpawnTxGenerator { + tx_receiver, + tx_sender_shutdown, + .. + } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(100)); + + let (scheduler_handle, _scheduler_cancel) = + setup_connection_worker_scheduler(server_address, tx_receiver, Some(validator_identity)) + .await; + + // Check results + let actual_num_packets = count_received_packets_for(receiver, tx_size, TEST_MAX_TIME).await; + assert_eq!(actual_num_packets, expected_num_txs); + + // Wait for the exchange to finish. + tx_sender_shutdown.await; + let localhost_stats = join_scheduler(scheduler_handle).await; + assert_eq!( + localhost_stats, + SendTransactionStats { + successfully_sent: expected_num_txs as u64, + ..Default::default() + } + ); + + // Exit server + exit.store(true, Ordering::Relaxed); + server_handle.await.unwrap(); +} + +// Check that if client sends transactions at a reasonably high rate that is +// higher than what the server accepts, nevertheless all the transactions are +// delivered and there are no errors on the client side. +#[tokio::test] +async fn test_connection_throttling() { + let SpawnTestServerResult { + join_handle: server_handle, + exit, + receiver, + server_address, + stats: _stats, + } = setup_quic_server(None, TestServerConfig::default()); + + // Setup sending txs + let tx_size = 1; + let expected_num_txs: usize = 50; + // Send at 1000 TPS - x10 more than the throttling interval of 10ms used in other tests allows. + let SpawnTxGenerator { + tx_receiver, + tx_sender_shutdown, + .. + } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(1)); + + let (scheduler_handle, _scheduler_cancel) = + setup_connection_worker_scheduler(server_address, tx_receiver, None).await; + + // Check results + let actual_num_packets = + count_received_packets_for(receiver, tx_size, Duration::from_secs(1)).await; + assert_eq!(actual_num_packets, expected_num_txs); + + // Stop sending + tx_sender_shutdown.await; + let localhost_stats = join_scheduler(scheduler_handle).await; + assert_eq!( + localhost_stats, + SendTransactionStats { + successfully_sent: expected_num_txs as u64, + ..Default::default() + } + ); + + // Exit server + exit.store(true, Ordering::Relaxed); + server_handle.await.unwrap(); +} + +// Check that when the host cannot be reached, the client exits gracefully. +#[tokio::test] +async fn test_no_host() { + // A "black hole" address for the TPU. + let server_ip = IpAddr::V6(Ipv6Addr::new(0x100, 0, 0, 0, 0, 0, 0, 1)); + let server_address = SocketAddr::new(server_ip, 49151); + + // Setup sending side. + let tx_size = 1; + let max_send_attempts: usize = 10; + + let SpawnTxGenerator { + tx_receiver, + tx_sender_shutdown, + tx_sender_done, + .. + } = spawn_tx_sender(tx_size, max_send_attempts, Duration::from_millis(10)); + + let (scheduler_handle, _scheduler_cancel) = + setup_connection_worker_scheduler(server_address, tx_receiver, None).await; + + // Wait for all the transactions to be sent, and some extra time for the delivery to be + // attempted. + tx_sender_done.await.unwrap(); + sleep(Duration::from_millis(100)).await; + + // Wait for the generator to finish. + tx_sender_shutdown.await; + + // While attempting to establish a connection with a nonexistent host, we fill the worker's + // channel. Transactions from this channel will never be sent and will eventually be dropped + // without increasing the `SendTransactionStats` counters. + let stats = scheduler_handle + .await + .expect("Scheduler should stop successfully") + .expect("Scheduler execution was successful"); + assert_eq!(stats, HashMap::new()); +} + +// Check that when the client is rate-limited by server, we update counters +// accordingly. To implement it we: +// * set the connection limit per minute to 1 +// * create a dummy connection to reach the limit and immediately close it +// * set up client which will try to create a new connection which it will be +// rate-limited. This test doesn't check what happens when the rate-limiting +// period ends because it too long for test (1min). +#[tokio::test] +async fn test_rate_limiting() { + let SpawnTestServerResult { + join_handle: server_handle, + exit, + receiver, + server_address, + stats: _stats, + } = setup_quic_server( + None, + TestServerConfig { + max_connections_per_peer: 100, + max_connections_per_ipaddr_per_minute: 1, + ..Default::default() + }, + ); + + let connection_to_reach_limit = make_client_endpoint(&server_address, None).await; + drop(connection_to_reach_limit); + + // Setup sending txs + let tx_size = 1; + let expected_num_txs: usize = 16; + let SpawnTxGenerator { + tx_receiver, + tx_sender_shutdown, + .. + } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(100)); + + let (scheduler_handle, scheduler_cancel) = + setup_connection_worker_scheduler(server_address, tx_receiver, None).await; + + let actual_num_packets = count_received_packets_for(receiver, tx_size, TEST_MAX_TIME).await; + assert_eq!(actual_num_packets, 0); + + // Stop the sender. + tx_sender_shutdown.await; + + // And the scheduler. + scheduler_cancel.cancel(); + let localhost_stats = join_scheduler(scheduler_handle).await; + + // We do not expect to see any errors, as the connection is in the pending state still, when we + // do the shutdown. If we increase the time we wait in `count_received_packets_for`, we would + // start seeing a `connection_error_timed_out` incremented to 1. Potentially, we may want to + // accept both 0 and 1 as valid values for it. + assert_eq!(localhost_stats, SendTransactionStats::default()); + + // Stop the server. + exit.store(true, Ordering::Relaxed); + server_handle.await.unwrap(); +} + +// The same as test_rate_limiting but here we wait for 1 min to check that the +// connection has been established. +#[tokio::test] +// TODO Provide an alternative testing interface for `streamer::nonblocking::quic::spawn_server` +// that would accept throttling at a granularity below 1 minute. +#[ignore = "takes 70s to complete"] +async fn test_rate_limiting_establish_connection() { + let SpawnTestServerResult { + join_handle: server_handle, + exit, + receiver, + server_address, + stats: _stats, + } = setup_quic_server( + None, + TestServerConfig { + max_connections_per_peer: 100, + max_connections_per_ipaddr_per_minute: 1, + ..Default::default() + }, + ); + + let connection_to_reach_limit = make_client_endpoint(&server_address, None).await; + drop(connection_to_reach_limit); + + // Setup sending txs + let tx_size = 1; + let expected_num_txs: usize = 65; + let SpawnTxGenerator { + tx_receiver, + tx_sender_shutdown, + .. + } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(1000)); + + let (scheduler_handle, scheduler_cancel) = + setup_connection_worker_scheduler(server_address, tx_receiver, None).await; + + let actual_num_packets = + count_received_packets_for(receiver, tx_size, Duration::from_secs(70)).await; + assert!( + actual_num_packets > 0, + "As we wait longer than 1 minute, at least one transaction should be delivered. \ + After 1 minute the server is expected to accept our connection.\n\ + Actual packets delivered: {actual_num_packets}" + ); + + // Stop the sender. + tx_sender_shutdown.await; + + // And the scheduler. + scheduler_cancel.cancel(); + let mut localhost_stats = join_scheduler(scheduler_handle).await; + assert!( + localhost_stats.connection_error_timed_out > 0, + "As the quinn timeout is below 1 minute, a few connections will fail to connect during \ + the 1 minute delay.\n\ + Actual connection_error_timed_out: {}", + localhost_stats.connection_error_timed_out + ); + assert!( + localhost_stats.successfully_sent > 0, + "As we run the test for longer than 1 minute, we expect a connection to be established, \ + and a number of transactions to be delivered.\n\ + Actual successfully_sent: {}", + localhost_stats.successfully_sent + ); + + // All the rest of the error counters should be 0. + localhost_stats.connection_error_timed_out = 0; + localhost_stats.successfully_sent = 0; + assert_eq!(localhost_stats, SendTransactionStats::default()); + + // Stop the server. + exit.store(true, Ordering::Relaxed); + server_handle.await.unwrap(); +}