Skip to content

Commit

Permalink
feat: look for eah in file (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
vovkman authored Nov 13, 2024
1 parent 5351e83 commit c4946a7
Showing 1 changed file with 123 additions and 25 deletions.
148 changes: 123 additions & 25 deletions core/src/accounts_hash_verifier.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,23 @@
//! Service to calculate accounts hashes
use {
crate::snapshot_packager_service::PendingSnapshotPackages,
crossbeam_channel::{Receiver, Sender},
solana_accounts_db::{
crate::snapshot_packager_service::PendingSnapshotPackages, bs58, crossbeam_channel::{Receiver, Sender}, solana_accounts_db::{
accounts_db::CalcAccountsHashKind,
accounts_hash::{
AccountsHash, AccountsHashKind, CalcAccountsHashConfig, HashStats,
IncrementalAccountsHash,
},
sorted_storages::SortedStorages,
},
solana_measure::measure_us,
solana_runtime::{
serde_snapshot::BankIncrementalSnapshotPersistence,
snapshot_config::SnapshotConfig,
snapshot_package::{
}, solana_measure::measure_us, solana_runtime::{
serde_snapshot::BankIncrementalSnapshotPersistence, snapshot_bank_utils::DISABLED_SNAPSHOT_ARCHIVE_INTERVAL, snapshot_config::SnapshotConfig, snapshot_package::{
self, AccountsPackage, AccountsPackageKind, SnapshotKind, SnapshotPackage,
},
snapshot_utils,
},
solana_sdk::clock::{Slot, DEFAULT_MS_PER_SLOT},
std::{
io::Result as IoResult,
sync::{
}, snapshot_utils
}, solana_sdk::{clock::{Slot, DEFAULT_MS_PER_SLOT}, hash::Hash}, std::{
fs::{read_to_string, File}, io::{Result as IoResult, Write}, path::Path, sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
thread::{self, Builder, JoinHandle},
time::Duration,
},
}, thread::{self, Builder, JoinHandle}, time::Duration
}
};

pub struct AccountsHashVerifier {
Expand Down Expand Up @@ -209,16 +197,71 @@ impl AccountsHashVerifier {
}
}

fn read_epoch_accounts_hash_from_file(epoch: u64, incremental_snapshot_archive_interval_slots: u64) -> Option<AccountsHash> {
// skip waiting for eah file if node is a distributor
if !Self::snapshot_disabled(incremental_snapshot_archive_interval_slots) {
return None;
}
info!("Waiting for epoch accounts hash file");
let eah_path = std::env::var("SOLANA_EAH_PATH").unwrap_or_else(|_| "/home/solana/eah".to_string());
let file_path = Path::new(&eah_path).join(format!("{}.txt", epoch));
let max_attempts = 60; // retry for 1 hour
let wait_duration = Duration::from_secs(60); // 1 min

for _ in 0..max_attempts {
if file_path.exists() {
let content = match read_to_string(&file_path) {
Ok(content) => content,
Err(e) => {
warn!("Failed to read epoch accounts hash file: {}", e);
continue;
}
};
let bytes = match bs58::decode(content.trim()).into_vec() {
Ok(bytes) if bytes.len() == 32 => bytes,
_ => {
warn!("Invalid hash: must be a valid 32-byte Base58 encoded string");
continue;
}
};
let accounts_hash = AccountsHash(Hash::new(&bytes));
return Some(accounts_hash);
}
thread::sleep(wait_duration);
}

warn!("Epoch accounts hash file not found after {} attempts", max_attempts);
None
}

fn snapshot_disabled(incremental_snapshot_archive_interval_slots: u64) -> bool {
incremental_snapshot_archive_interval_slots == DISABLED_SNAPSHOT_ARCHIVE_INTERVAL
}

#[allow(clippy::too_many_arguments)]
fn process_accounts_package(
accounts_package: AccountsPackage,
pending_snapshot_packages: &Mutex<PendingSnapshotPackages>,
snapshot_config: &SnapshotConfig,
) -> IoResult<()> {
let (accounts_hash_kind, bank_incremental_snapshot_persistence) =
Self::calculate_and_verify_accounts_hash(&accounts_package, snapshot_config)?;

Self::save_epoch_accounts_hash(&accounts_package, accounts_hash_kind);
let epoch_schedule = &accounts_package.epoch_schedule;
let current_epoch = epoch_schedule.get_epoch(accounts_package.slot);

let incremental_snapshot_archive_interval_slots = snapshot_config.incremental_snapshot_archive_interval_slots;
let (accounts_hash_kind, bank_incremental_snapshot_persistence) = match accounts_package.package_kind {
AccountsPackageKind::EpochAccountsHash => {
if let Some(accounts_hash) = Self::read_epoch_accounts_hash_from_file(current_epoch, incremental_snapshot_archive_interval_slots) {
info!("Using epoch accounts hash file instead of calculating it");
(accounts_hash.into(), None)
} else {
// If no file is found, calculate the hash
info!("Calculating epoch accounts hash");
Self::calculate_and_verify_accounts_hash(&accounts_package, snapshot_config)?
}
},
_ => Self::calculate_and_verify_accounts_hash(&accounts_package, snapshot_config)?
};
Self::save_epoch_accounts_hash(&accounts_package, accounts_hash_kind, current_epoch, incremental_snapshot_archive_interval_slots);

Self::purge_old_accounts_hashes(&accounts_package, snapshot_config);

Expand Down Expand Up @@ -413,6 +456,8 @@ impl AccountsHashVerifier {
fn save_epoch_accounts_hash(
accounts_package: &AccountsPackage,
accounts_hash: AccountsHashKind,
current_epoch: u64,
incremental_snapshot_archive_interval_slots: u64,
) {
if accounts_package.package_kind == AccountsPackageKind::EpochAccountsHash {
let AccountsHashKind::Full(accounts_hash) = accounts_hash else {
Expand All @@ -422,6 +467,41 @@ impl AccountsHashVerifier {
"saving epoch accounts hash, slot: {}, hash: {}",
accounts_package.slot, accounts_hash.0,
);
// Save the accounts hash to file
if !Self::snapshot_disabled(incremental_snapshot_archive_interval_slots) {
let accounts_hash_string = format!("{}", accounts_hash.0);
let file_name = format!("{}.txt", current_epoch);
let eah_path = std::env::var("SOLANA_EAH_PATH").unwrap_or_else(|_| "/home/solana/eah".to_string());
let eah_dir = Path::new(&eah_path);

// Attempt to wipe the content of the folder before saving the file
if eah_dir.exists() {
if let Err(e) = std::fs::remove_dir_all(eah_dir) {
warn!("Failed to remove existing directory: {}", e);
}
}
if let Err(e) = std::fs::create_dir_all(eah_dir) {
warn!("Failed to create directory: {}", e);
}

let file_path = eah_dir.join(file_name);
match File::create(&file_path) {
Ok(mut file) => {
if let Err(e) = file.write_all(accounts_hash_string.as_bytes()) {
warn!("Failed to write epoch accounts hash to file: {}", e);
} else {
info!(
"Saved epoch accounts hash to file: {}",
file_path.display()
);
}
}
Err(e) => {
warn!("Failed to create file for epoch accounts hash: {}", e);
}
}
}

accounts_package
.accounts
.accounts_db
Expand Down Expand Up @@ -750,4 +830,22 @@ mod tests {
)
.is_none());
}
}

#[test]
fn test_snapshot_disabled() {
let test_cases = vec![
(DISABLED_SNAPSHOT_ARCHIVE_INTERVAL, true),
(30, false),
(0, false),
];

for (input, expected) in test_cases {
assert_eq!(
AccountsHashVerifier::snapshot_disabled(input),
expected,
"Failed for input: {}",
input
);
}
}
}

0 comments on commit c4946a7

Please sign in to comment.