From 5a40ac05bebd033262d72cfc55818fd36d60adcb Mon Sep 17 00:00:00 2001 From: Evan Batsell Date: Mon, 15 Apr 2024 16:19:35 -0400 Subject: [PATCH] Reduce redundant keeper retries, emit keeper balance --- keepers/validator-keeper/src/gossip.rs | 41 ++++++- keepers/validator-keeper/src/lib.rs | 28 ++++- keepers/validator-keeper/src/main.rs | 12 -- .../validator-keeper/src/mev_commission.rs | 111 +++++++++--------- keepers/validator-keeper/src/stake.rs | 48 +++++++- keepers/validator-keeper/src/vote_account.rs | 40 ++++++- 6 files changed, 203 insertions(+), 77 deletions(-) diff --git a/keepers/validator-keeper/src/gossip.rs b/keepers/validator-keeper/src/gossip.rs index c4270704..a5b0e8a0 100644 --- a/keepers/validator-keeper/src/gossip.rs +++ b/keepers/validator-keeper/src/gossip.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashMap, net::{IpAddr, SocketAddr}, str::FromStr, sync::{ @@ -31,7 +32,7 @@ use tokio::time::sleep; use validator_history::{ self, constants::{MAX_ALLOC_BYTES, MIN_VOTE_EPOCHS}, - Config, ValidatorHistory, + Config, ValidatorHistory, ValidatorHistoryEntry, }; use crate::{get_validator_history_accounts_with_retry, start_spy_server}; @@ -293,6 +294,12 @@ pub async fn upload_gossip_values( let validator_history_accounts = get_validator_history_accounts_with_retry(&client, *program_id).await?; + let validator_history_map = HashMap::from_iter( + validator_history_accounts + .iter() + .map(|vh| (vh.vote_account, vh)), + ); + // Wait for all active validators to be received sleep(Duration::from_secs(30)).await; @@ -321,10 +328,19 @@ pub async fn upload_gossip_values( exit.store(true, Ordering::Relaxed); + let epoch = client.get_epoch_info().await?.epoch; + let addresses = gossip_entries .iter() - .map(|a| a.address()) + .filter_map(|a| { + if gossip_data_uploaded(&validator_history_map, a.address(), epoch) { + None + } else { + Some(a.address()) + } + }) .collect::>(); + let existing_accounts_response = get_multiple_accounts_batched(&addresses, &client).await?; let create_transactions = existing_accounts_response @@ -350,6 +366,27 @@ pub async fn upload_gossip_values( }) } +fn gossip_data_uploaded( + validator_history_map: &HashMap, + vote_account: Pubkey, + epoch: u64, +) -> bool { + let validator_history = validator_history_map.get(&vote_account); + if validator_history.is_none() { + return false; + } + + let validator_history = validator_history.unwrap(); + + if let Some(latest_entry) = validator_history.history.last() { + return latest_entry.epoch == epoch as u16 + && latest_entry.ip != ValidatorHistoryEntry::default().ip + && latest_entry.version.major != ValidatorHistoryEntry::default().version.major + && latest_entry.client_type != ValidatorHistoryEntry::default().client_type; + } + false +} + // CODE BELOW SLIGHTLY MODIFIED FROM // solana_sdk/src/ed25519_instruction.rs diff --git a/keepers/validator-keeper/src/lib.rs b/keepers/validator-keeper/src/lib.rs index 1c2d8c43..e156a05a 100644 --- a/keepers/validator-keeper/src/lib.rs +++ b/keepers/validator-keeper/src/lib.rs @@ -42,7 +42,7 @@ pub mod vote_account; pub type Error = Box; -pub const PRIORITY_FEE: u64 = 500_000; +pub const PRIORITY_FEE: u64 = 100_000; #[derive(ThisError, Debug)] pub enum KeeperError { @@ -134,6 +134,7 @@ pub fn emit_cluster_history_datapoint(stats: SubmitStats, runs_for_epoch: i64) { pub async fn emit_validator_history_metrics( client: &Arc, program_id: Pubkey, + keeper_address: Pubkey, ) -> Result<(), Box> { let epoch = client.get_epoch_info().await?; @@ -199,6 +200,8 @@ pub async fn emit_validator_history_metrics( .await? .len(); + let keeper_balance = get_balance_with_retry(client, keeper_address).await?; + datapoint_info!( "validator-history-stats", ("num_validator_histories", num_validators, i64), @@ -218,6 +221,11 @@ pub async fn emit_validator_history_metrics( ), ); + datapoint_info!( + "stakenet-keeper-stats", + ("balance_lamports", keeper_balance, i64), + ); + Ok(()) } @@ -262,6 +270,24 @@ pub async fn get_validator_history_accounts_with_retry( get_validator_history_accounts(client, program_id).await } +pub async fn get_balance_with_retry( + client: &RpcClient, + account: Pubkey, +) -> Result { + let mut retries = 5; + loop { + match client.get_balance(&account).await { + Ok(balance) => return Ok(balance), + Err(e) => { + if retries == 0 { + return Err(e); + } + retries -= 1; + } + } + } +} + pub fn start_spy_server( cluster_entrypoint: SocketAddr, gossip_port: u16, diff --git a/keepers/validator-keeper/src/main.rs b/keepers/validator-keeper/src/main.rs index 8a1e6195..579d23b9 100644 --- a/keepers/validator-keeper/src/main.rs +++ b/keepers/validator-keeper/src/main.rs @@ -86,10 +86,6 @@ async fn mev_commission_loop( tip_distribution_program_id: Pubkey, interval: u64, ) { - let mut prev_epoch = 0; - // {TipDistributionAccount : VoteAccount} - let mut validators_updated: HashMap = HashMap::new(); - loop { // Continuously runs throughout an epoch, polling for new tip distribution accounts // and submitting update txs when new accounts are detected @@ -98,8 +94,6 @@ async fn mev_commission_loop( keypair.clone(), &commission_history_program_id, &tip_distribution_program_id, - &mut validators_updated, - &mut prev_epoch, ) .await { @@ -141,10 +135,6 @@ async fn mev_earned_loop( tip_distribution_program_id: Pubkey, interval: u64, ) { - let mut curr_epoch = 0; - // {TipDistributionAccount : VoteAccount} - let mut validators_updated: HashMap = HashMap::new(); - loop { // Continuously runs throughout an epoch, polling for tip distribution accounts from the prev epoch with uploaded merkle roots // and submitting update_mev_earned (technically update_mev_comission) txs when the uploaded merkle roots are detected @@ -153,8 +143,6 @@ async fn mev_earned_loop( &keypair, &commission_history_program_id, &tip_distribution_program_id, - &mut validators_updated, - &mut curr_epoch, ) .await { diff --git a/keepers/validator-keeper/src/mev_commission.rs b/keepers/validator-keeper/src/mev_commission.rs index 83718084..b48f6e31 100644 --- a/keepers/validator-keeper/src/mev_commission.rs +++ b/keepers/validator-keeper/src/mev_commission.rs @@ -14,9 +14,10 @@ use solana_client::rpc_response::RpcVoteAccountInfo; use solana_program::{instruction::Instruction, pubkey::Pubkey}; use solana_sdk::{signature::Keypair, signer::Signer}; use validator_history::constants::MIN_VOTE_EPOCHS; +use validator_history::ValidatorHistoryEntry; use validator_history::{constants::MAX_ALLOC_BYTES, Config, ValidatorHistory}; -use crate::{KeeperError, PRIORITY_FEE}; +use crate::{get_validator_history_accounts_with_retry, KeeperError, PRIORITY_FEE}; #[derive(Clone)] pub struct ValidatorMevCommissionEntry { @@ -127,16 +128,15 @@ pub async fn update_mev_commission( keypair: Arc, validator_history_program_id: &Pubkey, tip_distribution_program_id: &Pubkey, - validators_updated: &mut HashMap, - prev_epoch: &mut u64, ) -> Result { let epoch = client.get_epoch_info().await?.epoch; - if epoch > *prev_epoch { - validators_updated.clear(); - } - *prev_epoch = epoch; let vote_accounts = get_vote_accounts_with_retry(&client, MIN_VOTE_EPOCHS, None).await?; + let validator_histories = + get_validator_history_accounts_with_retry(&client, *validator_history_program_id).await?; + + let validator_history_map = + HashMap::from_iter(validator_histories.iter().map(|vh| (vh.vote_account, vh))); let entries = vote_accounts .iter() @@ -155,12 +155,12 @@ pub async fn update_mev_commission( let entries_to_update = existing_entries .into_iter() - .filter(|entry| !validators_updated.contains_key(&entry.tip_distribution_account)) + .filter(|entry| !mev_commission_uploaded(&validator_history_map, entry.address(), epoch)) .collect::>(); let (create_transactions, update_instructions) = build_create_and_update_instructions(&client, &entries_to_update).await?; - match submit_create_and_update( + submit_create_and_update( &client, create_transactions, update_instructions, @@ -168,22 +168,7 @@ pub async fn update_mev_commission( PRIORITY_FEE, ) .await - { - Ok(submit_result) => { - if submit_result.creates.errors == 0 && submit_result.updates.errors == 0 { - for ValidatorMevCommissionEntry { - vote_account, - tip_distribution_account, - .. - } in entries_to_update - { - validators_updated.insert(tip_distribution_account, vote_account); - } - } - Ok(submit_result) - } - Err(e) => Err(e.into()), - } + .map_err(|e| e.into()) } pub async fn update_mev_earned( @@ -191,19 +176,15 @@ pub async fn update_mev_earned( keypair: &Arc, validator_history_program_id: &Pubkey, tip_distribution_program_id: &Pubkey, - validators_updated: &mut HashMap, - curr_epoch: &mut u64, ) -> Result { let epoch = client.get_epoch_info().await?.epoch; - if epoch > *curr_epoch { - // new epoch started, we assume here that all the validators with TDAs from curr_epoch-1 have had their merkle roots uploaded/processed by this point - // clear our map of TDAs derived from curr_epoch -1 and start fresh for epoch-1 (or curr_epoch) - validators_updated.clear(); - } - *curr_epoch = epoch; - let vote_accounts = get_vote_accounts_with_retry(client, MIN_VOTE_EPOCHS, None).await?; + let validator_histories = + get_validator_history_accounts_with_retry(&client, *validator_history_program_id).await?; + + let validator_history_map = + HashMap::from_iter(validator_histories.iter().map(|vh| (vh.vote_account, vh))); let entries = vote_accounts .iter() @@ -223,35 +204,21 @@ pub async fn update_mev_earned( let entries_to_update = uploaded_merkleroot_entries .into_iter() - .filter(|entry| !validators_updated.contains_key(&entry.tip_distribution_account)) + .filter(|entry| !mev_earned_uploaded(&validator_history_map, entry.address(), epoch - 1)) .collect::>(); + let (create_transactions, update_instructions) = build_create_and_update_instructions(client, &entries_to_update).await?; - let submit_result = submit_create_and_update( + submit_create_and_update( client, create_transactions, update_instructions, keypair, PRIORITY_FEE, ) - .await; - match submit_result { - Ok(submit_result) => { - if submit_result.creates.errors == 0 && submit_result.updates.errors == 0 { - for ValidatorMevCommissionEntry { - vote_account, - tip_distribution_account, - .. - } in entries_to_update - { - validators_updated.insert(tip_distribution_account, vote_account); - } - } - Ok(submit_result) - } - Err(e) => Err(e.into()), - } + .await + .map_err(|e| e.into()) } async fn get_existing_entries( @@ -309,3 +276,41 @@ async fn get_entries_with_uploaded_merkleroot( // Fetch tip distribution accounts with uploaded merkle roots for this epoch Ok(result) } + +fn mev_commission_uploaded( + validator_history_map: &HashMap, + vote_account: Pubkey, + epoch: u64, +) -> bool { + let validator_history = validator_history_map.get(&vote_account); + if validator_history.is_none() { + return false; + } + + let validator_history = validator_history.unwrap(); + + if let Some(latest_entry) = validator_history.history.last() { + return latest_entry.epoch == epoch as u16 + && latest_entry.mev_commission != ValidatorHistoryEntry::default().mev_commission; + } + false +} + +fn mev_earned_uploaded( + validator_history_map: &HashMap, + vote_account: Pubkey, + epoch: u64, +) -> bool { + let validator_history = validator_history_map.get(&vote_account); + if validator_history.is_none() { + return false; + } + + let validator_history = validator_history.unwrap(); + + if let Some(latest_entry) = validator_history.history.last() { + return latest_entry.epoch == epoch as u16 + && latest_entry.mev_earned != ValidatorHistoryEntry::default().mev_earned; + } + false +} diff --git a/keepers/validator-keeper/src/stake.rs b/keepers/validator-keeper/src/stake.rs index 5296b642..e1e4c3a3 100644 --- a/keepers/validator-keeper/src/stake.rs +++ b/keepers/validator-keeper/src/stake.rs @@ -20,9 +20,10 @@ use solana_sdk::{ use validator_history::{ constants::{MAX_ALLOC_BYTES, MIN_VOTE_EPOCHS}, state::{Config, ValidatorHistory}, + ValidatorHistoryEntry, }; -use crate::{KeeperError, PRIORITY_FEE}; +use crate::{get_validator_history_accounts_with_retry, KeeperError, PRIORITY_FEE}; pub struct StakeHistoryEntry { pub stake: u64, @@ -187,6 +188,12 @@ pub async fn update_stake_history( .max() .unwrap_or(0); + let validator_histories = + get_validator_history_accounts_with_retry(&client, *program_id).await?; + + let validator_history_map: HashMap = + HashMap::from_iter(validator_histories.iter().map(|vh| (vh.vote_account, vh))); + let (stake_rank_map, superminority_threshold) = get_stake_rank_map_and_superminority_count(&vote_accounts); @@ -201,17 +208,22 @@ pub async fn update_stake_history( let stake_history_entries = vote_accounts .iter() - .map(|va| { + .filter_map(|va| { let rank = stake_rank_map[&va.vote_pubkey.clone()]; let is_superminority = rank <= superminority_threshold; - StakeHistoryEntry::new( + + if stake_entry_uploaded(&validator_history_map, va, epoch) { + return None; + } + + Some(StakeHistoryEntry::new( va, program_id, &keypair.pubkey(), epoch, rank, is_superminority, - ) + )) }) .collect::>(); @@ -326,6 +338,34 @@ pub async fn _recompute_superminority_and_rank( Ok(()) } +fn stake_entry_uploaded( + validator_history_map: &HashMap, + vote_account: &RpcVoteAccountInfo, + epoch: u64, +) -> bool { + let vote_account = Pubkey::from_str(&vote_account.vote_pubkey) + .map_err(|e| { + error!("Invalid vote account pubkey"); + e + }) + .expect("Invalid vote account pubkey"); + let validator_history = validator_history_map.get(&vote_account); + if validator_history.is_none() { + return false; + } + + let validator_history = validator_history.unwrap(); + + if let Some(latest_entry) = validator_history.history.last() { + return latest_entry.epoch == epoch as u16 + && latest_entry.is_superminority != ValidatorHistoryEntry::default().is_superminority + && latest_entry.rank != ValidatorHistoryEntry::default().rank + && latest_entry.activated_stake_lamports + != ValidatorHistoryEntry::default().activated_stake_lamports; + } + false +} + pub fn emit_stake_history_datapoint(stats: CreateUpdateStats, runs_for_epoch: i64) { datapoint_info!( "stake-history-stats", diff --git a/keepers/validator-keeper/src/vote_account.rs b/keepers/validator-keeper/src/vote_account.rs index 5d06de41..ba61bde0 100644 --- a/keepers/validator-keeper/src/vote_account.rs +++ b/keepers/validator-keeper/src/vote_account.rs @@ -1,4 +1,4 @@ -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::str::FromStr; use std::sync::Arc; @@ -111,9 +111,12 @@ pub async fn update_vote_accounts( get_validator_history_accounts_with_retry(&rpc_client, validator_history_program_id) .await?; - let vote_account_pubkeys = validator_histories - .iter() - .map(|vh| vh.vote_account) + let validator_history_map = + HashMap::from_iter(validator_histories.iter().map(|vh| (vh.vote_account, vh))); + + let vote_account_pubkeys = validator_history_map + .clone() + .into_keys() .collect::>(); let vote_accounts = get_multiple_accounts_batched(&vote_account_pubkeys, &rpc_client).await?; @@ -146,8 +149,13 @@ pub async fn update_vote_accounts( .chain(validator_histories.iter().map(|vh| vh.vote_account)) .collect::>(); + let slot = rpc_client.get_epoch_info().await?.absolute_slot; // Remove closed vote accounts from all vote accounts - all_vote_accounts.retain(|va| !closed_vote_accounts.contains(va)); + // Remove vote accounts for which this instruction has been called within 50,000 slots + all_vote_accounts.retain(|va| { + !closed_vote_accounts.contains(va) + && !vote_account_uploaded_recently(&validator_history_map, va, slot) + }); let entries = all_vote_accounts .iter() @@ -168,3 +176,25 @@ pub async fn update_vote_accounts( submit_result.map_err(|e| e.into()) } + +fn vote_account_uploaded_recently( + validator_history_map: &HashMap, + vote_account: &Pubkey, + slot: u64, +) -> bool { + let validator_history = validator_history_map.get(vote_account); + if validator_history.is_none() { + return false; + } + let validator_history = validator_history.unwrap(); + + if let Some(last_updated_slot) = validator_history + .history + .vote_account_last_update_slot_latest() + { + if last_updated_slot > slot - 50000 { + return true; + } + } + false +}