Skip to content

Commit

Permalink
Merge pull request #266 from blockworks-foundation/bootstrap_leader_s…
Browse files Browse the repository at this point in the history
…chedule_from_rpc

Bootstrap leader schedule from rpc
  • Loading branch information
godmodegalactus authored Jan 8, 2024
2 parents e39a2f5 + 21d0cee commit b49e51d
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 54 deletions.
19 changes: 15 additions & 4 deletions cluster-endpoints/src/grpc_leaders_getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,26 @@ impl LeaderFetcherInterface for GrpcLeaderGetter {
.current
.as_ref()
.map(|e| e.epoch)
.unwrap_or(to_epoch);
.unwrap_or(to_epoch)
+ 1;
if from > to {
bail!("invalid arguments for get_slot_leaders");
bail!(
"invalid arguments for get_slot_leaders: from:{from} to:{to} from:{from} > to:{to}"
);
}
if from_epoch < current_epoch || from_epoch > next_epoch {
bail!("invalid arguments for get_slot_leaders");
bail!(
"invalid arguments for get_slot_leaders: from:{from} to:{to} \
from_epoch:{from_epoch} < current_epoch:{current_epoch} \
|| from_epoch > next_epoch:{next_epoch}"
);
}
if to_epoch < current_epoch || to_epoch > next_epoch {
bail!("invalid arguments for get_slot_leaders");
bail!(
"invalid arguments for get_slot_leaders: from:{from} to:{to} \
to_epoch:{to_epoch} < current_epoch:{current_epoch} \
|| to_epoch:{to_epoch} > next_epoch:{next_epoch}"
);
}

let limit = to - from;
Expand Down
15 changes: 11 additions & 4 deletions core/src/structures/epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ impl EpochCache {
self.epoch_schedule.get_last_slot_in_epoch(epoch)
}

pub async fn bootstrap_epoch(rpc_client: &RpcClient) -> anyhow::Result<EpochCache> {
pub async fn bootstrap_epoch(
rpc_client: &RpcClient,
) -> anyhow::Result<(EpochCache, EpochInfo)> {
let res_epoch = rpc_client
.get_account(&solana_sdk::sysvar::epoch_schedule::id())
.await?;
Expand All @@ -72,9 +74,14 @@ impl EpochCache {
bail!("Error during bootstrap epoch. SysvarAccountType::EpochSchedule can't be deserilized. Epoch can't be calculated.");
};

Ok(EpochCache {
epoch_schedule: Arc::new(epoch_schedule),
})
let epoch_info = rpc_client.get_epoch_info().await?;

Ok((
EpochCache {
epoch_schedule: Arc::new(epoch_schedule),
},
epoch_info,
))
}
}

Expand Down
23 changes: 10 additions & 13 deletions lite-rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
get_latest_block(blocks_notifier.resubscribe(), CommitmentConfig::finalized()).await;
info!("Got finalized block: {:?}", finalized_block.slot);

let epoch_data = EpochCache::bootstrap_epoch(&rpc_client).await?;
let slots_per_epoch = epoch_data.get_epoch_schedule().slots_per_epoch;
let (epoch_data, current_epoch_info) = EpochCache::bootstrap_epoch(&rpc_client).await?;

let block_information_store =
BlockInformationStore::new(BlockInformation::from_block(&finalized_block));
Expand Down Expand Up @@ -212,17 +211,15 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
let (leader_schedule, rpc_stakes_send): (Arc<dyn LeaderFetcherInterface>, Option<_>) =
if use_grpc && calculate_leader_schedule_form_geyser {
//init leader schedule grpc process.
//1) get stored schedule and stakes
if let Some((leader_schedule, vote_stakes)) =
solana_lite_rpc_stakevote::bootstrap_leaderschedule_from_files(slots_per_epoch)
{
data_cache
.identity_stakes
.update_stakes_for_identity(vote_stakes)
.await;
let mut data_schedule = data_cache.leader_schedule.write().await;
*data_schedule = leader_schedule;
}

//1) get stored leader schedule and stakes (or via RPC if not present)
solana_lite_rpc_stakevote::bootstrat_literpc_leader_schedule(
rpc_client.url(),
&data_cache,
current_epoch_info.epoch,
)
.await;

//2) start stake vote and leader schedule.
let (rpc_stakes_send, rpc_stakes_recv) = mpsc::channel(1000);
let stake_vote_jh = solana_lite_rpc_stakevote::start_stakes_and_votes_loop(
Expand Down
140 changes: 119 additions & 21 deletions stake_vote/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,19 @@ use anyhow::bail;
use futures::future::join_all;
use futures_util::stream::FuturesUnordered;
use solana_client::client_error::ClientError;
use solana_client::client_error::ClientErrorKind;
use solana_client::rpc_client::RpcClient;
use solana_client::rpc_response::RpcVoteAccountStatus;
use solana_lite_rpc_core::stores::data_cache::DataCache;
use solana_lite_rpc_core::structures::leaderschedule::CalculatedSchedule;
use solana_lite_rpc_core::structures::leaderschedule::LeaderScheduleData;
use solana_program::slot_history::Slot;
use solana_sdk::account::Account;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::epoch_info::EpochInfo;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::sysvar::epoch_schedule::EpochSchedule;
use std::collections::HashMap;
use std::time::Duration;
use tokio::task::JoinHandle;

Expand Down Expand Up @@ -47,9 +52,10 @@ pub async fn bootstrap_schedule_epoch_data(data_cache: &DataCache) -> ScheduleEp
// Return the current and next epoxh leader schedule and the current epoch stakes of vote accounts
// if the corresponding files exist.
pub fn bootstrap_leaderschedule_from_files(
current_epoch_of_loading: u64,
slots_in_epoch: u64,
) -> Option<(CalculatedSchedule, RpcVoteAccountStatus)> {
bootstrap_current_leader_schedule(slots_in_epoch)
bootstrap_current_leader_schedule(slots_in_epoch, current_epoch_of_loading)
.map(|(leader_schedule, current_epoch_stakes, _)| {
let vote_acccounts = crate::vote::get_rpc_vote_account_info_from_current_epoch_stakes(
&current_epoch_stakes,
Expand All @@ -59,6 +65,63 @@ pub fn bootstrap_leaderschedule_from_files(
.ok()
}

// Return the current or next epoch leader schedule using the RPC calls.
pub fn bootstrap_leaderschedule_from_rpc(
rpc_url: String,
epoch_schedule: &EpochSchedule,
) -> Result<CalculatedSchedule, ClientError> {
let current_epoch = get_rpc_epoch_info(rpc_url.clone())?;
let current_schedule_by_node =
get_rpc_leader_schedule(rpc_url.clone(), None)?.ok_or(ClientError {
request: None,
kind: ClientErrorKind::Custom("RPC return no leader schedule".to_string()),
})?;

//Calculate the slot leaders by from the node schedule because RPC call get_slot_leaders is limited to 5000 slots.
let current_schedule_by_slot =
crate::leader_schedule::calculate_slot_leaders_from_schedule(&current_schedule_by_node)
.map_err(|err| ClientError {
request: None,
kind: ClientErrorKind::Custom(format!(
"Leader schedule from RPC can't generate slot leaders because:{err}"
)),
})?;

//get next epoch rpc schedule
let next_epoch = current_epoch.epoch + 1;
let next_first_epoch_slot = epoch_schedule.get_first_slot_in_epoch(next_epoch);
let next_schedule_by_node =
get_rpc_leader_schedule(rpc_url.clone(), Some(next_first_epoch_slot))?.ok_or(
ClientError {
request: None,
kind: ClientErrorKind::Custom("RPC return no leader schedule".to_string()),
},
)?;

//Calculate the slot leaders by from the node schedule because RPC call get_slot_leaders is limited to 5000 slots.
let next_schedule_by_slot =
crate::leader_schedule::calculate_slot_leaders_from_schedule(&next_schedule_by_node)
.map_err(|err| ClientError {
request: None,
kind: ClientErrorKind::Custom(format!(
"Leader schedule from RPC can't generate slot leaders because:{err}"
)),
})?;

Ok(CalculatedSchedule {
current: Some(LeaderScheduleData {
schedule_by_node: current_schedule_by_node.clone(),
schedule_by_slot: current_schedule_by_slot.clone(),
epoch: current_epoch.epoch,
}),
next: Some(LeaderScheduleData {
schedule_by_node: next_schedule_by_node,
schedule_by_slot: next_schedule_by_slot,
epoch: current_epoch.epoch + 1,
}),
})
}

/*
Bootstrap state changes
Expand Down Expand Up @@ -92,8 +155,15 @@ pub fn run_bootstrap_events(
stakestore: &mut StakeStore,
votestore: &mut VoteStore,
slots_in_epoch: u64,
current_epoch_of_loading: u64,
) -> anyhow::Result<Option<anyhow::Result<(CalculatedSchedule, RpcVoteAccountStatus)>>> {
let result = process_bootstrap_event(event, stakestore, votestore, slots_in_epoch);
let result = process_bootstrap_event(
event,
stakestore,
votestore,
slots_in_epoch,
current_epoch_of_loading,
);
match result {
BootsrapProcessResult::TaskHandle(jh) => {
bootstrap_tasks.push(jh);
Expand All @@ -105,6 +175,7 @@ pub fn run_bootstrap_events(
stakestore,
votestore,
slots_in_epoch,
current_epoch_of_loading,
),
BootsrapProcessResult::End(leader_schedule_result) => Ok(Some(leader_schedule_result)),
BootsrapProcessResult::Error(err) => bail!(err),
Expand Down Expand Up @@ -154,14 +225,14 @@ fn process_bootstrap_event(
stakestore: &mut StakeStore,
votestore: &mut VoteStore,
slots_in_epoch: u64,
current_epoch_of_loading: u64,
) -> BootsrapProcessResult {
match event {
BootstrapEvent::InitBootstrap {
sleep_time,
rpc_url,
} => {
let jh = tokio::task::spawn_blocking(move || {
log::info!("BootstrapEvent::InitBootstrap RECV");
if sleep_time > 0 {
std::thread::sleep(Duration::from_secs(sleep_time));
}
Expand All @@ -180,7 +251,6 @@ fn process_bootstrap_event(
BootsrapProcessResult::TaskHandle(jh)
}
BootstrapEvent::BootstrapAccountsFetched(stakes, votes, history, rpc_url) => {
log::info!("BootstrapEvent::BootstrapAccountsFetched RECV");
match (&mut stakestore.stakes, &mut votestore.votes).take() {
TakeResult::Map((stake_map, (vote_map, epoch_cache))) => {
BootsrapProcessResult::Event(BootstrapEvent::StoreExtracted(
Expand Down Expand Up @@ -220,8 +290,6 @@ fn process_bootstrap_event(
history,
rpc_url,
) => {
log::info!("BootstrapEvent::StoreExtracted RECV");

let stake_history = crate::account::read_historystake_from_account(&history.data);
if stake_history.is_none() {
return BootsrapProcessResult::Error(
Expand All @@ -244,7 +312,10 @@ fn process_bootstrap_event(
0, //with RPC no way to know the slot of the account update. Set to 0.
);

match bootstrap_current_leader_schedule(slots_in_epoch) {
match bootstrap_current_leader_schedule(
current_epoch_of_loading,
slots_in_epoch,
) {
Ok((leader_schedule, current_epoch_stakes, next_epoch_stakes)) => {
let vote_acccounts =
crate::vote::get_rpc_vote_account_info_from_current_epoch_stakes(
Expand Down Expand Up @@ -279,8 +350,6 @@ fn process_bootstrap_event(
rpc_url,
leader_schedule_result,
) => {
log::info!("BootstrapEvent::AccountsMerged RECV");

match (
stakestore.stakes.merge(stake_map),
votestore.votes.merge((vote_map, epoch_cache)),
Expand Down Expand Up @@ -314,39 +383,55 @@ fn bootstrap_accounts(
}

fn get_stake_account(rpc_url: String) -> Result<(Vec<(Pubkey, Account)>, String), ClientError> {
log::info!("TaskToExec RpcGetStakeAccount start");
let rpc_client = RpcClient::new_with_timeout_and_commitment(
rpc_url.clone(),
Duration::from_secs(600),
CommitmentConfig::finalized(),
);
let res_stake = rpc_client.get_program_accounts(&solana_sdk::stake::program::id());
log::info!("TaskToExec RpcGetStakeAccount END");
res_stake.map(|stake| (stake, rpc_url))
rpc_client
.get_program_accounts(&solana_sdk::stake::program::id())
.map(|stake| (stake, rpc_url))
}

fn get_vote_account(rpc_url: String) -> Result<(Vec<(Pubkey, Account)>, String), ClientError> {
log::info!("TaskToExec RpcGetVoteAccount start");
let rpc_client = RpcClient::new_with_timeout_and_commitment(
rpc_url.clone(),
Duration::from_secs(600),
CommitmentConfig::finalized(),
);
let res_vote = rpc_client.get_program_accounts(&solana_sdk::vote::program::id());
log::info!("TaskToExec RpcGetVoteAccount END");
res_vote.map(|votes| (votes, rpc_url))
rpc_client
.get_program_accounts(&solana_sdk::vote::program::id())
.map(|votes| (votes, rpc_url))
}

pub fn get_stakehistory_account(rpc_url: String) -> Result<Account, ClientError> {
log::info!("TaskToExec RpcGetStakeHistory start");
let rpc_client = RpcClient::new_with_timeout_and_commitment(
rpc_url,
Duration::from_secs(600),
CommitmentConfig::finalized(),
);
let res_stake = rpc_client.get_account(&solana_sdk::sysvar::stake_history::id());
log::info!("TaskToExec RpcGetStakeHistory END",);
res_stake
rpc_client.get_account(&solana_sdk::sysvar::stake_history::id())
}

fn get_rpc_epoch_info(rpc_url: String) -> Result<EpochInfo, ClientError> {
let rpc_client = RpcClient::new_with_timeout_and_commitment(
rpc_url.clone(),
Duration::from_secs(600),
CommitmentConfig::finalized(),
);
rpc_client.get_epoch_info()
}

fn get_rpc_leader_schedule(
rpc_url: String,
slot: Option<Slot>,
) -> Result<Option<HashMap<String, Vec<usize>>>, ClientError> {
let rpc_client = RpcClient::new_with_timeout_and_commitment(
rpc_url.clone(),
Duration::from_secs(600),
CommitmentConfig::finalized(),
);
rpc_client.get_leader_schedule(slot)
}

// pub struct BootstrapScheduleResult {
Expand All @@ -355,13 +440,26 @@ pub fn get_stakehistory_account(rpc_url: String) -> Result<Account, ClientError>
// }

pub fn bootstrap_current_leader_schedule(
current_epoch_of_loading: u64,
slots_in_epoch: u64,
) -> anyhow::Result<(CalculatedSchedule, EpochVoteStakes, EpochVoteStakes)> {
let (current_epoch, current_epoch_stakes) =
crate::utils::read_schedule_vote_stakes(CURRENT_EPOCH_VOTE_STAKES_FILE)?;
let (next_epoch, next_epoch_stakes) =
crate::utils::read_schedule_vote_stakes(NEXT_EPOCH_VOTE_STAKES_FILE)?;

//verify that the current loaded epoch correspond to the current epoch slot
if current_epoch_of_loading != current_epoch {
return Err(ClientError {
request: None,
kind: ClientErrorKind::Custom(
"Current epoch bootstrap file doesn't correspond to the validator current epoch."
.to_string(),
),
}
.into());
}

//calcualte leader schedule for all vote stakes.
let current_schedule = crate::leader_schedule::calculate_leader_schedule(
&current_epoch_stakes,
Expand Down
4 changes: 2 additions & 2 deletions stake_vote/src/epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl ScheduleEpochData {
&mut self,
history: StakeHistory,
) -> Option<LeaderScheduleEvent> {
log::info!("set_epoch_stake_history");
log::debug!("set_epoch_stake_history");
self.new_stake_history = Some(history);
self.verify_epoch_change()
}
Expand All @@ -68,7 +68,7 @@ impl ScheduleEpochData {
//to avoid to delay too much the schedule, start the calculus at the end of the epoch.
//the first epoch slot arrive very late cause of the stake account notification from the validator.
if self.current_confirmed_slot >= self.last_slot_in_epoch {
log::info!(
log::debug!(
"manage_change_epoch at slot:{} last_slot_in_epoch:{}",
self.current_confirmed_slot,
self.last_slot_in_epoch
Expand Down
Loading

0 comments on commit b49e51d

Please sign in to comment.