Skip to content

Commit

Permalink
Make CostTracker aware of inflight transactions (#437)
Browse files Browse the repository at this point in the history
When a leader is packing a Bank, transactions costs are added to the
CostTracker and then later updated or removed, depending on if the
tx is committed. However, it is possible for a Bank to be frozen while
there are several tx's in flight.

CostUpdateService submits a metric with cost information almost
immediately after a Bank has been frozen. The result is that we have
observed cost details being submitted before some cost removals take
place, which causes a massive over-reporting of the block cost
compared to actual.

This PR adds a field to track the number of transactions that are
inflight, and adds a simple mechanism to try to allow that value to
settle to zero before submitting the datapoint. The number of inflight
tx's is submitted with the datapoint, so even if the value does not
settle to zero, we can still detect this case and know the metric is
tainted.

Co-authored-by: Andrew Fitzgerald <[email protected]>
  • Loading branch information
steviez and apfitzge authored Mar 29, 2024
1 parent d140cdb commit 9076348
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 1 deletion.
7 changes: 7 additions & 0 deletions core/src/banking_stage/qos_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ impl QosService {
}
})
.collect();
cost_tracker.add_transactions_in_flight(num_included);

cost_tracking_time.stop();
self.metrics
Expand Down Expand Up @@ -167,17 +168,20 @@ impl QosService {
bank: &Bank,
) {
let mut cost_tracker = bank.write_cost_tracker().unwrap();
let mut num_included = 0;
transaction_cost_results
.zip(transaction_committed_status)
.for_each(|(tx_cost, transaction_committed_details)| {
// Only transactions that the qos service included have to be
// checked for update
if let Ok(tx_cost) = tx_cost {
num_included += 1;
if *transaction_committed_details == CommitTransactionDetails::NotCommitted {
cost_tracker.remove(tx_cost)
}
}
});
cost_tracker.sub_transactions_in_flight(num_included);
}

fn update_committed_transaction_costs<'a>(
Expand Down Expand Up @@ -206,13 +210,16 @@ impl QosService {
bank: &Bank,
) {
let mut cost_tracker = bank.write_cost_tracker().unwrap();
let mut num_included = 0;
transaction_cost_results.for_each(|tx_cost| {
// Only transactions that the qos service included have to be
// removed
if let Ok(tx_cost) = tx_cost {
num_included += 1;
cost_tracker.remove(tx_cost);
}
});
cost_tracker.sub_transactions_in_flight(num_included);
}

// metrics are reported by bank slot
Expand Down
30 changes: 29 additions & 1 deletion core/src/cost_update_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use {
std::{
sync::Arc,
thread::{self, Builder, JoinHandle},
time::Duration,
},
};
pub enum CostUpdate {
Expand All @@ -19,6 +20,12 @@ pub struct CostUpdateService {
thread_hdl: JoinHandle<()>,
}

// The maximum number of retries to check if CostTracker::in_flight_transaction_count() has settled
// to zero. Bail out after this many retries; the in-flight count is reported so this is ok
const MAX_LOOP_COUNT: usize = 25;
// Throttle checking the count to avoid excessive polling
const LOOP_LIMITER: Duration = Duration::from_millis(10);

impl CostUpdateService {
pub fn new(blockstore: Arc<Blockstore>, cost_update_receiver: CostUpdateReceiver) -> Self {
let thread_hdl = Builder::new()
Expand All @@ -39,7 +46,28 @@ impl CostUpdateService {
for cost_update in cost_update_receiver.iter() {
match cost_update {
CostUpdate::FrozenBank { bank } => {
bank.read_cost_tracker().unwrap().report_stats(bank.slot());
for loop_count in 1..=MAX_LOOP_COUNT {
{
// Release the lock so that the thread that will
// update the count is able to obtain a write lock
//
// Use inner scope to avoid sleeping with the lock
let cost_tracker = bank.read_cost_tracker().unwrap();
let in_flight_transaction_count =
cost_tracker.in_flight_transaction_count();

if in_flight_transaction_count == 0 || loop_count == MAX_LOOP_COUNT {
let slot = bank.slot();
trace!(
"inflight transaction count is {in_flight_transaction_count} \
for slot {slot} after {loop_count} iteration(s)"
);
cost_tracker.report_stats(slot);
break;
}
}
std::thread::sleep(LOOP_LIMITER);
}
}
}
}
Expand Down
27 changes: 27 additions & 0 deletions cost-model/src/cost_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ pub struct CostTracker {
transaction_signature_count: u64,
secp256k1_instruction_signature_count: u64,
ed25519_instruction_signature_count: u64,
/// The number of transactions that have had their estimated cost added to
/// the tracker, but are still waiting for an update with actual usage or
/// removal if the transaction does not end up getting committed.
in_flight_transaction_count: usize,
}

impl Default for CostTracker {
Expand All @@ -83,6 +87,7 @@ impl Default for CostTracker {
transaction_signature_count: 0,
secp256k1_instruction_signature_count: 0,
ed25519_instruction_signature_count: 0,
in_flight_transaction_count: 0,
}
}
}
Expand All @@ -100,6 +105,23 @@ impl CostTracker {
self.vote_cost_limit = vote_cost_limit;
}

pub fn in_flight_transaction_count(&self) -> usize {
self.in_flight_transaction_count
}

pub fn add_transactions_in_flight(&mut self, in_flight_transaction_count: usize) {
saturating_add_assign!(
self.in_flight_transaction_count,
in_flight_transaction_count
);
}

pub fn sub_transactions_in_flight(&mut self, in_flight_transaction_count: usize) {
self.in_flight_transaction_count = self
.in_flight_transaction_count
.saturating_sub(in_flight_transaction_count);
}

pub fn try_add(&mut self, tx_cost: &TransactionCost) -> Result<u64, CostTrackerError> {
self.would_fit(tx_cost)?;
self.add_transaction_cost(tx_cost);
Expand Down Expand Up @@ -174,6 +196,11 @@ impl CostTracker {
self.ed25519_instruction_signature_count,
i64
),
(
"inflight_transaction_count",
self.in_flight_transaction_count,
i64
),
);
}

Expand Down

0 comments on commit 9076348

Please sign in to comment.