diff --git a/cluster-endpoints/src/grpc_leaders_getter.rs b/cluster-endpoints/src/grpc_leaders_getter.rs index 2bb3afc2..da317985 100644 --- a/cluster-endpoints/src/grpc_leaders_getter.rs +++ b/cluster-endpoints/src/grpc_leaders_getter.rs @@ -44,15 +44,26 @@ impl LeaderFetcherInterface for GrpcLeaderGetter { .current .as_ref() .map(|e| e.epoch) - .unwrap_or(to_epoch); + .unwrap_or(to_epoch) + + 1; if from > to { - bail!("invalid arguments for get_slot_leaders"); + bail!( + "invalid arguments for get_slot_leaders: from:{from} to:{to} from:{from} > to:{to}" + ); } if from_epoch < current_epoch || from_epoch > next_epoch { - bail!("invalid arguments for get_slot_leaders"); + bail!( + "invalid arguments for get_slot_leaders: from:{from} to:{to} \ + from_epoch:{from_epoch} < current_epoch:{current_epoch} \ + || from_epoch > next_epoch:{next_epoch}" + ); } if to_epoch < current_epoch || to_epoch > next_epoch { - bail!("invalid arguments for get_slot_leaders"); + bail!( + "invalid arguments for get_slot_leaders: from:{from} to:{to} \ + to_epoch:{to_epoch} < current_epoch:{current_epoch} \ + || to_epoch:{to_epoch} > next_epoch:{next_epoch}" + ); } let limit = to - from; diff --git a/core/src/structures/epoch.rs b/core/src/structures/epoch.rs index 15cb3a1d..c0ea1572 100644 --- a/core/src/structures/epoch.rs +++ b/core/src/structures/epoch.rs @@ -60,7 +60,9 @@ impl EpochCache { self.epoch_schedule.get_last_slot_in_epoch(epoch) } - pub async fn bootstrap_epoch(rpc_client: &RpcClient) -> anyhow::Result { + pub async fn bootstrap_epoch( + rpc_client: &RpcClient, + ) -> anyhow::Result<(EpochCache, EpochInfo)> { let res_epoch = rpc_client .get_account(&solana_sdk::sysvar::epoch_schedule::id()) .await?; @@ -72,9 +74,14 @@ impl EpochCache { bail!("Error during bootstrap epoch. SysvarAccountType::EpochSchedule can't be deserilized. Epoch can't be calculated."); }; - Ok(EpochCache { - epoch_schedule: Arc::new(epoch_schedule), - }) + let epoch_info = rpc_client.get_epoch_info().await?; + + Ok(( + EpochCache { + epoch_schedule: Arc::new(epoch_schedule), + }, + epoch_info, + )) } } diff --git a/lite-rpc/src/main.rs b/lite-rpc/src/main.rs index 979b9231..8d4c3d8c 100644 --- a/lite-rpc/src/main.rs +++ b/lite-rpc/src/main.rs @@ -154,8 +154,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc) -> anyhow: get_latest_block(blocks_notifier.resubscribe(), CommitmentConfig::finalized()).await; info!("Got finalized block: {:?}", finalized_block.slot); - let epoch_data = EpochCache::bootstrap_epoch(&rpc_client).await?; - let slots_per_epoch = epoch_data.get_epoch_schedule().slots_per_epoch; + let (epoch_data, current_epoch_info) = EpochCache::bootstrap_epoch(&rpc_client).await?; let block_information_store = BlockInformationStore::new(BlockInformation::from_block(&finalized_block)); @@ -212,17 +211,15 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc) -> anyhow: let (leader_schedule, rpc_stakes_send): (Arc, Option<_>) = if use_grpc && calculate_leader_schedule_form_geyser { //init leader schedule grpc process. - //1) get stored schedule and stakes - if let Some((leader_schedule, vote_stakes)) = - solana_lite_rpc_stakevote::bootstrap_leaderschedule_from_files(slots_per_epoch) - { - data_cache - .identity_stakes - .update_stakes_for_identity(vote_stakes) - .await; - let mut data_schedule = data_cache.leader_schedule.write().await; - *data_schedule = leader_schedule; - } + + //1) get stored leader schedule and stakes (or via RPC if not present) + solana_lite_rpc_stakevote::bootstrat_literpc_leader_schedule( + rpc_client.url(), + &data_cache, + current_epoch_info.epoch, + ) + .await; + //2) start stake vote and leader schedule. let (rpc_stakes_send, rpc_stakes_recv) = mpsc::channel(1000); let stake_vote_jh = solana_lite_rpc_stakevote::start_stakes_and_votes_loop( diff --git a/stake_vote/src/bootstrap.rs b/stake_vote/src/bootstrap.rs index 6a426ca4..3d73894b 100644 --- a/stake_vote/src/bootstrap.rs +++ b/stake_vote/src/bootstrap.rs @@ -11,14 +11,19 @@ use anyhow::bail; use futures::future::join_all; use futures_util::stream::FuturesUnordered; use solana_client::client_error::ClientError; +use solana_client::client_error::ClientErrorKind; use solana_client::rpc_client::RpcClient; use solana_client::rpc_response::RpcVoteAccountStatus; use solana_lite_rpc_core::stores::data_cache::DataCache; use solana_lite_rpc_core::structures::leaderschedule::CalculatedSchedule; use solana_lite_rpc_core::structures::leaderschedule::LeaderScheduleData; +use solana_program::slot_history::Slot; use solana_sdk::account::Account; use solana_sdk::commitment_config::CommitmentConfig; +use solana_sdk::epoch_info::EpochInfo; use solana_sdk::pubkey::Pubkey; +use solana_sdk::sysvar::epoch_schedule::EpochSchedule; +use std::collections::HashMap; use std::time::Duration; use tokio::task::JoinHandle; @@ -47,9 +52,10 @@ pub async fn bootstrap_schedule_epoch_data(data_cache: &DataCache) -> ScheduleEp // Return the current and next epoxh leader schedule and the current epoch stakes of vote accounts // if the corresponding files exist. pub fn bootstrap_leaderschedule_from_files( + current_epoch_of_loading: u64, slots_in_epoch: u64, ) -> Option<(CalculatedSchedule, RpcVoteAccountStatus)> { - bootstrap_current_leader_schedule(slots_in_epoch) + bootstrap_current_leader_schedule(slots_in_epoch, current_epoch_of_loading) .map(|(leader_schedule, current_epoch_stakes, _)| { let vote_acccounts = crate::vote::get_rpc_vote_account_info_from_current_epoch_stakes( ¤t_epoch_stakes, @@ -59,6 +65,63 @@ pub fn bootstrap_leaderschedule_from_files( .ok() } +// Return the current or next epoch leader schedule using the RPC calls. +pub fn bootstrap_leaderschedule_from_rpc( + rpc_url: String, + epoch_schedule: &EpochSchedule, +) -> Result { + let current_epoch = get_rpc_epoch_info(rpc_url.clone())?; + let current_schedule_by_node = + get_rpc_leader_schedule(rpc_url.clone(), None)?.ok_or(ClientError { + request: None, + kind: ClientErrorKind::Custom("RPC return no leader schedule".to_string()), + })?; + + //Calculate the slot leaders by from the node schedule because RPC call get_slot_leaders is limited to 5000 slots. + let current_schedule_by_slot = + crate::leader_schedule::calculate_slot_leaders_from_schedule(¤t_schedule_by_node) + .map_err(|err| ClientError { + request: None, + kind: ClientErrorKind::Custom(format!( + "Leader schedule from RPC can't generate slot leaders because:{err}" + )), + })?; + + //get next epoch rpc schedule + let next_epoch = current_epoch.epoch + 1; + let next_first_epoch_slot = epoch_schedule.get_first_slot_in_epoch(next_epoch); + let next_schedule_by_node = + get_rpc_leader_schedule(rpc_url.clone(), Some(next_first_epoch_slot))?.ok_or( + ClientError { + request: None, + kind: ClientErrorKind::Custom("RPC return no leader schedule".to_string()), + }, + )?; + + //Calculate the slot leaders by from the node schedule because RPC call get_slot_leaders is limited to 5000 slots. + let next_schedule_by_slot = + crate::leader_schedule::calculate_slot_leaders_from_schedule(&next_schedule_by_node) + .map_err(|err| ClientError { + request: None, + kind: ClientErrorKind::Custom(format!( + "Leader schedule from RPC can't generate slot leaders because:{err}" + )), + })?; + + Ok(CalculatedSchedule { + current: Some(LeaderScheduleData { + schedule_by_node: current_schedule_by_node.clone(), + schedule_by_slot: current_schedule_by_slot.clone(), + epoch: current_epoch.epoch, + }), + next: Some(LeaderScheduleData { + schedule_by_node: next_schedule_by_node, + schedule_by_slot: next_schedule_by_slot, + epoch: current_epoch.epoch + 1, + }), + }) +} + /* Bootstrap state changes @@ -92,8 +155,15 @@ pub fn run_bootstrap_events( stakestore: &mut StakeStore, votestore: &mut VoteStore, slots_in_epoch: u64, + current_epoch_of_loading: u64, ) -> anyhow::Result>> { - let result = process_bootstrap_event(event, stakestore, votestore, slots_in_epoch); + let result = process_bootstrap_event( + event, + stakestore, + votestore, + slots_in_epoch, + current_epoch_of_loading, + ); match result { BootsrapProcessResult::TaskHandle(jh) => { bootstrap_tasks.push(jh); @@ -105,6 +175,7 @@ pub fn run_bootstrap_events( stakestore, votestore, slots_in_epoch, + current_epoch_of_loading, ), BootsrapProcessResult::End(leader_schedule_result) => Ok(Some(leader_schedule_result)), BootsrapProcessResult::Error(err) => bail!(err), @@ -154,6 +225,7 @@ fn process_bootstrap_event( stakestore: &mut StakeStore, votestore: &mut VoteStore, slots_in_epoch: u64, + current_epoch_of_loading: u64, ) -> BootsrapProcessResult { match event { BootstrapEvent::InitBootstrap { @@ -161,7 +233,6 @@ fn process_bootstrap_event( rpc_url, } => { let jh = tokio::task::spawn_blocking(move || { - log::info!("BootstrapEvent::InitBootstrap RECV"); if sleep_time > 0 { std::thread::sleep(Duration::from_secs(sleep_time)); } @@ -180,7 +251,6 @@ fn process_bootstrap_event( BootsrapProcessResult::TaskHandle(jh) } BootstrapEvent::BootstrapAccountsFetched(stakes, votes, history, rpc_url) => { - log::info!("BootstrapEvent::BootstrapAccountsFetched RECV"); match (&mut stakestore.stakes, &mut votestore.votes).take() { TakeResult::Map((stake_map, (vote_map, epoch_cache))) => { BootsrapProcessResult::Event(BootstrapEvent::StoreExtracted( @@ -220,8 +290,6 @@ fn process_bootstrap_event( history, rpc_url, ) => { - log::info!("BootstrapEvent::StoreExtracted RECV"); - let stake_history = crate::account::read_historystake_from_account(&history.data); if stake_history.is_none() { return BootsrapProcessResult::Error( @@ -244,7 +312,10 @@ fn process_bootstrap_event( 0, //with RPC no way to know the slot of the account update. Set to 0. ); - match bootstrap_current_leader_schedule(slots_in_epoch) { + match bootstrap_current_leader_schedule( + current_epoch_of_loading, + slots_in_epoch, + ) { Ok((leader_schedule, current_epoch_stakes, next_epoch_stakes)) => { let vote_acccounts = crate::vote::get_rpc_vote_account_info_from_current_epoch_stakes( @@ -279,8 +350,6 @@ fn process_bootstrap_event( rpc_url, leader_schedule_result, ) => { - log::info!("BootstrapEvent::AccountsMerged RECV"); - match ( stakestore.stakes.merge(stake_map), votestore.votes.merge((vote_map, epoch_cache)), @@ -314,39 +383,55 @@ fn bootstrap_accounts( } fn get_stake_account(rpc_url: String) -> Result<(Vec<(Pubkey, Account)>, String), ClientError> { - log::info!("TaskToExec RpcGetStakeAccount start"); let rpc_client = RpcClient::new_with_timeout_and_commitment( rpc_url.clone(), Duration::from_secs(600), CommitmentConfig::finalized(), ); - let res_stake = rpc_client.get_program_accounts(&solana_sdk::stake::program::id()); - log::info!("TaskToExec RpcGetStakeAccount END"); - res_stake.map(|stake| (stake, rpc_url)) + rpc_client + .get_program_accounts(&solana_sdk::stake::program::id()) + .map(|stake| (stake, rpc_url)) } fn get_vote_account(rpc_url: String) -> Result<(Vec<(Pubkey, Account)>, String), ClientError> { - log::info!("TaskToExec RpcGetVoteAccount start"); let rpc_client = RpcClient::new_with_timeout_and_commitment( rpc_url.clone(), Duration::from_secs(600), CommitmentConfig::finalized(), ); - let res_vote = rpc_client.get_program_accounts(&solana_sdk::vote::program::id()); - log::info!("TaskToExec RpcGetVoteAccount END"); - res_vote.map(|votes| (votes, rpc_url)) + rpc_client + .get_program_accounts(&solana_sdk::vote::program::id()) + .map(|votes| (votes, rpc_url)) } pub fn get_stakehistory_account(rpc_url: String) -> Result { - log::info!("TaskToExec RpcGetStakeHistory start"); let rpc_client = RpcClient::new_with_timeout_and_commitment( rpc_url, Duration::from_secs(600), CommitmentConfig::finalized(), ); - let res_stake = rpc_client.get_account(&solana_sdk::sysvar::stake_history::id()); - log::info!("TaskToExec RpcGetStakeHistory END",); - res_stake + rpc_client.get_account(&solana_sdk::sysvar::stake_history::id()) +} + +fn get_rpc_epoch_info(rpc_url: String) -> Result { + let rpc_client = RpcClient::new_with_timeout_and_commitment( + rpc_url.clone(), + Duration::from_secs(600), + CommitmentConfig::finalized(), + ); + rpc_client.get_epoch_info() +} + +fn get_rpc_leader_schedule( + rpc_url: String, + slot: Option, +) -> Result>>, ClientError> { + let rpc_client = RpcClient::new_with_timeout_and_commitment( + rpc_url.clone(), + Duration::from_secs(600), + CommitmentConfig::finalized(), + ); + rpc_client.get_leader_schedule(slot) } // pub struct BootstrapScheduleResult { @@ -355,6 +440,7 @@ pub fn get_stakehistory_account(rpc_url: String) -> Result // } pub fn bootstrap_current_leader_schedule( + current_epoch_of_loading: u64, slots_in_epoch: u64, ) -> anyhow::Result<(CalculatedSchedule, EpochVoteStakes, EpochVoteStakes)> { let (current_epoch, current_epoch_stakes) = @@ -362,6 +448,18 @@ pub fn bootstrap_current_leader_schedule( let (next_epoch, next_epoch_stakes) = crate::utils::read_schedule_vote_stakes(NEXT_EPOCH_VOTE_STAKES_FILE)?; + //verify that the current loaded epoch correspond to the current epoch slot + if current_epoch_of_loading != current_epoch { + return Err(ClientError { + request: None, + kind: ClientErrorKind::Custom( + "Current epoch bootstrap file doesn't correspond to the validator current epoch." + .to_string(), + ), + } + .into()); + } + //calcualte leader schedule for all vote stakes. let current_schedule = crate::leader_schedule::calculate_leader_schedule( ¤t_epoch_stakes, diff --git a/stake_vote/src/epoch.rs b/stake_vote/src/epoch.rs index ac185bfb..0ee271d8 100644 --- a/stake_vote/src/epoch.rs +++ b/stake_vote/src/epoch.rs @@ -56,7 +56,7 @@ impl ScheduleEpochData { &mut self, history: StakeHistory, ) -> Option { - log::info!("set_epoch_stake_history"); + log::debug!("set_epoch_stake_history"); self.new_stake_history = Some(history); self.verify_epoch_change() } @@ -68,7 +68,7 @@ impl ScheduleEpochData { //to avoid to delay too much the schedule, start the calculus at the end of the epoch. //the first epoch slot arrive very late cause of the stake account notification from the validator. if self.current_confirmed_slot >= self.last_slot_in_epoch { - log::info!( + log::debug!( "manage_change_epoch at slot:{} last_slot_in_epoch:{}", self.current_confirmed_slot, self.last_slot_in_epoch diff --git a/stake_vote/src/leader_schedule.rs b/stake_vote/src/leader_schedule.rs index 3fe756dc..7d217c24 100644 --- a/stake_vote/src/leader_schedule.rs +++ b/stake_vote/src/leader_schedule.rs @@ -13,7 +13,9 @@ use solana_lite_rpc_core::structures::leaderschedule::LeaderScheduleData; use solana_sdk::clock::NUM_CONSECUTIVE_LEADER_SLOTS; use solana_sdk::pubkey::Pubkey; use solana_sdk::stake_history::StakeHistory; +use std::collections::BTreeMap; use std::collections::HashMap; +use std::str::FromStr; use std::sync::Arc; use tokio::task::JoinHandle; @@ -117,7 +119,7 @@ fn process_leadershedule_event( ) => { match (&mut stakestore.stakes, &mut votestore.votes).take() { TakeResult::Map((stake_map, (vote_map, mut epoch_cache))) => { - log::info!("LeaderScheduleEvent::CalculateScedule"); + log::info!("Start calculate leader schedule"); //do the calculus in a blocking task. let jh = tokio::task::spawn_blocking({ move || { @@ -221,7 +223,6 @@ fn process_leadershedule_event( (new_epoch, slots_in_epoch, epoch_schedule), stake_history, ) => { - log::info!("LeaderScheduleEvent::MergeStoreAndSaveSchedule RECV"); match ( stakestore.stakes.merge(stake_map), votestore.votes.merge((vote_map, epoch_cache)), @@ -304,10 +305,24 @@ pub fn calculate_leader_schedule( let mut seed = [0u8; 32]; seed[0..8].copy_from_slice(&epoch.to_le_bytes()); sort_stakes(&mut stakes); - log::info!("calculate_leader_schedule stakes:{stakes:?} epoch:{epoch}"); + //log::info!("calculate_leader_schedule stakes:{stakes:?} epoch:{epoch}"); LeaderSchedule::new(&stakes, seed, slots_in_epoch, NUM_CONSECUTIVE_LEADER_SLOTS) } +pub fn calculate_slot_leaders_from_schedule( + leader_scheudle: &HashMap>, +) -> Result, String> { + let mut slot_leaders_map = BTreeMap::new(); + for (pk, index_list) in leader_scheudle { + for index in index_list { + let pubkey = Pubkey::from_str(pk) + .map_err(|err| format!("Pubkey from leader schedule not a plublic key:{err}"))?; + slot_leaders_map.insert(index, pubkey); + } + } + Ok(slot_leaders_map.into_values().collect()) +} + // Cribbed from leader_schedule_utils fn sort_stakes(stakes: &mut Vec<(Pubkey, u64)>) { // Sort first by stake. If stakes are the same, sort by pubkey to ensure a diff --git a/stake_vote/src/lib.rs b/stake_vote/src/lib.rs index 912c6250..b73eaf22 100644 --- a/stake_vote/src/lib.rs +++ b/stake_vote/src/lib.rs @@ -30,13 +30,55 @@ mod stake; mod utils; mod vote; -pub use bootstrap::bootstrap_leaderschedule_from_files; +// pub use bootstrap::{bootstrap_leaderschedule_from_files, bootstrap_leaderschedule_from_rpc}; const STAKESTORE_INITIAL_CAPACITY: usize = 600000; const VOTESTORE_INITIAL_CAPACITY: usize = 600000; type Slot = u64; +pub async fn bootstrat_literpc_leader_schedule( + rpc_url: String, + data_cache: &DataCache, + current_epoch_of_loading: u64, +) { + //init leader schedule grpc process. + //1) get stored schedule and stakes + let slots_per_epoch = data_cache.epoch_data.get_epoch_schedule().slots_per_epoch; + match crate::bootstrap::bootstrap_leaderschedule_from_files( + current_epoch_of_loading, + slots_per_epoch, + ) { + Some((leader_schedule, vote_stakes)) => { + data_cache + .identity_stakes + .update_stakes_for_identity(vote_stakes) + .await; + let mut data_schedule = data_cache.leader_schedule.write().await; + *data_schedule = leader_schedule; + } + None => { + log::info!("Leader schedule bootstrap file not found. Try to boot from rpc."); + match crate::bootstrap::bootstrap_leaderschedule_from_rpc( + rpc_url, + data_cache.epoch_data.get_epoch_schedule(), + ) { + Ok(leader_schedule) => { + log::info!("Leader schedule bootstrap from rpc done.",); + let mut data_schedule = data_cache.leader_schedule.write().await; + *data_schedule = leader_schedule; + } + Err(err) => { + log::warn!( + "An error occurs during bootstrap of the leader schedule using rpc:{err}" + ); + log::warn!("No schedule has been loaded"); + } + } + } + } +} + pub async fn start_stakes_and_votes_loop( data_cache: DataCache, mut slot_notification: SlotStream, @@ -143,7 +185,7 @@ pub async fn start_stakes_and_votes_loop( if let Some(account) = account.account { let acc_id = Pubkey::try_from(account.pubkey).expect("valid pubkey"); if acc_id == solana_sdk::sysvar::stake_history::ID { - log::info!("Geyser notifstake_history"); + log::debug!("Geyser notifstake_history"); match crate::account::read_historystake_from_account(account.data.as_slice()) { Some(stake_history) => { let schedule_event = current_schedule_epoch.set_epoch_stake_history(stake_history); @@ -244,7 +286,7 @@ pub async fn start_stakes_and_votes_loop( } //manage bootstrap event Some(Ok(event)) = spawned_bootstrap_task.next() => { - match crate::bootstrap::run_bootstrap_events(event, &mut spawned_bootstrap_task, &mut stakestore, &mut votestore, current_schedule_epoch.slots_in_epoch) { + match crate::bootstrap::run_bootstrap_events(event, &mut spawned_bootstrap_task, &mut stakestore, &mut votestore, current_schedule_epoch.slots_in_epoch, current_schedule_epoch.current_epoch) { Ok(Some(boot_res))=> { match boot_res { Ok((current_schedule_data, vote_stakes)) => { diff --git a/stake_vote/src/vote.rs b/stake_vote/src/vote.rs index b9d305aa..a837cd61 100644 --- a/stake_vote/src/vote.rs +++ b/stake_vote/src/vote.rs @@ -36,10 +36,8 @@ impl EpochVoteStakesCache { } pub fn add_stakes_for_epoch(&mut self, vote_stakes: EpochVoteStakes) { - log::info!("add_stakes_for_epoch :{}", vote_stakes.epoch); - if self.cache.insert(vote_stakes.epoch, vote_stakes).is_some() { - log::warn!("Override existing vote stake epoch cache for epoch:"); - } + log::debug!("add_stakes_for_epoch :{}", vote_stakes.epoch); + self.cache.insert(vote_stakes.epoch, vote_stakes); } }