diff --git a/core/src/banking_stage/immutable_deserialized_packet.rs b/core/src/banking_stage/immutable_deserialized_packet.rs index b03f3d5d64d4e8..7bb259494e4c31 100644 --- a/core/src/banking_stage/immutable_deserialized_packet.rs +++ b/core/src/banking_stage/immutable_deserialized_packet.rs @@ -41,12 +41,12 @@ pub enum DeserializedPacketError { #[derive(Debug, Eq)] pub struct ImmutableDeserializedPacket { - original_packet: Packet, - transaction: SanitizedVersionedTransaction, - message_hash: Hash, - is_simple_vote: bool, - compute_unit_price: u64, - compute_unit_limit: u32, + pub original_packet: Packet, + pub transaction: SanitizedVersionedTransaction, + pub message_hash: Hash, + pub is_simple_vote: bool, + pub compute_unit_price: u64, + pub compute_unit_limit: u32, } impl ImmutableDeserializedPacket { diff --git a/prio-graph-scheduler/src/deserializable_packet.rs b/prio-graph-scheduler/src/deserializable_packet.rs index f67984e9a7625e..79c2a834666d41 100644 --- a/prio-graph-scheduler/src/deserializable_packet.rs +++ b/prio-graph-scheduler/src/deserializable_packet.rs @@ -1,4 +1,5 @@ -use ahash::HashSet; +use std::collections::HashSet; +use solana_core::banking_stage::immutable_deserialized_packet::{DeserializedPacketError, ImmutableDeserializedPacket}; use solana_sdk::hash::Hash; use solana_sdk::message::AddressLoader; use solana_sdk::packet::Packet; @@ -37,3 +38,46 @@ pub trait DeserializableTxPacket: PartialEq + PartialOrd + Eq + Sized { fn compute_unit_limit(&self) -> u64; } + + +/// TODO: migrate to solana_core +impl DeserializableTxPacket for ImmutableDeserializedPacket { + type DeserializeError = DeserializedPacketError; + + fn from_packet(packet: Packet) -> Result { + ImmutableDeserializedPacket::new(packet) + } + + fn build_sanitized_transaction( + &self, + votes_only: bool, + address_loader: impl AddressLoader, + reserved_account_keys: &HashSet, + ) -> Option { + self.build_sanitized_transaction(votes_only, address_loader, reserved_account_keys) + } + + fn original_packet(&self) -> &Packet { + &self.original_packet + } + + fn transaction(&self) -> &SanitizedVersionedTransaction { + &self.transaction + } + + fn message_hash(&self) -> &Hash { + &self.message_hash + } + + fn is_simple_vote(&self) -> bool { + self.is_simple_vote + } + + fn compute_unit_price(&self) -> u64 { + self.compute_unit_price + } + + fn compute_unit_limit(&self) -> u64 { + u64::from(self.compute_unit_limit) + } +} \ No newline at end of file diff --git a/prio-graph-scheduler/src/prio_graph_scheduler.rs b/prio-graph-scheduler/src/prio_graph_scheduler.rs index a2f339d3baeaac..4d66a7901d4976 100644 --- a/prio-graph-scheduler/src/prio_graph_scheduler.rs +++ b/prio-graph-scheduler/src/prio_graph_scheduler.rs @@ -1,14 +1,12 @@ use { - crate::scheduler_messages::{ - ConsumeWork, FinishedConsumeWork, TransactionBatchId, TransactionId, - }, - crate::transaction_priority_id::TransactionPriorityId, - crate::transaction_state::TransactionState, crate::{ + deserializable_packet::DeserializableTxPacket, in_flight_tracker::InFlightTracker, scheduler_error::SchedulerError, + scheduler_messages::{ConsumeWork, FinishedConsumeWork, TransactionBatchId, TransactionId}, thread_aware_account_locks::{ThreadAwareAccountLocks, ThreadId, ThreadSet}, - transaction_state::SanitizedTransactionTTL, + transaction_priority_id::TransactionPriorityId, + transaction_state::{SanitizedTransactionTTL, TransactionState}, transaction_state_container::TransactionStateContainer, }, crossbeam_channel::{Receiver, Sender, TryRecvError}, @@ -25,15 +23,16 @@ use { }, }; -pub struct PrioGraphScheduler { +pub struct PrioGraphScheduler { in_flight_tracker: InFlightTracker, account_locks: ThreadAwareAccountLocks, consume_work_senders: Vec>, finished_consume_work_receiver: Receiver, look_ahead_window_size: usize, + phantom: std::marker::PhantomData

, } -impl PrioGraphScheduler { +impl PrioGraphScheduler

{ pub fn new( consume_work_senders: Vec>, finished_consume_work_receiver: Receiver, @@ -45,6 +44,7 @@ impl PrioGraphScheduler { consume_work_senders, finished_consume_work_receiver, look_ahead_window_size: 2048, + phantom: std::marker::PhantomData, } } @@ -66,7 +66,7 @@ impl PrioGraphScheduler { /// not cause conflicts in the near future. pub fn schedule( &mut self, - container: &mut TransactionStateContainer, + container: &mut TransactionStateContainer

, pre_graph_filter: impl Fn(&[&SanitizedTransaction], &mut [bool]), pre_lock_filter: impl Fn(&SanitizedTransaction) -> bool, ) -> Result { @@ -102,7 +102,7 @@ impl PrioGraphScheduler { let mut total_filter_time_us: u64 = 0; let mut window_budget = self.look_ahead_window_size; - let mut chunked_pops = |container: &mut TransactionStateContainer, + let mut chunked_pops = |container: &mut TransactionStateContainer

, prio_graph: &mut PrioGraph<_, _, _, _>, window_budget: &mut usize| { while *window_budget > 0 { @@ -281,7 +281,7 @@ impl PrioGraphScheduler { /// Returns (num_transactions, num_retryable_transactions) on success. pub fn receive_completed( &mut self, - container: &mut TransactionStateContainer, + container: &mut TransactionStateContainer

, ) -> Result<(usize, usize), SchedulerError> { let mut total_num_transactions: usize = 0; let mut total_num_retryable: usize = 0; @@ -300,7 +300,7 @@ impl PrioGraphScheduler { /// Returns `Ok((num_transactions, num_retryable))` if a batch was received, `Ok((0, 0))` if no batch was received. fn try_receive_completed( &mut self, - container: &mut TransactionStateContainer, + container: &mut TransactionStateContainer

, ) -> Result<(usize, usize), SchedulerError> { match self.finished_consume_work_receiver.try_recv() { Ok(FinishedConsumeWork { @@ -535,8 +535,8 @@ enum TransactionSchedulingError { UnschedulableConflicts, } -fn try_schedule_transaction( - transaction_state: &mut TransactionState, +fn try_schedule_transaction( + transaction_state: &mut TransactionState

, pre_lock_filter: impl Fn(&SanitizedTransaction) -> bool, blocking_locks: &mut ReadWriteAccountSet, account_locks: &mut ThreadAwareAccountLocks, @@ -621,15 +621,17 @@ mod tests { fn create_test_frame( num_threads: usize, ) -> ( - PrioGraphScheduler, + PrioGraphScheduler, Vec>, Sender, ) { let (consume_work_senders, consume_work_receivers) = (0..num_threads).map(|_| unbounded()).unzip(); let (finished_consume_work_sender, finished_consume_work_receiver) = unbounded(); - let scheduler = - PrioGraphScheduler::new(consume_work_senders, finished_consume_work_receiver); + let scheduler = PrioGraphScheduler::::new( + consume_work_senders, + finished_consume_work_receiver, + ); ( scheduler, consume_work_receivers, @@ -666,8 +668,9 @@ mod tests { u64, ), >, - ) -> TransactionStateContainer { - let mut container = TransactionStateContainer::with_capacity(10 * 1024); + ) -> TransactionStateContainer { + let mut container = + TransactionStateContainer::::with_capacity(10 * 1024); for (index, (from_keypair, to_pubkeys, lamports, compute_unit_price)) in tx_infos.into_iter().enumerate() { diff --git a/prio-graph-scheduler/src/scheduler_controller.rs b/prio-graph-scheduler/src/scheduler_controller.rs deleted file mode 100644 index 5a04e1eb39d65b..00000000000000 --- a/prio-graph-scheduler/src/scheduler_controller.rs +++ /dev/null @@ -1,1161 +0,0 @@ -//! Control flow for BankingStage's transaction scheduler. -//! - -use { - super::{ - prio_graph_scheduler::PrioGraphScheduler, - scheduler_error::SchedulerError, - scheduler_metrics::{ - SchedulerCountMetrics, SchedulerLeaderDetectionMetrics, SchedulerTimingMetrics, - }, - transaction_id_generator::TransactionIdGenerator, - transaction_state::SanitizedTransactionTTL, - transaction_state_container::TransactionStateContainer, - }, - crate::banking_stage::{ - consume_worker::ConsumeWorkerMetrics, - consumer::Consumer, - decision_maker::{BufferedPacketsDecision, DecisionMaker}, - forwarder::Forwarder, - immutable_deserialized_packet::ImmutableDeserializedPacket, - packet_deserializer::PacketDeserializer, - ForwardOption, LikeClusterInfo, TOTAL_BUFFERED_PACKETS, - }, - arrayvec::ArrayVec, - crossbeam_channel::RecvTimeoutError, - solana_accounts_db::account_locks::validate_account_locks, - solana_cost_model::cost_model::CostModel, - solana_measure::measure_us, - solana_runtime::{bank::Bank, bank_forks::BankForks}, - solana_runtime_transaction::instructions_processor::process_compute_budget_instructions, - solana_sdk::{ - self, - clock::{FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, MAX_PROCESSING_AGE}, - fee::FeeBudgetLimits, - saturating_add_assign, - transaction::SanitizedTransaction, - }, - solana_svm::transaction_error_metrics::TransactionErrorMetrics, - solana_svm_transaction::svm_message::SVMMessage, - std::{ - sync::{Arc, RwLock}, - time::{Duration, Instant}, - }, -}; - -/// Controls packet and transaction flow into scheduler, and scheduling execution. -pub(crate) struct SchedulerController { - /// Decision maker for determining what should be done with transactions. - decision_maker: DecisionMaker, - /// Packet/Transaction ingress. - packet_receiver: PacketDeserializer, - bank_forks: Arc>, - /// Generates unique IDs for incoming transactions. - transaction_id_generator: TransactionIdGenerator, - /// Container for transaction state. - /// Shared resource between `packet_receiver` and `scheduler`. - container: TransactionStateContainer, - /// State for scheduling and communicating with worker threads. - scheduler: PrioGraphScheduler, - /// Metrics tracking time for leader bank detection. - leader_detection_metrics: SchedulerLeaderDetectionMetrics, - /// Metrics tracking counts on transactions in different states - /// over an interval and during a leader slot. - count_metrics: SchedulerCountMetrics, - /// Metrics tracking time spent in difference code sections - /// over an interval and during a leader slot. - timing_metrics: SchedulerTimingMetrics, - /// Metric report handles for the worker threads. - worker_metrics: Vec>, - /// State for forwarding packets to the leader, if enabled. - forwarder: Option>, -} - -impl SchedulerController { - pub fn new( - decision_maker: DecisionMaker, - packet_deserializer: PacketDeserializer, - bank_forks: Arc>, - scheduler: PrioGraphScheduler, - worker_metrics: Vec>, - forwarder: Option>, - ) -> Self { - Self { - decision_maker, - packet_receiver: packet_deserializer, - bank_forks, - transaction_id_generator: TransactionIdGenerator::default(), - container: TransactionStateContainer::with_capacity(TOTAL_BUFFERED_PACKETS), - scheduler, - leader_detection_metrics: SchedulerLeaderDetectionMetrics::default(), - count_metrics: SchedulerCountMetrics::default(), - timing_metrics: SchedulerTimingMetrics::default(), - worker_metrics, - forwarder, - } - } - - pub fn run(mut self) -> Result<(), SchedulerError> { - loop { - // BufferedPacketsDecision is shared with legacy BankingStage, which will forward - // packets. Initially, not renaming these decision variants but the actions taken - // are different, since new BankingStage will not forward packets. - // For `Forward` and `ForwardAndHold`, we want to receive packets but will not - // forward them to the next leader. In this case, `ForwardAndHold` is - // indistinguishable from `Hold`. - // - // `Forward` will drop packets from the buffer instead of forwarding. - // During receiving, since packets would be dropped from buffer anyway, we can - // bypass sanitization and buffering and immediately drop the packets. - let (decision, decision_time_us) = - measure_us!(self.decision_maker.make_consume_or_forward_decision()); - self.timing_metrics.update(|timing_metrics| { - saturating_add_assign!(timing_metrics.decision_time_us, decision_time_us); - }); - let new_leader_slot = decision.bank_start().map(|b| b.working_bank.slot()); - self.leader_detection_metrics - .update_and_maybe_report(decision.bank_start()); - self.count_metrics - .maybe_report_and_reset_slot(new_leader_slot); - self.timing_metrics - .maybe_report_and_reset_slot(new_leader_slot); - - self.process_transactions(&decision)?; - self.receive_completed()?; - if !self.receive_and_buffer_packets(&decision) { - break; - } - // Report metrics only if there is data. - // Reset intervals when appropriate, regardless of report. - let should_report = self.count_metrics.interval_has_data(); - let priority_min_max = self.container.get_min_max_priority(); - self.count_metrics.update(|count_metrics| { - count_metrics.update_priority_stats(priority_min_max); - }); - self.count_metrics - .maybe_report_and_reset_interval(should_report); - self.timing_metrics - .maybe_report_and_reset_interval(should_report); - self.worker_metrics - .iter() - .for_each(|metrics| metrics.maybe_report_and_reset()); - } - - Ok(()) - } - - /// Process packets based on decision. - fn process_transactions( - &mut self, - decision: &BufferedPacketsDecision, - ) -> Result<(), SchedulerError> { - let forwarding_enabled = self.forwarder.is_some(); - match decision { - BufferedPacketsDecision::Consume(bank_start) => { - let (scheduling_summary, schedule_time_us) = measure_us!(self.scheduler.schedule( - &mut self.container, - |txs, results| { - Self::pre_graph_filter( - txs, - results, - &bank_start.working_bank, - MAX_PROCESSING_AGE, - ) - }, - |_| true // no pre-lock filter for now - )?); - - self.count_metrics.update(|count_metrics| { - saturating_add_assign!( - count_metrics.num_scheduled, - scheduling_summary.num_scheduled - ); - saturating_add_assign!( - count_metrics.num_unschedulable, - scheduling_summary.num_unschedulable - ); - saturating_add_assign!( - count_metrics.num_schedule_filtered_out, - scheduling_summary.num_filtered_out - ); - }); - - self.timing_metrics.update(|timing_metrics| { - saturating_add_assign!( - timing_metrics.schedule_filter_time_us, - scheduling_summary.filter_time_us - ); - saturating_add_assign!(timing_metrics.schedule_time_us, schedule_time_us); - }); - } - BufferedPacketsDecision::Forward => { - if forwarding_enabled { - let (_, forward_time_us) = measure_us!(self.forward_packets(false)); - self.timing_metrics.update(|timing_metrics| { - saturating_add_assign!(timing_metrics.forward_time_us, forward_time_us); - }); - } else { - let (_, clear_time_us) = measure_us!(self.clear_container()); - self.timing_metrics.update(|timing_metrics| { - saturating_add_assign!(timing_metrics.clear_time_us, clear_time_us); - }); - } - } - BufferedPacketsDecision::ForwardAndHold => { - if forwarding_enabled { - let (_, forward_time_us) = measure_us!(self.forward_packets(true)); - self.timing_metrics.update(|timing_metrics| { - saturating_add_assign!(timing_metrics.forward_time_us, forward_time_us); - }); - } else { - let (_, clean_time_us) = measure_us!(self.clean_queue()); - self.timing_metrics.update(|timing_metrics| { - saturating_add_assign!(timing_metrics.clean_time_us, clean_time_us); - }); - } - } - BufferedPacketsDecision::Hold => {} - } - - Ok(()) - } - - fn pre_graph_filter( - transactions: &[&SanitizedTransaction], - results: &mut [bool], - bank: &Bank, - max_age: usize, - ) { - let lock_results = vec![Ok(()); transactions.len()]; - let mut error_counters = TransactionErrorMetrics::default(); - let check_results = - bank.check_transactions(transactions, &lock_results, max_age, &mut error_counters); - - let fee_check_results: Vec<_> = check_results - .into_iter() - .zip(transactions) - .map(|(result, tx)| { - result?; // if there's already error do nothing - Consumer::check_fee_payer_unlocked(bank, tx.message(), &mut error_counters) - }) - .collect(); - - for (fee_check_result, result) in fee_check_results.into_iter().zip(results.iter_mut()) { - *result = fee_check_result.is_ok(); - } - } - - /// Forward packets to the next leader. - fn forward_packets(&mut self, hold: bool) { - const MAX_FORWARDING_DURATION: Duration = Duration::from_millis(100); - let start = Instant::now(); - let bank = self.bank_forks.read().unwrap().working_bank(); - let feature_set = &bank.feature_set; - let forwarder = self.forwarder.as_mut().expect("forwarder must exist"); - - // Pop from the container in chunks, filter using bank checks, then attempt to forward. - // This doubles as a way to clean the queue as well as forwarding transactions. - const CHUNK_SIZE: usize = 64; - let mut num_forwarded: usize = 0; - let mut ids_to_add_back = Vec::new(); - let mut max_time_reached = false; - while !self.container.is_empty() { - let mut filter_array = [true; CHUNK_SIZE]; - let mut ids = Vec::with_capacity(CHUNK_SIZE); - let mut txs = Vec::with_capacity(CHUNK_SIZE); - - for _ in 0..CHUNK_SIZE { - if let Some(id) = self.container.pop() { - ids.push(id); - } else { - break; - } - } - let chunk_size = ids.len(); - ids.iter().for_each(|id| { - let transaction = self.container.get_transaction_ttl(&id.id).unwrap(); - txs.push(&transaction.transaction); - }); - - // use same filter we use for processing transactions: - // age, already processed, fee-check. - Self::pre_graph_filter( - &txs, - &mut filter_array, - &bank, - MAX_PROCESSING_AGE - .saturating_sub(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET as usize), - ); - - for (id, filter_result) in ids.iter().zip(&filter_array[..chunk_size]) { - if !*filter_result { - self.container.remove_by_id(&id.id); - continue; - } - - ids_to_add_back.push(*id); // add back to the queue at end - let state = self.container.get_mut_transaction_state(&id.id).unwrap(); - let sanitized_transaction = &state.transaction_ttl().transaction; - let immutable_packet = state.packet().clone(); - - // If not already forwarded and can be forwarded, add to forwardable packets. - if state.should_forward() - && forwarder.try_add_packet( - sanitized_transaction, - immutable_packet, - feature_set, - ) - { - saturating_add_assign!(num_forwarded, 1); - state.mark_forwarded(); - } - } - - if start.elapsed() >= MAX_FORWARDING_DURATION { - max_time_reached = true; - break; - } - } - - // Forward each batch of transactions - forwarder.forward_batched_packets(&ForwardOption::ForwardTransaction); - forwarder.clear_batches(); - - // If we hit the time limit. Drop everything that was not checked/processed. - // If we cannot run these simple checks in time, then we cannot run them during - // leader slot. - if max_time_reached { - while let Some(id) = self.container.pop() { - self.container.remove_by_id(&id.id); - } - } - - if hold { - for priority_id in ids_to_add_back { - self.container.push_id_into_queue(priority_id); - } - } else { - for priority_id in ids_to_add_back { - self.container.remove_by_id(&priority_id.id); - } - } - - self.count_metrics.update(|count_metrics| { - saturating_add_assign!(count_metrics.num_forwarded, num_forwarded); - }); - } - - /// Clears the transaction state container. - /// This only clears pending transactions, and does **not** clear in-flight transactions. - fn clear_container(&mut self) { - let mut num_dropped_on_clear: usize = 0; - while let Some(id) = self.container.pop() { - self.container.remove_by_id(&id.id); - saturating_add_assign!(num_dropped_on_clear, 1); - } - - self.count_metrics.update(|count_metrics| { - saturating_add_assign!(count_metrics.num_dropped_on_clear, num_dropped_on_clear); - }); - } - - /// Clean unprocessable transactions from the queue. These will be transactions that are - /// expired, already processed, or are no longer sanitizable. - /// This only clears pending transactions, and does **not** clear in-flight transactions. - fn clean_queue(&mut self) { - // Clean up any transactions that have already been processed, are too old, or do not have - // valid nonce accounts. - const MAX_TRANSACTION_CHECKS: usize = 10_000; - let mut transaction_ids = Vec::with_capacity(MAX_TRANSACTION_CHECKS); - - while let Some(id) = self.container.pop() { - transaction_ids.push(id); - } - - let bank = self.bank_forks.read().unwrap().working_bank(); - - const CHUNK_SIZE: usize = 128; - let mut error_counters = TransactionErrorMetrics::default(); - let mut num_dropped_on_age_and_status: usize = 0; - for chunk in transaction_ids.chunks(CHUNK_SIZE) { - let lock_results = vec![Ok(()); chunk.len()]; - let sanitized_txs: Vec<_> = chunk - .iter() - .map(|id| { - &self - .container - .get_transaction_ttl(&id.id) - .expect("transaction must exist") - .transaction - }) - .collect(); - - let check_results = bank.check_transactions( - &sanitized_txs, - &lock_results, - MAX_PROCESSING_AGE, - &mut error_counters, - ); - - for (result, id) in check_results.into_iter().zip(chunk.iter()) { - if result.is_err() { - saturating_add_assign!(num_dropped_on_age_and_status, 1); - self.container.remove_by_id(&id.id); - } else { - self.container.push_id_into_queue(*id); - } - } - } - - self.count_metrics.update(|count_metrics| { - saturating_add_assign!( - count_metrics.num_dropped_on_age_and_status, - num_dropped_on_age_and_status - ); - }); - } - - /// Receives completed transactions from the workers and updates metrics. - fn receive_completed(&mut self) -> Result<(), SchedulerError> { - let ((num_transactions, num_retryable), receive_completed_time_us) = - measure_us!(self.scheduler.receive_completed(&mut self.container)?); - - self.count_metrics.update(|count_metrics| { - saturating_add_assign!(count_metrics.num_finished, num_transactions); - saturating_add_assign!(count_metrics.num_retryable, num_retryable); - }); - self.timing_metrics.update(|timing_metrics| { - saturating_add_assign!( - timing_metrics.receive_completed_time_us, - receive_completed_time_us - ); - }); - - Ok(()) - } - - /// Returns whether the packet receiver is still connected. - fn receive_and_buffer_packets(&mut self, decision: &BufferedPacketsDecision) -> bool { - let remaining_queue_capacity = self.container.remaining_queue_capacity(); - - const MAX_PACKET_RECEIVE_TIME: Duration = Duration::from_millis(10); - let (recv_timeout, should_buffer) = match decision { - BufferedPacketsDecision::Consume(_) => ( - if self.container.is_empty() { - MAX_PACKET_RECEIVE_TIME - } else { - Duration::ZERO - }, - true, - ), - BufferedPacketsDecision::Forward => (MAX_PACKET_RECEIVE_TIME, self.forwarder.is_some()), - BufferedPacketsDecision::ForwardAndHold | BufferedPacketsDecision::Hold => { - (MAX_PACKET_RECEIVE_TIME, true) - } - }; - - let (received_packet_results, receive_time_us) = measure_us!(self - .packet_receiver - .receive_packets(recv_timeout, remaining_queue_capacity, |packet| { - packet.check_excessive_precompiles()?; - Ok(packet) - })); - - self.timing_metrics.update(|timing_metrics| { - saturating_add_assign!(timing_metrics.receive_time_us, receive_time_us); - }); - - match received_packet_results { - Ok(receive_packet_results) => { - let num_received_packets = receive_packet_results.deserialized_packets.len(); - - self.count_metrics.update(|count_metrics| { - saturating_add_assign!(count_metrics.num_received, num_received_packets); - }); - - if should_buffer { - let (_, buffer_time_us) = measure_us!( - self.buffer_packets(receive_packet_results.deserialized_packets) - ); - self.timing_metrics.update(|timing_metrics| { - saturating_add_assign!(timing_metrics.buffer_time_us, buffer_time_us); - }); - } else { - self.count_metrics.update(|count_metrics| { - saturating_add_assign!( - count_metrics.num_dropped_on_receive, - num_received_packets - ); - }); - } - } - Err(RecvTimeoutError::Timeout) => {} - Err(RecvTimeoutError::Disconnected) => return false, - } - - true - } - - fn buffer_packets(&mut self, packets: Vec) { - // Convert to Arcs - let packets: Vec<_> = packets.into_iter().map(Arc::new).collect(); - // Sanitize packets, generate IDs, and insert into the container. - let bank = self.bank_forks.read().unwrap().working_bank(); - let last_slot_in_epoch = bank.epoch_schedule().get_last_slot_in_epoch(bank.epoch()); - let transaction_account_lock_limit = bank.get_transaction_account_lock_limit(); - let vote_only = bank.vote_only_bank(); - - const CHUNK_SIZE: usize = 128; - let lock_results: [_; CHUNK_SIZE] = core::array::from_fn(|_| Ok(())); - - let mut arc_packets = ArrayVec::<_, CHUNK_SIZE>::new(); - let mut transactions = ArrayVec::<_, CHUNK_SIZE>::new(); - let mut fee_budget_limits_vec = ArrayVec::<_, CHUNK_SIZE>::new(); - - let mut error_counts = TransactionErrorMetrics::default(); - for chunk in packets.chunks(CHUNK_SIZE) { - let mut post_sanitization_count: usize = 0; - chunk - .iter() - .filter_map(|packet| { - packet - .build_sanitized_transaction( - vote_only, - bank.as_ref(), - bank.get_reserved_account_keys(), - ) - .map(|tx| (packet.clone(), tx)) - }) - .inspect(|_| saturating_add_assign!(post_sanitization_count, 1)) - .filter(|(_packet, tx)| { - validate_account_locks( - tx.message().account_keys(), - transaction_account_lock_limit, - ) - .is_ok() - }) - .filter_map(|(packet, tx)| { - process_compute_budget_instructions(SVMMessage::program_instructions_iter(&tx)) - .map(|compute_budget| (packet, tx, compute_budget.into())) - .ok() - }) - .for_each(|(packet, tx, fee_budget_limits)| { - arc_packets.push(packet); - transactions.push(tx); - fee_budget_limits_vec.push(fee_budget_limits); - }); - - let check_results = bank.check_transactions( - &transactions, - &lock_results[..transactions.len()], - MAX_PROCESSING_AGE, - &mut error_counts, - ); - let post_lock_validation_count = transactions.len(); - - let mut post_transaction_check_count: usize = 0; - let mut num_dropped_on_capacity: usize = 0; - let mut num_buffered: usize = 0; - for (((packet, transaction), fee_budget_limits), _check_result) in arc_packets - .drain(..) - .zip(transactions.drain(..)) - .zip(fee_budget_limits_vec.drain(..)) - .zip(check_results) - .filter(|(_, check_result)| check_result.is_ok()) - { - saturating_add_assign!(post_transaction_check_count, 1); - let transaction_id = self.transaction_id_generator.next(); - - let (priority, cost) = - Self::calculate_priority_and_cost(&transaction, &fee_budget_limits, &bank); - let transaction_ttl = SanitizedTransactionTTL { - transaction, - max_age_slot: last_slot_in_epoch, - }; - - if self.container.insert_new_transaction( - transaction_id, - transaction_ttl, - packet, - priority, - cost, - ) { - saturating_add_assign!(num_dropped_on_capacity, 1); - } - saturating_add_assign!(num_buffered, 1); - } - - // Update metrics for transactions that were dropped. - let num_dropped_on_sanitization = chunk.len().saturating_sub(post_sanitization_count); - let num_dropped_on_lock_validation = - post_sanitization_count.saturating_sub(post_lock_validation_count); - let num_dropped_on_transaction_checks = - post_lock_validation_count.saturating_sub(post_transaction_check_count); - - self.count_metrics.update(|count_metrics| { - saturating_add_assign!( - count_metrics.num_dropped_on_capacity, - num_dropped_on_capacity - ); - saturating_add_assign!(count_metrics.num_buffered, num_buffered); - saturating_add_assign!( - count_metrics.num_dropped_on_sanitization, - num_dropped_on_sanitization - ); - saturating_add_assign!( - count_metrics.num_dropped_on_validate_locks, - num_dropped_on_lock_validation - ); - saturating_add_assign!( - count_metrics.num_dropped_on_receive_transaction_checks, - num_dropped_on_transaction_checks - ); - }); - } - } - - /// Calculate priority and cost for a transaction: - /// - /// Cost is calculated through the `CostModel`, - /// and priority is calculated through a formula here that attempts to sell - /// blockspace to the highest bidder. - /// - /// The priority is calculated as: - /// P = R / (1 + C) - /// where P is the priority, R is the reward, - /// and C is the cost towards block-limits. - /// - /// Current minimum costs are on the order of several hundred, - /// so the denominator is effectively C, and the +1 is simply - /// to avoid any division by zero due to a bug - these costs - /// are calculated by the cost-model and are not direct - /// from user input. They should never be zero. - /// Any difference in the prioritization is negligible for - /// the current transaction costs. - fn calculate_priority_and_cost( - transaction: &SanitizedTransaction, - fee_budget_limits: &FeeBudgetLimits, - bank: &Bank, - ) -> (u64, u64) { - let cost = CostModel::calculate_cost(transaction, &bank.feature_set).sum(); - let reward = bank.calculate_reward_for_transaction(transaction, fee_budget_limits); - - // We need a multiplier here to avoid rounding down too aggressively. - // For many transactions, the cost will be greater than the fees in terms of raw lamports. - // For the purposes of calculating prioritization, we multiply the fees by a large number so that - // the cost is a small fraction. - // An offset of 1 is used in the denominator to explicitly avoid division by zero. - const MULTIPLIER: u64 = 1_000_000; - ( - reward - .saturating_mul(MULTIPLIER) - .saturating_div(cost.saturating_add(1)), - cost, - ) - } -} - -#[cfg(test)] -mod tests { - use { - super::*, - crate::{ - banking_stage::{ - consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH, - scheduler_messages::{ConsumeWork, FinishedConsumeWork, TransactionBatchId}, - tests::create_slow_genesis_config, - }, - banking_trace::BankingPacketBatch, - sigverify::SigverifyTracerPacketStats, - }, - crossbeam_channel::{unbounded, Receiver, Sender}, - itertools::Itertools, - solana_gossip::cluster_info::ClusterInfo, - solana_ledger::{ - blockstore::Blockstore, genesis_utils::GenesisConfigInfo, - get_tmp_ledger_path_auto_delete, leader_schedule_cache::LeaderScheduleCache, - }, - solana_perf::packet::{to_packet_batches, PacketBatch, NUM_PACKETS}, - solana_poh::poh_recorder::{PohRecorder, Record, WorkingBankEntry}, - solana_runtime::bank::Bank, - solana_sdk::{ - compute_budget::ComputeBudgetInstruction, fee_calculator::FeeRateGovernor, hash::Hash, - message::Message, poh_config::PohConfig, pubkey::Pubkey, signature::Keypair, - signer::Signer, system_instruction, system_transaction, transaction::Transaction, - }, - std::sync::{atomic::AtomicBool, Arc, RwLock}, - tempfile::TempDir, - }; - - fn create_channels(num: usize) -> (Vec>, Vec>) { - (0..num).map(|_| unbounded()).unzip() - } - - // Helper struct to create tests that hold channels, files, etc. - // such that our tests can be more easily set up and run. - struct TestFrame { - bank: Arc, - mint_keypair: Keypair, - _ledger_path: TempDir, - _entry_receiver: Receiver, - _record_receiver: Receiver, - poh_recorder: Arc>, - banking_packet_sender: Sender, Option)>>, - - consume_work_receivers: Vec>, - finished_consume_work_sender: Sender, - } - - fn create_test_frame(num_threads: usize) -> (TestFrame, SchedulerController>) { - let GenesisConfigInfo { - mut genesis_config, - mint_keypair, - .. - } = create_slow_genesis_config(u64::MAX); - genesis_config.fee_rate_governor = FeeRateGovernor::new(5000, 0); - let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); - - let ledger_path = get_tmp_ledger_path_auto_delete!(); - let blockstore = Blockstore::open(ledger_path.path()) - .expect("Expected to be able to open database ledger"); - let (poh_recorder, entry_receiver, record_receiver) = PohRecorder::new( - bank.tick_height(), - bank.last_blockhash(), - bank.clone(), - Some((4, 4)), - bank.ticks_per_slot(), - Arc::new(blockstore), - &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), - &PohConfig::default(), - Arc::new(AtomicBool::default()), - ); - let poh_recorder = Arc::new(RwLock::new(poh_recorder)); - let decision_maker = DecisionMaker::new(Pubkey::new_unique(), poh_recorder.clone()); - - let (banking_packet_sender, banking_packet_receiver) = unbounded(); - let packet_deserializer = PacketDeserializer::new(banking_packet_receiver); - - let (consume_work_senders, consume_work_receivers) = create_channels(num_threads); - let (finished_consume_work_sender, finished_consume_work_receiver) = unbounded(); - - let test_frame = TestFrame { - bank, - mint_keypair, - _ledger_path: ledger_path, - _entry_receiver: entry_receiver, - _record_receiver: record_receiver, - poh_recorder, - banking_packet_sender, - consume_work_receivers, - finished_consume_work_sender, - }; - - let scheduler_controller = SchedulerController::new( - decision_maker, - packet_deserializer, - bank_forks, - PrioGraphScheduler::new(consume_work_senders, finished_consume_work_receiver), - vec![], // no actual workers with metrics to report, this can be empty - None, - ); - - (test_frame, scheduler_controller) - } - - fn create_and_fund_prioritized_transfer( - bank: &Bank, - mint_keypair: &Keypair, - from_keypair: &Keypair, - to_pubkey: &Pubkey, - lamports: u64, - compute_unit_price: u64, - recent_blockhash: Hash, - ) -> Transaction { - // Fund the sending key, so that the transaction does not get filtered by the fee-payer check. - { - let transfer = system_transaction::transfer( - mint_keypair, - &from_keypair.pubkey(), - 500_000, // just some amount that will always be enough - bank.last_blockhash(), - ); - bank.process_transaction(&transfer).unwrap(); - } - - let transfer = system_instruction::transfer(&from_keypair.pubkey(), to_pubkey, lamports); - let prioritization = ComputeBudgetInstruction::set_compute_unit_price(compute_unit_price); - let message = Message::new(&[transfer, prioritization], Some(&from_keypair.pubkey())); - Transaction::new(&vec![from_keypair], message, recent_blockhash) - } - - fn to_banking_packet_batch(txs: &[Transaction]) -> BankingPacketBatch { - let packet_batch = to_packet_batches(txs, NUM_PACKETS); - Arc::new((packet_batch, None)) - } - - // Helper function to let test receive and then schedule packets. - // The order of operations here is convenient for testing, but does not - // match the order of operations in the actual scheduler. - // The actual scheduler will process immediately after the decision, - // in order to keep the decision as recent as possible for processing. - // In the tests, the decision will not become stale, so it is more convenient - // to receive first and then schedule. - fn test_receive_then_schedule( - scheduler_controller: &mut SchedulerController>, - ) { - let decision = scheduler_controller - .decision_maker - .make_consume_or_forward_decision(); - assert!(matches!(decision, BufferedPacketsDecision::Consume(_))); - assert!(scheduler_controller.receive_completed().is_ok()); - assert!(scheduler_controller.receive_and_buffer_packets(&decision)); - assert!(scheduler_controller.process_transactions(&decision).is_ok()); - } - - #[test] - #[should_panic(expected = "batch id 0 is not being tracked")] - fn test_unexpected_batch_id() { - let (test_frame, scheduler_controller) = create_test_frame(1); - let TestFrame { - finished_consume_work_sender, - .. - } = &test_frame; - - finished_consume_work_sender - .send(FinishedConsumeWork { - work: ConsumeWork { - batch_id: TransactionBatchId::new(0), - ids: vec![], - transactions: vec![], - max_age_slots: vec![], - }, - retryable_indexes: vec![], - }) - .unwrap(); - - scheduler_controller.run().unwrap(); - } - - #[test] - fn test_schedule_consume_single_threaded_no_conflicts() { - let (test_frame, mut scheduler_controller) = create_test_frame(1); - let TestFrame { - bank, - mint_keypair, - poh_recorder, - banking_packet_sender, - consume_work_receivers, - .. - } = &test_frame; - - poh_recorder - .write() - .unwrap() - .set_bank_for_test(bank.clone()); - - // Send packet batch to the scheduler - should do nothing until we become the leader. - let tx1 = create_and_fund_prioritized_transfer( - bank, - mint_keypair, - &Keypair::new(), - &Pubkey::new_unique(), - 1, - 1000, - bank.last_blockhash(), - ); - let tx2 = create_and_fund_prioritized_transfer( - bank, - mint_keypair, - &Keypair::new(), - &Pubkey::new_unique(), - 1, - 2000, - bank.last_blockhash(), - ); - let tx1_hash = tx1.message().hash(); - let tx2_hash = tx2.message().hash(); - - let txs = vec![tx1, tx2]; - banking_packet_sender - .send(to_banking_packet_batch(&txs)) - .unwrap(); - - test_receive_then_schedule(&mut scheduler_controller); - let consume_work = consume_work_receivers[0].try_recv().unwrap(); - assert_eq!(consume_work.ids.len(), 2); - assert_eq!(consume_work.transactions.len(), 2); - let message_hashes = consume_work - .transactions - .iter() - .map(|tx| tx.message_hash()) - .collect_vec(); - assert_eq!(message_hashes, vec![&tx2_hash, &tx1_hash]); - } - - #[test] - fn test_schedule_consume_single_threaded_conflict() { - let (test_frame, mut scheduler_controller) = create_test_frame(1); - let TestFrame { - bank, - mint_keypair, - poh_recorder, - banking_packet_sender, - consume_work_receivers, - .. - } = &test_frame; - - poh_recorder - .write() - .unwrap() - .set_bank_for_test(bank.clone()); - - let pk = Pubkey::new_unique(); - let tx1 = create_and_fund_prioritized_transfer( - bank, - mint_keypair, - &Keypair::new(), - &pk, - 1, - 1000, - bank.last_blockhash(), - ); - let tx2 = create_and_fund_prioritized_transfer( - bank, - mint_keypair, - &Keypair::new(), - &pk, - 1, - 2000, - bank.last_blockhash(), - ); - let tx1_hash = tx1.message().hash(); - let tx2_hash = tx2.message().hash(); - - let txs = vec![tx1, tx2]; - banking_packet_sender - .send(to_banking_packet_batch(&txs)) - .unwrap(); - - // We expect 2 batches to be scheduled - test_receive_then_schedule(&mut scheduler_controller); - let consume_works = (0..2) - .map(|_| consume_work_receivers[0].try_recv().unwrap()) - .collect_vec(); - - let num_txs_per_batch = consume_works.iter().map(|cw| cw.ids.len()).collect_vec(); - let message_hashes = consume_works - .iter() - .flat_map(|cw| cw.transactions.iter().map(|tx| tx.message_hash())) - .collect_vec(); - assert_eq!(num_txs_per_batch, vec![1; 2]); - assert_eq!(message_hashes, vec![&tx2_hash, &tx1_hash]); - } - - #[test] - fn test_schedule_consume_single_threaded_multi_batch() { - let (test_frame, mut scheduler_controller) = create_test_frame(1); - let TestFrame { - bank, - mint_keypair, - poh_recorder, - banking_packet_sender, - consume_work_receivers, - .. - } = &test_frame; - - poh_recorder - .write() - .unwrap() - .set_bank_for_test(bank.clone()); - - // Send multiple batches - all get scheduled - let txs1 = (0..2 * TARGET_NUM_TRANSACTIONS_PER_BATCH) - .map(|i| { - create_and_fund_prioritized_transfer( - bank, - mint_keypair, - &Keypair::new(), - &Pubkey::new_unique(), - i as u64, - 1, - bank.last_blockhash(), - ) - }) - .collect_vec(); - let txs2 = (0..2 * TARGET_NUM_TRANSACTIONS_PER_BATCH) - .map(|i| { - create_and_fund_prioritized_transfer( - bank, - mint_keypair, - &Keypair::new(), - &Pubkey::new_unique(), - i as u64, - 2, - bank.last_blockhash(), - ) - }) - .collect_vec(); - - banking_packet_sender - .send(to_banking_packet_batch(&txs1)) - .unwrap(); - banking_packet_sender - .send(to_banking_packet_batch(&txs2)) - .unwrap(); - - // We expect 4 batches to be scheduled - test_receive_then_schedule(&mut scheduler_controller); - let consume_works = (0..4) - .map(|_| consume_work_receivers[0].try_recv().unwrap()) - .collect_vec(); - - assert_eq!( - consume_works.iter().map(|cw| cw.ids.len()).collect_vec(), - vec![TARGET_NUM_TRANSACTIONS_PER_BATCH; 4] - ); - } - - #[test] - fn test_schedule_consume_simple_thread_selection() { - let (test_frame, mut scheduler_controller) = create_test_frame(2); - let TestFrame { - bank, - mint_keypair, - poh_recorder, - banking_packet_sender, - consume_work_receivers, - .. - } = &test_frame; - - poh_recorder - .write() - .unwrap() - .set_bank_for_test(bank.clone()); - - // Send 4 transactions w/o conflicts. 2 should be scheduled on each thread - let txs = (0..4) - .map(|i| { - create_and_fund_prioritized_transfer( - bank, - mint_keypair, - &Keypair::new(), - &Pubkey::new_unique(), - 1, - i * 10, - bank.last_blockhash(), - ) - }) - .collect_vec(); - banking_packet_sender - .send(to_banking_packet_batch(&txs)) - .unwrap(); - - // Priority Expectation: - // Thread 0: [3, 1] - // Thread 1: [2, 0] - let t0_expected = [3, 1] - .into_iter() - .map(|i| txs[i].message().hash()) - .collect_vec(); - let t1_expected = [2, 0] - .into_iter() - .map(|i| txs[i].message().hash()) - .collect_vec(); - - test_receive_then_schedule(&mut scheduler_controller); - let t0_actual = consume_work_receivers[0] - .try_recv() - .unwrap() - .transactions - .iter() - .map(|tx| *tx.message_hash()) - .collect_vec(); - let t1_actual = consume_work_receivers[1] - .try_recv() - .unwrap() - .transactions - .iter() - .map(|tx| *tx.message_hash()) - .collect_vec(); - - assert_eq!(t0_actual, t0_expected); - assert_eq!(t1_actual, t1_expected); - } - - #[test] - fn test_schedule_consume_retryable() { - let (test_frame, mut scheduler_controller) = create_test_frame(1); - let TestFrame { - bank, - mint_keypair, - poh_recorder, - banking_packet_sender, - consume_work_receivers, - finished_consume_work_sender, - .. - } = &test_frame; - - poh_recorder - .write() - .unwrap() - .set_bank_for_test(bank.clone()); - - // Send packet batch to the scheduler - should do nothing until we become the leader. - let tx1 = create_and_fund_prioritized_transfer( - bank, - mint_keypair, - &Keypair::new(), - &Pubkey::new_unique(), - 1, - 1000, - bank.last_blockhash(), - ); - let tx2 = create_and_fund_prioritized_transfer( - bank, - mint_keypair, - &Keypair::new(), - &Pubkey::new_unique(), - 1, - 2000, - bank.last_blockhash(), - ); - let tx1_hash = tx1.message().hash(); - let tx2_hash = tx2.message().hash(); - - let txs = vec![tx1, tx2]; - banking_packet_sender - .send(to_banking_packet_batch(&txs)) - .unwrap(); - - test_receive_then_schedule(&mut scheduler_controller); - let consume_work = consume_work_receivers[0].try_recv().unwrap(); - assert_eq!(consume_work.ids.len(), 2); - assert_eq!(consume_work.transactions.len(), 2); - let message_hashes = consume_work - .transactions - .iter() - .map(|tx| tx.message_hash()) - .collect_vec(); - assert_eq!(message_hashes, vec![&tx2_hash, &tx1_hash]); - - // Complete the batch - marking the second transaction as retryable - finished_consume_work_sender - .send(FinishedConsumeWork { - work: consume_work, - retryable_indexes: vec![1], - }) - .unwrap(); - - // Transaction should be rescheduled - test_receive_then_schedule(&mut scheduler_controller); - let consume_work = consume_work_receivers[0].try_recv().unwrap(); - assert_eq!(consume_work.ids.len(), 1); - assert_eq!(consume_work.transactions.len(), 1); - let message_hashes = consume_work - .transactions - .iter() - .map(|tx| tx.message_hash()) - .collect_vec(); - assert_eq!(message_hashes, vec![&tx1_hash]); - } -} diff --git a/prio-graph-scheduler/src/scheduler_messages.rs b/prio-graph-scheduler/src/scheduler_messages.rs index b5e11be6ba9d78..466fce7b1e7f54 100644 --- a/prio-graph-scheduler/src/scheduler_messages.rs +++ b/prio-graph-scheduler/src/scheduler_messages.rs @@ -1,5 +1,5 @@ use { - solana_core::banking_stage::immutable_deserialized_packet::ImmutableDeserializedPacket, + crate::deserializable_packet::DeserializableTxPacket, solana_sdk::{clock::Slot, transaction::SanitizedTransaction}, std::{fmt::Display, sync::Arc}, }; @@ -59,8 +59,8 @@ pub struct ConsumeWork { /// Message: [Scheduler -> Worker] /// Transactions to be forwarded to the next leader(s) -pub struct ForwardWork { - pub packets: Vec>, +pub struct ForwardWork { + pub packets: Vec>, } /// Message: [Worker -> Scheduler] @@ -72,7 +72,7 @@ pub struct FinishedConsumeWork { /// Message: [Worker -> Scheduler] /// Forwarded transactions. -pub struct FinishedForwardWork { - pub work: ForwardWork, +pub struct FinishedForwardWork { + pub work: ForwardWork

, pub successful: bool, } diff --git a/prio-graph-scheduler/src/transaction_state.rs b/prio-graph-scheduler/src/transaction_state.rs index 9c9d783ab15369..56575beeaf79a4 100644 --- a/prio-graph-scheduler/src/transaction_state.rs +++ b/prio-graph-scheduler/src/transaction_state.rs @@ -1,13 +1,13 @@ use { - solana_core::banking_stage::immutable_deserialized_packet::ImmutableDeserializedPacket, - solana_sdk::{clock::Slot, transaction::SanitizedTransaction}, - std::sync::Arc, + crate::deserializable_packet::DeserializableTxPacket, + solana_sdk::{clock::Slot, transaction::SanitizedTransaction}, + std::sync::Arc, }; /// Simple wrapper type to tie a sanitized transaction to max age slot. pub struct SanitizedTransactionTTL { - pub transaction: SanitizedTransaction, - pub max_age_slot: Slot, + pub transaction: SanitizedTransaction, + pub max_age_slot: Slot, } /// TransactionState is used to track the state of a transaction in the transaction scheduler @@ -30,330 +30,333 @@ pub struct SanitizedTransactionTTL { /// to the appropriate thread for processing. This is done to avoid cloning the /// `SanitizedTransaction`. #[allow(clippy::large_enum_variant)] -pub enum TransactionState { - /// The transaction is available for scheduling. - Unprocessed { - transaction_ttl: SanitizedTransactionTTL, - packet: Arc, - priority: u64, - cost: u64, - should_forward: bool, - }, - /// The transaction is currently scheduled or being processed. - Pending { - packet: Arc, - priority: u64, - cost: u64, - should_forward: bool, - }, - /// Only used during transition. - Transitioning, +pub enum TransactionState { + /// The transaction is available for scheduling. + Unprocessed { + transaction_ttl: SanitizedTransactionTTL, + packet: Arc

, + priority: u64, + cost: u64, + should_forward: bool, + }, + /// The transaction is currently scheduled or being processed. + Pending { + packet: Arc

, + priority: u64, + cost: u64, + should_forward: bool, + }, + /// Only used during transition. + Transitioning, } -impl TransactionState { - /// Creates a new `TransactionState` in the `Unprocessed` state. - pub fn new( - transaction_ttl: SanitizedTransactionTTL, - packet: Arc, - priority: u64, - cost: u64, - ) -> Self { - let should_forward = !packet.original_packet().meta().forwarded() - && packet.original_packet().meta().is_from_staked_node(); - Self::Unprocessed { - transaction_ttl, - packet, - priority, - cost, - should_forward, - } - } +impl TransactionState

{ + /// Creates a new `TransactionState` in the `Unprocessed` state. + pub fn new( + transaction_ttl: SanitizedTransactionTTL, + packet: Arc

, + priority: u64, + cost: u64, + ) -> Self { + let should_forward = !packet.original_packet().meta().forwarded() + && packet.original_packet().meta().is_from_staked_node(); + Self::Unprocessed { + transaction_ttl, + packet, + priority, + cost, + should_forward, + } + } - /// Return the priority of the transaction. - /// This is *not* the same as the `compute_unit_price` of the transaction. - /// The priority is used to order transactions for processing. - pub fn priority(&self) -> u64 { - match self { - Self::Unprocessed { priority, .. } => *priority, - Self::Pending { priority, .. } => *priority, - Self::Transitioning => unreachable!(), - } - } + /// Return the priority of the transaction. + /// This is *not* the same as the `compute_unit_price` of the transaction. + /// The priority is used to order transactions for processing. + pub fn priority(&self) -> u64 { + match self { + Self::Unprocessed { priority, .. } => *priority, + Self::Pending { priority, .. } => *priority, + Self::Transitioning => unreachable!(), + } + } - /// Return the cost of the transaction. - pub fn cost(&self) -> u64 { - match self { - Self::Unprocessed { cost, .. } => *cost, - Self::Pending { cost, .. } => *cost, - Self::Transitioning => unreachable!(), - } - } + /// Return the cost of the transaction. + pub fn cost(&self) -> u64 { + match self { + Self::Unprocessed { cost, .. } => *cost, + Self::Pending { cost, .. } => *cost, + Self::Transitioning => unreachable!(), + } + } - /// Return whether packet should be attempted to be forwarded. - pub fn should_forward(&self) -> bool { - match self { - Self::Unprocessed { - should_forward: forwarded, - .. - } => *forwarded, - Self::Pending { - should_forward: forwarded, - .. - } => *forwarded, - Self::Transitioning => unreachable!(), - } - } + /// Return whether packet should be attempted to be forwarded. + pub fn should_forward(&self) -> bool { + match self { + Self::Unprocessed { + should_forward: forwarded, + .. + } => *forwarded, + Self::Pending { + should_forward: forwarded, + .. + } => *forwarded, + Self::Transitioning => unreachable!(), + } + } - /// Mark the packet as forwarded. - /// This is used to prevent the packet from being forwarded multiple times. - pub fn mark_forwarded(&mut self) { - match self { - Self::Unprocessed { should_forward, .. } => *should_forward = false, - Self::Pending { should_forward, .. } => *should_forward = false, - Self::Transitioning => unreachable!(), - } - } + /// Mark the packet as forwarded. + /// This is used to prevent the packet from being forwarded multiple times. + pub fn mark_forwarded(&mut self) { + match self { + Self::Unprocessed { should_forward, .. } => *should_forward = false, + Self::Pending { should_forward, .. } => *should_forward = false, + Self::Transitioning => unreachable!(), + } + } - /// Return the packet of the transaction. - pub fn packet(&self) -> &Arc { - match self { - Self::Unprocessed { packet, .. } => packet, - Self::Pending { packet, .. } => packet, - Self::Transitioning => unreachable!(), - } - } + /// Return the packet of the transaction. + pub fn packet(&self) -> &Arc

{ + match self { + Self::Unprocessed { packet, .. } => packet, + Self::Pending { packet, .. } => packet, + Self::Transitioning => unreachable!(), + } + } - /// Intended to be called when a transaction is scheduled. This method will - /// transition the transaction from `Unprocessed` to `Pending` and return the - /// `SanitizedTransactionTTL` for processing. - /// - /// # Panics - /// This method will panic if the transaction is already in the `Pending` state, - /// as this is an invalid state transition. - pub fn transition_to_pending(&mut self) -> SanitizedTransactionTTL { - match self.take() { - TransactionState::Unprocessed { - transaction_ttl, - packet, - priority, - cost, - should_forward: forwarded, - } => { - *self = TransactionState::Pending { - packet, - priority, - cost, - should_forward: forwarded, - }; - transaction_ttl - } - TransactionState::Pending { .. } => { - panic!("transaction already pending"); - } - Self::Transitioning => unreachable!(), - } - } + /// Intended to be called when a transaction is scheduled. This method will + /// transition the transaction from `Unprocessed` to `Pending` and return the + /// `SanitizedTransactionTTL` for processing. + /// + /// # Panics + /// This method will panic if the transaction is already in the `Pending` state, + /// as this is an invalid state transition. + pub fn transition_to_pending(&mut self) -> SanitizedTransactionTTL { + match self.take() { + TransactionState::Unprocessed { + transaction_ttl, + packet, + priority, + cost, + should_forward: forwarded, + } => { + *self = TransactionState::Pending { + packet, + priority, + cost, + should_forward: forwarded, + }; + transaction_ttl + } + TransactionState::Pending { .. } => { + panic!("transaction already pending"); + } + Self::Transitioning => unreachable!(), + } + } - /// Intended to be called when a transaction is retried. This method will - /// transition the transaction from `Pending` to `Unprocessed`. - /// - /// # Panics - /// This method will panic if the transaction is already in the `Unprocessed` - /// state, as this is an invalid state transition. - pub fn transition_to_unprocessed(&mut self, transaction_ttl: SanitizedTransactionTTL) { - match self.take() { - TransactionState::Unprocessed { .. } => panic!("already unprocessed"), - TransactionState::Pending { - packet, - priority, - cost, - should_forward: forwarded, - } => { - *self = Self::Unprocessed { - transaction_ttl, - packet, - priority, - cost, - should_forward: forwarded, - } - } - Self::Transitioning => unreachable!(), - } - } + /// Intended to be called when a transaction is retried. This method will + /// transition the transaction from `Pending` to `Unprocessed`. + /// + /// # Panics + /// This method will panic if the transaction is already in the `Unprocessed` + /// state, as this is an invalid state transition. + pub fn transition_to_unprocessed(&mut self, transaction_ttl: SanitizedTransactionTTL) { + match self.take() { + TransactionState::Unprocessed { .. } => panic!("already unprocessed"), + TransactionState::Pending { + packet, + priority, + cost, + should_forward: forwarded, + } => { + *self = Self::Unprocessed { + transaction_ttl, + packet, + priority, + cost, + should_forward: forwarded, + } + } + Self::Transitioning => unreachable!(), + } + } - /// Get a reference to the `SanitizedTransactionTTL` for the transaction. - /// - /// # Panics - /// This method will panic if the transaction is in the `Pending` state. - pub fn transaction_ttl(&self) -> &SanitizedTransactionTTL { - match self { - Self::Unprocessed { - transaction_ttl, .. - } => transaction_ttl, - Self::Pending { .. } => panic!("transaction is pending"), - Self::Transitioning => unreachable!(), - } - } + /// Get a reference to the `SanitizedTransactionTTL` for the transaction. + /// + /// # Panics + /// This method will panic if the transaction is in the `Pending` state. + pub fn transaction_ttl(&self) -> &SanitizedTransactionTTL { + match self { + Self::Unprocessed { + transaction_ttl, .. + } => transaction_ttl, + Self::Pending { .. } => panic!("transaction is pending"), + Self::Transitioning => unreachable!(), + } + } - /// Internal helper to transitioning between states. - /// Replaces `self` with a dummy state that will immediately be overwritten in transition. - fn take(&mut self) -> Self { - core::mem::replace(self, Self::Transitioning) - } + /// Internal helper to transitioning between states. + /// Replaces `self` with a dummy state that will immediately be overwritten in transition. + fn take(&mut self) -> Self { + core::mem::replace(self, Self::Transitioning) + } } #[cfg(test)] mod tests { - use { - super::*, - solana_sdk::{ - compute_budget::ComputeBudgetInstruction, hash::Hash, message::Message, packet::Packet, - signature::Keypair, signer::Signer, system_instruction, transaction::Transaction, - }, - }; + use { + super::*, + solana_core::banking_stage::immutable_deserialized_packet::ImmutableDeserializedPacket, + solana_sdk::{ + compute_budget::ComputeBudgetInstruction, hash::Hash, message::Message, packet::Packet, + signature::Keypair, signer::Signer, system_instruction, transaction::Transaction, + }, + }; - fn create_transaction_state(compute_unit_price: u64) -> TransactionState { - let from_keypair = Keypair::new(); - let ixs = vec![ - system_instruction::transfer( - &from_keypair.pubkey(), - &solana_sdk::pubkey::new_rand(), - 1, - ), - ComputeBudgetInstruction::set_compute_unit_price(compute_unit_price), - ]; - let message = Message::new(&ixs, Some(&from_keypair.pubkey())); - let tx = Transaction::new(&[&from_keypair], message, Hash::default()); + fn create_transaction_state( + compute_unit_price: u64, + ) -> TransactionState { + let from_keypair = Keypair::new(); + let ixs = vec![ + system_instruction::transfer( + &from_keypair.pubkey(), + &solana_sdk::pubkey::new_rand(), + 1, + ), + ComputeBudgetInstruction::set_compute_unit_price(compute_unit_price), + ]; + let message = Message::new(&ixs, Some(&from_keypair.pubkey())); + let tx = Transaction::new(&[&from_keypair], message, Hash::default()); - let packet = Arc::new( - ImmutableDeserializedPacket::new(Packet::from_data(None, tx.clone()).unwrap()).unwrap(), - ); - let transaction_ttl = SanitizedTransactionTTL { - transaction: SanitizedTransaction::from_transaction_for_tests(tx), - max_age_slot: Slot::MAX, - }; - const TEST_TRANSACTION_COST: u64 = 5000; - TransactionState::new( - transaction_ttl, - packet, - compute_unit_price, - TEST_TRANSACTION_COST, - ) - } + let packet = Arc::new( + ImmutableDeserializedPacket::new(Packet::from_data(None, tx.clone()).unwrap()).unwrap(), + ); + let transaction_ttl = SanitizedTransactionTTL { + transaction: SanitizedTransaction::from_transaction_for_tests(tx), + max_age_slot: Slot::MAX, + }; + const TEST_TRANSACTION_COST: u64 = 5000; + TransactionState::new( + transaction_ttl, + packet, + compute_unit_price, + TEST_TRANSACTION_COST, + ) + } - #[test] - #[should_panic(expected = "already pending")] - fn test_transition_to_pending_panic() { - let mut transaction_state = create_transaction_state(0); - transaction_state.transition_to_pending(); - transaction_state.transition_to_pending(); // invalid transition - } + #[test] + #[should_panic(expected = "already pending")] + fn test_transition_to_pending_panic() { + let mut transaction_state = create_transaction_state(0); + transaction_state.transition_to_pending(); + transaction_state.transition_to_pending(); // invalid transition + } - #[test] - fn test_transition_to_pending() { - let mut transaction_state = create_transaction_state(0); - assert!(matches!( - transaction_state, - TransactionState::Unprocessed { .. } - )); - let _ = transaction_state.transition_to_pending(); - assert!(matches!( - transaction_state, - TransactionState::Pending { .. } - )); - } + #[test] + fn test_transition_to_pending() { + let mut transaction_state = create_transaction_state(0); + assert!(matches!( + transaction_state, + TransactionState::Unprocessed { .. } + )); + let _ = transaction_state.transition_to_pending(); + assert!(matches!( + transaction_state, + TransactionState::Pending { .. } + )); + } - #[test] - #[should_panic(expected = "already unprocessed")] - fn test_transition_to_unprocessed_panic() { - let mut transaction_state = create_transaction_state(0); + #[test] + #[should_panic(expected = "already unprocessed")] + fn test_transition_to_unprocessed_panic() { + let mut transaction_state = create_transaction_state(0); - // Manually clone `SanitizedTransactionTTL` - let SanitizedTransactionTTL { - transaction, - max_age_slot, - } = transaction_state.transaction_ttl(); - let transaction_ttl = SanitizedTransactionTTL { - transaction: transaction.clone(), - max_age_slot: *max_age_slot, - }; - transaction_state.transition_to_unprocessed(transaction_ttl); // invalid transition - } + // Manually clone `SanitizedTransactionTTL` + let SanitizedTransactionTTL { + transaction, + max_age_slot, + } = transaction_state.transaction_ttl(); + let transaction_ttl = SanitizedTransactionTTL { + transaction: transaction.clone(), + max_age_slot: *max_age_slot, + }; + transaction_state.transition_to_unprocessed(transaction_ttl); // invalid transition + } - #[test] - fn test_transition_to_unprocessed() { - let mut transaction_state = create_transaction_state(0); - assert!(matches!( - transaction_state, - TransactionState::Unprocessed { .. } - )); - let transaction_ttl = transaction_state.transition_to_pending(); - assert!(matches!( - transaction_state, - TransactionState::Pending { .. } - )); - transaction_state.transition_to_unprocessed(transaction_ttl); - assert!(matches!( - transaction_state, - TransactionState::Unprocessed { .. } - )); - } + #[test] + fn test_transition_to_unprocessed() { + let mut transaction_state = create_transaction_state(0); + assert!(matches!( + transaction_state, + TransactionState::Unprocessed { .. } + )); + let transaction_ttl = transaction_state.transition_to_pending(); + assert!(matches!( + transaction_state, + TransactionState::Pending { .. } + )); + transaction_state.transition_to_unprocessed(transaction_ttl); + assert!(matches!( + transaction_state, + TransactionState::Unprocessed { .. } + )); + } - #[test] - fn test_priority() { - let priority = 15; - let mut transaction_state = create_transaction_state(priority); - assert_eq!(transaction_state.priority(), priority); + #[test] + fn test_priority() { + let priority = 15; + let mut transaction_state = create_transaction_state(priority); + assert_eq!(transaction_state.priority(), priority); - // ensure compute unit price is not lost through state transitions - let transaction_ttl = transaction_state.transition_to_pending(); - assert_eq!(transaction_state.priority(), priority); - transaction_state.transition_to_unprocessed(transaction_ttl); - assert_eq!(transaction_state.priority(), priority); - } + // ensure compute unit price is not lost through state transitions + let transaction_ttl = transaction_state.transition_to_pending(); + assert_eq!(transaction_state.priority(), priority); + transaction_state.transition_to_unprocessed(transaction_ttl); + assert_eq!(transaction_state.priority(), priority); + } - #[test] - #[should_panic(expected = "transaction is pending")] - fn test_transaction_ttl_panic() { - let mut transaction_state = create_transaction_state(0); - let transaction_ttl = transaction_state.transaction_ttl(); - assert!(matches!( - transaction_state, - TransactionState::Unprocessed { .. } - )); - assert_eq!(transaction_ttl.max_age_slot, Slot::MAX); + #[test] + #[should_panic(expected = "transaction is pending")] + fn test_transaction_ttl_panic() { + let mut transaction_state = create_transaction_state(0); + let transaction_ttl = transaction_state.transaction_ttl(); + assert!(matches!( + transaction_state, + TransactionState::Unprocessed { .. } + )); + assert_eq!(transaction_ttl.max_age_slot, Slot::MAX); - let _ = transaction_state.transition_to_pending(); - assert!(matches!( - transaction_state, - TransactionState::Pending { .. } - )); - let _ = transaction_state.transaction_ttl(); // pending state, the transaction ttl is not available - } + let _ = transaction_state.transition_to_pending(); + assert!(matches!( + transaction_state, + TransactionState::Pending { .. } + )); + let _ = transaction_state.transaction_ttl(); // pending state, the transaction ttl is not available + } - #[test] - fn test_transaction_ttl() { - let mut transaction_state = create_transaction_state(0); - let transaction_ttl = transaction_state.transaction_ttl(); - assert!(matches!( - transaction_state, - TransactionState::Unprocessed { .. } - )); - assert_eq!(transaction_ttl.max_age_slot, Slot::MAX); + #[test] + fn test_transaction_ttl() { + let mut transaction_state = create_transaction_state(0); + let transaction_ttl = transaction_state.transaction_ttl(); + assert!(matches!( + transaction_state, + TransactionState::Unprocessed { .. } + )); + assert_eq!(transaction_ttl.max_age_slot, Slot::MAX); - // ensure transaction_ttl is not lost through state transitions - let transaction_ttl = transaction_state.transition_to_pending(); - assert!(matches!( - transaction_state, - TransactionState::Pending { .. } - )); + // ensure transaction_ttl is not lost through state transitions + let transaction_ttl = transaction_state.transition_to_pending(); + assert!(matches!( + transaction_state, + TransactionState::Pending { .. } + )); - transaction_state.transition_to_unprocessed(transaction_ttl); - let transaction_ttl = transaction_state.transaction_ttl(); - assert!(matches!( - transaction_state, - TransactionState::Unprocessed { .. } - )); - assert_eq!(transaction_ttl.max_age_slot, Slot::MAX); - } + transaction_state.transition_to_unprocessed(transaction_ttl); + let transaction_ttl = transaction_state.transaction_ttl(); + assert!(matches!( + transaction_state, + TransactionState::Unprocessed { .. } + )); + assert_eq!(transaction_ttl.max_age_slot, Slot::MAX); + } } diff --git a/prio-graph-scheduler/src/transaction_state_container.rs b/prio-graph-scheduler/src/transaction_state_container.rs index 8e2a51f5f7bb37..c162034f6f890d 100644 --- a/prio-graph-scheduler/src/transaction_state_container.rs +++ b/prio-graph-scheduler/src/transaction_state_container.rs @@ -3,10 +3,9 @@ use { transaction_priority_id::TransactionPriorityId, transaction_state::{SanitizedTransactionTTL, TransactionState}, }, - crate::scheduler_messages::TransactionId, + crate::{deserializable_packet::DeserializableTxPacket, scheduler_messages::TransactionId}, itertools::MinMaxResult, min_max_heap::MinMaxHeap, - solana_core::banking_stage::immutable_deserialized_packet::ImmutableDeserializedPacket, std::{collections::HashMap, sync::Arc}, }; @@ -35,12 +34,12 @@ use { /// /// The container maintains a fixed capacity. If the queue is full when pushing /// a new transaction, the lowest priority transaction will be dropped. -pub struct TransactionStateContainer { +pub struct TransactionStateContainer { priority_queue: MinMaxHeap, - id_to_transaction_state: HashMap, + id_to_transaction_state: HashMap>, } -impl TransactionStateContainer { +impl TransactionStateContainer

{ pub fn with_capacity(capacity: usize) -> Self { Self { priority_queue: MinMaxHeap::with_capacity(capacity), @@ -67,16 +66,13 @@ impl TransactionStateContainer { pub fn get_mut_transaction_state( &mut self, id: &TransactionId, - ) -> Option<&mut TransactionState> { + ) -> Option<&mut TransactionState

> { self.id_to_transaction_state.get_mut(id) } /// Get reference to `SanitizedTransactionTTL` by id. /// Panics if the transaction does not exist. - pub fn get_transaction_ttl( - &self, - id: &TransactionId, - ) -> Option<&SanitizedTransactionTTL> { + pub fn get_transaction_ttl(&self, id: &TransactionId) -> Option<&SanitizedTransactionTTL> { self.id_to_transaction_state .get(id) .map(|state| state.transaction_ttl()) @@ -88,7 +84,7 @@ impl TransactionStateContainer { &mut self, transaction_id: TransactionId, transaction_ttl: SanitizedTransactionTTL, - packet: Arc, + packet: Arc

, priority: u64, cost: u64, ) -> bool { @@ -150,8 +146,7 @@ impl TransactionStateContainer { #[cfg(test)] mod tests { use { - super::*, - solana_sdk::{ + super::*, solana_core::banking_stage::immutable_deserialized_packet::ImmutableDeserializedPacket, solana_sdk::{ compute_budget::ComputeBudgetInstruction, hash::Hash, message::Message, @@ -161,7 +156,7 @@ mod tests { slot_history::Slot, system_instruction, transaction::{SanitizedTransaction, Transaction}, - }, + } }; /// Returns (transaction_ttl, priority, cost) @@ -202,7 +197,10 @@ mod tests { (transaction_ttl, packet, priority, TEST_TRANSACTION_COST) } - fn push_to_container(container: &mut TransactionStateContainer, num: usize) { + fn push_to_container( + container: &mut TransactionStateContainer, + num: usize, + ) { for id in 0..num as u64 { let priority = id; let (transaction_ttl, packet, priority, cost) = test_transaction(priority);