diff --git a/Cargo.lock b/Cargo.lock index c3e4bcbf4f0d07..42352a5f7ea4c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7214,8 +7214,23 @@ name = "solana-prio-graph-scheduler" version = "2.1.0" dependencies = [ "ahash 0.8.10", + "arrayvec", + "assert_matches", + "crossbeam-channel", + "itertools 0.12.1", + "log", + "min-max-heap", + "prio-graph", "solana-core", + "solana-cost-model", + "solana-gossip", + "solana-ledger", + "solana-measure", + "solana-metrics", + "solana-poh", + "solana-runtime", "solana-sdk", + "thiserror", ] [[package]] diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 78aec0b62b7e2d..41e7d95595dd84 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -62,7 +62,7 @@ pub mod qos_service; pub mod unprocessed_packet_batches; pub mod unprocessed_transaction_storage; -mod consume_worker; +pub mod consume_worker; mod decision_maker; mod forward_packet_batches_by_accounts; mod forward_worker; @@ -73,7 +73,7 @@ mod multi_iterator_scanner; mod packet_deserializer; mod packet_filter; mod packet_receiver; -mod read_write_account_set; +pub mod read_write_account_set; #[allow(dead_code)] mod scheduler_messages; mod transaction_scheduler; diff --git a/prio-graph-scheduler/Cargo.toml b/prio-graph-scheduler/Cargo.toml index 9bc4b097c82d19..47706ad7f40b1b 100644 --- a/prio-graph-scheduler/Cargo.toml +++ b/prio-graph-scheduler/Cargo.toml @@ -12,9 +12,25 @@ edition.workspace = true [dependencies] solana-core = { workspace = true } solana-sdk = { workspace = true } +solana-poh = { workspace = true } +solana-metrics = { workspace = true } +solana-ledger = { workspace = true } +solana-runtime = { workspace = true } +solana-gossip = { workspace = true } +solana-cost-model = { workspace = true } +solana-measure = { workspace = true } ahash = { workspace = true } +prio-graph = { workspace = true } +thiserror = { workspace = true } +itertools = { workspace = true } +log = { workspace = true } +crossbeam-channel = { workspace = true } +arrayvec = { workspace = true } +min-max-heap = { workspace = true } +[dev-dependencies] +assert_matches = { workspace = true } [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] diff --git a/prio-graph-scheduler/src/lib.rs b/prio-graph-scheduler/src/lib.rs index 927e9d77bbc084..0fa6eac41017b1 100644 --- a/prio-graph-scheduler/src/lib.rs +++ b/prio-graph-scheduler/src/lib.rs @@ -3,4 +3,17 @@ pub mod transaction_state; pub mod scheduler_messages; pub mod id_generator; pub mod in_flight_tracker; -pub mod thread_aware_account_locks; \ No newline at end of file +pub mod thread_aware_account_locks; +pub mod transaction_priority_id; +pub mod scheduler_error; +pub mod scheduler_metrics; +// pub mod scheduler_controller; +pub mod transaction_state_container; +pub mod prio_graph_scheduler; + +#[macro_use] +extern crate solana_metrics; + +#[cfg(test)] +#[macro_use] +extern crate assert_matches; \ No newline at end of file diff --git a/prio-graph-scheduler/src/prio_graph_scheduler.rs b/prio-graph-scheduler/src/prio_graph_scheduler.rs new file mode 100644 index 00000000000000..a2f339d3baeaac --- /dev/null +++ b/prio-graph-scheduler/src/prio_graph_scheduler.rs @@ -0,0 +1,907 @@ +use { + crate::scheduler_messages::{ + ConsumeWork, FinishedConsumeWork, TransactionBatchId, TransactionId, + }, + crate::transaction_priority_id::TransactionPriorityId, + crate::transaction_state::TransactionState, + crate::{ + in_flight_tracker::InFlightTracker, + scheduler_error::SchedulerError, + thread_aware_account_locks::{ThreadAwareAccountLocks, ThreadId, ThreadSet}, + transaction_state::SanitizedTransactionTTL, + transaction_state_container::TransactionStateContainer, + }, + crossbeam_channel::{Receiver, Sender, TryRecvError}, + itertools::izip, + prio_graph::{AccessKind, PrioGraph}, + solana_core::banking_stage::{ + consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH, read_write_account_set::ReadWriteAccountSet, + }, + solana_cost_model::block_cost_limits::MAX_BLOCK_UNITS, + solana_measure::measure_us, + solana_sdk::{ + pubkey::Pubkey, saturating_add_assign, slot_history::Slot, + transaction::SanitizedTransaction, + }, +}; + +pub struct PrioGraphScheduler { + in_flight_tracker: InFlightTracker, + account_locks: ThreadAwareAccountLocks, + consume_work_senders: Vec>, + finished_consume_work_receiver: Receiver, + look_ahead_window_size: usize, +} + +impl PrioGraphScheduler { + pub fn new( + consume_work_senders: Vec>, + finished_consume_work_receiver: Receiver, + ) -> Self { + let num_threads = consume_work_senders.len(); + Self { + in_flight_tracker: InFlightTracker::new(num_threads), + account_locks: ThreadAwareAccountLocks::new(num_threads), + consume_work_senders, + finished_consume_work_receiver, + look_ahead_window_size: 2048, + } + } + + /// Schedule transactions from the given `TransactionStateContainer` to be + /// consumed by the worker threads. Returns summary of scheduling, or an + /// error. + /// `pre_graph_filter` is used to filter out transactions that should be + /// skipped and dropped before insertion to the prio-graph. This fn should + /// set `false` for transactions that should be dropped, and `true` + /// otherwise. + /// `pre_lock_filter` is used to filter out transactions after they have + /// made it to the top of the prio-graph, and immediately before locks are + /// checked and taken. This fn should return `true` for transactions that + /// should be scheduled, and `false` otherwise. + /// + /// Uses a `PrioGraph` to perform look-ahead during the scheduling of transactions. + /// This, combined with internal tracking of threads' in-flight transactions, allows + /// for load-balancing while prioritizing scheduling transactions onto threads that will + /// not cause conflicts in the near future. + pub fn schedule( + &mut self, + container: &mut TransactionStateContainer, + pre_graph_filter: impl Fn(&[&SanitizedTransaction], &mut [bool]), + pre_lock_filter: impl Fn(&SanitizedTransaction) -> bool, + ) -> Result { + let num_threads = self.consume_work_senders.len(); + let max_cu_per_thread = MAX_BLOCK_UNITS / num_threads as u64; + + let mut schedulable_threads = ThreadSet::any(num_threads); + for thread_id in 0..num_threads { + if self.in_flight_tracker.cus_in_flight_per_thread()[thread_id] >= max_cu_per_thread { + schedulable_threads.remove(thread_id); + } + } + if schedulable_threads.is_empty() { + return Ok(SchedulingSummary { + num_scheduled: 0, + num_unschedulable: 0, + num_filtered_out: 0, + filter_time_us: 0, + }); + } + + let mut batches = Batches::new(num_threads); + // Some transactions may be unschedulable due to multi-thread conflicts. + // These transactions cannot be scheduled until some conflicting work is completed. + // However, the scheduler should not allow other transactions that conflict with + // these transactions to be scheduled before them. + let mut unschedulable_ids = Vec::new(); + let mut blocking_locks = ReadWriteAccountSet::default(); + let mut prio_graph = PrioGraph::new(|id: &TransactionPriorityId, _graph_node| *id); + + // Track metrics on filter. + let mut num_filtered_out: usize = 0; + let mut total_filter_time_us: u64 = 0; + + let mut window_budget = self.look_ahead_window_size; + let mut chunked_pops = |container: &mut TransactionStateContainer, + prio_graph: &mut PrioGraph<_, _, _, _>, + window_budget: &mut usize| { + while *window_budget > 0 { + const MAX_FILTER_CHUNK_SIZE: usize = 128; + let mut filter_array = [true; MAX_FILTER_CHUNK_SIZE]; + let mut ids = Vec::with_capacity(MAX_FILTER_CHUNK_SIZE); + let mut txs = Vec::with_capacity(MAX_FILTER_CHUNK_SIZE); + + let chunk_size = (*window_budget).min(MAX_FILTER_CHUNK_SIZE); + for _ in 0..chunk_size { + if let Some(id) = container.pop() { + ids.push(id); + } else { + break; + } + } + *window_budget = window_budget.saturating_sub(chunk_size); + + ids.iter().for_each(|id| { + let transaction = container.get_transaction_ttl(&id.id).unwrap(); + txs.push(&transaction.transaction); + }); + + let (_, filter_us) = + measure_us!(pre_graph_filter(&txs, &mut filter_array[..chunk_size])); + saturating_add_assign!(total_filter_time_us, filter_us); + + for (id, filter_result) in ids.iter().zip(&filter_array[..chunk_size]) { + if *filter_result { + let transaction = container.get_transaction_ttl(&id.id).unwrap(); + prio_graph.insert_transaction( + *id, + Self::get_transaction_account_access(transaction), + ); + } else { + saturating_add_assign!(num_filtered_out, 1); + container.remove_by_id(&id.id); + } + } + + if ids.len() != chunk_size { + break; + } + } + }; + + // Create the initial look-ahead window. + // Check transactions against filter, remove from container if it fails. + chunked_pops(container, &mut prio_graph, &mut window_budget); + + let mut unblock_this_batch = + Vec::with_capacity(self.consume_work_senders.len() * TARGET_NUM_TRANSACTIONS_PER_BATCH); + const MAX_TRANSACTIONS_PER_SCHEDULING_PASS: usize = 100_000; + let mut num_scheduled: usize = 0; + let mut num_sent: usize = 0; + let mut num_unschedulable: usize = 0; + while num_scheduled < MAX_TRANSACTIONS_PER_SCHEDULING_PASS { + // If nothing is in the main-queue of the `PrioGraph` then there's nothing left to schedule. + if prio_graph.is_empty() { + break; + } + + while let Some(id) = prio_graph.pop() { + unblock_this_batch.push(id); + + // Should always be in the container, during initial testing phase panic. + // Later, we can replace with a continue in case this does happen. + let Some(transaction_state) = container.get_mut_transaction_state(&id.id) else { + panic!("transaction state must exist") + }; + + let maybe_schedule_info = try_schedule_transaction( + transaction_state, + &pre_lock_filter, + &mut blocking_locks, + &mut self.account_locks, + num_threads, + |thread_set| { + Self::select_thread( + thread_set, + &batches.total_cus, + self.in_flight_tracker.cus_in_flight_per_thread(), + &batches.transactions, + self.in_flight_tracker.num_in_flight_per_thread(), + ) + }, + ); + + match maybe_schedule_info { + Err(TransactionSchedulingError::Filtered) => { + container.remove_by_id(&id.id); + } + Err(TransactionSchedulingError::UnschedulableConflicts) => { + unschedulable_ids.push(id); + saturating_add_assign!(num_unschedulable, 1); + } + Ok(TransactionSchedulingInfo { + thread_id, + transaction, + max_age_slot, + cost, + }) => { + saturating_add_assign!(num_scheduled, 1); + batches.transactions[thread_id].push(transaction); + batches.ids[thread_id].push(id.id); + batches.max_age_slots[thread_id].push(max_age_slot); + saturating_add_assign!(batches.total_cus[thread_id], cost); + + // If target batch size is reached, send only this batch. + if batches.ids[thread_id].len() >= TARGET_NUM_TRANSACTIONS_PER_BATCH { + saturating_add_assign!( + num_sent, + self.send_batch(&mut batches, thread_id)? + ); + } + + // if the thread is at max_cu_per_thread, remove it from the schedulable threads + // if there are no more schedulable threads, stop scheduling. + if self.in_flight_tracker.cus_in_flight_per_thread()[thread_id] + + batches.total_cus[thread_id] + >= max_cu_per_thread + { + schedulable_threads.remove(thread_id); + if schedulable_threads.is_empty() { + break; + } + } + + if num_scheduled >= MAX_TRANSACTIONS_PER_SCHEDULING_PASS { + break; + } + } + } + } + + // Send all non-empty batches + saturating_add_assign!(num_sent, self.send_batches(&mut batches)?); + + // Refresh window budget and do chunked pops + saturating_add_assign!(window_budget, unblock_this_batch.len()); + chunked_pops(container, &mut prio_graph, &mut window_budget); + + // Unblock all transactions that were blocked by the transactions that were just sent. + for id in unblock_this_batch.drain(..) { + prio_graph.unblock(&id); + } + } + + // Send batches for any remaining transactions + saturating_add_assign!(num_sent, self.send_batches(&mut batches)?); + + // Push unschedulable ids back into the container + for id in unschedulable_ids { + container.push_id_into_queue(id); + } + + // Push remaining transactions back into the container + while let Some((id, _)) = prio_graph.pop_and_unblock() { + container.push_id_into_queue(id); + } + + assert_eq!( + num_scheduled, num_sent, + "number of scheduled and sent transactions must match" + ); + + Ok(SchedulingSummary { + num_scheduled, + num_unschedulable, + num_filtered_out, + filter_time_us: total_filter_time_us, + }) + } + + /// Receive completed batches of transactions without blocking. + /// Returns (num_transactions, num_retryable_transactions) on success. + pub fn receive_completed( + &mut self, + container: &mut TransactionStateContainer, + ) -> Result<(usize, usize), SchedulerError> { + let mut total_num_transactions: usize = 0; + let mut total_num_retryable: usize = 0; + loop { + let (num_transactions, num_retryable) = self.try_receive_completed(container)?; + if num_transactions == 0 { + break; + } + saturating_add_assign!(total_num_transactions, num_transactions); + saturating_add_assign!(total_num_retryable, num_retryable); + } + Ok((total_num_transactions, total_num_retryable)) + } + + /// Receive completed batches of transactions. + /// Returns `Ok((num_transactions, num_retryable))` if a batch was received, `Ok((0, 0))` if no batch was received. + fn try_receive_completed( + &mut self, + container: &mut TransactionStateContainer, + ) -> Result<(usize, usize), SchedulerError> { + match self.finished_consume_work_receiver.try_recv() { + Ok(FinishedConsumeWork { + work: + ConsumeWork { + batch_id, + ids, + transactions, + max_age_slots, + }, + retryable_indexes, + }) => { + let num_transactions = ids.len(); + let num_retryable = retryable_indexes.len(); + + // Free the locks + self.complete_batch(batch_id, &transactions); + + // Retryable transactions should be inserted back into the container + let mut retryable_iter = retryable_indexes.into_iter().peekable(); + for (index, (id, transaction, max_age_slot)) in + izip!(ids, transactions, max_age_slots).enumerate() + { + if let Some(retryable_index) = retryable_iter.peek() { + if *retryable_index == index { + container.retry_transaction( + id, + SanitizedTransactionTTL { + transaction, + max_age_slot, + }, + ); + retryable_iter.next(); + continue; + } + } + container.remove_by_id(&id); + } + + Ok((num_transactions, num_retryable)) + } + Err(TryRecvError::Empty) => Ok((0, 0)), + Err(TryRecvError::Disconnected) => Err(SchedulerError::DisconnectedRecvChannel( + "finished consume work", + )), + } + } + + /// Mark a given `TransactionBatchId` as completed. + /// This will update the internal tracking, including account locks. + fn complete_batch( + &mut self, + batch_id: TransactionBatchId, + transactions: &[SanitizedTransaction], + ) { + let thread_id = self.in_flight_tracker.complete_batch(batch_id); + for transaction in transactions { + let message = transaction.message(); + let account_keys = message.account_keys(); + let write_account_locks = account_keys + .iter() + .enumerate() + .filter_map(|(index, key)| message.is_writable(index).then_some(key)); + let read_account_locks = account_keys + .iter() + .enumerate() + .filter_map(|(index, key)| (!message.is_writable(index)).then_some(key)); + self.account_locks + .unlock_accounts(write_account_locks, read_account_locks, thread_id); + } + } + + /// Send all batches of transactions to the worker threads. + /// Returns the number of transactions sent. + fn send_batches(&mut self, batches: &mut Batches) -> Result { + (0..self.consume_work_senders.len()) + .map(|thread_index| self.send_batch(batches, thread_index)) + .sum() + } + + /// Send a batch of transactions to the given thread's `ConsumeWork` channel. + /// Returns the number of transactions sent. + fn send_batch( + &mut self, + batches: &mut Batches, + thread_index: usize, + ) -> Result { + if batches.ids[thread_index].is_empty() { + return Ok(0); + } + + let (ids, transactions, max_age_slots, total_cus) = batches.take_batch(thread_index); + + let batch_id = self + .in_flight_tracker + .track_batch(ids.len(), total_cus, thread_index); + + let num_scheduled = ids.len(); + let work = ConsumeWork { + batch_id, + ids, + transactions, + max_age_slots, + }; + self.consume_work_senders[thread_index] + .send(work) + .map_err(|_| SchedulerError::DisconnectedSendChannel("consume work sender"))?; + + Ok(num_scheduled) + } + + /// Given the schedulable `thread_set`, select the thread with the least amount + /// of work queued up. + /// Currently, "work" is just defined as the number of transactions. + /// + /// If the `chain_thread` is available, this thread will be selected, regardless of + /// load-balancing. + /// + /// Panics if the `thread_set` is empty. This should never happen, see comment + /// on `ThreadAwareAccountLocks::try_lock_accounts`. + fn select_thread( + thread_set: ThreadSet, + batch_cus_per_thread: &[u64], + in_flight_cus_per_thread: &[u64], + batches_per_thread: &[Vec], + in_flight_per_thread: &[usize], + ) -> ThreadId { + thread_set + .contained_threads_iter() + .map(|thread_id| { + ( + thread_id, + batch_cus_per_thread[thread_id] + in_flight_cus_per_thread[thread_id], + batches_per_thread[thread_id].len() + in_flight_per_thread[thread_id], + ) + }) + .min_by(|a, b| a.1.cmp(&b.1).then_with(|| a.2.cmp(&b.2))) + .map(|(thread_id, _, _)| thread_id) + .unwrap() + } + + /// Gets accessed accounts (resources) for use in `PrioGraph`. + fn get_transaction_account_access( + transaction: &SanitizedTransactionTTL, + ) -> impl Iterator + '_ { + let message = transaction.transaction.message(); + message + .account_keys() + .iter() + .enumerate() + .map(|(index, key)| { + if message.is_writable(index) { + (*key, AccessKind::Write) + } else { + (*key, AccessKind::Read) + } + }) + } +} + +/// Metrics from scheduling transactions. +#[derive(Debug, PartialEq, Eq)] +pub struct SchedulingSummary { + /// Number of transactions scheduled. + pub num_scheduled: usize, + /// Number of transactions that were not scheduled due to conflicts. + pub num_unschedulable: usize, + /// Number of transactions that were dropped due to filter. + pub num_filtered_out: usize, + /// Time spent filtering transactions + pub filter_time_us: u64, +} + +struct Batches { + ids: Vec>, + transactions: Vec>, + max_age_slots: Vec>, + total_cus: Vec, +} + +impl Batches { + fn new(num_threads: usize) -> Self { + Self { + ids: vec![Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH); num_threads], + transactions: vec![Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH); num_threads], + max_age_slots: vec![Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH); num_threads], + total_cus: vec![0; num_threads], + } + } + + fn take_batch( + &mut self, + thread_id: ThreadId, + ) -> ( + Vec, + Vec, + Vec, + u64, + ) { + ( + core::mem::replace( + &mut self.ids[thread_id], + Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH), + ), + core::mem::replace( + &mut self.transactions[thread_id], + Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH), + ), + core::mem::replace( + &mut self.max_age_slots[thread_id], + Vec::with_capacity(TARGET_NUM_TRANSACTIONS_PER_BATCH), + ), + core::mem::replace(&mut self.total_cus[thread_id], 0), + ) + } +} + +/// A transaction has been scheduled to a thread. +struct TransactionSchedulingInfo { + thread_id: ThreadId, + transaction: SanitizedTransaction, + max_age_slot: Slot, + cost: u64, +} + +/// Error type for reasons a transaction could not be scheduled. +enum TransactionSchedulingError { + /// Transaction was filtered out before locking. + Filtered, + /// Transaction cannot be scheduled due to conflicts, or + /// higher priority conflicting transactions are unschedulable. + UnschedulableConflicts, +} + +fn try_schedule_transaction( + transaction_state: &mut TransactionState, + pre_lock_filter: impl Fn(&SanitizedTransaction) -> bool, + blocking_locks: &mut ReadWriteAccountSet, + account_locks: &mut ThreadAwareAccountLocks, + num_threads: usize, + thread_selector: impl Fn(ThreadSet) -> ThreadId, +) -> Result { + let transaction = &transaction_state.transaction_ttl().transaction; + if !pre_lock_filter(transaction) { + return Err(TransactionSchedulingError::Filtered); + } + + // Check if this transaction conflicts with any blocked transactions + let message = transaction.message(); + if !blocking_locks.check_locks(message) { + blocking_locks.take_locks(message); + return Err(TransactionSchedulingError::UnschedulableConflicts); + } + + // Schedule the transaction if it can be. + let message = transaction.message(); + let account_keys = message.account_keys(); + let write_account_locks = account_keys + .iter() + .enumerate() + .filter_map(|(index, key)| message.is_writable(index).then_some(key)); + let read_account_locks = account_keys + .iter() + .enumerate() + .filter_map(|(index, key)| (!message.is_writable(index)).then_some(key)); + + let Some(thread_id) = account_locks.try_lock_accounts( + write_account_locks, + read_account_locks, + ThreadSet::any(num_threads), + thread_selector, + ) else { + blocking_locks.take_locks(message); + return Err(TransactionSchedulingError::UnschedulableConflicts); + }; + + let sanitized_transaction_ttl = transaction_state.transition_to_pending(); + let cost = transaction_state.cost(); + + Ok(TransactionSchedulingInfo { + thread_id, + transaction: sanitized_transaction_ttl.transaction, + max_age_slot: sanitized_transaction_ttl.max_age_slot, + cost, + }) +} + +#[cfg(test)] +mod tests { + use { + super::*, + crossbeam_channel::{unbounded, Receiver}, + itertools::Itertools, + solana_core::banking_stage::{ + consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH, + immutable_deserialized_packet::ImmutableDeserializedPacket, + }, + solana_sdk::{ + compute_budget::ComputeBudgetInstruction, hash::Hash, message::Message, packet::Packet, + pubkey::Pubkey, signature::Keypair, signer::Signer, system_instruction, + transaction::Transaction, + }, + std::{borrow::Borrow, sync::Arc}, + }; + + macro_rules! txid { + ($value:expr) => { + TransactionId::new($value) + }; + } + + macro_rules! txids { + ([$($element:expr),*]) => { + vec![ $(txid!($element)),* ] + }; + } + + fn create_test_frame( + num_threads: usize, + ) -> ( + PrioGraphScheduler, + Vec>, + Sender, + ) { + let (consume_work_senders, consume_work_receivers) = + (0..num_threads).map(|_| unbounded()).unzip(); + let (finished_consume_work_sender, finished_consume_work_receiver) = unbounded(); + let scheduler = + PrioGraphScheduler::new(consume_work_senders, finished_consume_work_receiver); + ( + scheduler, + consume_work_receivers, + finished_consume_work_sender, + ) + } + + fn prioritized_tranfers( + from_keypair: &Keypair, + to_pubkeys: impl IntoIterator>, + lamports: u64, + priority: u64, + ) -> SanitizedTransaction { + let to_pubkeys_lamports = to_pubkeys + .into_iter() + .map(|pubkey| *pubkey.borrow()) + .zip(std::iter::repeat(lamports)) + .collect_vec(); + let mut ixs = + system_instruction::transfer_many(&from_keypair.pubkey(), &to_pubkeys_lamports); + let prioritization = ComputeBudgetInstruction::set_compute_unit_price(priority); + ixs.push(prioritization); + let message = Message::new(&ixs, Some(&from_keypair.pubkey())); + let tx = Transaction::new(&[from_keypair], message, Hash::default()); + SanitizedTransaction::from_transaction_for_tests(tx) + } + + fn create_container( + tx_infos: impl IntoIterator< + Item = ( + impl Borrow, + impl IntoIterator>, + u64, + u64, + ), + >, + ) -> TransactionStateContainer { + let mut container = TransactionStateContainer::with_capacity(10 * 1024); + for (index, (from_keypair, to_pubkeys, lamports, compute_unit_price)) in + tx_infos.into_iter().enumerate() + { + let id = TransactionId::new(index as u64); + let transaction = prioritized_tranfers( + from_keypair.borrow(), + to_pubkeys, + lamports, + compute_unit_price, + ); + let packet = Arc::new( + ImmutableDeserializedPacket::new( + Packet::from_data(None, transaction.to_versioned_transaction()).unwrap(), + ) + .unwrap(), + ); + let transaction_ttl = SanitizedTransactionTTL { + transaction, + max_age_slot: Slot::MAX, + }; + const TEST_TRANSACTION_COST: u64 = 5000; + container.insert_new_transaction( + id, + transaction_ttl, + packet, + compute_unit_price, + TEST_TRANSACTION_COST, + ); + } + + container + } + + fn collect_work( + receiver: &Receiver, + ) -> (Vec, Vec>) { + receiver + .try_iter() + .map(|work| { + let ids = work.ids.clone(); + (work, ids) + }) + .unzip() + } + + fn test_pre_graph_filter(_txs: &[&SanitizedTransaction], results: &mut [bool]) { + results.fill(true); + } + + fn test_pre_lock_filter(_tx: &SanitizedTransaction) -> bool { + true + } + + #[test] + fn test_schedule_disconnected_channel() { + let (mut scheduler, work_receivers, _finished_work_sender) = create_test_frame(1); + let mut container = create_container([(&Keypair::new(), &[Pubkey::new_unique()], 1, 1)]); + + drop(work_receivers); // explicitly drop receivers + assert_matches!( + scheduler.schedule(&mut container, test_pre_graph_filter, test_pre_lock_filter), + Err(SchedulerError::DisconnectedSendChannel(_)) + ); + } + + #[test] + fn test_schedule_single_threaded_no_conflicts() { + let (mut scheduler, work_receivers, _finished_work_sender) = create_test_frame(1); + let mut container = create_container([ + (&Keypair::new(), &[Pubkey::new_unique()], 1, 1), + (&Keypair::new(), &[Pubkey::new_unique()], 2, 2), + ]); + + let scheduling_summary = scheduler + .schedule(&mut container, test_pre_graph_filter, test_pre_lock_filter) + .unwrap(); + assert_eq!(scheduling_summary.num_scheduled, 2); + assert_eq!(scheduling_summary.num_unschedulable, 0); + assert_eq!(collect_work(&work_receivers[0]).1, vec![txids!([1, 0])]); + } + + #[test] + fn test_schedule_single_threaded_conflict() { + let (mut scheduler, work_receivers, _finished_work_sender) = create_test_frame(1); + let pubkey = Pubkey::new_unique(); + let mut container = create_container([ + (&Keypair::new(), &[pubkey], 1, 1), + (&Keypair::new(), &[pubkey], 1, 2), + ]); + + let scheduling_summary = scheduler + .schedule(&mut container, test_pre_graph_filter, test_pre_lock_filter) + .unwrap(); + assert_eq!(scheduling_summary.num_scheduled, 2); + assert_eq!(scheduling_summary.num_unschedulable, 0); + assert_eq!( + collect_work(&work_receivers[0]).1, + vec![txids!([1]), txids!([0])] + ); + } + + #[test] + fn test_schedule_consume_single_threaded_multi_batch() { + let (mut scheduler, work_receivers, _finished_work_sender) = create_test_frame(1); + let mut container = create_container( + (0..4 * TARGET_NUM_TRANSACTIONS_PER_BATCH) + .map(|i| (Keypair::new(), [Pubkey::new_unique()], i as u64, 1)), + ); + + // expect 4 full batches to be scheduled + let scheduling_summary = scheduler + .schedule(&mut container, test_pre_graph_filter, test_pre_lock_filter) + .unwrap(); + assert_eq!( + scheduling_summary.num_scheduled, + 4 * TARGET_NUM_TRANSACTIONS_PER_BATCH + ); + assert_eq!(scheduling_summary.num_unschedulable, 0); + + let thread0_work_counts: Vec<_> = work_receivers[0] + .try_iter() + .map(|work| work.ids.len()) + .collect(); + assert_eq!(thread0_work_counts, [TARGET_NUM_TRANSACTIONS_PER_BATCH; 4]); + } + + #[test] + fn test_schedule_simple_thread_selection() { + let (mut scheduler, work_receivers, _finished_work_sender) = create_test_frame(2); + let mut container = + create_container((0..4).map(|i| (Keypair::new(), [Pubkey::new_unique()], 1, i))); + + let scheduling_summary = scheduler + .schedule(&mut container, test_pre_graph_filter, test_pre_lock_filter) + .unwrap(); + assert_eq!(scheduling_summary.num_scheduled, 4); + assert_eq!(scheduling_summary.num_unschedulable, 0); + assert_eq!(collect_work(&work_receivers[0]).1, [txids!([3, 1])]); + assert_eq!(collect_work(&work_receivers[1]).1, [txids!([2, 0])]); + } + + #[test] + fn test_schedule_priority_guard() { + let (mut scheduler, work_receivers, finished_work_sender) = create_test_frame(2); + // intentionally shorten the look-ahead window to cause unschedulable conflicts + scheduler.look_ahead_window_size = 2; + + let accounts = (0..8).map(|_| Keypair::new()).collect_vec(); + let mut container = create_container([ + (&accounts[0], &[accounts[1].pubkey()], 1, 6), + (&accounts[2], &[accounts[3].pubkey()], 1, 5), + (&accounts[4], &[accounts[5].pubkey()], 1, 4), + (&accounts[6], &[accounts[7].pubkey()], 1, 3), + (&accounts[1], &[accounts[2].pubkey()], 1, 2), + (&accounts[2], &[accounts[3].pubkey()], 1, 1), + ]); + + // The look-ahead window is intentionally shortened, high priority transactions + // [0, 1, 2, 3] do not conflict, and are scheduled onto threads in a + // round-robin fashion. This leads to transaction [4] being unschedulable due + // to conflicts with [0] and [1], which were scheduled to different threads. + // Transaction [5] is technically schedulable, onto thread 1 since it only + // conflicts with transaction [1]. However, [5] will not be scheduled because + // it conflicts with a higher-priority transaction [4] that is unschedulable. + // The full prio-graph can be visualized as: + // [0] \ + // -> [4] -> [5] + // [1] / ------/ + // [2] + // [3] + // Because the look-ahead window is shortened to a size of 4, the scheduler does + // not have knowledge of the joining at transaction [4] until after [0] and [1] + // have been scheduled. + let scheduling_summary = scheduler + .schedule(&mut container, test_pre_graph_filter, test_pre_lock_filter) + .unwrap(); + assert_eq!(scheduling_summary.num_scheduled, 4); + assert_eq!(scheduling_summary.num_unschedulable, 2); + let (thread_0_work, thread_0_ids) = collect_work(&work_receivers[0]); + assert_eq!(thread_0_ids, [txids!([0]), txids!([2])]); + assert_eq!( + collect_work(&work_receivers[1]).1, + [txids!([1]), txids!([3])] + ); + + // Cannot schedule even on next pass because of lock conflicts + let scheduling_summary = scheduler + .schedule(&mut container, test_pre_graph_filter, test_pre_lock_filter) + .unwrap(); + assert_eq!(scheduling_summary.num_scheduled, 0); + assert_eq!(scheduling_summary.num_unschedulable, 2); + + // Complete batch on thread 0. Remaining txs can be scheduled onto thread 1 + finished_work_sender + .send(FinishedConsumeWork { + work: thread_0_work.into_iter().next().unwrap(), + retryable_indexes: vec![], + }) + .unwrap(); + scheduler.receive_completed(&mut container).unwrap(); + let scheduling_summary = scheduler + .schedule(&mut container, test_pre_graph_filter, test_pre_lock_filter) + .unwrap(); + assert_eq!(scheduling_summary.num_scheduled, 2); + assert_eq!(scheduling_summary.num_unschedulable, 0); + + assert_eq!( + collect_work(&work_receivers[1]).1, + [txids!([4]), txids!([5])] + ); + } + + #[test] + fn test_schedule_pre_lock_filter() { + let (mut scheduler, work_receivers, _finished_work_sender) = create_test_frame(1); + let pubkey = Pubkey::new_unique(); + let keypair = Keypair::new(); + let mut container = create_container([ + (&Keypair::new(), &[pubkey], 1, 1), + (&keypair, &[pubkey], 1, 2), + (&Keypair::new(), &[pubkey], 1, 3), + ]); + + // 2nd transaction should be filtered out and dropped before locking. + let pre_lock_filter = + |tx: &SanitizedTransaction| tx.message().fee_payer() != &keypair.pubkey(); + let scheduling_summary = scheduler + .schedule(&mut container, test_pre_graph_filter, pre_lock_filter) + .unwrap(); + assert_eq!(scheduling_summary.num_scheduled, 2); + assert_eq!(scheduling_summary.num_unschedulable, 0); + assert_eq!( + collect_work(&work_receivers[0]).1, + vec![txids!([2]), txids!([0])] + ); + } +} diff --git a/prio-graph-scheduler/src/scheduler_controller.rs b/prio-graph-scheduler/src/scheduler_controller.rs new file mode 100644 index 00000000000000..5a04e1eb39d65b --- /dev/null +++ b/prio-graph-scheduler/src/scheduler_controller.rs @@ -0,0 +1,1161 @@ +//! Control flow for BankingStage's transaction scheduler. +//! + +use { + super::{ + prio_graph_scheduler::PrioGraphScheduler, + scheduler_error::SchedulerError, + scheduler_metrics::{ + SchedulerCountMetrics, SchedulerLeaderDetectionMetrics, SchedulerTimingMetrics, + }, + transaction_id_generator::TransactionIdGenerator, + transaction_state::SanitizedTransactionTTL, + transaction_state_container::TransactionStateContainer, + }, + crate::banking_stage::{ + consume_worker::ConsumeWorkerMetrics, + consumer::Consumer, + decision_maker::{BufferedPacketsDecision, DecisionMaker}, + forwarder::Forwarder, + immutable_deserialized_packet::ImmutableDeserializedPacket, + packet_deserializer::PacketDeserializer, + ForwardOption, LikeClusterInfo, TOTAL_BUFFERED_PACKETS, + }, + arrayvec::ArrayVec, + crossbeam_channel::RecvTimeoutError, + solana_accounts_db::account_locks::validate_account_locks, + solana_cost_model::cost_model::CostModel, + solana_measure::measure_us, + solana_runtime::{bank::Bank, bank_forks::BankForks}, + solana_runtime_transaction::instructions_processor::process_compute_budget_instructions, + solana_sdk::{ + self, + clock::{FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, MAX_PROCESSING_AGE}, + fee::FeeBudgetLimits, + saturating_add_assign, + transaction::SanitizedTransaction, + }, + solana_svm::transaction_error_metrics::TransactionErrorMetrics, + solana_svm_transaction::svm_message::SVMMessage, + std::{ + sync::{Arc, RwLock}, + time::{Duration, Instant}, + }, +}; + +/// Controls packet and transaction flow into scheduler, and scheduling execution. +pub(crate) struct SchedulerController { + /// Decision maker for determining what should be done with transactions. + decision_maker: DecisionMaker, + /// Packet/Transaction ingress. + packet_receiver: PacketDeserializer, + bank_forks: Arc>, + /// Generates unique IDs for incoming transactions. + transaction_id_generator: TransactionIdGenerator, + /// Container for transaction state. + /// Shared resource between `packet_receiver` and `scheduler`. + container: TransactionStateContainer, + /// State for scheduling and communicating with worker threads. + scheduler: PrioGraphScheduler, + /// Metrics tracking time for leader bank detection. + leader_detection_metrics: SchedulerLeaderDetectionMetrics, + /// Metrics tracking counts on transactions in different states + /// over an interval and during a leader slot. + count_metrics: SchedulerCountMetrics, + /// Metrics tracking time spent in difference code sections + /// over an interval and during a leader slot. + timing_metrics: SchedulerTimingMetrics, + /// Metric report handles for the worker threads. + worker_metrics: Vec>, + /// State for forwarding packets to the leader, if enabled. + forwarder: Option>, +} + +impl SchedulerController { + pub fn new( + decision_maker: DecisionMaker, + packet_deserializer: PacketDeserializer, + bank_forks: Arc>, + scheduler: PrioGraphScheduler, + worker_metrics: Vec>, + forwarder: Option>, + ) -> Self { + Self { + decision_maker, + packet_receiver: packet_deserializer, + bank_forks, + transaction_id_generator: TransactionIdGenerator::default(), + container: TransactionStateContainer::with_capacity(TOTAL_BUFFERED_PACKETS), + scheduler, + leader_detection_metrics: SchedulerLeaderDetectionMetrics::default(), + count_metrics: SchedulerCountMetrics::default(), + timing_metrics: SchedulerTimingMetrics::default(), + worker_metrics, + forwarder, + } + } + + pub fn run(mut self) -> Result<(), SchedulerError> { + loop { + // BufferedPacketsDecision is shared with legacy BankingStage, which will forward + // packets. Initially, not renaming these decision variants but the actions taken + // are different, since new BankingStage will not forward packets. + // For `Forward` and `ForwardAndHold`, we want to receive packets but will not + // forward them to the next leader. In this case, `ForwardAndHold` is + // indistinguishable from `Hold`. + // + // `Forward` will drop packets from the buffer instead of forwarding. + // During receiving, since packets would be dropped from buffer anyway, we can + // bypass sanitization and buffering and immediately drop the packets. + let (decision, decision_time_us) = + measure_us!(self.decision_maker.make_consume_or_forward_decision()); + self.timing_metrics.update(|timing_metrics| { + saturating_add_assign!(timing_metrics.decision_time_us, decision_time_us); + }); + let new_leader_slot = decision.bank_start().map(|b| b.working_bank.slot()); + self.leader_detection_metrics + .update_and_maybe_report(decision.bank_start()); + self.count_metrics + .maybe_report_and_reset_slot(new_leader_slot); + self.timing_metrics + .maybe_report_and_reset_slot(new_leader_slot); + + self.process_transactions(&decision)?; + self.receive_completed()?; + if !self.receive_and_buffer_packets(&decision) { + break; + } + // Report metrics only if there is data. + // Reset intervals when appropriate, regardless of report. + let should_report = self.count_metrics.interval_has_data(); + let priority_min_max = self.container.get_min_max_priority(); + self.count_metrics.update(|count_metrics| { + count_metrics.update_priority_stats(priority_min_max); + }); + self.count_metrics + .maybe_report_and_reset_interval(should_report); + self.timing_metrics + .maybe_report_and_reset_interval(should_report); + self.worker_metrics + .iter() + .for_each(|metrics| metrics.maybe_report_and_reset()); + } + + Ok(()) + } + + /// Process packets based on decision. + fn process_transactions( + &mut self, + decision: &BufferedPacketsDecision, + ) -> Result<(), SchedulerError> { + let forwarding_enabled = self.forwarder.is_some(); + match decision { + BufferedPacketsDecision::Consume(bank_start) => { + let (scheduling_summary, schedule_time_us) = measure_us!(self.scheduler.schedule( + &mut self.container, + |txs, results| { + Self::pre_graph_filter( + txs, + results, + &bank_start.working_bank, + MAX_PROCESSING_AGE, + ) + }, + |_| true // no pre-lock filter for now + )?); + + self.count_metrics.update(|count_metrics| { + saturating_add_assign!( + count_metrics.num_scheduled, + scheduling_summary.num_scheduled + ); + saturating_add_assign!( + count_metrics.num_unschedulable, + scheduling_summary.num_unschedulable + ); + saturating_add_assign!( + count_metrics.num_schedule_filtered_out, + scheduling_summary.num_filtered_out + ); + }); + + self.timing_metrics.update(|timing_metrics| { + saturating_add_assign!( + timing_metrics.schedule_filter_time_us, + scheduling_summary.filter_time_us + ); + saturating_add_assign!(timing_metrics.schedule_time_us, schedule_time_us); + }); + } + BufferedPacketsDecision::Forward => { + if forwarding_enabled { + let (_, forward_time_us) = measure_us!(self.forward_packets(false)); + self.timing_metrics.update(|timing_metrics| { + saturating_add_assign!(timing_metrics.forward_time_us, forward_time_us); + }); + } else { + let (_, clear_time_us) = measure_us!(self.clear_container()); + self.timing_metrics.update(|timing_metrics| { + saturating_add_assign!(timing_metrics.clear_time_us, clear_time_us); + }); + } + } + BufferedPacketsDecision::ForwardAndHold => { + if forwarding_enabled { + let (_, forward_time_us) = measure_us!(self.forward_packets(true)); + self.timing_metrics.update(|timing_metrics| { + saturating_add_assign!(timing_metrics.forward_time_us, forward_time_us); + }); + } else { + let (_, clean_time_us) = measure_us!(self.clean_queue()); + self.timing_metrics.update(|timing_metrics| { + saturating_add_assign!(timing_metrics.clean_time_us, clean_time_us); + }); + } + } + BufferedPacketsDecision::Hold => {} + } + + Ok(()) + } + + fn pre_graph_filter( + transactions: &[&SanitizedTransaction], + results: &mut [bool], + bank: &Bank, + max_age: usize, + ) { + let lock_results = vec![Ok(()); transactions.len()]; + let mut error_counters = TransactionErrorMetrics::default(); + let check_results = + bank.check_transactions(transactions, &lock_results, max_age, &mut error_counters); + + let fee_check_results: Vec<_> = check_results + .into_iter() + .zip(transactions) + .map(|(result, tx)| { + result?; // if there's already error do nothing + Consumer::check_fee_payer_unlocked(bank, tx.message(), &mut error_counters) + }) + .collect(); + + for (fee_check_result, result) in fee_check_results.into_iter().zip(results.iter_mut()) { + *result = fee_check_result.is_ok(); + } + } + + /// Forward packets to the next leader. + fn forward_packets(&mut self, hold: bool) { + const MAX_FORWARDING_DURATION: Duration = Duration::from_millis(100); + let start = Instant::now(); + let bank = self.bank_forks.read().unwrap().working_bank(); + let feature_set = &bank.feature_set; + let forwarder = self.forwarder.as_mut().expect("forwarder must exist"); + + // Pop from the container in chunks, filter using bank checks, then attempt to forward. + // This doubles as a way to clean the queue as well as forwarding transactions. + const CHUNK_SIZE: usize = 64; + let mut num_forwarded: usize = 0; + let mut ids_to_add_back = Vec::new(); + let mut max_time_reached = false; + while !self.container.is_empty() { + let mut filter_array = [true; CHUNK_SIZE]; + let mut ids = Vec::with_capacity(CHUNK_SIZE); + let mut txs = Vec::with_capacity(CHUNK_SIZE); + + for _ in 0..CHUNK_SIZE { + if let Some(id) = self.container.pop() { + ids.push(id); + } else { + break; + } + } + let chunk_size = ids.len(); + ids.iter().for_each(|id| { + let transaction = self.container.get_transaction_ttl(&id.id).unwrap(); + txs.push(&transaction.transaction); + }); + + // use same filter we use for processing transactions: + // age, already processed, fee-check. + Self::pre_graph_filter( + &txs, + &mut filter_array, + &bank, + MAX_PROCESSING_AGE + .saturating_sub(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET as usize), + ); + + for (id, filter_result) in ids.iter().zip(&filter_array[..chunk_size]) { + if !*filter_result { + self.container.remove_by_id(&id.id); + continue; + } + + ids_to_add_back.push(*id); // add back to the queue at end + let state = self.container.get_mut_transaction_state(&id.id).unwrap(); + let sanitized_transaction = &state.transaction_ttl().transaction; + let immutable_packet = state.packet().clone(); + + // If not already forwarded and can be forwarded, add to forwardable packets. + if state.should_forward() + && forwarder.try_add_packet( + sanitized_transaction, + immutable_packet, + feature_set, + ) + { + saturating_add_assign!(num_forwarded, 1); + state.mark_forwarded(); + } + } + + if start.elapsed() >= MAX_FORWARDING_DURATION { + max_time_reached = true; + break; + } + } + + // Forward each batch of transactions + forwarder.forward_batched_packets(&ForwardOption::ForwardTransaction); + forwarder.clear_batches(); + + // If we hit the time limit. Drop everything that was not checked/processed. + // If we cannot run these simple checks in time, then we cannot run them during + // leader slot. + if max_time_reached { + while let Some(id) = self.container.pop() { + self.container.remove_by_id(&id.id); + } + } + + if hold { + for priority_id in ids_to_add_back { + self.container.push_id_into_queue(priority_id); + } + } else { + for priority_id in ids_to_add_back { + self.container.remove_by_id(&priority_id.id); + } + } + + self.count_metrics.update(|count_metrics| { + saturating_add_assign!(count_metrics.num_forwarded, num_forwarded); + }); + } + + /// Clears the transaction state container. + /// This only clears pending transactions, and does **not** clear in-flight transactions. + fn clear_container(&mut self) { + let mut num_dropped_on_clear: usize = 0; + while let Some(id) = self.container.pop() { + self.container.remove_by_id(&id.id); + saturating_add_assign!(num_dropped_on_clear, 1); + } + + self.count_metrics.update(|count_metrics| { + saturating_add_assign!(count_metrics.num_dropped_on_clear, num_dropped_on_clear); + }); + } + + /// Clean unprocessable transactions from the queue. These will be transactions that are + /// expired, already processed, or are no longer sanitizable. + /// This only clears pending transactions, and does **not** clear in-flight transactions. + fn clean_queue(&mut self) { + // Clean up any transactions that have already been processed, are too old, or do not have + // valid nonce accounts. + const MAX_TRANSACTION_CHECKS: usize = 10_000; + let mut transaction_ids = Vec::with_capacity(MAX_TRANSACTION_CHECKS); + + while let Some(id) = self.container.pop() { + transaction_ids.push(id); + } + + let bank = self.bank_forks.read().unwrap().working_bank(); + + const CHUNK_SIZE: usize = 128; + let mut error_counters = TransactionErrorMetrics::default(); + let mut num_dropped_on_age_and_status: usize = 0; + for chunk in transaction_ids.chunks(CHUNK_SIZE) { + let lock_results = vec![Ok(()); chunk.len()]; + let sanitized_txs: Vec<_> = chunk + .iter() + .map(|id| { + &self + .container + .get_transaction_ttl(&id.id) + .expect("transaction must exist") + .transaction + }) + .collect(); + + let check_results = bank.check_transactions( + &sanitized_txs, + &lock_results, + MAX_PROCESSING_AGE, + &mut error_counters, + ); + + for (result, id) in check_results.into_iter().zip(chunk.iter()) { + if result.is_err() { + saturating_add_assign!(num_dropped_on_age_and_status, 1); + self.container.remove_by_id(&id.id); + } else { + self.container.push_id_into_queue(*id); + } + } + } + + self.count_metrics.update(|count_metrics| { + saturating_add_assign!( + count_metrics.num_dropped_on_age_and_status, + num_dropped_on_age_and_status + ); + }); + } + + /// Receives completed transactions from the workers and updates metrics. + fn receive_completed(&mut self) -> Result<(), SchedulerError> { + let ((num_transactions, num_retryable), receive_completed_time_us) = + measure_us!(self.scheduler.receive_completed(&mut self.container)?); + + self.count_metrics.update(|count_metrics| { + saturating_add_assign!(count_metrics.num_finished, num_transactions); + saturating_add_assign!(count_metrics.num_retryable, num_retryable); + }); + self.timing_metrics.update(|timing_metrics| { + saturating_add_assign!( + timing_metrics.receive_completed_time_us, + receive_completed_time_us + ); + }); + + Ok(()) + } + + /// Returns whether the packet receiver is still connected. + fn receive_and_buffer_packets(&mut self, decision: &BufferedPacketsDecision) -> bool { + let remaining_queue_capacity = self.container.remaining_queue_capacity(); + + const MAX_PACKET_RECEIVE_TIME: Duration = Duration::from_millis(10); + let (recv_timeout, should_buffer) = match decision { + BufferedPacketsDecision::Consume(_) => ( + if self.container.is_empty() { + MAX_PACKET_RECEIVE_TIME + } else { + Duration::ZERO + }, + true, + ), + BufferedPacketsDecision::Forward => (MAX_PACKET_RECEIVE_TIME, self.forwarder.is_some()), + BufferedPacketsDecision::ForwardAndHold | BufferedPacketsDecision::Hold => { + (MAX_PACKET_RECEIVE_TIME, true) + } + }; + + let (received_packet_results, receive_time_us) = measure_us!(self + .packet_receiver + .receive_packets(recv_timeout, remaining_queue_capacity, |packet| { + packet.check_excessive_precompiles()?; + Ok(packet) + })); + + self.timing_metrics.update(|timing_metrics| { + saturating_add_assign!(timing_metrics.receive_time_us, receive_time_us); + }); + + match received_packet_results { + Ok(receive_packet_results) => { + let num_received_packets = receive_packet_results.deserialized_packets.len(); + + self.count_metrics.update(|count_metrics| { + saturating_add_assign!(count_metrics.num_received, num_received_packets); + }); + + if should_buffer { + let (_, buffer_time_us) = measure_us!( + self.buffer_packets(receive_packet_results.deserialized_packets) + ); + self.timing_metrics.update(|timing_metrics| { + saturating_add_assign!(timing_metrics.buffer_time_us, buffer_time_us); + }); + } else { + self.count_metrics.update(|count_metrics| { + saturating_add_assign!( + count_metrics.num_dropped_on_receive, + num_received_packets + ); + }); + } + } + Err(RecvTimeoutError::Timeout) => {} + Err(RecvTimeoutError::Disconnected) => return false, + } + + true + } + + fn buffer_packets(&mut self, packets: Vec) { + // Convert to Arcs + let packets: Vec<_> = packets.into_iter().map(Arc::new).collect(); + // Sanitize packets, generate IDs, and insert into the container. + let bank = self.bank_forks.read().unwrap().working_bank(); + let last_slot_in_epoch = bank.epoch_schedule().get_last_slot_in_epoch(bank.epoch()); + let transaction_account_lock_limit = bank.get_transaction_account_lock_limit(); + let vote_only = bank.vote_only_bank(); + + const CHUNK_SIZE: usize = 128; + let lock_results: [_; CHUNK_SIZE] = core::array::from_fn(|_| Ok(())); + + let mut arc_packets = ArrayVec::<_, CHUNK_SIZE>::new(); + let mut transactions = ArrayVec::<_, CHUNK_SIZE>::new(); + let mut fee_budget_limits_vec = ArrayVec::<_, CHUNK_SIZE>::new(); + + let mut error_counts = TransactionErrorMetrics::default(); + for chunk in packets.chunks(CHUNK_SIZE) { + let mut post_sanitization_count: usize = 0; + chunk + .iter() + .filter_map(|packet| { + packet + .build_sanitized_transaction( + vote_only, + bank.as_ref(), + bank.get_reserved_account_keys(), + ) + .map(|tx| (packet.clone(), tx)) + }) + .inspect(|_| saturating_add_assign!(post_sanitization_count, 1)) + .filter(|(_packet, tx)| { + validate_account_locks( + tx.message().account_keys(), + transaction_account_lock_limit, + ) + .is_ok() + }) + .filter_map(|(packet, tx)| { + process_compute_budget_instructions(SVMMessage::program_instructions_iter(&tx)) + .map(|compute_budget| (packet, tx, compute_budget.into())) + .ok() + }) + .for_each(|(packet, tx, fee_budget_limits)| { + arc_packets.push(packet); + transactions.push(tx); + fee_budget_limits_vec.push(fee_budget_limits); + }); + + let check_results = bank.check_transactions( + &transactions, + &lock_results[..transactions.len()], + MAX_PROCESSING_AGE, + &mut error_counts, + ); + let post_lock_validation_count = transactions.len(); + + let mut post_transaction_check_count: usize = 0; + let mut num_dropped_on_capacity: usize = 0; + let mut num_buffered: usize = 0; + for (((packet, transaction), fee_budget_limits), _check_result) in arc_packets + .drain(..) + .zip(transactions.drain(..)) + .zip(fee_budget_limits_vec.drain(..)) + .zip(check_results) + .filter(|(_, check_result)| check_result.is_ok()) + { + saturating_add_assign!(post_transaction_check_count, 1); + let transaction_id = self.transaction_id_generator.next(); + + let (priority, cost) = + Self::calculate_priority_and_cost(&transaction, &fee_budget_limits, &bank); + let transaction_ttl = SanitizedTransactionTTL { + transaction, + max_age_slot: last_slot_in_epoch, + }; + + if self.container.insert_new_transaction( + transaction_id, + transaction_ttl, + packet, + priority, + cost, + ) { + saturating_add_assign!(num_dropped_on_capacity, 1); + } + saturating_add_assign!(num_buffered, 1); + } + + // Update metrics for transactions that were dropped. + let num_dropped_on_sanitization = chunk.len().saturating_sub(post_sanitization_count); + let num_dropped_on_lock_validation = + post_sanitization_count.saturating_sub(post_lock_validation_count); + let num_dropped_on_transaction_checks = + post_lock_validation_count.saturating_sub(post_transaction_check_count); + + self.count_metrics.update(|count_metrics| { + saturating_add_assign!( + count_metrics.num_dropped_on_capacity, + num_dropped_on_capacity + ); + saturating_add_assign!(count_metrics.num_buffered, num_buffered); + saturating_add_assign!( + count_metrics.num_dropped_on_sanitization, + num_dropped_on_sanitization + ); + saturating_add_assign!( + count_metrics.num_dropped_on_validate_locks, + num_dropped_on_lock_validation + ); + saturating_add_assign!( + count_metrics.num_dropped_on_receive_transaction_checks, + num_dropped_on_transaction_checks + ); + }); + } + } + + /// Calculate priority and cost for a transaction: + /// + /// Cost is calculated through the `CostModel`, + /// and priority is calculated through a formula here that attempts to sell + /// blockspace to the highest bidder. + /// + /// The priority is calculated as: + /// P = R / (1 + C) + /// where P is the priority, R is the reward, + /// and C is the cost towards block-limits. + /// + /// Current minimum costs are on the order of several hundred, + /// so the denominator is effectively C, and the +1 is simply + /// to avoid any division by zero due to a bug - these costs + /// are calculated by the cost-model and are not direct + /// from user input. They should never be zero. + /// Any difference in the prioritization is negligible for + /// the current transaction costs. + fn calculate_priority_and_cost( + transaction: &SanitizedTransaction, + fee_budget_limits: &FeeBudgetLimits, + bank: &Bank, + ) -> (u64, u64) { + let cost = CostModel::calculate_cost(transaction, &bank.feature_set).sum(); + let reward = bank.calculate_reward_for_transaction(transaction, fee_budget_limits); + + // We need a multiplier here to avoid rounding down too aggressively. + // For many transactions, the cost will be greater than the fees in terms of raw lamports. + // For the purposes of calculating prioritization, we multiply the fees by a large number so that + // the cost is a small fraction. + // An offset of 1 is used in the denominator to explicitly avoid division by zero. + const MULTIPLIER: u64 = 1_000_000; + ( + reward + .saturating_mul(MULTIPLIER) + .saturating_div(cost.saturating_add(1)), + cost, + ) + } +} + +#[cfg(test)] +mod tests { + use { + super::*, + crate::{ + banking_stage::{ + consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH, + scheduler_messages::{ConsumeWork, FinishedConsumeWork, TransactionBatchId}, + tests::create_slow_genesis_config, + }, + banking_trace::BankingPacketBatch, + sigverify::SigverifyTracerPacketStats, + }, + crossbeam_channel::{unbounded, Receiver, Sender}, + itertools::Itertools, + solana_gossip::cluster_info::ClusterInfo, + solana_ledger::{ + blockstore::Blockstore, genesis_utils::GenesisConfigInfo, + get_tmp_ledger_path_auto_delete, leader_schedule_cache::LeaderScheduleCache, + }, + solana_perf::packet::{to_packet_batches, PacketBatch, NUM_PACKETS}, + solana_poh::poh_recorder::{PohRecorder, Record, WorkingBankEntry}, + solana_runtime::bank::Bank, + solana_sdk::{ + compute_budget::ComputeBudgetInstruction, fee_calculator::FeeRateGovernor, hash::Hash, + message::Message, poh_config::PohConfig, pubkey::Pubkey, signature::Keypair, + signer::Signer, system_instruction, system_transaction, transaction::Transaction, + }, + std::sync::{atomic::AtomicBool, Arc, RwLock}, + tempfile::TempDir, + }; + + fn create_channels(num: usize) -> (Vec>, Vec>) { + (0..num).map(|_| unbounded()).unzip() + } + + // Helper struct to create tests that hold channels, files, etc. + // such that our tests can be more easily set up and run. + struct TestFrame { + bank: Arc, + mint_keypair: Keypair, + _ledger_path: TempDir, + _entry_receiver: Receiver, + _record_receiver: Receiver, + poh_recorder: Arc>, + banking_packet_sender: Sender, Option)>>, + + consume_work_receivers: Vec>, + finished_consume_work_sender: Sender, + } + + fn create_test_frame(num_threads: usize) -> (TestFrame, SchedulerController>) { + let GenesisConfigInfo { + mut genesis_config, + mint_keypair, + .. + } = create_slow_genesis_config(u64::MAX); + genesis_config.fee_rate_governor = FeeRateGovernor::new(5000, 0); + let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); + + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()) + .expect("Expected to be able to open database ledger"); + let (poh_recorder, entry_receiver, record_receiver) = PohRecorder::new( + bank.tick_height(), + bank.last_blockhash(), + bank.clone(), + Some((4, 4)), + bank.ticks_per_slot(), + Arc::new(blockstore), + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), + &PohConfig::default(), + Arc::new(AtomicBool::default()), + ); + let poh_recorder = Arc::new(RwLock::new(poh_recorder)); + let decision_maker = DecisionMaker::new(Pubkey::new_unique(), poh_recorder.clone()); + + let (banking_packet_sender, banking_packet_receiver) = unbounded(); + let packet_deserializer = PacketDeserializer::new(banking_packet_receiver); + + let (consume_work_senders, consume_work_receivers) = create_channels(num_threads); + let (finished_consume_work_sender, finished_consume_work_receiver) = unbounded(); + + let test_frame = TestFrame { + bank, + mint_keypair, + _ledger_path: ledger_path, + _entry_receiver: entry_receiver, + _record_receiver: record_receiver, + poh_recorder, + banking_packet_sender, + consume_work_receivers, + finished_consume_work_sender, + }; + + let scheduler_controller = SchedulerController::new( + decision_maker, + packet_deserializer, + bank_forks, + PrioGraphScheduler::new(consume_work_senders, finished_consume_work_receiver), + vec![], // no actual workers with metrics to report, this can be empty + None, + ); + + (test_frame, scheduler_controller) + } + + fn create_and_fund_prioritized_transfer( + bank: &Bank, + mint_keypair: &Keypair, + from_keypair: &Keypair, + to_pubkey: &Pubkey, + lamports: u64, + compute_unit_price: u64, + recent_blockhash: Hash, + ) -> Transaction { + // Fund the sending key, so that the transaction does not get filtered by the fee-payer check. + { + let transfer = system_transaction::transfer( + mint_keypair, + &from_keypair.pubkey(), + 500_000, // just some amount that will always be enough + bank.last_blockhash(), + ); + bank.process_transaction(&transfer).unwrap(); + } + + let transfer = system_instruction::transfer(&from_keypair.pubkey(), to_pubkey, lamports); + let prioritization = ComputeBudgetInstruction::set_compute_unit_price(compute_unit_price); + let message = Message::new(&[transfer, prioritization], Some(&from_keypair.pubkey())); + Transaction::new(&vec![from_keypair], message, recent_blockhash) + } + + fn to_banking_packet_batch(txs: &[Transaction]) -> BankingPacketBatch { + let packet_batch = to_packet_batches(txs, NUM_PACKETS); + Arc::new((packet_batch, None)) + } + + // Helper function to let test receive and then schedule packets. + // The order of operations here is convenient for testing, but does not + // match the order of operations in the actual scheduler. + // The actual scheduler will process immediately after the decision, + // in order to keep the decision as recent as possible for processing. + // In the tests, the decision will not become stale, so it is more convenient + // to receive first and then schedule. + fn test_receive_then_schedule( + scheduler_controller: &mut SchedulerController>, + ) { + let decision = scheduler_controller + .decision_maker + .make_consume_or_forward_decision(); + assert!(matches!(decision, BufferedPacketsDecision::Consume(_))); + assert!(scheduler_controller.receive_completed().is_ok()); + assert!(scheduler_controller.receive_and_buffer_packets(&decision)); + assert!(scheduler_controller.process_transactions(&decision).is_ok()); + } + + #[test] + #[should_panic(expected = "batch id 0 is not being tracked")] + fn test_unexpected_batch_id() { + let (test_frame, scheduler_controller) = create_test_frame(1); + let TestFrame { + finished_consume_work_sender, + .. + } = &test_frame; + + finished_consume_work_sender + .send(FinishedConsumeWork { + work: ConsumeWork { + batch_id: TransactionBatchId::new(0), + ids: vec![], + transactions: vec![], + max_age_slots: vec![], + }, + retryable_indexes: vec![], + }) + .unwrap(); + + scheduler_controller.run().unwrap(); + } + + #[test] + fn test_schedule_consume_single_threaded_no_conflicts() { + let (test_frame, mut scheduler_controller) = create_test_frame(1); + let TestFrame { + bank, + mint_keypair, + poh_recorder, + banking_packet_sender, + consume_work_receivers, + .. + } = &test_frame; + + poh_recorder + .write() + .unwrap() + .set_bank_for_test(bank.clone()); + + // Send packet batch to the scheduler - should do nothing until we become the leader. + let tx1 = create_and_fund_prioritized_transfer( + bank, + mint_keypair, + &Keypair::new(), + &Pubkey::new_unique(), + 1, + 1000, + bank.last_blockhash(), + ); + let tx2 = create_and_fund_prioritized_transfer( + bank, + mint_keypair, + &Keypair::new(), + &Pubkey::new_unique(), + 1, + 2000, + bank.last_blockhash(), + ); + let tx1_hash = tx1.message().hash(); + let tx2_hash = tx2.message().hash(); + + let txs = vec![tx1, tx2]; + banking_packet_sender + .send(to_banking_packet_batch(&txs)) + .unwrap(); + + test_receive_then_schedule(&mut scheduler_controller); + let consume_work = consume_work_receivers[0].try_recv().unwrap(); + assert_eq!(consume_work.ids.len(), 2); + assert_eq!(consume_work.transactions.len(), 2); + let message_hashes = consume_work + .transactions + .iter() + .map(|tx| tx.message_hash()) + .collect_vec(); + assert_eq!(message_hashes, vec![&tx2_hash, &tx1_hash]); + } + + #[test] + fn test_schedule_consume_single_threaded_conflict() { + let (test_frame, mut scheduler_controller) = create_test_frame(1); + let TestFrame { + bank, + mint_keypair, + poh_recorder, + banking_packet_sender, + consume_work_receivers, + .. + } = &test_frame; + + poh_recorder + .write() + .unwrap() + .set_bank_for_test(bank.clone()); + + let pk = Pubkey::new_unique(); + let tx1 = create_and_fund_prioritized_transfer( + bank, + mint_keypair, + &Keypair::new(), + &pk, + 1, + 1000, + bank.last_blockhash(), + ); + let tx2 = create_and_fund_prioritized_transfer( + bank, + mint_keypair, + &Keypair::new(), + &pk, + 1, + 2000, + bank.last_blockhash(), + ); + let tx1_hash = tx1.message().hash(); + let tx2_hash = tx2.message().hash(); + + let txs = vec![tx1, tx2]; + banking_packet_sender + .send(to_banking_packet_batch(&txs)) + .unwrap(); + + // We expect 2 batches to be scheduled + test_receive_then_schedule(&mut scheduler_controller); + let consume_works = (0..2) + .map(|_| consume_work_receivers[0].try_recv().unwrap()) + .collect_vec(); + + let num_txs_per_batch = consume_works.iter().map(|cw| cw.ids.len()).collect_vec(); + let message_hashes = consume_works + .iter() + .flat_map(|cw| cw.transactions.iter().map(|tx| tx.message_hash())) + .collect_vec(); + assert_eq!(num_txs_per_batch, vec![1; 2]); + assert_eq!(message_hashes, vec![&tx2_hash, &tx1_hash]); + } + + #[test] + fn test_schedule_consume_single_threaded_multi_batch() { + let (test_frame, mut scheduler_controller) = create_test_frame(1); + let TestFrame { + bank, + mint_keypair, + poh_recorder, + banking_packet_sender, + consume_work_receivers, + .. + } = &test_frame; + + poh_recorder + .write() + .unwrap() + .set_bank_for_test(bank.clone()); + + // Send multiple batches - all get scheduled + let txs1 = (0..2 * TARGET_NUM_TRANSACTIONS_PER_BATCH) + .map(|i| { + create_and_fund_prioritized_transfer( + bank, + mint_keypair, + &Keypair::new(), + &Pubkey::new_unique(), + i as u64, + 1, + bank.last_blockhash(), + ) + }) + .collect_vec(); + let txs2 = (0..2 * TARGET_NUM_TRANSACTIONS_PER_BATCH) + .map(|i| { + create_and_fund_prioritized_transfer( + bank, + mint_keypair, + &Keypair::new(), + &Pubkey::new_unique(), + i as u64, + 2, + bank.last_blockhash(), + ) + }) + .collect_vec(); + + banking_packet_sender + .send(to_banking_packet_batch(&txs1)) + .unwrap(); + banking_packet_sender + .send(to_banking_packet_batch(&txs2)) + .unwrap(); + + // We expect 4 batches to be scheduled + test_receive_then_schedule(&mut scheduler_controller); + let consume_works = (0..4) + .map(|_| consume_work_receivers[0].try_recv().unwrap()) + .collect_vec(); + + assert_eq!( + consume_works.iter().map(|cw| cw.ids.len()).collect_vec(), + vec![TARGET_NUM_TRANSACTIONS_PER_BATCH; 4] + ); + } + + #[test] + fn test_schedule_consume_simple_thread_selection() { + let (test_frame, mut scheduler_controller) = create_test_frame(2); + let TestFrame { + bank, + mint_keypair, + poh_recorder, + banking_packet_sender, + consume_work_receivers, + .. + } = &test_frame; + + poh_recorder + .write() + .unwrap() + .set_bank_for_test(bank.clone()); + + // Send 4 transactions w/o conflicts. 2 should be scheduled on each thread + let txs = (0..4) + .map(|i| { + create_and_fund_prioritized_transfer( + bank, + mint_keypair, + &Keypair::new(), + &Pubkey::new_unique(), + 1, + i * 10, + bank.last_blockhash(), + ) + }) + .collect_vec(); + banking_packet_sender + .send(to_banking_packet_batch(&txs)) + .unwrap(); + + // Priority Expectation: + // Thread 0: [3, 1] + // Thread 1: [2, 0] + let t0_expected = [3, 1] + .into_iter() + .map(|i| txs[i].message().hash()) + .collect_vec(); + let t1_expected = [2, 0] + .into_iter() + .map(|i| txs[i].message().hash()) + .collect_vec(); + + test_receive_then_schedule(&mut scheduler_controller); + let t0_actual = consume_work_receivers[0] + .try_recv() + .unwrap() + .transactions + .iter() + .map(|tx| *tx.message_hash()) + .collect_vec(); + let t1_actual = consume_work_receivers[1] + .try_recv() + .unwrap() + .transactions + .iter() + .map(|tx| *tx.message_hash()) + .collect_vec(); + + assert_eq!(t0_actual, t0_expected); + assert_eq!(t1_actual, t1_expected); + } + + #[test] + fn test_schedule_consume_retryable() { + let (test_frame, mut scheduler_controller) = create_test_frame(1); + let TestFrame { + bank, + mint_keypair, + poh_recorder, + banking_packet_sender, + consume_work_receivers, + finished_consume_work_sender, + .. + } = &test_frame; + + poh_recorder + .write() + .unwrap() + .set_bank_for_test(bank.clone()); + + // Send packet batch to the scheduler - should do nothing until we become the leader. + let tx1 = create_and_fund_prioritized_transfer( + bank, + mint_keypair, + &Keypair::new(), + &Pubkey::new_unique(), + 1, + 1000, + bank.last_blockhash(), + ); + let tx2 = create_and_fund_prioritized_transfer( + bank, + mint_keypair, + &Keypair::new(), + &Pubkey::new_unique(), + 1, + 2000, + bank.last_blockhash(), + ); + let tx1_hash = tx1.message().hash(); + let tx2_hash = tx2.message().hash(); + + let txs = vec![tx1, tx2]; + banking_packet_sender + .send(to_banking_packet_batch(&txs)) + .unwrap(); + + test_receive_then_schedule(&mut scheduler_controller); + let consume_work = consume_work_receivers[0].try_recv().unwrap(); + assert_eq!(consume_work.ids.len(), 2); + assert_eq!(consume_work.transactions.len(), 2); + let message_hashes = consume_work + .transactions + .iter() + .map(|tx| tx.message_hash()) + .collect_vec(); + assert_eq!(message_hashes, vec![&tx2_hash, &tx1_hash]); + + // Complete the batch - marking the second transaction as retryable + finished_consume_work_sender + .send(FinishedConsumeWork { + work: consume_work, + retryable_indexes: vec![1], + }) + .unwrap(); + + // Transaction should be rescheduled + test_receive_then_schedule(&mut scheduler_controller); + let consume_work = consume_work_receivers[0].try_recv().unwrap(); + assert_eq!(consume_work.ids.len(), 1); + assert_eq!(consume_work.transactions.len(), 1); + let message_hashes = consume_work + .transactions + .iter() + .map(|tx| tx.message_hash()) + .collect_vec(); + assert_eq!(message_hashes, vec![&tx1_hash]); + } +} diff --git a/prio-graph-scheduler/src/scheduler_error.rs b/prio-graph-scheduler/src/scheduler_error.rs new file mode 100644 index 00000000000000..9b8d4015448e57 --- /dev/null +++ b/prio-graph-scheduler/src/scheduler_error.rs @@ -0,0 +1,9 @@ +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum SchedulerError { + #[error("Sending channel disconnected: {0}")] + DisconnectedSendChannel(&'static str), + #[error("Recv channel disconnected: {0}")] + DisconnectedRecvChannel(&'static str), +} diff --git a/prio-graph-scheduler/src/scheduler_metrics.rs b/prio-graph-scheduler/src/scheduler_metrics.rs new file mode 100644 index 00000000000000..922c105acba8c9 --- /dev/null +++ b/prio-graph-scheduler/src/scheduler_metrics.rs @@ -0,0 +1,408 @@ +use { + itertools::MinMaxResult, + solana_poh::poh_recorder::BankStart, + solana_sdk::{clock::Slot, timing::AtomicInterval}, + std::time::Instant, +}; + +#[derive(Default)] +pub struct SchedulerCountMetrics { + interval: IntervalSchedulerCountMetrics, + slot: SlotSchedulerCountMetrics, +} + +impl SchedulerCountMetrics { + pub fn update(&mut self, update: impl Fn(&mut SchedulerCountMetricsInner)) { + update(&mut self.interval.metrics); + update(&mut self.slot.metrics); + } + + pub fn maybe_report_and_reset_slot(&mut self, slot: Option) { + self.slot.maybe_report_and_reset(slot); + } + + pub fn maybe_report_and_reset_interval(&mut self, should_report: bool) { + self.interval.maybe_report_and_reset(should_report); + } + + pub fn interval_has_data(&self) -> bool { + self.interval.metrics.has_data() + } +} + +#[derive(Default)] +struct IntervalSchedulerCountMetrics { + interval: AtomicInterval, + metrics: SchedulerCountMetricsInner, +} + +#[derive(Default)] +struct SlotSchedulerCountMetrics { + slot: Option, + metrics: SchedulerCountMetricsInner, +} + +#[derive(Default)] +pub struct SchedulerCountMetricsInner { + /// Number of packets received. + pub num_received: usize, + /// Number of packets buffered. + pub num_buffered: usize, + + /// Number of transactions scheduled. + pub num_scheduled: usize, + /// Number of transactions that were unschedulable. + pub num_unschedulable: usize, + /// Number of transactions that were filtered out during scheduling. + pub num_schedule_filtered_out: usize, + /// Number of completed transactions received from workers. + pub num_finished: usize, + /// Number of transactions that were retryable. + pub num_retryable: usize, + /// Number of transactions that were scheduled to be forwarded. + pub num_forwarded: usize, + + /// Number of transactions that were immediately dropped on receive. + pub num_dropped_on_receive: usize, + /// Number of transactions that were dropped due to sanitization failure. + pub num_dropped_on_sanitization: usize, + /// Number of transactions that were dropped due to failed lock validation. + pub num_dropped_on_validate_locks: usize, + /// Number of transactions that were dropped due to failed transaction + /// checks during receive. + pub num_dropped_on_receive_transaction_checks: usize, + /// Number of transactions that were dropped due to clearing. + pub num_dropped_on_clear: usize, + /// Number of transactions that were dropped due to age and status checks. + pub num_dropped_on_age_and_status: usize, + /// Number of transactions that were dropped due to exceeded capacity. + pub num_dropped_on_capacity: usize, + /// Min prioritization fees in the transaction container + pub min_prioritization_fees: u64, + /// Max prioritization fees in the transaction container + pub max_prioritization_fees: u64, +} + +impl IntervalSchedulerCountMetrics { + fn maybe_report_and_reset(&mut self, should_report: bool) { + const REPORT_INTERVAL_MS: u64 = 1000; + if self.interval.should_update(REPORT_INTERVAL_MS) { + if should_report { + self.metrics.report("banking_stage_scheduler_counts", None); + } + self.metrics.reset(); + } + } +} + +impl SlotSchedulerCountMetrics { + fn maybe_report_and_reset(&mut self, slot: Option) { + if self.slot != slot { + // Only report if there was an assigned slot. + if self.slot.is_some() { + self.metrics + .report("banking_stage_scheduler_slot_counts", self.slot); + } + self.metrics.reset(); + self.slot = slot; + } + } +} + +impl SchedulerCountMetricsInner { + fn report(&self, name: &'static str, slot: Option) { + let mut datapoint = create_datapoint!( + @point name, + ("num_received", self.num_received, i64), + ("num_buffered", self.num_buffered, i64), + ("num_scheduled", self.num_scheduled, i64), + ("num_unschedulable", self.num_unschedulable, i64), + ( + "num_schedule_filtered_out", + self.num_schedule_filtered_out, + i64 + ), + ("num_finished", self.num_finished, i64), + ("num_retryable", self.num_retryable, i64), + ("num_forwarded", self.num_forwarded, i64), + ("num_dropped_on_receive", self.num_dropped_on_receive, i64), + ( + "num_dropped_on_sanitization", + self.num_dropped_on_sanitization, + i64 + ), + ( + "num_dropped_on_validate_locks", + self.num_dropped_on_validate_locks, + i64 + ), + ( + "num_dropped_on_receive_transaction_checks", + self.num_dropped_on_receive_transaction_checks, + i64 + ), + ("num_dropped_on_clear", self.num_dropped_on_clear, i64), + ( + "num_dropped_on_age_and_status", + self.num_dropped_on_age_and_status, + i64 + ), + ("num_dropped_on_capacity", self.num_dropped_on_capacity, i64), + ("min_priority", self.get_min_priority(), i64), + ("max_priority", self.get_max_priority(), i64) + ); + if let Some(slot) = slot { + datapoint.add_field_i64("slot", slot as i64); + } + solana_metrics::submit(datapoint, log::Level::Info); + } + + pub fn has_data(&self) -> bool { + self.num_received != 0 + || self.num_buffered != 0 + || self.num_scheduled != 0 + || self.num_unschedulable != 0 + || self.num_schedule_filtered_out != 0 + || self.num_finished != 0 + || self.num_retryable != 0 + || self.num_forwarded != 0 + || self.num_dropped_on_receive != 0 + || self.num_dropped_on_sanitization != 0 + || self.num_dropped_on_validate_locks != 0 + || self.num_dropped_on_receive_transaction_checks != 0 + || self.num_dropped_on_clear != 0 + || self.num_dropped_on_age_and_status != 0 + || self.num_dropped_on_capacity != 0 + } + + fn reset(&mut self) { + self.num_received = 0; + self.num_buffered = 0; + self.num_scheduled = 0; + self.num_unschedulable = 0; + self.num_schedule_filtered_out = 0; + self.num_finished = 0; + self.num_retryable = 0; + self.num_forwarded = 0; + self.num_dropped_on_receive = 0; + self.num_dropped_on_sanitization = 0; + self.num_dropped_on_validate_locks = 0; + self.num_dropped_on_receive_transaction_checks = 0; + self.num_dropped_on_clear = 0; + self.num_dropped_on_age_and_status = 0; + self.num_dropped_on_capacity = 0; + self.min_prioritization_fees = u64::MAX; + self.max_prioritization_fees = 0; + } + + pub fn update_priority_stats(&mut self, min_max_fees: MinMaxResult) { + // update min/max priority + match min_max_fees { + itertools::MinMaxResult::NoElements => { + // do nothing + } + itertools::MinMaxResult::OneElement(e) => { + self.min_prioritization_fees = e; + self.max_prioritization_fees = e; + } + itertools::MinMaxResult::MinMax(min, max) => { + self.min_prioritization_fees = min; + self.max_prioritization_fees = max; + } + } + } + + pub fn get_min_priority(&self) -> u64 { + // to avoid getting u64::max recorded by metrics / in case of edge cases + if self.min_prioritization_fees != u64::MAX { + self.min_prioritization_fees + } else { + 0 + } + } + + pub fn get_max_priority(&self) -> u64 { + self.max_prioritization_fees + } +} + +#[derive(Default)] +pub struct SchedulerTimingMetrics { + interval: IntervalSchedulerTimingMetrics, + slot: SlotSchedulerTimingMetrics, +} + +impl SchedulerTimingMetrics { + pub fn update(&mut self, update: impl Fn(&mut SchedulerTimingMetricsInner)) { + update(&mut self.interval.metrics); + update(&mut self.slot.metrics); + } + + pub fn maybe_report_and_reset_slot(&mut self, slot: Option) { + self.slot.maybe_report_and_reset(slot); + } + + pub fn maybe_report_and_reset_interval(&mut self, should_report: bool) { + self.interval.maybe_report_and_reset(should_report); + } +} + +#[derive(Default)] +struct IntervalSchedulerTimingMetrics { + interval: AtomicInterval, + metrics: SchedulerTimingMetricsInner, +} + +#[derive(Default)] +struct SlotSchedulerTimingMetrics { + slot: Option, + metrics: SchedulerTimingMetricsInner, +} + +#[derive(Default)] +pub struct SchedulerTimingMetricsInner { + /// Time spent making processing decisions. + pub decision_time_us: u64, + /// Time spent receiving packets. + pub receive_time_us: u64, + /// Time spent buffering packets. + pub buffer_time_us: u64, + /// Time spent filtering transactions during scheduling. + pub schedule_filter_time_us: u64, + /// Time spent scheduling transactions. + pub schedule_time_us: u64, + /// Time spent clearing transactions from the container. + pub clear_time_us: u64, + /// Time spent cleaning expired or processed transactions from the container. + pub clean_time_us: u64, + /// Time spent forwarding transactions. + pub forward_time_us: u64, + /// Time spent receiving completed transactions. + pub receive_completed_time_us: u64, +} + +impl IntervalSchedulerTimingMetrics { + fn maybe_report_and_reset(&mut self, should_report: bool) { + const REPORT_INTERVAL_MS: u64 = 1000; + if self.interval.should_update(REPORT_INTERVAL_MS) { + if should_report { + self.metrics.report("banking_stage_scheduler_timing", None); + } + self.metrics.reset(); + } + } +} + +impl SlotSchedulerTimingMetrics { + fn maybe_report_and_reset(&mut self, slot: Option) { + if self.slot != slot { + // Only report if there was an assigned slot. + if self.slot.is_some() { + self.metrics + .report("banking_stage_scheduler_slot_timing", self.slot); + } + self.metrics.reset(); + self.slot = slot; + } + } +} + +impl SchedulerTimingMetricsInner { + fn report(&self, name: &'static str, slot: Option) { + let mut datapoint = create_datapoint!( + @point name, + ("decision_time_us", self.decision_time_us, i64), + ("receive_time_us", self.receive_time_us, i64), + ("buffer_time_us", self.buffer_time_us, i64), + ("schedule_filter_time_us", self.schedule_filter_time_us, i64), + ("schedule_time_us", self.schedule_time_us, i64), + ("clear_time_us", self.clear_time_us, i64), + ("clean_time_us", self.clean_time_us, i64), + ("forward_time_us", self.forward_time_us, i64), + ( + "receive_completed_time_us", + self.receive_completed_time_us, + i64 + ) + ); + if let Some(slot) = slot { + datapoint.add_field_i64("slot", slot as i64); + } + solana_metrics::submit(datapoint, log::Level::Info); + } + + fn reset(&mut self) { + self.decision_time_us = 0; + self.receive_time_us = 0; + self.buffer_time_us = 0; + self.schedule_filter_time_us = 0; + self.schedule_time_us = 0; + self.clear_time_us = 0; + self.clean_time_us = 0; + self.forward_time_us = 0; + self.receive_completed_time_us = 0; + } +} + +#[derive(Default)] +pub struct SchedulerLeaderDetectionMetrics { + inner: Option, +} + +struct SchedulerLeaderDetectionMetricsInner { + slot: Slot, + bank_creation_time: Instant, + bank_detected_time: Instant, +} + +impl SchedulerLeaderDetectionMetrics { + pub fn update_and_maybe_report(&mut self, bank_start: Option<&BankStart>) { + match (&self.inner, bank_start) { + (None, Some(bank_start)) => self.initialize_inner(bank_start), + (Some(_inner), None) => self.report_and_reset(), + (Some(inner), Some(bank_start)) if inner.slot != bank_start.working_bank.slot() => { + self.report_and_reset(); + self.initialize_inner(bank_start); + } + _ => {} + } + } + + fn initialize_inner(&mut self, bank_start: &BankStart) { + let bank_detected_time = Instant::now(); + self.inner = Some(SchedulerLeaderDetectionMetricsInner { + slot: bank_start.working_bank.slot(), + bank_creation_time: *bank_start.bank_creation_time, + bank_detected_time, + }); + } + + fn report_and_reset(&mut self) { + let SchedulerLeaderDetectionMetricsInner { + slot, + bank_creation_time, + bank_detected_time, + } = self.inner.take().expect("inner must be present"); + + let bank_detected_delay_us = bank_detected_time + .duration_since(bank_creation_time) + .as_micros() + .try_into() + .unwrap_or(i64::MAX); + let bank_detected_to_slot_end_detected_us = bank_detected_time + .elapsed() + .as_micros() + .try_into() + .unwrap_or(i64::MAX); + datapoint_info!( + "banking_stage_scheduler_leader_detection", + ("slot", slot, i64), + ("bank_detected_delay_us", bank_detected_delay_us, i64), + ( + "bank_detected_to_slot_end_detected_us", + bank_detected_to_slot_end_detected_us, + i64 + ), + ); + } +} diff --git a/prio-graph-scheduler/src/transaction_priority_id.rs b/prio-graph-scheduler/src/transaction_priority_id.rs new file mode 100644 index 00000000000000..f39927324e355a --- /dev/null +++ b/prio-graph-scheduler/src/transaction_priority_id.rs @@ -0,0 +1,69 @@ +use { + crate::scheduler_messages::TransactionId, + prio_graph::TopLevelId, + std::hash::{Hash, Hasher}, +}; + +/// A unique identifier tied with priority ordering for a transaction/packet: +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub struct TransactionPriorityId { + pub priority: u64, + pub id: TransactionId, +} + +impl TransactionPriorityId { + pub fn new(priority: u64, id: TransactionId) -> Self { + Self { priority, id } + } +} + +impl Hash for TransactionPriorityId { + fn hash(&self, state: &mut H) { + self.id.hash(state) + } +} + +impl TopLevelId for TransactionPriorityId { + fn id(&self) -> Self { + *self + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_transaction_priority_id_ordering() { + // Higher priority first + { + let id1 = TransactionPriorityId::new(1, TransactionId::new(1)); + let id2 = TransactionPriorityId::new(2, TransactionId::new(1)); + assert!(id1 < id2); + assert!(id1 <= id2); + assert!(id2 > id1); + assert!(id2 >= id1); + } + + // Equal priority then compare by id + { + let id1 = TransactionPriorityId::new(1, TransactionId::new(1)); + let id2 = TransactionPriorityId::new(1, TransactionId::new(2)); + assert!(id1 < id2); + assert!(id1 <= id2); + assert!(id2 > id1); + assert!(id2 >= id1); + } + + // Equal priority and id + { + let id1 = TransactionPriorityId::new(1, TransactionId::new(1)); + let id2 = TransactionPriorityId::new(1, TransactionId::new(1)); + assert_eq!(id1, id2); + assert!(id1 >= id2); + assert!(id1 <= id2); + assert!(id2 >= id1); + assert!(id2 <= id1); + } + } +} diff --git a/prio-graph-scheduler/src/transaction_state_container.rs b/prio-graph-scheduler/src/transaction_state_container.rs new file mode 100644 index 00000000000000..8e2a51f5f7bb37 --- /dev/null +++ b/prio-graph-scheduler/src/transaction_state_container.rs @@ -0,0 +1,259 @@ +use { + super::{ + transaction_priority_id::TransactionPriorityId, + transaction_state::{SanitizedTransactionTTL, TransactionState}, + }, + crate::scheduler_messages::TransactionId, + itertools::MinMaxResult, + min_max_heap::MinMaxHeap, + solana_core::banking_stage::immutable_deserialized_packet::ImmutableDeserializedPacket, + std::{collections::HashMap, sync::Arc}, +}; + +/// This structure will hold `TransactionState` for the entirety of a +/// transaction's lifetime in the scheduler and BankingStage as a whole. +/// +/// Transaction Lifetime: +/// 1. Received from `SigVerify` by `BankingStage` +/// 2. Inserted into `TransactionStateContainer` by `BankingStage` +/// 3. Popped in priority-order by scheduler, and transitioned to `Pending` state +/// 4. Processed by `ConsumeWorker` +/// a. If consumed, remove `Pending` state from the `TransactionStateContainer` +/// b. If retryable, transition back to `Unprocessed` state. +/// Re-insert to the queue, and return to step 3. +/// +/// The structure is composed of two main components: +/// 1. A priority queue of wrapped `TransactionId`s, which are used to +/// order transactions by priority for selection by the scheduler. +/// 2. A map of `TransactionId` to `TransactionState`, which is used to +/// track the state of each transaction. +/// +/// When `Pending`, the associated `TransactionId` is not in the queue, but +/// is still in the map. +/// The entry in the map should exist before insertion into the queue, and be +/// be removed only after the id is removed from the queue. +/// +/// The container maintains a fixed capacity. If the queue is full when pushing +/// a new transaction, the lowest priority transaction will be dropped. +pub struct TransactionStateContainer { + priority_queue: MinMaxHeap, + id_to_transaction_state: HashMap, +} + +impl TransactionStateContainer { + pub fn with_capacity(capacity: usize) -> Self { + Self { + priority_queue: MinMaxHeap::with_capacity(capacity), + id_to_transaction_state: HashMap::with_capacity(capacity), + } + } + + /// Returns true if the queue is empty. + pub fn is_empty(&self) -> bool { + self.priority_queue.is_empty() + } + + /// Returns the remaining capacity of the queue + pub fn remaining_queue_capacity(&self) -> usize { + self.priority_queue.capacity() - self.priority_queue.len() + } + + /// Get the top transaction id in the priority queue. + pub fn pop(&mut self) -> Option { + self.priority_queue.pop_max() + } + + /// Get mutable transaction state by id. + pub fn get_mut_transaction_state( + &mut self, + id: &TransactionId, + ) -> Option<&mut TransactionState> { + self.id_to_transaction_state.get_mut(id) + } + + /// Get reference to `SanitizedTransactionTTL` by id. + /// Panics if the transaction does not exist. + pub fn get_transaction_ttl( + &self, + id: &TransactionId, + ) -> Option<&SanitizedTransactionTTL> { + self.id_to_transaction_state + .get(id) + .map(|state| state.transaction_ttl()) + } + + /// Insert a new transaction into the container's queues and maps. + /// Returns `true` if a packet was dropped due to capacity limits. + pub fn insert_new_transaction( + &mut self, + transaction_id: TransactionId, + transaction_ttl: SanitizedTransactionTTL, + packet: Arc, + priority: u64, + cost: u64, + ) -> bool { + let priority_id = TransactionPriorityId::new(priority, transaction_id); + self.id_to_transaction_state.insert( + transaction_id, + TransactionState::new(transaction_ttl, packet, priority, cost), + ); + self.push_id_into_queue(priority_id) + } + + /// Retries a transaction - inserts transaction back into map (but not packet). + /// This transitions the transaction to `Unprocessed` state. + pub fn retry_transaction( + &mut self, + transaction_id: TransactionId, + transaction_ttl: SanitizedTransactionTTL, + ) { + let transaction_state = self + .get_mut_transaction_state(&transaction_id) + .expect("transaction must exist"); + let priority_id = TransactionPriorityId::new(transaction_state.priority(), transaction_id); + transaction_state.transition_to_unprocessed(transaction_ttl); + self.push_id_into_queue(priority_id); + } + + /// Pushes a transaction id into the priority queue. If the queue is full, the lowest priority + /// transaction will be dropped (removed from the queue and map). + /// Returns `true` if a packet was dropped due to capacity limits. + pub fn push_id_into_queue(&mut self, priority_id: TransactionPriorityId) -> bool { + if self.remaining_queue_capacity() == 0 { + let popped_id = self.priority_queue.push_pop_min(priority_id); + self.remove_by_id(&popped_id.id); + true + } else { + self.priority_queue.push(priority_id); + false + } + } + + /// Remove transaction by id. + pub fn remove_by_id(&mut self, id: &TransactionId) { + self.id_to_transaction_state + .remove(id) + .expect("transaction must exist"); + } + + pub fn get_min_max_priority(&self) -> MinMaxResult { + match self.priority_queue.peek_min() { + Some(min) => match self.priority_queue.peek_max() { + Some(max) => MinMaxResult::MinMax(min.priority, max.priority), + None => MinMaxResult::OneElement(min.priority), + }, + None => MinMaxResult::NoElements, + } + } +} + +#[cfg(test)] +mod tests { + use { + super::*, + solana_sdk::{ + compute_budget::ComputeBudgetInstruction, + hash::Hash, + message::Message, + packet::Packet, + signature::Keypair, + signer::Signer, + slot_history::Slot, + system_instruction, + transaction::{SanitizedTransaction, Transaction}, + }, + }; + + /// Returns (transaction_ttl, priority, cost) + fn test_transaction( + priority: u64, + ) -> ( + SanitizedTransactionTTL, + Arc, + u64, + u64, + ) { + let from_keypair = Keypair::new(); + let ixs = vec![ + system_instruction::transfer( + &from_keypair.pubkey(), + &solana_sdk::pubkey::new_rand(), + 1, + ), + ComputeBudgetInstruction::set_compute_unit_price(priority), + ]; + let message = Message::new(&ixs, Some(&from_keypair.pubkey())); + let tx = SanitizedTransaction::from_transaction_for_tests(Transaction::new( + &[&from_keypair], + message, + Hash::default(), + )); + let packet = Arc::new( + ImmutableDeserializedPacket::new( + Packet::from_data(None, tx.to_versioned_transaction()).unwrap(), + ) + .unwrap(), + ); + let transaction_ttl = SanitizedTransactionTTL { + transaction: tx, + max_age_slot: Slot::MAX, + }; + const TEST_TRANSACTION_COST: u64 = 5000; + (transaction_ttl, packet, priority, TEST_TRANSACTION_COST) + } + + fn push_to_container(container: &mut TransactionStateContainer, num: usize) { + for id in 0..num as u64 { + let priority = id; + let (transaction_ttl, packet, priority, cost) = test_transaction(priority); + container.insert_new_transaction( + TransactionId::new(id), + transaction_ttl, + packet, + priority, + cost, + ); + } + } + + #[test] + fn test_is_empty() { + let mut container = TransactionStateContainer::with_capacity(1); + assert!(container.is_empty()); + + push_to_container(&mut container, 1); + assert!(!container.is_empty()); + } + + #[test] + fn test_priority_queue_capacity() { + let mut container = TransactionStateContainer::with_capacity(1); + push_to_container(&mut container, 5); + + assert_eq!(container.priority_queue.len(), 1); + assert_eq!(container.id_to_transaction_state.len(), 1); + assert_eq!( + container + .id_to_transaction_state + .iter() + .map(|ts| ts.1.priority()) + .next() + .unwrap(), + 4 + ); + } + + #[test] + fn test_get_mut_transaction_state() { + let mut container = TransactionStateContainer::with_capacity(5); + push_to_container(&mut container, 5); + + let existing_id = TransactionId::new(3); + let non_existing_id = TransactionId::new(7); + assert!(container.get_mut_transaction_state(&existing_id).is_some()); + assert!(container.get_mut_transaction_state(&existing_id).is_some()); + assert!(container + .get_mut_transaction_state(&non_existing_id) + .is_none()); + } +}