Skip to content

Commit

Permalink
Use std::num::Saturating over saturating_add_assign!() (#523)
Browse files Browse the repository at this point in the history
std::num::Saturating allows us to create integers that will override
the standard arithmetic operators to use saturating math. This removes
the need for a custom macro as well as reduces mental load as someone
only needs to remember that they want saturating math once.

This PR introduces std::num::Saturating integers to replace all
use of saturating_add_assign!() in the accounts-db crate
  • Loading branch information
steviez authored Apr 4, 2024
1 parent 527cad2 commit a088364
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 95 deletions.
93 changes: 40 additions & 53 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ use {
hash::Hash,
pubkey::Pubkey,
rent_collector::RentCollector,
saturating_add_assign,
timing::AtomicInterval,
transaction::SanitizedTransaction,
},
Expand All @@ -100,6 +99,7 @@ use {
fs,
hash::{Hash as StdHash, Hasher as StdHasher},
io::Result as IoResult,
num::Saturating,
ops::{Range, RangeBounds},
path::{Path, PathBuf},
sync::{
Expand Down Expand Up @@ -1742,21 +1742,21 @@ impl SplitAncientStorages {

#[derive(Debug, Default)]
struct FlushStats {
num_flushed: usize,
num_purged: usize,
total_size: u64,
num_flushed: Saturating<usize>,
num_purged: Saturating<usize>,
total_size: Saturating<u64>,
store_accounts_timing: StoreAccountsTiming,
store_accounts_total_us: u64,
store_accounts_total_us: Saturating<u64>,
}

impl FlushStats {
fn accumulate(&mut self, other: &Self) {
saturating_add_assign!(self.num_flushed, other.num_flushed);
saturating_add_assign!(self.num_purged, other.num_purged);
saturating_add_assign!(self.total_size, other.total_size);
self.num_flushed += other.num_flushed;
self.num_purged += other.num_purged;
self.total_size += other.total_size;
self.store_accounts_timing
.accumulate(&other.store_accounts_timing);
saturating_add_assign!(self.store_accounts_total_us, other.store_accounts_total_us);
self.store_accounts_total_us += other.store_accounts_total_us;
}
}

Expand Down Expand Up @@ -1884,26 +1884,20 @@ pub(crate) struct ShrinkAncientStats {
#[derive(Debug, Default)]
pub(crate) struct ShrinkStatsSub {
pub(crate) store_accounts_timing: StoreAccountsTiming,
pub(crate) rewrite_elapsed_us: u64,
pub(crate) create_and_insert_store_elapsed_us: u64,
pub(crate) unpackable_slots_count: usize,
pub(crate) newest_alive_packed_count: usize,
pub(crate) rewrite_elapsed_us: Saturating<u64>,
pub(crate) create_and_insert_store_elapsed_us: Saturating<u64>,
pub(crate) unpackable_slots_count: Saturating<usize>,
pub(crate) newest_alive_packed_count: Saturating<usize>,
}

impl ShrinkStatsSub {
pub(crate) fn accumulate(&mut self, other: &Self) {
self.store_accounts_timing
.accumulate(&other.store_accounts_timing);
saturating_add_assign!(self.rewrite_elapsed_us, other.rewrite_elapsed_us);
saturating_add_assign!(
self.create_and_insert_store_elapsed_us,
other.create_and_insert_store_elapsed_us
);
saturating_add_assign!(self.unpackable_slots_count, other.unpackable_slots_count);
saturating_add_assign!(
self.newest_alive_packed_count,
other.newest_alive_packed_count
);
self.rewrite_elapsed_us += other.rewrite_elapsed_us;
self.create_and_insert_store_elapsed_us += other.create_and_insert_store_elapsed_us;
self.unpackable_slots_count += other.unpackable_slots_count;
self.newest_alive_packed_count += other.newest_alive_packed_count;
}
}
#[derive(Debug, Default)]
Expand Down Expand Up @@ -3979,7 +3973,7 @@ impl AccountsDb {
let (shrink_in_progress, time_us) = measure_us!(
self.get_store_for_shrink(slot, shrink_collect.alive_total_bytes as u64)
);
stats_sub.create_and_insert_store_elapsed_us = time_us;
stats_sub.create_and_insert_store_elapsed_us = Saturating(time_us);

// here, we're writing back alive_accounts. That should be an atomic operation
// without use of rather wide locks in this whole function, because we're
Expand All @@ -3992,7 +3986,7 @@ impl AccountsDb {
);

rewrite_elapsed.stop();
stats_sub.rewrite_elapsed_us = rewrite_elapsed.as_us();
stats_sub.rewrite_elapsed_us = Saturating(rewrite_elapsed.as_us());

// `store_accounts_frozen()` above may have purged accounts from some
// other storage entries (the ones that were just overwritten by this
Expand Down Expand Up @@ -4024,7 +4018,7 @@ impl AccountsDb {
.fetch_add(1, Ordering::Relaxed);
}
shrink_stats.create_and_insert_store_elapsed.fetch_add(
stats_sub.create_and_insert_store_elapsed_us,
stats_sub.create_and_insert_store_elapsed_us.0,
Ordering::Relaxed,
);
shrink_stats.store_accounts_elapsed.fetch_add(
Expand All @@ -4041,12 +4035,12 @@ impl AccountsDb {
);
shrink_stats
.rewrite_elapsed
.fetch_add(stats_sub.rewrite_elapsed_us, Ordering::Relaxed);
.fetch_add(stats_sub.rewrite_elapsed_us.0, Ordering::Relaxed);
shrink_stats
.unpackable_slots_count
.fetch_add(stats_sub.unpackable_slots_count as u64, Ordering::Relaxed);
.fetch_add(stats_sub.unpackable_slots_count.0 as u64, Ordering::Relaxed);
shrink_stats.newest_alive_packed_count.fetch_add(
stats_sub.newest_alive_packed_count as u64,
stats_sub.newest_alive_packed_count.0 as u64,
Ordering::Relaxed,
);
}
Expand Down Expand Up @@ -4454,7 +4448,8 @@ impl AccountsDb {
let (mut shrink_in_progress, create_and_insert_store_elapsed_us) = measure_us!(
current_ancient.create_if_necessary(slot, self, shrink_collect.alive_total_bytes)
);
stats_sub.create_and_insert_store_elapsed_us = create_and_insert_store_elapsed_us;
stats_sub.create_and_insert_store_elapsed_us =
Saturating(create_and_insert_store_elapsed_us);
let available_bytes = current_ancient.accounts_file().accounts.remaining_bytes();
// split accounts in 'slot' into:
// 'Primary', which can fit in 'current_ancient'
Expand Down Expand Up @@ -4516,7 +4511,7 @@ impl AccountsDb {
}
assert_eq!(bytes_remaining_to_write, 0);
rewrite_elapsed.stop();
stats_sub.rewrite_elapsed_us = rewrite_elapsed.as_us();
stats_sub.rewrite_elapsed_us = Saturating(rewrite_elapsed.as_us());

if slot != current_ancient.slot() {
// all append vecs in this slot have been combined into an ancient append vec
Expand Down Expand Up @@ -6116,9 +6111,9 @@ impl AccountsDb {
});
datapoint_info!(
"accounts_db-flush_accounts_cache_aggressively",
("num_flushed", flush_stats.num_flushed, i64),
("num_purged", flush_stats.num_purged, i64),
("total_flush_size", flush_stats.total_size, i64),
("num_flushed", flush_stats.num_flushed.0, i64),
("num_purged", flush_stats.num_purged.0, i64),
("total_flush_size", flush_stats.total_size.0, i64),
("total_cache_size", self.accounts_cache.size(), i64),
("total_frozen_slots", excess_slot_count, i64),
("total_slots", self.accounts_cache.num_slots(), i64),
Expand Down Expand Up @@ -6146,7 +6141,7 @@ impl AccountsDb {
("num_accounts_saved", num_accounts_saved, i64),
(
"store_accounts_total_us",
flush_stats.store_accounts_total_us,
flush_stats.store_accounts_total_us.0,
i64
),
(
Expand Down Expand Up @@ -6235,9 +6230,7 @@ impl AccountsDb {
mut should_flush_f: Option<&mut impl FnMut(&Pubkey, &AccountSharedData) -> bool>,
max_clean_root: Option<Slot>,
) -> FlushStats {
let mut num_purged = 0;
let mut total_size = 0;
let mut num_flushed = 0;
let mut flush_stats = FlushStats::default();
let iter_items: Vec<_> = slot_cache.iter().collect();
let mut purged_slot_pubkeys: HashSet<(Slot, Pubkey)> = HashSet::new();
let mut pubkey_to_slot_set: Vec<(Pubkey, Slot)> = vec![];
Expand All @@ -6263,15 +6256,15 @@ impl AccountsDb {
.unwrap_or(true);
if should_flush {
let hash = iter_item.value().hash();
total_size += aligned_stored_size(account.data().len()) as u64;
num_flushed += 1;
flush_stats.total_size += aligned_stored_size(account.data().len()) as u64;
flush_stats.num_flushed += 1;
Some(((key, account), hash))
} else {
// If we don't flush, we have to remove the entry from the
// index, since it's equivalent to purging
purged_slot_pubkeys.insert((slot, *key));
pubkey_to_slot_set.push((*key, slot));
num_purged += 1;
flush_stats.num_purged += 1;
None
}
})
Expand All @@ -6288,22 +6281,21 @@ impl AccountsDb {
&HashSet::default(),
);

let mut store_accounts_timing = StoreAccountsTiming::default();
let mut store_accounts_total_us = 0;
if !is_dead_slot {
// This ensures that all updates are written to an AppendVec, before any
// updates to the index happen, so anybody that sees a real entry in the index,
// will be able to find the account in storage
let flushed_store = self.create_and_insert_store(slot, total_size, "flush_slot_cache");
let flushed_store =
self.create_and_insert_store(slot, flush_stats.total_size.0, "flush_slot_cache");
let (store_accounts_timing_inner, store_accounts_total_inner_us) = measure_us!(self
.store_accounts_frozen(
(slot, &accounts[..]),
Some(hashes),
&flushed_store,
StoreReclaims::Default,
));
store_accounts_timing = store_accounts_timing_inner;
store_accounts_total_us = store_accounts_total_inner_us;
flush_stats.store_accounts_timing = store_accounts_timing_inner;
flush_stats.store_accounts_total_us = Saturating(store_accounts_total_inner_us);

// If the above sizing function is correct, just one AppendVec is enough to hold
// all the data for the slot
Expand All @@ -6315,13 +6307,8 @@ impl AccountsDb {
// There is some racy condition for existing readers who just has read exactly while
// flushing. That case is handled by retry_to_get_account_accessor()
assert!(self.accounts_cache.remove_slot(slot).is_some());
FlushStats {
num_flushed,
num_purged,
total_size,
store_accounts_timing,
store_accounts_total_us,
}

flush_stats
}

/// flush all accounts in this slot
Expand Down
Loading

0 comments on commit a088364

Please sign in to comment.