diff --git a/core/src/banking_stage/qos_service.rs b/core/src/banking_stage/qos_service.rs index 8c1507ae3f..c9e1d98d64 100644 --- a/core/src/banking_stage/qos_service.rs +++ b/core/src/banking_stage/qos_service.rs @@ -120,6 +120,7 @@ impl QosService { } }) .collect(); + cost_tracker.add_transactions_in_flight(num_included); cost_tracking_time.stop(); self.metrics @@ -167,17 +168,20 @@ impl QosService { bank: &Bank, ) { let mut cost_tracker = bank.write_cost_tracker().unwrap(); + let mut num_included = 0; transaction_cost_results .zip(transaction_committed_status) .for_each(|(tx_cost, transaction_committed_details)| { // Only transactions that the qos service included have to be // checked for update if let Ok(tx_cost) = tx_cost { + num_included += 1; if *transaction_committed_details == CommitTransactionDetails::NotCommitted { cost_tracker.remove(tx_cost) } } }); + cost_tracker.sub_transactions_in_flight(num_included); } fn update_committed_transaction_costs<'a>( @@ -206,13 +210,16 @@ impl QosService { bank: &Bank, ) { let mut cost_tracker = bank.write_cost_tracker().unwrap(); + let mut num_included = 0; transaction_cost_results.for_each(|tx_cost| { // Only transactions that the qos service included have to be // removed if let Ok(tx_cost) = tx_cost { + num_included += 1; cost_tracker.remove(tx_cost); } }); + cost_tracker.sub_transactions_in_flight(num_included); } // metrics are reported by bank slot diff --git a/core/src/cost_update_service.rs b/core/src/cost_update_service.rs index 6b49c8fdf1..58ef6c48ed 100644 --- a/core/src/cost_update_service.rs +++ b/core/src/cost_update_service.rs @@ -7,6 +7,7 @@ use { std::{ sync::Arc, thread::{self, Builder, JoinHandle}, + time::Duration, }, }; pub enum CostUpdate { @@ -19,6 +20,12 @@ pub struct CostUpdateService { thread_hdl: JoinHandle<()>, } +// The maximum number of retries to check if CostTracker::in_flight_transaction_count() has settled +// to zero. Bail out after this many retries; the in-flight count is reported so this is ok +const MAX_LOOP_COUNT: usize = 25; +// Throttle checking the count to avoid excessive polling +const LOOP_LIMITER: Duration = Duration::from_millis(10); + impl CostUpdateService { pub fn new(blockstore: Arc, cost_update_receiver: CostUpdateReceiver) -> Self { let thread_hdl = Builder::new() @@ -39,7 +46,28 @@ impl CostUpdateService { for cost_update in cost_update_receiver.iter() { match cost_update { CostUpdate::FrozenBank { bank } => { - bank.read_cost_tracker().unwrap().report_stats(bank.slot()); + for loop_count in 1..=MAX_LOOP_COUNT { + { + // Release the lock so that the thread that will + // update the count is able to obtain a write lock + // + // Use inner scope to avoid sleeping with the lock + let cost_tracker = bank.read_cost_tracker().unwrap(); + let in_flight_transaction_count = + cost_tracker.in_flight_transaction_count(); + + if in_flight_transaction_count == 0 || loop_count == MAX_LOOP_COUNT { + let slot = bank.slot(); + trace!( + "inflight transaction count is {in_flight_transaction_count} \ + for slot {slot} after {loop_count} iteration(s)" + ); + cost_tracker.report_stats(slot); + break; + } + } + std::thread::sleep(LOOP_LIMITER); + } } } } diff --git a/cost-model/src/cost_tracker.rs b/cost-model/src/cost_tracker.rs index b5e3f9f493..64185edb6c 100644 --- a/cost-model/src/cost_tracker.rs +++ b/cost-model/src/cost_tracker.rs @@ -61,6 +61,10 @@ pub struct CostTracker { transaction_signature_count: u64, secp256k1_instruction_signature_count: u64, ed25519_instruction_signature_count: u64, + /// The number of transactions that have had their estimated cost added to + /// the tracker, but are still waiting for an update with actual usage or + /// removal if the transaction does not end up getting committed. + in_flight_transaction_count: usize, } impl Default for CostTracker { @@ -83,6 +87,7 @@ impl Default for CostTracker { transaction_signature_count: 0, secp256k1_instruction_signature_count: 0, ed25519_instruction_signature_count: 0, + in_flight_transaction_count: 0, } } } @@ -100,6 +105,23 @@ impl CostTracker { self.vote_cost_limit = vote_cost_limit; } + pub fn in_flight_transaction_count(&self) -> usize { + self.in_flight_transaction_count + } + + pub fn add_transactions_in_flight(&mut self, in_flight_transaction_count: usize) { + saturating_add_assign!( + self.in_flight_transaction_count, + in_flight_transaction_count + ); + } + + pub fn sub_transactions_in_flight(&mut self, in_flight_transaction_count: usize) { + self.in_flight_transaction_count = self + .in_flight_transaction_count + .saturating_sub(in_flight_transaction_count); + } + pub fn try_add(&mut self, tx_cost: &TransactionCost) -> Result { self.would_fit(tx_cost)?; self.add_transaction_cost(tx_cost); @@ -174,6 +196,11 @@ impl CostTracker { self.ed25519_instruction_signature_count, i64 ), + ( + "inflight_transaction_count", + self.in_flight_transaction_count, + i64 + ), ); }