From 9076348ef40e28975796c2ff25460bf3b25d41e4 Mon Sep 17 00:00:00 2001 From: steviez Date: Fri, 29 Mar 2024 07:34:12 -0500 Subject: [PATCH] Make CostTracker aware of inflight transactions (#437) When a leader is packing a Bank, transactions costs are added to the CostTracker and then later updated or removed, depending on if the tx is committed. However, it is possible for a Bank to be frozen while there are several tx's in flight. CostUpdateService submits a metric with cost information almost immediately after a Bank has been frozen. The result is that we have observed cost details being submitted before some cost removals take place, which causes a massive over-reporting of the block cost compared to actual. This PR adds a field to track the number of transactions that are inflight, and adds a simple mechanism to try to allow that value to settle to zero before submitting the datapoint. The number of inflight tx's is submitted with the datapoint, so even if the value does not settle to zero, we can still detect this case and know the metric is tainted. Co-authored-by: Andrew Fitzgerald --- core/src/banking_stage/qos_service.rs | 7 +++++++ core/src/cost_update_service.rs | 30 ++++++++++++++++++++++++++- cost-model/src/cost_tracker.rs | 27 ++++++++++++++++++++++++ 3 files changed, 63 insertions(+), 1 deletion(-) diff --git a/core/src/banking_stage/qos_service.rs b/core/src/banking_stage/qos_service.rs index 8c1507ae3f..c9e1d98d64 100644 --- a/core/src/banking_stage/qos_service.rs +++ b/core/src/banking_stage/qos_service.rs @@ -120,6 +120,7 @@ impl QosService { } }) .collect(); + cost_tracker.add_transactions_in_flight(num_included); cost_tracking_time.stop(); self.metrics @@ -167,17 +168,20 @@ impl QosService { bank: &Bank, ) { let mut cost_tracker = bank.write_cost_tracker().unwrap(); + let mut num_included = 0; transaction_cost_results .zip(transaction_committed_status) .for_each(|(tx_cost, transaction_committed_details)| { // Only transactions that the qos service included have to be // checked for update if let Ok(tx_cost) = tx_cost { + num_included += 1; if *transaction_committed_details == CommitTransactionDetails::NotCommitted { cost_tracker.remove(tx_cost) } } }); + cost_tracker.sub_transactions_in_flight(num_included); } fn update_committed_transaction_costs<'a>( @@ -206,13 +210,16 @@ impl QosService { bank: &Bank, ) { let mut cost_tracker = bank.write_cost_tracker().unwrap(); + let mut num_included = 0; transaction_cost_results.for_each(|tx_cost| { // Only transactions that the qos service included have to be // removed if let Ok(tx_cost) = tx_cost { + num_included += 1; cost_tracker.remove(tx_cost); } }); + cost_tracker.sub_transactions_in_flight(num_included); } // metrics are reported by bank slot diff --git a/core/src/cost_update_service.rs b/core/src/cost_update_service.rs index 6b49c8fdf1..58ef6c48ed 100644 --- a/core/src/cost_update_service.rs +++ b/core/src/cost_update_service.rs @@ -7,6 +7,7 @@ use { std::{ sync::Arc, thread::{self, Builder, JoinHandle}, + time::Duration, }, }; pub enum CostUpdate { @@ -19,6 +20,12 @@ pub struct CostUpdateService { thread_hdl: JoinHandle<()>, } +// The maximum number of retries to check if CostTracker::in_flight_transaction_count() has settled +// to zero. Bail out after this many retries; the in-flight count is reported so this is ok +const MAX_LOOP_COUNT: usize = 25; +// Throttle checking the count to avoid excessive polling +const LOOP_LIMITER: Duration = Duration::from_millis(10); + impl CostUpdateService { pub fn new(blockstore: Arc, cost_update_receiver: CostUpdateReceiver) -> Self { let thread_hdl = Builder::new() @@ -39,7 +46,28 @@ impl CostUpdateService { for cost_update in cost_update_receiver.iter() { match cost_update { CostUpdate::FrozenBank { bank } => { - bank.read_cost_tracker().unwrap().report_stats(bank.slot()); + for loop_count in 1..=MAX_LOOP_COUNT { + { + // Release the lock so that the thread that will + // update the count is able to obtain a write lock + // + // Use inner scope to avoid sleeping with the lock + let cost_tracker = bank.read_cost_tracker().unwrap(); + let in_flight_transaction_count = + cost_tracker.in_flight_transaction_count(); + + if in_flight_transaction_count == 0 || loop_count == MAX_LOOP_COUNT { + let slot = bank.slot(); + trace!( + "inflight transaction count is {in_flight_transaction_count} \ + for slot {slot} after {loop_count} iteration(s)" + ); + cost_tracker.report_stats(slot); + break; + } + } + std::thread::sleep(LOOP_LIMITER); + } } } } diff --git a/cost-model/src/cost_tracker.rs b/cost-model/src/cost_tracker.rs index b5e3f9f493..64185edb6c 100644 --- a/cost-model/src/cost_tracker.rs +++ b/cost-model/src/cost_tracker.rs @@ -61,6 +61,10 @@ pub struct CostTracker { transaction_signature_count: u64, secp256k1_instruction_signature_count: u64, ed25519_instruction_signature_count: u64, + /// The number of transactions that have had their estimated cost added to + /// the tracker, but are still waiting for an update with actual usage or + /// removal if the transaction does not end up getting committed. + in_flight_transaction_count: usize, } impl Default for CostTracker { @@ -83,6 +87,7 @@ impl Default for CostTracker { transaction_signature_count: 0, secp256k1_instruction_signature_count: 0, ed25519_instruction_signature_count: 0, + in_flight_transaction_count: 0, } } } @@ -100,6 +105,23 @@ impl CostTracker { self.vote_cost_limit = vote_cost_limit; } + pub fn in_flight_transaction_count(&self) -> usize { + self.in_flight_transaction_count + } + + pub fn add_transactions_in_flight(&mut self, in_flight_transaction_count: usize) { + saturating_add_assign!( + self.in_flight_transaction_count, + in_flight_transaction_count + ); + } + + pub fn sub_transactions_in_flight(&mut self, in_flight_transaction_count: usize) { + self.in_flight_transaction_count = self + .in_flight_transaction_count + .saturating_sub(in_flight_transaction_count); + } + pub fn try_add(&mut self, tx_cost: &TransactionCost) -> Result { self.would_fit(tx_cost)?; self.add_transaction_cost(tx_cost); @@ -174,6 +196,11 @@ impl CostTracker { self.ed25519_instruction_signature_count, i64 ), + ( + "inflight_transaction_count", + self.in_flight_transaction_count, + i64 + ), ); }