Skip to content

Commit

Permalink
v2.0: Marks old storages as dirty and uncleaned in clean_accounts() (…
Browse files Browse the repository at this point in the history
…backport of #3737) (#3747)

* Marks old storages as dirty and uncleaned in clean_accounts() (#3737)

(cherry picked from commit 31742ca)

# Conflicts:
#	accounts-db/src/accounts_db.rs
#	accounts-db/src/accounts_db/tests.rs
#	runtime/src/bank.rs

* fixup merge conflict

---------

Co-authored-by: Brooks <[email protected]>
  • Loading branch information
mergify[bot] and brooksprumo authored Nov 25, 2024
1 parent ef8a9f5 commit f77014d
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 58 deletions.
180 changes: 130 additions & 50 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3050,11 +3050,30 @@ impl AccountsDb {
last_full_snapshot_slot: Option<Slot>,
timings: &mut CleanKeyTimings,
epoch_schedule: &EpochSchedule,
old_storages_policy: OldStoragesPolicy,
) -> (Vec<Pubkey>, Option<Slot>) {
let oldest_non_ancient_slot = self.get_oldest_non_ancient_slot(epoch_schedule);
let mut dirty_store_processing_time = Measure::start("dirty_store_processing");
let max_slot_inclusive =
max_clean_root_inclusive.unwrap_or_else(|| self.accounts_index.max_root_inclusive());
let max_root_inclusive = self.accounts_index.max_root_inclusive();
let max_slot_inclusive = max_clean_root_inclusive.unwrap_or(max_root_inclusive);

if old_storages_policy == OldStoragesPolicy::Clean {
let slot_one_epoch_old =
max_root_inclusive.saturating_sub(epoch_schedule.slots_per_epoch);
// do nothing special for these 100 old storages that will likely get cleaned up shortly
let acceptable_straggler_slot_count = 100;
let old_slot_cutoff =
slot_one_epoch_old.saturating_sub(acceptable_straggler_slot_count);
let (old_storages, old_slots) = self.get_snapshot_storages(..old_slot_cutoff);
let num_old_storages = old_storages.len();
self.accounts_index
.add_uncleaned_roots(old_slots.iter().copied());
for (old_slot, old_storage) in std::iter::zip(old_slots, old_storages) {
self.dirty_stores.entry(old_slot).or_insert(old_storage);
}
info!("Marked {num_old_storages} old storages as dirty");
}

let mut dirty_stores = Vec::with_capacity(self.dirty_stores.len());
// find the oldest dirty slot
// we'll add logging if that append vec cannot be marked dead
Expand Down Expand Up @@ -3160,7 +3179,17 @@ impl AccountsDb {

/// Call clean_accounts() with the common parameters that tests/benches use.
pub fn clean_accounts_for_tests(&self) {
self.clean_accounts(None, false, None, &EpochSchedule::default())
self.clean_accounts(
None,
false,
None,
&EpochSchedule::default(),
if self.ancient_append_vec_offset.is_some() {
OldStoragesPolicy::Leave
} else {
OldStoragesPolicy::Clean
},
)
}

/// called with cli argument to verify refcounts are correct on all accounts
Expand Down Expand Up @@ -3244,6 +3273,7 @@ impl AccountsDb {
is_startup: bool,
last_full_snapshot_slot: Option<Slot>,
epoch_schedule: &EpochSchedule,
old_storages_policy: OldStoragesPolicy,
) {
if self.exhaustively_verify_refcounts {
self.exhaustively_verify_refcounts(max_clean_root_inclusive);
Expand All @@ -3265,6 +3295,7 @@ impl AccountsDb {
last_full_snapshot_slot,
&mut key_timings,
epoch_schedule,
old_storages_policy,
);

let mut sort = Measure::start("sort");
Expand Down Expand Up @@ -4882,6 +4913,10 @@ impl AccountsDb {
let _guard = self.active_stats.activate(ActiveStatItem::Shrink);
const DIRTY_STORES_CLEANING_THRESHOLD: usize = 10_000;
const OUTER_CHUNK_SIZE: usize = 2000;
// Leave any old storages alone for now. Once the validator is running
// normal, calls to clean_accounts() will have the correct policy based
// on if ancient storages are enabled or not.
let old_storages_policy = OldStoragesPolicy::Leave;
if is_startup {
let slots = self.all_slots_in_storage();
let threads = num_cpus::get();
Expand All @@ -4893,14 +4928,26 @@ impl AccountsDb {
}
});
if self.dirty_stores.len() > DIRTY_STORES_CLEANING_THRESHOLD {
self.clean_accounts(None, is_startup, last_full_snapshot_slot, epoch_schedule);
self.clean_accounts(
None,
is_startup,
last_full_snapshot_slot,
epoch_schedule,
old_storages_policy,
);
}
});
} else {
for slot in self.all_slots_in_storage() {
self.shrink_slot_forced(slot);
if self.dirty_stores.len() > DIRTY_STORES_CLEANING_THRESHOLD {
self.clean_accounts(None, is_startup, last_full_snapshot_slot, epoch_schedule);
self.clean_accounts(
None,
is_startup,
last_full_snapshot_slot,
epoch_schedule,
old_storages_policy,
);
}
}
}
Expand Down Expand Up @@ -7177,40 +7224,6 @@ impl AccountsDb {
.collect()
}

/// storages are sorted by slot and have range info.
/// add all stores older than slots_per_epoch to dirty_stores so clean visits these slots
fn mark_old_slots_as_dirty(
&self,
storages: &SortedStorages,
slots_per_epoch: Slot,
stats: &mut crate::accounts_hash::HashStats,
) {
// Nothing to do if ancient append vecs are enabled.
// Ancient slots will be visited by the ancient append vec code and dealt with correctly.
// we expect these ancient append vecs to be old and keeping accounts
// We can expect the normal processes will keep them cleaned.
// If we included them here then ALL accounts in ALL ancient append vecs will be visited by clean each time.
if self.ancient_append_vec_offset.is_some() {
return;
}

let mut mark_time = Measure::start("mark_time");
let mut num_dirty_slots: usize = 0;
let max = storages.max_slot_inclusive();
let acceptable_straggler_slot_count = 100; // do nothing special for these old stores which will likely get cleaned up shortly
let sub = slots_per_epoch + acceptable_straggler_slot_count;
let in_epoch_range_start = max.saturating_sub(sub);
for (slot, storage) in storages.iter_range(&(..in_epoch_range_start)) {
if let Some(storage) = storage {
self.dirty_stores.insert(slot, storage.clone());
num_dirty_slots += 1;
}
}
mark_time.stop();
stats.mark_time_us = mark_time.as_us();
stats.num_dirty_slots = num_dirty_slots;
}

pub fn calculate_accounts_hash_from(
&self,
data_source: CalcAccountsHashDataSource,
Expand Down Expand Up @@ -7583,8 +7596,6 @@ impl AccountsDb {
let storages_start_slot = storages.range().start;
stats.oldest_root = storages_start_slot;

self.mark_old_slots_as_dirty(storages, config.epoch_schedule.slots_per_epoch, &mut stats);

let slot = storages.max_slot_inclusive();
let use_bg_thread_pool = config.use_bg_thread_pool;
let accounts_hash_cache_path = self.accounts_hash_cache_path.clone();
Expand Down Expand Up @@ -9435,6 +9446,20 @@ pub(crate) enum UpdateIndexThreadSelection {
PoolWithThreshold,
}

/// How should old storages be handled in clean_accounts()?
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum OldStoragesPolicy {
/// Clean all old storages, even if they were not explictly marked as dirty.
///
/// This is the default behavior when not skipping rewrites.
Clean,
/// Leave all old storages.
///
/// When skipping rewrites, we intentionally will have ancient storages.
/// Do not clean them up automatically in clean_accounts().
Leave,
}

// These functions/fields are only usable from a dev context (i.e. tests and benches)
#[cfg(feature = "dev-context-only-utils")]
impl AccountStorageEntry {
Expand Down Expand Up @@ -12048,13 +12073,25 @@ pub mod tests {
// updates in later slots in slot 1
assert_eq!(accounts.alive_account_count_in_slot(0), 1);
assert_eq!(accounts.alive_account_count_in_slot(1), 1);
accounts.clean_accounts(Some(0), false, None, &EpochSchedule::default());
accounts.clean_accounts(
Some(0),
false,
None,
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);
assert_eq!(accounts.alive_account_count_in_slot(0), 1);
assert_eq!(accounts.alive_account_count_in_slot(1), 1);
assert!(accounts.accounts_index.contains_with(&pubkey, None, None));

// Now the account can be cleaned up
accounts.clean_accounts(Some(1), false, None, &EpochSchedule::default());
accounts.clean_accounts(
Some(1),
false,
None,
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);
assert_eq!(accounts.alive_account_count_in_slot(0), 0);
assert_eq!(accounts.alive_account_count_in_slot(1), 0);

Expand Down Expand Up @@ -13536,7 +13573,13 @@ pub mod tests {
db.add_root_and_flush_write_cache(1);

// Only clean zero lamport accounts up to slot 0
db.clean_accounts(Some(0), false, None, &EpochSchedule::default());
db.clean_accounts(
Some(0),
false,
None,
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);

// Should still be able to find zero lamport account in slot 1
assert_eq!(
Expand Down Expand Up @@ -14689,7 +14732,13 @@ pub mod tests {
db.calculate_accounts_delta_hash(1);

// Clean to remove outdated entry from slot 0
db.clean_accounts(Some(1), false, None, &EpochSchedule::default());
db.clean_accounts(
Some(1),
false,
None,
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);

// Shrink Slot 0
{
Expand All @@ -14708,7 +14757,13 @@ pub mod tests {
// Should be one store before clean for slot 0
db.get_and_assert_single_storage(0);
db.calculate_accounts_delta_hash(2);
db.clean_accounts(Some(2), false, None, &EpochSchedule::default());
db.clean_accounts(
Some(2),
false,
None,
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);

// No stores should exist for slot 0 after clean
assert_no_storages_at_slot(&db, 0);
Expand Down Expand Up @@ -15587,13 +15642,31 @@ pub mod tests {

assert_eq!(accounts_db.ref_count_for_pubkey(&pubkey), 3);

accounts_db.clean_accounts(Some(slot2), false, Some(slot2), &EpochSchedule::default());
accounts_db.clean_accounts(
Some(slot2),
false,
Some(slot2),
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);
assert_eq!(accounts_db.ref_count_for_pubkey(&pubkey), 2);

accounts_db.clean_accounts(None, false, Some(slot2), &EpochSchedule::default());
accounts_db.clean_accounts(
None,
false,
Some(slot2),
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);
assert_eq!(accounts_db.ref_count_for_pubkey(&pubkey), 1);

accounts_db.clean_accounts(None, false, Some(slot3), &EpochSchedule::default());
accounts_db.clean_accounts(
None,
false,
Some(slot3),
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);
assert_eq!(accounts_db.ref_count_for_pubkey(&pubkey), 0);
}
);
Expand Down Expand Up @@ -17902,7 +17975,13 @@ pub mod tests {

// calculate the full accounts hash
let full_accounts_hash = {
accounts_db.clean_accounts(Some(slot - 1), false, None, &EpochSchedule::default());
accounts_db.clean_accounts(
Some(slot - 1),
false,
None,
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);
let (storages, _) = accounts_db.get_snapshot_storages(..=slot);
let storages = SortedStorages::new(&storages);
accounts_db.calculate_accounts_hash(
Expand Down Expand Up @@ -17972,6 +18051,7 @@ pub mod tests {
false,
Some(full_accounts_hash_slot),
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);
let (storages, _) =
accounts_db.get_snapshot_storages(full_accounts_hash_slot + 1..=slot);
Expand Down
33 changes: 25 additions & 8 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ use {
accounts::{AccountAddressFilter, Accounts, PubkeyAccountSlot},
accounts_db::{
AccountShrinkThreshold, AccountStorageEntry, AccountsDb, AccountsDbConfig,
CalcAccountsHashDataSource, PubkeyHashAccount, VerifyAccountsHashAndLamportsConfig,
CalcAccountsHashDataSource, OldStoragesPolicy, PubkeyHashAccount,
VerifyAccountsHashAndLamportsConfig,
},
accounts_hash::{
AccountHash, AccountsHash, CalcAccountsHashConfig, HashStats, IncrementalAccountsHash,
Expand Down Expand Up @@ -5944,6 +5945,7 @@ impl Bank {
true,
Some(last_full_snapshot_slot),
self.epoch_schedule(),
self.clean_accounts_old_storages_policy(),
);
info!("Cleaning... Done.");
} else {
Expand Down Expand Up @@ -6285,6 +6287,7 @@ impl Bank {
false,
last_full_snapshot_slot,
self.epoch_schedule(),
self.clean_accounts_old_storages_policy(),
);
}

Expand All @@ -6300,23 +6303,37 @@ impl Bank {
}

pub(crate) fn shrink_ancient_slots(&self) {
let can_skip_rewrites = self.bank_hash_skips_rent_rewrites();
let test_skip_rewrites_but_include_in_bank_hash = self
.rc
.accounts
.accounts_db
.test_skip_rewrites_but_include_in_bank_hash;
// Invoke ancient slot shrinking only when the validator is
// explicitly configured to do so. This condition may be
// removed when the skip rewrites feature is enabled.
if can_skip_rewrites || test_skip_rewrites_but_include_in_bank_hash {
if self.are_ancient_storages_enabled() {
self.rc
.accounts
.accounts_db
.shrink_ancient_slots(self.epoch_schedule())
}
}

/// Returns if ancient storages are enabled or not
pub fn are_ancient_storages_enabled(&self) -> bool {
let can_skip_rewrites = self.bank_hash_skips_rent_rewrites();
let test_skip_rewrites_but_include_in_bank_hash = self
.rc
.accounts
.accounts_db
.test_skip_rewrites_but_include_in_bank_hash;
can_skip_rewrites || test_skip_rewrites_but_include_in_bank_hash
}

/// Returns how clean_accounts() should handle old storages
fn clean_accounts_old_storages_policy(&self) -> OldStoragesPolicy {
if self.are_ancient_storages_enabled() {
OldStoragesPolicy::Leave
} else {
OldStoragesPolicy::Clean
}
}

pub fn validate_fee_collector_account(&self) -> bool {
self.feature_set
.is_active(&feature_set::validate_fee_collector_account::id())
Expand Down

0 comments on commit f77014d

Please sign in to comment.