diff --git a/.gitignore b/.gitignore index b631d85e..dbe4b7e2 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,6 @@ test-ledger */test-ledger .idea/ **/targets +**/credentials +**/config +**/*.env \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..fbcb5ba8 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,5 @@ +{ + "cSpell.words": [ + "datapoint" + ] +} \ No newline at end of file diff --git a/keepers/validator-keeper/README.md b/keepers/validator-keeper/README.md new file mode 100644 index 00000000..879fe431 --- /dev/null +++ b/keepers/validator-keeper/README.md @@ -0,0 +1,53 @@ +## Keeper State +The `KeeperState` keeps track of: +- current epoch data +- running tally of all operations successes and failures for the given epoch +- all accounts fetched from the RPC that are needed downstream + +Note: All maps are key'd by the `vote_account` + +## Program + +### Initialize +Gather all needed arguments, and initialize the global `KeeperState`. + +### Loop +The forever loop consists of three parts: **Fetch**, **Fire** and **Emit**. There is only ever one **Fetch** and **Emit** section, and there can be several **Fire** sections. + +The **Fire** sections can run on independent cadences - say we want the Validator History functions to run every 300sec and we want to emit metrics every 60sec. + +The **Fetch** section is run _before_ and **Fire** section. +The **Emit** section is *one tick* after any **Fire** section. + +#### Fetch +The **Fetch** section is in charge of three operations: +- Keeping track of the current epoch and resetting the runs and error counts for each operation +- Creating any missing accounts needed for the downstream **Fires** to run +- Fetching and updating all of the needed accounts needed downstream + +This is accomplished is by running three functions within the **Fetch** section +- `pre_create_update` - Updates epoch, and fetches all needed accounts that are not dependant on any missing accounts. +- `create_missing_accounts` - Creates the missing accounts, which can be determined by the accounts fetched in the previous step +- `post_create_update` - Fetches any last accounts that needed the missing accounts + +Since this is run before every **FIRE** section, some accounts will be fetched that are not needed. This may seem wasteful but the simplicity of having a synchronized global is worth the cost. + +Notes: +- The **Fetch** section is the only section that will mutate the `KeeperState`. +- If anything in the **Fetch** section fails, no **Fires** will run + +#### Fire +There are several **Fire** sections running at their own cadence. Before any **Fire** section is run, the **Fetch** section will be called. + +Each **Fire** is a call to `operations::operation_name::fire` which will fire off the operation and return the new count of runs and errors for that operation to be saved in the `KeeperState` + +Notes: +- Each **Fire** is self contained, one should not be dependant on another. +- No **Fire* will fetch any accounts, if there are needs for them, they should be added to the `KeeperState` + + +#### Emit +This section emits the state of the Keeper one tick after any operation has been called. This is because we want to emit a failure of any **Fetch** operation, which on failure advances the tick. + + + diff --git a/keepers/validator-keeper/src/cluster_info.rs b/keepers/validator-keeper/src/cluster_info.rs deleted file mode 100644 index b0a2b183..00000000 --- a/keepers/validator-keeper/src/cluster_info.rs +++ /dev/null @@ -1,48 +0,0 @@ -use std::sync::Arc; - -use anchor_lang::{InstructionData, ToAccountMetas}; -use keeper_core::{submit_transactions, SubmitStats, TransactionExecutionError}; -use solana_client::nonblocking::rpc_client::RpcClient; -use solana_sdk::{ - compute_budget, instruction::Instruction, pubkey::Pubkey, signature::Keypair, signer::Signer, -}; -use validator_history::state::ClusterHistory; - -use crate::PRIORITY_FEE; - -pub async fn update_cluster_info( - client: Arc, - keypair: Arc, - program_id: &Pubkey, -) -> Result { - let (cluster_history_account, _) = - Pubkey::find_program_address(&[ClusterHistory::SEED], program_id); - - let priority_fee_ix = - compute_budget::ComputeBudgetInstruction::set_compute_unit_price(PRIORITY_FEE); - let heap_request_ix = compute_budget::ComputeBudgetInstruction::request_heap_frame(256 * 1024); - let compute_budget_ix = - compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(1_400_000); - let update_instruction = Instruction { - program_id: *program_id, - accounts: validator_history::accounts::CopyClusterInfo { - cluster_history_account, - slot_history: solana_program::sysvar::slot_history::id(), - signer: keypair.pubkey(), - } - .to_account_metas(None), - data: validator_history::instruction::CopyClusterInfo {}.data(), - }; - - submit_transactions( - &client, - vec![vec![ - priority_fee_ix, - heap_request_ix, - compute_budget_ix, - update_instruction, - ]], - &keypair, - ) - .await -} diff --git a/keepers/validator-keeper/src/entries/copy_vote_account_entry.rs b/keepers/validator-keeper/src/entries/copy_vote_account_entry.rs new file mode 100644 index 00000000..83f9deba --- /dev/null +++ b/keepers/validator-keeper/src/entries/copy_vote_account_entry.rs @@ -0,0 +1,52 @@ +use anchor_lang::{InstructionData, ToAccountMetas}; +use keeper_core::{Address, UpdateInstruction}; +use solana_sdk::instruction::Instruction; +use solana_sdk::pubkey::Pubkey; +use validator_history::Config; +use validator_history::ValidatorHistory; + +pub struct CopyVoteAccountEntry { + pub vote_account: Pubkey, + pub validator_history_account: Pubkey, + pub config_address: Pubkey, + pub program_id: Pubkey, + pub signer: Pubkey, +} + +impl CopyVoteAccountEntry { + pub fn new(vote_account: &Pubkey, program_id: &Pubkey, signer: &Pubkey) -> Self { + let (validator_history_account, _) = Pubkey::find_program_address( + &[ValidatorHistory::SEED, &vote_account.to_bytes()], + program_id, + ); + let (config_address, _) = Pubkey::find_program_address(&[Config::SEED], program_id); + Self { + vote_account: *vote_account, + validator_history_account, + config_address, + program_id: *program_id, + signer: *signer, + } + } +} + +impl Address for CopyVoteAccountEntry { + fn address(&self) -> Pubkey { + self.validator_history_account + } +} + +impl UpdateInstruction for CopyVoteAccountEntry { + fn update_instruction(&self) -> Instruction { + Instruction { + program_id: self.program_id, + accounts: validator_history::accounts::CopyVoteAccount { + validator_history_account: self.validator_history_account, + vote_account: self.vote_account, + signer: self.signer, + } + .to_account_metas(None), + data: validator_history::instruction::CopyVoteAccount {}.data(), + } + } +} diff --git a/keepers/validator-keeper/src/entries/gossip_entry.rs b/keepers/validator-keeper/src/entries/gossip_entry.rs new file mode 100644 index 00000000..58ffcfab --- /dev/null +++ b/keepers/validator-keeper/src/entries/gossip_entry.rs @@ -0,0 +1,159 @@ +use anchor_lang::InstructionData; +use anchor_lang::ToAccountMetas; +use bytemuck::{bytes_of, Pod, Zeroable}; +use keeper_core::Address; +use solana_sdk::{ + compute_budget::ComputeBudgetInstruction, instruction::Instruction, pubkey::Pubkey, + signature::Signature, +}; + +use crate::{derive_validator_history_address, derive_validator_history_config_address}; + +#[derive(Clone, Debug)] +pub struct GossipEntry { + pub vote_account: Pubkey, + pub validator_history_account: Pubkey, + pub config: Pubkey, + pub signature: Signature, + pub message: Vec, + pub program_id: Pubkey, + pub identity: Pubkey, + pub signer: Pubkey, +} + +impl GossipEntry { + pub fn new( + vote_account: &Pubkey, + signature: &Signature, + message: &[u8], + program_id: &Pubkey, + identity: &Pubkey, + signer: &Pubkey, + ) -> Self { + let validator_history_account = derive_validator_history_address(vote_account, program_id); + let config = derive_validator_history_config_address(program_id); + Self { + vote_account: *vote_account, + validator_history_account, + config, + signature: *signature, + message: message.to_vec(), + program_id: *program_id, + identity: *identity, + signer: *signer, + } + } +} + +impl Address for GossipEntry { + fn address(&self) -> Pubkey { + self.validator_history_account + } +} + +impl GossipEntry { + pub fn build_update_tx(&self, priority_fee: u64) -> Vec { + let mut ixs = vec![ + ComputeBudgetInstruction::set_compute_unit_limit(100_000), + ComputeBudgetInstruction::set_compute_unit_price(priority_fee), + build_verify_signature_ix( + self.signature.as_ref(), + self.identity.to_bytes(), + &self.message, + ), + ]; + + ixs.push(Instruction { + program_id: self.program_id, + accounts: validator_history::accounts::CopyGossipContactInfo { + validator_history_account: self.validator_history_account, + vote_account: self.vote_account, + instructions: solana_program::sysvar::instructions::id(), + config: self.config, + oracle_authority: self.signer, + } + .to_account_metas(None), + data: validator_history::instruction::CopyGossipContactInfo {}.data(), + }); + ixs + } +} + +// CODE BELOW SLIGHTLY MODIFIED FROM +// solana_sdk/src/ed25519_instruction.rs + +pub const PUBKEY_SERIALIZED_SIZE: usize = 32; +pub const SIGNATURE_SERIALIZED_SIZE: usize = 64; +pub const SIGNATURE_OFFSETS_SERIALIZED_SIZE: usize = 14; +// bytemuck requires structures to be aligned +pub const SIGNATURE_OFFSETS_START: usize = 2; +pub const DATA_START: usize = SIGNATURE_OFFSETS_SERIALIZED_SIZE + SIGNATURE_OFFSETS_START; + +#[derive(Default, Debug, Copy, Clone, Zeroable, Pod, Eq, PartialEq)] +#[repr(C)] +pub struct Ed25519SignatureOffsets { + signature_offset: u16, // offset to ed25519 signature of 64 bytes + signature_instruction_index: u16, // instruction index to find signature + public_key_offset: u16, // offset to public key of 32 bytes + public_key_instruction_index: u16, // instruction index to find public key + message_data_offset: u16, // offset to start of message data + message_data_size: u16, // size of message data + message_instruction_index: u16, // index of instruction data to get message data +} + +// This code is modified from solana_sdk/src/ed25519_instruction.rs +// due to that function requiring a keypair, and generating the signature within the function. +// In our case we don't have the keypair, we just have the signature and pubkey. +pub fn build_verify_signature_ix( + signature: &[u8], + pubkey: [u8; 32], + message: &[u8], +) -> Instruction { + assert_eq!(pubkey.len(), PUBKEY_SERIALIZED_SIZE); + assert_eq!(signature.len(), SIGNATURE_SERIALIZED_SIZE); + + let mut instruction_data = Vec::with_capacity( + DATA_START + .saturating_add(SIGNATURE_SERIALIZED_SIZE) + .saturating_add(PUBKEY_SERIALIZED_SIZE) + .saturating_add(message.len()), + ); + + let num_signatures: u8 = 1; + let public_key_offset = DATA_START; + let signature_offset = public_key_offset.saturating_add(PUBKEY_SERIALIZED_SIZE); + let message_data_offset = signature_offset.saturating_add(SIGNATURE_SERIALIZED_SIZE); + + // add padding byte so that offset structure is aligned + instruction_data.extend_from_slice(bytes_of(&[num_signatures, 0])); + + let offsets = Ed25519SignatureOffsets { + signature_offset: signature_offset as u16, + signature_instruction_index: u16::MAX, + public_key_offset: public_key_offset as u16, + public_key_instruction_index: u16::MAX, + message_data_offset: message_data_offset as u16, + message_data_size: message.len() as u16, + message_instruction_index: u16::MAX, + }; + + instruction_data.extend_from_slice(bytes_of(&offsets)); + + debug_assert_eq!(instruction_data.len(), public_key_offset); + + instruction_data.extend_from_slice(&pubkey); + + debug_assert_eq!(instruction_data.len(), signature_offset); + + instruction_data.extend_from_slice(signature); + + debug_assert_eq!(instruction_data.len(), message_data_offset); + + instruction_data.extend_from_slice(message); + + Instruction { + program_id: solana_program::ed25519_program::id(), + accounts: vec![], + data: instruction_data, + } +} diff --git a/keepers/validator-keeper/src/entries/mev_commission_entry.rs b/keepers/validator-keeper/src/entries/mev_commission_entry.rs new file mode 100644 index 00000000..ea885834 --- /dev/null +++ b/keepers/validator-keeper/src/entries/mev_commission_entry.rs @@ -0,0 +1,69 @@ +use anchor_lang::{InstructionData, ToAccountMetas}; +use jito_tip_distribution::sdk::derive_tip_distribution_account_address; +use keeper_core::{Address, UpdateInstruction}; +use solana_program::{instruction::Instruction, pubkey::Pubkey}; + +use crate::{derive_validator_history_address, derive_validator_history_config_address}; + +#[derive(Clone)] +pub struct ValidatorMevCommissionEntry { + pub vote_account: Pubkey, + pub tip_distribution_account: Pubkey, + pub validator_history_account: Pubkey, + pub config: Pubkey, + pub program_id: Pubkey, + pub signer: Pubkey, + pub epoch: u64, +} + +impl ValidatorMevCommissionEntry { + pub fn new( + vote_account: &Pubkey, + epoch: u64, + program_id: &Pubkey, + tip_distribution_program_id: &Pubkey, + signer: &Pubkey, + ) -> Self { + let validator_history_account = derive_validator_history_address(vote_account, program_id); + let (tip_distribution_account, _) = derive_tip_distribution_account_address( + tip_distribution_program_id, + vote_account, + epoch, + ); + let config = derive_validator_history_config_address(program_id); + + Self { + vote_account: *vote_account, + tip_distribution_account, + validator_history_account, + config, + program_id: *program_id, + signer: *signer, + epoch, + } + } +} + +impl Address for ValidatorMevCommissionEntry { + fn address(&self) -> Pubkey { + self.validator_history_account + } +} + +impl UpdateInstruction for ValidatorMevCommissionEntry { + fn update_instruction(&self) -> Instruction { + Instruction { + program_id: self.program_id, + accounts: validator_history::accounts::CopyTipDistributionAccount { + validator_history_account: self.validator_history_account, + vote_account: self.vote_account, + tip_distribution_account: self.tip_distribution_account, + config: self.config, + signer: self.signer, + } + .to_account_metas(None), + data: validator_history::instruction::CopyTipDistributionAccount { epoch: self.epoch } + .data(), + } + } +} diff --git a/keepers/validator-keeper/src/entries/mod.rs b/keepers/validator-keeper/src/entries/mod.rs new file mode 100644 index 00000000..05723c57 --- /dev/null +++ b/keepers/validator-keeper/src/entries/mod.rs @@ -0,0 +1,4 @@ +pub mod copy_vote_account_entry; +pub mod gossip_entry; +pub mod mev_commission_entry; +pub mod stake_history_entry; diff --git a/keepers/validator-keeper/src/entries/stake_history_entry.rs b/keepers/validator-keeper/src/entries/stake_history_entry.rs new file mode 100644 index 00000000..602fb3d0 --- /dev/null +++ b/keepers/validator-keeper/src/entries/stake_history_entry.rs @@ -0,0 +1,79 @@ +use std::str::FromStr; + +use anchor_lang::InstructionData; +use anchor_lang::ToAccountMetas; +use keeper_core::Address; +use keeper_core::UpdateInstruction; +use solana_client::rpc_response::RpcVoteAccountInfo; +use solana_sdk::{instruction::Instruction, pubkey::Pubkey}; + +use crate::derive_validator_history_address; +use crate::derive_validator_history_config_address; + +pub struct StakeHistoryEntry { + pub stake: u64, + pub rank: u32, + pub is_superminority: bool, + pub vote_account: Pubkey, + pub address: Pubkey, + pub config: Pubkey, + pub signer: Pubkey, + pub program_id: Pubkey, + pub epoch: u64, +} + +impl StakeHistoryEntry { + pub fn new( + vote_account: &RpcVoteAccountInfo, + program_id: &Pubkey, + signer: &Pubkey, + epoch: u64, + rank: u32, + is_superminority: bool, + ) -> StakeHistoryEntry { + let vote_pubkey = + Pubkey::from_str(&vote_account.vote_pubkey).expect("Invalid vote account pubkey"); + let address = derive_validator_history_address(&vote_pubkey, program_id); + let config = derive_validator_history_config_address(program_id); + + StakeHistoryEntry { + stake: vote_account.activated_stake, + rank, + is_superminority, + vote_account: vote_pubkey, + address, + config, + signer: *signer, + program_id: *program_id, + epoch, + } + } +} + +impl Address for StakeHistoryEntry { + fn address(&self) -> Pubkey { + self.address + } +} + +impl UpdateInstruction for StakeHistoryEntry { + fn update_instruction(&self) -> Instruction { + Instruction { + program_id: self.program_id, + accounts: validator_history::accounts::UpdateStakeHistory { + validator_history_account: self.address, + vote_account: self.vote_account, + config: self.config, + oracle_authority: self.signer, + } + .to_account_metas(None), + data: validator_history::instruction::UpdateStakeHistory { + lamports: self.stake, + epoch: self.epoch, + rank: self.rank, + is_superminority: self.is_superminority, + } + .data(), + } + } +} diff --git a/keepers/validator-keeper/src/lib.rs b/keepers/validator-keeper/src/lib.rs index 96e988bc..89ce1adc 100644 --- a/keepers/validator-keeper/src/lib.rs +++ b/keepers/validator-keeper/src/lib.rs @@ -3,11 +3,8 @@ use std::{ sync::{atomic::AtomicBool, Arc}, }; -use anchor_lang::{AccountDeserialize, Discriminator}; -use keeper_core::{ - get_multiple_accounts_batched, get_vote_accounts_with_retry, CreateUpdateStats, - MultipleAccountsError, SubmitStats, TransactionExecutionError, -}; +use anchor_lang::{AccountDeserialize, Discriminator, InstructionData, ToAccountMetas}; +use keeper_core::{MultipleAccountsError, TransactionExecutionError}; use log::error; use solana_account_decoder::UiDataSliceConfig; use solana_client::{ @@ -20,26 +17,20 @@ use solana_gossip::{ cluster_info::ClusterInfo, gossip_service::GossipService, legacy_contact_info::LegacyContactInfo, }; -use solana_metrics::datapoint_info; use solana_net_utils::bind_in_range; use solana_sdk::{ + instruction::Instruction, pubkey::Pubkey, signature::{Keypair, Signer}, - vote::program::id as get_vote_program_id, }; use solana_streamer::socket::SocketAddrSpace; use jito_tip_distribution::state::TipDistributionAccount; use thiserror::Error as ThisError; -use validator_history::{ - constants::MIN_VOTE_EPOCHS, ClusterHistory, ValidatorHistory, ValidatorHistoryEntry, -}; - -pub mod cluster_info; -pub mod gossip; -pub mod mev_commission; -pub mod stake; -pub mod vote_account; +use validator_history::{constants::MAX_ALLOC_BYTES, ClusterHistory, Config, ValidatorHistory}; +pub mod entries; +pub mod operations; +pub mod state; pub type Error = Box; @@ -92,176 +83,64 @@ pub async fn get_tip_distribution_accounts( Ok(res.into_iter().map(|x| x.0).collect::>()) } -pub fn emit_mev_commission_datapoint(stats: CreateUpdateStats) { - datapoint_info!( - "mev-commission-stats", - ("num_creates_success", stats.creates.successes, i64), - ("num_creates_error", stats.creates.errors, i64), - ("num_updates_success", stats.updates.successes, i64), - ("num_updates_error", stats.updates.errors, i64), - ); -} - -pub fn emit_mev_earned_datapoint(stats: CreateUpdateStats) { - datapoint_info!( - "mev-earned-stats", - ("num_creates_success", stats.creates.successes, i64), - ("num_creates_error", stats.creates.errors, i64), - ("num_updates_success", stats.updates.successes, i64), - ("num_updates_error", stats.updates.errors, i64), - ); +pub fn derive_cluster_history_address(program_id: &Pubkey) -> Pubkey { + let (address, _) = Pubkey::find_program_address(&[ClusterHistory::SEED], program_id); + address } -pub fn emit_validator_commission_datapoint(stats: CreateUpdateStats, runs_for_epoch: i64) { - datapoint_info!( - "vote-account-stats", - ("num_creates_success", stats.creates.successes, i64), - ("num_creates_error", stats.creates.errors, i64), - ("num_updates_success", stats.updates.successes, i64), - ("num_updates_error", stats.updates.errors, i64), - ("runs_for_epoch", runs_for_epoch, i64), +pub fn derive_validator_history_address(vote_account: &Pubkey, program_id: &Pubkey) -> Pubkey { + let (address, _) = Pubkey::find_program_address( + &[ValidatorHistory::SEED, &vote_account.to_bytes()], + program_id, ); -} -pub fn emit_cluster_history_datapoint(stats: SubmitStats, runs_for_epoch: i64) { - datapoint_info!( - "cluster-history-stats", - ("num_success", stats.successes, i64), - ("num_errors", stats.errors, i64), - ("runs_for_epoch", runs_for_epoch, i64), - ); + address } -pub async fn emit_validator_history_metrics( - client: &Arc, - program_id: Pubkey, - keeper_address: Pubkey, -) -> Result<(), Box> { - let epoch = client.get_epoch_info().await?; +pub fn derive_validator_history_config_address(program_id: &Pubkey) -> Pubkey { + let (address, _) = Pubkey::find_program_address(&[Config::SEED], program_id); - let validator_histories = get_validator_history_accounts(client, program_id).await?; - - let mut ips = 0; - let mut versions = 0; - let mut types = 0; - let mut mev_comms = 0; - let mut comms = 0; - let mut epoch_credits = 0; - let mut stakes = 0; - let num_validators = validator_histories.len(); - let default = ValidatorHistoryEntry::default(); - - let mut all_history_vote_accounts = Vec::new(); - for validator_history in validator_histories { - if let Some(entry) = validator_history.history.last() { - if entry.epoch as u64 != epoch.epoch { - continue; - } - if entry.ip != default.ip { - ips += 1; - } - if !(entry.version.major == default.version.major - && entry.version.minor == default.version.minor - && entry.version.patch == default.version.patch) - { - versions += 1; - } - if entry.client_type != default.client_type { - types += 1; - } - if entry.mev_commission != default.mev_commission { - mev_comms += 1; - } - if entry.commission != default.commission { - comms += 1; - } - if entry.epoch_credits != default.epoch_credits { - epoch_credits += 1; - } - if entry.activated_stake_lamports != default.activated_stake_lamports { - stakes += 1; - } - } - - all_history_vote_accounts.push(validator_history.vote_account); - } - - let (cluster_history_address, _) = - Pubkey::find_program_address(&[ClusterHistory::SEED], &program_id); - let cluster_history_account = client.get_account(&cluster_history_address).await?; - let cluster_history = - ClusterHistory::try_deserialize(&mut cluster_history_account.data.as_slice())?; + address +} - let mut cluster_history_blocks: i64 = 0; - let cluster_history_entry = cluster_history.history.last(); - if let Some(cluster_history) = cluster_history_entry { - // Looking for previous epoch to be updated - if cluster_history.epoch as u64 == epoch.epoch - 1 { - cluster_history_blocks = 1; +pub fn get_create_validator_history_instructions( + vote_account: &Pubkey, + program_id: &Pubkey, + signer: &Keypair, +) -> Vec { + let validator_history_account = derive_validator_history_address(vote_account, program_id); + let config_account = derive_validator_history_config_address(program_id); + + let mut ixs = vec![Instruction { + program_id: *program_id, + accounts: validator_history::accounts::InitializeValidatorHistoryAccount { + validator_history_account, + vote_account: *vote_account, + system_program: solana_program::system_program::id(), + signer: signer.pubkey(), } - } - - let get_vote_accounts = get_vote_accounts_with_retry(client, MIN_VOTE_EPOCHS, None).await?; - - let get_vote_accounts_count = get_vote_accounts.len() as i64; - - let vote_program_id = get_vote_program_id(); - let live_validator_histories_count = - get_multiple_accounts_batched(&all_history_vote_accounts, client) - .await? - .iter() - .filter(|&account| { - account - .as_ref() - .map_or(false, |acc| acc.owner == vote_program_id) - }) - .count(); - - let get_vote_accounts_voting = get_vote_accounts - .iter() - .filter(|x| { - // Check if the last epoch credit ( most recent ) is the current epoch - x.epoch_credits.last().unwrap().0 == epoch.epoch - }) - .count(); - - let keeper_balance = get_balance_with_retry(client, keeper_address).await?; - - datapoint_info!( - "validator-history-stats", - ("num_validator_histories", num_validators, i64), - ( - "num_live_validator_histories", - live_validator_histories_count, - i64 - ), - ("num_ips", ips, i64), - ("num_versions", versions, i64), - ("num_client_types", types, i64), - ("num_mev_commissions", mev_comms, i64), - ("num_commissions", comms, i64), - ("num_epoch_credits", epoch_credits, i64), - ("num_stakes", stakes, i64), - ("cluster_history_blocks", cluster_history_blocks, i64), - ("slot_index", epoch.slot_index, i64), - ( - "num_get_vote_accounts_responses", - get_vote_accounts_count, - i64 - ), - ( - "num_get_vote_accounts_voting", - get_vote_accounts_voting, - i64 - ), - ); - - datapoint_info!( - "stakenet-keeper-stats", - ("balance_lamports", keeper_balance, i64), - ); + .to_account_metas(None), + data: validator_history::instruction::InitializeValidatorHistoryAccount {}.data(), + }]; + + let num_reallocs = (ValidatorHistory::SIZE - MAX_ALLOC_BYTES) / MAX_ALLOC_BYTES + 1; + ixs.extend(vec![ + Instruction { + program_id: *program_id, + accounts: validator_history::accounts::ReallocValidatorHistoryAccount { + validator_history_account, + vote_account: *vote_account, + config: config_account, + system_program: solana_program::system_program::id(), + signer: signer.pubkey(), + } + .to_account_metas(None), + data: validator_history::instruction::ReallocValidatorHistoryAccount {}.data(), + }; + num_reallocs + ]); - Ok(()) + ixs } pub async fn get_validator_history_accounts( diff --git a/keepers/validator-keeper/src/main.rs b/keepers/validator-keeper/src/main.rs index 0d1ee85b..54193712 100644 --- a/keepers/validator-keeper/src/main.rs +++ b/keepers/validator-keeper/src/main.rs @@ -3,28 +3,23 @@ This program starts several threads to manage the creation of validator history and the updating of the various data feeds within the accounts. It will emits metrics for each data feed, if env var SOLANA_METRICS_CONFIG is set to a valid influx server. */ - -use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; - use clap::{arg, command, Parser}; -use keeper_core::{Cluster, CreateUpdateStats, SubmitStats, TransactionExecutionError}; +use keeper_core::Cluster; use log::*; use solana_client::nonblocking::rpc_client::RpcClient; -use solana_metrics::{datapoint_error, set_host_id}; +use solana_metrics::set_host_id; use solana_sdk::{ pubkey::Pubkey, - signature::{read_keypair_file, Keypair, Signer}, + signature::{read_keypair_file, Keypair}, }; +use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; use tokio::time::sleep; use validator_keeper::{ - cluster_info::update_cluster_info, - emit_cluster_history_datapoint, emit_mev_commission_datapoint, emit_mev_earned_datapoint, - emit_validator_commission_datapoint, emit_validator_history_metrics, - gossip::{emit_gossip_datapoint, upload_gossip_values}, - mev_commission::{update_mev_commission, update_mev_earned}, - stake::{emit_stake_history_datapoint, update_stake_history}, - vote_account::update_vote_accounts, - KeeperError, + operations::{self, keeper_operations::KeeperOperations}, + state::{ + keeper_state::KeeperState, + update_state::{create_missing_accounts, post_create_update, pre_create_update}, + }, }; #[derive(Parser, Debug)] @@ -44,7 +39,7 @@ struct Args { gossip_entrypoint: Option, /// Path to keypair used to pay for account creation and execute transactions - #[arg(short, long, env, default_value = "~/.config/solana/id.json")] + #[arg(short, long, env, default_value = "./credentials/keypair.json")] keypair: PathBuf, /// Path to keypair used specifically for submitting permissioned transactions @@ -59,413 +54,207 @@ struct Args { #[arg(short, long, env)] tip_distribution_program_id: Pubkey, - // Loop interval time (default 300 sec) + // Interval to update Validator History Accounts (default 300 sec) #[arg(short, long, env, default_value = "300")] - interval: u64, + validator_history_interval: u64, + + // Interval to emit metrics (default 60 sec) + #[arg(short, long, env, default_value = "60")] + metrics_interval: u64, #[arg(short, long, env, default_value_t = Cluster::Mainnet)] cluster: Cluster, } -async fn monitoring_loop( - client: Arc, - program_id: Pubkey, - keeper_address: Pubkey, - interval: u64, -) { - loop { - match emit_validator_history_metrics(&client, program_id, keeper_address).await { - Ok(_) => {} - Err(e) => { - error!("Failed to emit validator history metrics: {}", e); - } - } - sleep(Duration::from_secs(interval)).await; - } +fn should_emit(tick: u64, intervals: &[u64]) -> bool { + intervals.iter().any(|interval| tick % (interval + 1) == 0) } -async fn mev_commission_loop( - client: Arc, - keypair: Arc, - commission_history_program_id: Pubkey, - tip_distribution_program_id: Pubkey, - interval: u64, -) { - loop { - // Continuously runs throughout an epoch, polling for new tip distribution accounts - // and submitting update txs when new accounts are detected - let stats = match update_mev_commission( - client.clone(), - keypair.clone(), - &commission_history_program_id, - &tip_distribution_program_id, - ) - .await - { - Ok(stats) => { - for message in stats - .creates - .results - .iter() - .chain(stats.updates.results.iter()) - { - if let Err(e) = message { - datapoint_error!("vote-account-error", ("error", e.to_string(), String),); - } - } - stats - } - Err(e) => { - let mut stats = CreateUpdateStats::default(); - if let KeeperError::TransactionExecutionError( - TransactionExecutionError::TransactionClientError(_, results), - ) = &e - { - stats.updates.successes = results.iter().filter(|r| r.is_ok()).count() as u64; - stats.updates.errors = results.iter().filter(|r| r.is_err()).count() as u64; - } - datapoint_error!("mev-commission-error", ("error", e.to_string(), String),); - stats - } - }; - emit_mev_commission_datapoint(stats); - sleep(Duration::from_secs(interval)).await; - } +fn should_update(tick: u64, intervals: &[u64]) -> bool { + intervals.iter().any(|interval| tick % interval == 0) } -async fn mev_earned_loop( - client: Arc, - keypair: Arc, - commission_history_program_id: Pubkey, - tip_distribution_program_id: Pubkey, - interval: u64, -) { - loop { - // Continuously runs throughout an epoch, polling for tip distribution accounts from the prev epoch with uploaded merkle roots - // and submitting update_mev_earned (technically update_mev_comission) txs when the uploaded merkle roots are detected - let stats = match update_mev_earned( - &client, - &keypair, - &commission_history_program_id, - &tip_distribution_program_id, - ) - .await - { - Ok(stats) => { - for message in stats - .creates - .results - .iter() - .chain(stats.updates.results.iter()) - { - if let Err(e) = message { - datapoint_error!("vote-account-error", ("error", e.to_string(), String),); - } - } - stats - } - Err(e) => { - let mut stats = CreateUpdateStats::default(); - if let KeeperError::TransactionExecutionError( - TransactionExecutionError::TransactionClientError(_, results), - ) = &e - { - stats.updates.successes = results.iter().filter(|r| r.is_ok()).count() as u64; - stats.updates.errors = results.iter().filter(|r| r.is_err()).count() as u64; - } - datapoint_error!("mev-earned-error", ("error", e.to_string(), String),); - stats - } - }; - emit_mev_earned_datapoint(stats); - sleep(Duration::from_secs(interval)).await; - } +fn should_fire(tick: u64, interval: u64) -> bool { + tick % interval == 0 } -async fn vote_account_loop( - rpc_client: Arc, - keypair: Arc, - program_id: Pubkey, - interval: u64, -) { - let mut runs_for_epoch = 0; - let mut current_epoch = 0; - let mut stats = CreateUpdateStats::default(); - loop { - let epoch_info = match rpc_client.get_epoch_info().await { - Ok(epoch_info) => epoch_info, - Err(e) => { - error!("Failed to get epoch info: {}", e); - sleep(Duration::from_secs(5)).await; - continue; - } - }; - if current_epoch != epoch_info.epoch { - runs_for_epoch = 0; - } - // Run at 10%, 50% and 90% completion of epoch - let should_run = (epoch_info.slot_index > epoch_info.slots_in_epoch / 1000 - && runs_for_epoch < 1) - || (epoch_info.slot_index > epoch_info.slots_in_epoch / 2 && runs_for_epoch < 2) - || (epoch_info.slot_index > epoch_info.slots_in_epoch * 9 / 10 && runs_for_epoch < 3); - - if should_run { - stats = match update_vote_accounts(rpc_client.clone(), keypair.clone(), program_id) - .await - { - Ok(stats) => { - for message in stats - .creates - .results - .iter() - .chain(stats.updates.results.iter()) - { - if let Err(e) = message { - datapoint_error!( - "vote-account-error", - ("error", e.to_string(), String), - ); - } - } - if stats.updates.errors == 0 && stats.creates.errors == 0 { - runs_for_epoch += 1; - } - sleep(Duration::from_secs(interval)).await; - stats - } - Err(e) => { - let mut stats = CreateUpdateStats::default(); - if let KeeperError::TransactionExecutionError( - TransactionExecutionError::TransactionClientError(_, results), - ) = &e - { - stats.updates.successes = - results.iter().filter(|r| r.is_ok()).count() as u64; - stats.updates.errors = results.iter().filter(|r| r.is_err()).count() as u64; - } - datapoint_error!("vote-account-error", ("error", e.to_string(), String),); - stats - } - }; - } - current_epoch = epoch_info.epoch; - emit_validator_commission_datapoint(stats.clone(), runs_for_epoch); - sleep(Duration::from_secs(interval)).await; - } +fn advance_tick(tick: &mut u64) { + *tick += 1; } -async fn stake_upload_loop( +async fn sleep_and_tick(tick: &mut u64) { + sleep(Duration::from_secs(1)).await; + advance_tick(tick); +} + +struct RunLoopConfig { client: Arc, keypair: Arc, program_id: Pubkey, - interval: u64, -) { - let mut runs_for_epoch = 0; - let mut current_epoch = 0; + tip_distribution_program_id: Pubkey, + oracle_authority_keypair: Option>, + gossip_entrypoint: Option, + validator_history_interval: u64, + metrics_interval: u64, +} + +async fn run_loop(config: RunLoopConfig) { + let RunLoopConfig { + client, + keypair, + program_id, + tip_distribution_program_id, + oracle_authority_keypair, + gossip_entrypoint, + validator_history_interval, + metrics_interval, + } = config; + let intervals = vec![validator_history_interval, metrics_interval]; + + // Stateful data + let mut keeper_state = KeeperState::new(); + let mut tick: u64 = 0; // 1 second ticks loop { - let epoch_info = match client.get_epoch_info().await { - Ok(epoch_info) => epoch_info, - Err(e) => { - error!("Failed to get epoch info: {}", e); - sleep(Duration::from_secs(5)).await; - continue; + // ---------------------- FETCH ----------------------------------- + // The fetch ( update ) functions fetch everything we need for the operations from the blockchain + // Additionally, this function will update the keeper state. If update fails - it will skip the fire functions. + if should_update(tick, &intervals) { + info!("Pre-fetching data for update..."); + match pre_create_update(&client, &keypair, &program_id, &mut keeper_state).await { + Ok(_) => { + keeper_state.increment_update_run_for_epoch(KeeperOperations::PreCreateUpdate); + } + Err(e) => { + error!("Failed to pre create update: {:?}", e); + advance_tick(&mut tick); + keeper_state + .increment_update_error_for_epoch(KeeperOperations::PreCreateUpdate); + continue; + } } - }; - let epoch = epoch_info.epoch; - let mut stats = CreateUpdateStats::default(); - if current_epoch != epoch { - runs_for_epoch = 0; - } - // Run at 0.1%, 50% and 90% completion of epoch - let should_run = (epoch_info.slot_index > epoch_info.slots_in_epoch / 1000 - && runs_for_epoch < 1) - || (epoch_info.slot_index > epoch_info.slots_in_epoch / 2 && runs_for_epoch < 2) - || (epoch_info.slot_index > epoch_info.slots_in_epoch * 9 / 10 && runs_for_epoch < 3); - if should_run { - stats = match update_stake_history(client.clone(), keypair.clone(), &program_id).await { - Ok(run_stats) => { - for message in stats - .creates - .results - .iter() - .chain(stats.updates.results.iter()) - { - if let Err(e) = message { - datapoint_error!( - "stake-history-error", - ("error", e.to_string(), String), - ); - } - } - - if stats.creates.errors == 0 && stats.updates.errors == 0 { - runs_for_epoch += 1; - } - run_stats + info!("Creating missing accounts..."); + match create_missing_accounts(&client, &keypair, &program_id, &keeper_state).await { + Ok(_) => { + keeper_state + .increment_update_run_for_epoch(KeeperOperations::CreateMissingAccounts); } Err(e) => { - let mut stats = CreateUpdateStats::default(); - if let KeeperError::TransactionExecutionError( - TransactionExecutionError::TransactionClientError(_, results), - ) = &e - { - stats.updates.successes = - results.iter().filter(|r| r.is_ok()).count() as u64; - stats.updates.errors = results.iter().filter(|r| r.is_err()).count() as u64; - } - datapoint_error!("stake-history-error", ("error", e.to_string(), String),); - stats + error!("Failed to create missing accounts: {:?}", e); + advance_tick(&mut tick); + keeper_state + .increment_update_error_for_epoch(KeeperOperations::CreateMissingAccounts); + continue; } - }; - } - - current_epoch = epoch; - emit_stake_history_datapoint(stats, runs_for_epoch); - sleep(Duration::from_secs(interval)).await; - } -} - -async fn gossip_upload_loop( - client: Arc, - keypair: Arc, - program_id: Pubkey, - entrypoint: SocketAddr, - interval: u64, -) { - let mut runs_for_epoch = 0; - let mut current_epoch = 0; - loop { - let epoch_info = match client.get_epoch_info().await { - Ok(epoch_info) => epoch_info, - Err(e) => { - error!("Failed to get epoch info: {}", e); - sleep(Duration::from_secs(5)).await; - continue; } - }; - let epoch = epoch_info.epoch; - if current_epoch != epoch { - runs_for_epoch = 0; - } - // Run at 0%, 50% and 90% completion of epoch - let should_run = runs_for_epoch < 1 - || (epoch_info.slot_index > epoch_info.slots_in_epoch / 2 && runs_for_epoch < 2) - || (epoch_info.slot_index > epoch_info.slots_in_epoch * 9 / 10 && runs_for_epoch < 3); - - let mut stats = CreateUpdateStats::default(); - if should_run { - stats = match upload_gossip_values( - client.clone(), - keypair.clone(), - entrypoint, + + info!("Post-fetching data for update..."); + match post_create_update( + &client, &program_id, + &tip_distribution_program_id, + &mut keeper_state, ) .await { - Ok(stats) => { - for message in stats - .creates - .results - .iter() - .chain(stats.updates.results.iter()) - { - if let Err(e) = message { - datapoint_error!( - "gossip-upload-error", - ("error", e.to_string(), String), - ); - } - } - if stats.creates.errors == 0 && stats.updates.errors == 0 { - runs_for_epoch += 1; - } - stats + Ok(_) => { + keeper_state.increment_update_run_for_epoch(KeeperOperations::PostCreateUpdate); } Err(e) => { - let mut stats = CreateUpdateStats::default(); - if let Some(TransactionExecutionError::TransactionClientError(_, results)) = - e.downcast_ref::() - { - stats.updates.successes = - results.iter().filter(|r| r.is_ok()).count() as u64; - stats.updates.errors = results.iter().filter(|r| r.is_err()).count() as u64; - } - - datapoint_error!("gossip-upload-error", ("error", e.to_string(), String),); - stats + error!("Failed to post create update: {:?}", e); + advance_tick(&mut tick); + keeper_state + .increment_update_error_for_epoch(KeeperOperations::PostCreateUpdate); + continue; } - }; + } } - current_epoch = epoch; - emit_gossip_datapoint(stats, runs_for_epoch); - sleep(Duration::from_secs(interval)).await; - } -} -async fn cluster_history_loop( - client: Arc, - keypair: Arc, - program_id: Pubkey, - interval: u64, -) { - let mut runs_for_epoch = 0; - let mut current_epoch = 0; + // ---------------------- FIRE ----------------------------------- + if should_fire(tick, validator_history_interval) { + info!("Firing operations..."); + + info!("Updating cluster history..."); + keeper_state.set_runs_and_errors_for_epoch( + operations::cluster_history::fire(&client, &keypair, &program_id, &keeper_state) + .await, + ); + + info!("Updating copy vote accounts..."); + keeper_state.set_runs_and_errors_for_epoch( + operations::vote_account::fire(&client, &keypair, &program_id, &keeper_state).await, + ); + + info!("Updating mev commission..."); + keeper_state.set_runs_and_errors_for_epoch( + operations::mev_commission::fire( + &client, + &keypair, + &program_id, + &tip_distribution_program_id, + &keeper_state, + ) + .await, + ); + + info!("Updating mev earned..."); + keeper_state.set_runs_and_errors_for_epoch( + operations::mev_earned::fire( + &client, + &keypair, + &program_id, + &tip_distribution_program_id, + &keeper_state, + ) + .await, + ); + + if let Some(oracle_authority_keypair) = &oracle_authority_keypair { + info!("Updating stake accounts..."); + keeper_state.set_runs_and_errors_for_epoch( + operations::stake_upload::fire( + &client, + oracle_authority_keypair, + &program_id, + &keeper_state, + ) + .await, + ); + } - loop { - let epoch_info = match client.get_epoch_info().await { - Ok(epoch_info) => epoch_info, - Err(e) => { - error!("Failed to get epoch info: {}", e); - sleep(Duration::from_secs(5)).await; - continue; + if let (Some(gossip_entrypoint), Some(oracle_authority_keypair)) = + (gossip_entrypoint, &oracle_authority_keypair) + { + info!("Updating gossip accounts..."); + keeper_state.set_runs_and_errors_for_epoch( + operations::gossip_upload::fire( + &client, + oracle_authority_keypair, + &program_id, + &gossip_entrypoint, + &keeper_state, + ) + .await, + ); } - }; - let epoch = epoch_info.epoch; + } - let mut stats = SubmitStats::default(); + // ---------------------- EMIT METRICS ----------------------------------- - if current_epoch != epoch { - runs_for_epoch = 0; + if should_fire(tick, metrics_interval) { + info!("Emitting metrics..."); + keeper_state + .set_runs_and_errors_for_epoch(operations::metrics_emit::fire(&keeper_state).await); } - // Run at 0.1%, 50% and 90% completion of epoch - let should_run = (epoch_info.slot_index > epoch_info.slots_in_epoch / 1000 - && runs_for_epoch < 1) - || (epoch_info.slot_index > epoch_info.slots_in_epoch / 2 && runs_for_epoch < 2) - || (epoch_info.slot_index > epoch_info.slots_in_epoch * 9 / 10 && runs_for_epoch < 3); - if should_run { - stats = match update_cluster_info(client.clone(), keypair.clone(), &program_id).await { - Ok(run_stats) => { - for message in run_stats.results.iter() { - if let Err(e) = message { - datapoint_error!( - "cluster-history-error", - ("error", e.to_string(), String), - ); - } - } - if run_stats.errors == 0 { - runs_for_epoch += 1; - } - run_stats - } - Err(e) => { - let mut stats = SubmitStats::default(); - if let TransactionExecutionError::TransactionClientError(_, results) = &e { - stats.successes = results.iter().filter(|r| r.is_ok()).count() as u64; - stats.errors = results.iter().filter(|r| r.is_err()).count() as u64; - } - datapoint_error!("cluster-history-error", ("error", e.to_string(), String),); - stats - } - }; + // ---------------------- EMIT --------------------------------- + if should_emit(tick, &intervals) { + KeeperOperations::emit(&keeper_state.runs_for_epoch, &keeper_state.errors_for_epoch) } - current_epoch = epoch; - emit_cluster_history_datapoint(stats, runs_for_epoch); - sleep(Duration::from_secs(interval)).await; + // ---------- SLEEP ---------- + sleep_and_tick(&mut tick).await; } } @@ -479,74 +268,35 @@ async fn main() { args.json_rpc_url.clone(), Duration::from_secs(60), )); - let keypair = Arc::new(read_keypair_file(args.keypair).expect("Failed reading keypair file")); - info!("Starting validator history keeper..."); - - tokio::spawn(monitoring_loop( - Arc::clone(&client), - args.program_id, - keypair.pubkey(), - args.interval, - )); - - tokio::spawn(cluster_history_loop( - Arc::clone(&client), - Arc::clone(&keypair), - args.program_id, - args.interval, - )); + let keypair = Arc::new(read_keypair_file(args.keypair).expect("Failed reading keypair file")); - tokio::spawn(vote_account_loop( - Arc::clone(&client), - Arc::clone(&keypair), - args.program_id, - args.interval, - )); + let oracle_authority_keypair = args + .oracle_authority_keypair + .map(|oracle_authority_keypair| { + Arc::new( + read_keypair_file(oracle_authority_keypair) + .expect("Failed reading stake keypair file"), + ) + }); - tokio::spawn(mev_commission_loop( - client.clone(), - keypair.clone(), - args.program_id, - args.tip_distribution_program_id, - args.interval, - )); + let gossip_entrypoint = args.gossip_entrypoint.map(|gossip_entrypoint| { + solana_net_utils::parse_host_port(&gossip_entrypoint) + .expect("Failed to parse host and port from gossip entrypoint") + }); - tokio::spawn(mev_earned_loop( - client.clone(), - keypair.clone(), - args.program_id, - args.tip_distribution_program_id, - args.interval, - )); + info!("Starting validator history keeper..."); - if let Some(oracle_authority_keypair) = args.oracle_authority_keypair { - let oracle_authority_keypair = Arc::new( - read_keypair_file(oracle_authority_keypair).expect("Failed reading stake keypair file"), - ); - tokio::spawn(stake_upload_loop( - Arc::clone(&client), - Arc::clone(&oracle_authority_keypair), - args.program_id, - args.interval, - )); - - if let Some(gossip_entrypoint) = args.gossip_entrypoint { - let entrypoint = solana_net_utils::parse_host_port(&gossip_entrypoint) - .expect("Failed to parse host and port from gossip entrypoint"); - // Cannot be sent to thread because there's a Box inside - gossip_upload_loop( - client.clone(), - oracle_authority_keypair, - args.program_id, - entrypoint, - args.interval, - ) - .await; - } - } - // Need final infinite loop to keep all threads alive - loop { - sleep(Duration::from_secs(60)).await; - } + let config = RunLoopConfig { + client, + keypair, + program_id: args.program_id, + tip_distribution_program_id: args.tip_distribution_program_id, + oracle_authority_keypair, + gossip_entrypoint, + validator_history_interval: args.validator_history_interval, + metrics_interval: args.metrics_interval, + }; + + run_loop(config).await; } diff --git a/keepers/validator-keeper/src/mev_commission.rs b/keepers/validator-keeper/src/mev_commission.rs deleted file mode 100644 index 94aa8dd6..00000000 --- a/keepers/validator-keeper/src/mev_commission.rs +++ /dev/null @@ -1,323 +0,0 @@ -use std::{collections::HashMap, str::FromStr, sync::Arc}; - -use anchor_lang::{AccountDeserialize, InstructionData, ToAccountMetas}; -use jito_tip_distribution::sdk::derive_tip_distribution_account_address; -use jito_tip_distribution::state::TipDistributionAccount; -use keeper_core::{ - build_create_and_update_instructions, get_multiple_accounts_batched, - get_vote_accounts_with_retry, submit_create_and_update, Address, CreateTransaction, - CreateUpdateStats, MultipleAccountsError, UpdateInstruction, -}; -use log::error; -use solana_client::nonblocking::rpc_client::RpcClient; -use solana_client::rpc_response::RpcVoteAccountInfo; -use solana_program::{instruction::Instruction, pubkey::Pubkey}; -use solana_sdk::{signature::Keypair, signer::Signer}; -use validator_history::constants::MIN_VOTE_EPOCHS; -use validator_history::ValidatorHistoryEntry; -use validator_history::{constants::MAX_ALLOC_BYTES, Config, ValidatorHistory}; - -use crate::{get_validator_history_accounts_with_retry, KeeperError, PRIORITY_FEE}; - -#[derive(Clone)] -pub struct ValidatorMevCommissionEntry { - pub vote_account: Pubkey, - pub tip_distribution_account: Pubkey, - pub validator_history_account: Pubkey, - pub config: Pubkey, - pub program_id: Pubkey, - pub signer: Pubkey, - pub epoch: u64, -} - -impl ValidatorMevCommissionEntry { - pub fn new( - vote_account: &RpcVoteAccountInfo, - epoch: u64, - program_id: &Pubkey, - tip_distribution_program_id: &Pubkey, - signer: &Pubkey, - ) -> Self { - let vote_account = Pubkey::from_str(&vote_account.vote_pubkey) - .map_err(|e| { - error!("Invalid vote account pubkey"); - e - }) - .expect("Invalid vote account pubkey"); - let (validator_history_account, _) = Pubkey::find_program_address( - &[ValidatorHistory::SEED, &vote_account.to_bytes()], - program_id, - ); - let (tip_distribution_account, _) = derive_tip_distribution_account_address( - tip_distribution_program_id, - &vote_account, - epoch, - ); - let (config, _) = Pubkey::find_program_address(&[Config::SEED], program_id); - Self { - vote_account, - tip_distribution_account, - validator_history_account, - config, - program_id: *program_id, - signer: *signer, - epoch, - } - } -} - -impl Address for ValidatorMevCommissionEntry { - fn address(&self) -> Pubkey { - self.validator_history_account - } -} - -impl CreateTransaction for ValidatorMevCommissionEntry { - fn create_transaction(&self) -> Vec { - let mut ixs = vec![Instruction { - program_id: self.program_id, - accounts: validator_history::accounts::InitializeValidatorHistoryAccount { - validator_history_account: self.validator_history_account, - vote_account: self.vote_account, - system_program: solana_program::system_program::id(), - signer: self.signer, - } - .to_account_metas(None), - data: validator_history::instruction::InitializeValidatorHistoryAccount {}.data(), - }]; - let num_reallocs = (ValidatorHistory::SIZE - MAX_ALLOC_BYTES) / MAX_ALLOC_BYTES + 1; - ixs.extend(vec![ - Instruction { - program_id: self.program_id, - accounts: validator_history::accounts::ReallocValidatorHistoryAccount { - validator_history_account: self.validator_history_account, - vote_account: self.vote_account, - config: self.config, - system_program: solana_program::system_program::id(), - signer: self.signer, - } - .to_account_metas(None), - data: validator_history::instruction::ReallocValidatorHistoryAccount {}.data(), - }; - num_reallocs - ]); - ixs - } -} - -impl UpdateInstruction for ValidatorMevCommissionEntry { - fn update_instruction(&self) -> Instruction { - Instruction { - program_id: self.program_id, - accounts: validator_history::accounts::CopyTipDistributionAccount { - validator_history_account: self.validator_history_account, - vote_account: self.vote_account, - tip_distribution_account: self.tip_distribution_account, - config: self.config, - signer: self.signer, - } - .to_account_metas(None), - data: validator_history::instruction::CopyTipDistributionAccount { epoch: self.epoch } - .data(), - } - } -} - -pub async fn update_mev_commission( - client: Arc, - keypair: Arc, - validator_history_program_id: &Pubkey, - tip_distribution_program_id: &Pubkey, -) -> Result { - let epoch = client.get_epoch_info().await?.epoch; - - let vote_accounts = get_vote_accounts_with_retry(&client, MIN_VOTE_EPOCHS, None).await?; - let validator_histories = - get_validator_history_accounts_with_retry(&client, *validator_history_program_id).await?; - - let validator_history_map = HashMap::from_iter(validator_histories.iter().map(|vh| { - ( - Pubkey::find_program_address( - &[ValidatorHistory::SEED, &vh.vote_account.to_bytes()], - validator_history_program_id, - ) - .0, - vh, - ) - })); - let entries = vote_accounts - .iter() - .map(|vote_account| { - ValidatorMevCommissionEntry::new( - vote_account, - epoch, - validator_history_program_id, - tip_distribution_program_id, - &keypair.pubkey(), - ) - }) - .collect::>(); - - let existing_entries = get_existing_entries(client.clone(), &entries).await?; - - let entries_to_update = existing_entries - .into_iter() - .filter(|entry| !mev_commission_uploaded(&validator_history_map, entry.address(), epoch)) - .collect::>(); - let (create_transactions, update_instructions) = - build_create_and_update_instructions(&client, &entries_to_update).await?; - - submit_create_and_update( - &client, - create_transactions, - update_instructions, - &keypair, - PRIORITY_FEE, - ) - .await - .map_err(|e| e.into()) -} - -pub async fn update_mev_earned( - client: &Arc, - keypair: &Arc, - validator_history_program_id: &Pubkey, - tip_distribution_program_id: &Pubkey, -) -> Result { - let epoch = client.get_epoch_info().await?.epoch; - - let vote_accounts = get_vote_accounts_with_retry(client, MIN_VOTE_EPOCHS, None).await?; - let validator_histories = - get_validator_history_accounts_with_retry(client, *validator_history_program_id).await?; - - let validator_history_map = HashMap::from_iter(validator_histories.iter().map(|vh| { - ( - Pubkey::find_program_address( - &[ValidatorHistory::SEED, &vh.vote_account.to_bytes()], - validator_history_program_id, - ) - .0, - vh, - ) - })); - let entries = vote_accounts - .iter() - .map(|vote_account| { - ValidatorMevCommissionEntry::new( - vote_account, - epoch.saturating_sub(1), // TDA derived from the prev epoch since the merkle roots are uploaded shortly after rollover - validator_history_program_id, - tip_distribution_program_id, - &keypair.pubkey(), - ) - }) - .collect::>(); - - let uploaded_merkleroot_entries = - get_entries_with_uploaded_merkleroot(client, &entries).await?; - - let entries_to_update = uploaded_merkleroot_entries - .into_iter() - .filter(|entry| !mev_earned_uploaded(&validator_history_map, entry.address(), epoch - 1)) - .collect::>(); - - let (create_transactions, update_instructions) = - build_create_and_update_instructions(client, &entries_to_update).await?; - - submit_create_and_update( - client, - create_transactions, - update_instructions, - keypair, - PRIORITY_FEE, - ) - .await - .map_err(|e| e.into()) -} - -async fn get_existing_entries( - client: Arc, - entries: &[ValidatorMevCommissionEntry], -) -> Result, MultipleAccountsError> { - /* Filters tip distribution tuples to the addresses, then fetches accounts to see which ones exist */ - let tip_distribution_addresses = entries - .iter() - .map(|entry| entry.tip_distribution_account) - .collect::>(); - - let accounts = get_multiple_accounts_batched(&tip_distribution_addresses, &client).await?; - let result = accounts - .iter() - .enumerate() - .filter_map(|(i, account_data)| { - if account_data.is_some() { - Some(entries[i].clone()) - } else { - None - } - }) - .collect::>(); - // Fetch existing tip distribution accounts for this epoch - Ok(result) -} - -async fn get_entries_with_uploaded_merkleroot( - client: &Arc, - entries: &[ValidatorMevCommissionEntry], -) -> Result, MultipleAccountsError> { - /* Filters tip distribution tuples to the addresses, then fetches accounts to see which ones have an uploaded merkle root */ - let tip_distribution_addresses = entries - .iter() - .map(|entry| entry.tip_distribution_account) - .collect::>(); - - let accounts = get_multiple_accounts_batched(&tip_distribution_addresses, client).await?; - let result = accounts - .iter() - .enumerate() - .filter_map(|(i, account_data)| { - if let Some(account_data) = account_data { - let mut data: &[u8] = &account_data.data; - if let Ok(tda) = TipDistributionAccount::try_deserialize(&mut data) { - if tda.merkle_root.is_some() { - return Some(entries[i].clone()); - } - } - } - None - }) - .collect::>(); - // Fetch tip distribution accounts with uploaded merkle roots for this epoch - Ok(result) -} - -fn mev_commission_uploaded( - validator_history_map: &HashMap, - vote_account: Pubkey, - epoch: u64, -) -> bool { - if let Some(validator_history) = validator_history_map.get(&vote_account) { - if let Some(latest_entry) = validator_history.history.last() { - return latest_entry.epoch == epoch as u16 - && latest_entry.mev_commission != ValidatorHistoryEntry::default().mev_commission; - } - } - false -} - -fn mev_earned_uploaded( - validator_history_map: &HashMap, - vote_account: Pubkey, - epoch: u64, -) -> bool { - if let Some(validator_history) = validator_history_map.get(&vote_account) { - if let Some(latest_entry) = validator_history - .history - .epoch_range(epoch as u16, epoch as u16)[0] - { - return latest_entry.epoch == epoch as u16 - && latest_entry.mev_earned != ValidatorHistoryEntry::default().mev_earned; - } - }; - false -} diff --git a/keepers/validator-keeper/src/operations/cluster_history.rs b/keepers/validator-keeper/src/operations/cluster_history.rs new file mode 100644 index 00000000..74d4f9bd --- /dev/null +++ b/keepers/validator-keeper/src/operations/cluster_history.rs @@ -0,0 +1,119 @@ +/* +This program starts several threads to manage the creation of validator history accounts, +and the updating of the various data feeds within the accounts. +It will emits metrics for each data feed, if env var SOLANA_METRICS_CONFIG is set to a valid influx server. +*/ + +use crate::state::keeper_state::KeeperState; +use crate::{derive_cluster_history_address, PRIORITY_FEE}; +use anchor_lang::{InstructionData, ToAccountMetas}; +use keeper_core::{submit_transactions, SubmitStats, TransactionExecutionError}; +use solana_client::nonblocking::rpc_client::RpcClient; +use solana_metrics::datapoint_error; +use solana_sdk::{ + compute_budget, + epoch_info::EpochInfo, + instruction::Instruction, + pubkey::Pubkey, + signature::{Keypair, Signer}, +}; +use std::sync::Arc; + +use super::keeper_operations::KeeperOperations; + +fn _get_operation() -> KeeperOperations { + KeeperOperations::ClusterHistory +} + +fn _should_run(epoch_info: &EpochInfo, runs_for_epoch: u64) -> bool { + // Run at 0.1%, 50% and 90% completion of epoch + (epoch_info.slot_index > epoch_info.slots_in_epoch / 1000 && runs_for_epoch < 1) + || (epoch_info.slot_index > epoch_info.slots_in_epoch / 2 && runs_for_epoch < 2) + || (epoch_info.slot_index > epoch_info.slots_in_epoch * 9 / 10 && runs_for_epoch < 3) +} + +async fn _process( + client: &Arc, + keypair: &Arc, + program_id: &Pubkey, +) -> Result { + update_cluster_info(client, keypair, program_id).await +} + +pub async fn fire( + client: &Arc, + keypair: &Arc, + program_id: &Pubkey, + keeper_state: &KeeperState, +) -> (KeeperOperations, u64, u64) { + let operation = _get_operation(); + let epoch_info = &keeper_state.epoch_info; + + let (mut runs_for_epoch, mut errors_for_epoch) = + keeper_state.copy_runs_and_errors_for_epoch(operation.clone()); + + let should_run = _should_run(epoch_info, runs_for_epoch); + + if should_run { + match _process(client, keypair, program_id).await { + Ok(stats) => { + for message in stats.results.iter() { + if let Err(e) = message { + datapoint_error!("cluster-history-error", ("error", e.to_string(), String),); + } + } + if stats.errors == 0 { + runs_for_epoch += 1; + } + } + Err(e) => { + datapoint_error!("cluster-history-error", ("error", e.to_string(), String),); + errors_for_epoch += 1; + } + }; + } + + (operation, runs_for_epoch, errors_for_epoch) +} + +// ----------------- OPERATION SPECIFIC FUNCTIONS ----------------- + +pub fn get_update_cluster_info_instructions( + program_id: &Pubkey, + keypair: &Pubkey, +) -> Vec { + let cluster_history_account = derive_cluster_history_address(program_id); + + let priority_fee_ix = + compute_budget::ComputeBudgetInstruction::set_compute_unit_price(PRIORITY_FEE); + let heap_request_ix = compute_budget::ComputeBudgetInstruction::request_heap_frame(256 * 1024); + let compute_budget_ix = + compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(1_400_000); + let update_instruction = Instruction { + program_id: *program_id, + accounts: validator_history::accounts::CopyClusterInfo { + cluster_history_account, + slot_history: solana_program::sysvar::slot_history::id(), + signer: *keypair, + } + .to_account_metas(None), + data: validator_history::instruction::CopyClusterInfo {}.data(), + }; + + vec![ + priority_fee_ix, + heap_request_ix, + compute_budget_ix, + update_instruction, + ] +} + +pub async fn update_cluster_info( + client: &Arc, + keypair: &Arc, + program_id: &Pubkey, +) -> Result { + let ixs = get_update_cluster_info_instructions(program_id, &keypair.pubkey()); + + submit_transactions(client, vec![ixs], keypair).await +} diff --git a/keepers/validator-keeper/src/gossip.rs b/keepers/validator-keeper/src/operations/gossip_upload.rs similarity index 57% rename from keepers/validator-keeper/src/gossip.rs rename to keepers/validator-keeper/src/operations/gossip_upload.rs index ed48f67f..d976a598 100644 --- a/keepers/validator-keeper/src/gossip.rs +++ b/keepers/validator-keeper/src/operations/gossip_upload.rs @@ -1,160 +1,94 @@ -use std::{ - collections::HashMap, - net::{IpAddr, SocketAddr}, - str::FromStr, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, RwLockReadGuard, - }, - time::Duration, -}; - -use anchor_lang::{InstructionData, ToAccountMetas}; +use crate::entries::gossip_entry::GossipEntry; +/* +This program starts several threads to manage the creation of validator history accounts, +and the updating of the various data feeds within the accounts. +It will emits metrics for each data feed, if env var SOLANA_METRICS_CONFIG is set to a valid influx server. +*/ +use crate::state::keeper_state::KeeperState; +use crate::{start_spy_server, PRIORITY_FEE}; use bytemuck::{bytes_of, Pod, Zeroable}; -use keeper_core::{ - get_multiple_accounts_batched, get_vote_accounts_with_retry, submit_transactions, Address, - CreateTransaction, CreateUpdateStats, -}; -use log::error; -use solana_client::{nonblocking::rpc_client::RpcClient, rpc_response::RpcVoteAccountInfo}; -use solana_gossip::{ - crds::Crds, - crds_value::{CrdsData, CrdsValue, CrdsValueLabel}, -}; -use solana_metrics::datapoint_info; +use keeper_core::{submit_transactions, SubmitStats}; +use log::*; +use solana_client::nonblocking::rpc_client::RpcClient; +use solana_client::rpc_response::RpcVoteAccountInfo; +use solana_gossip::crds::Crds; +use solana_gossip::crds_value::{CrdsData, CrdsValue, CrdsValueLabel}; +use solana_metrics::datapoint_error; +use solana_sdk::signature::Signable; use solana_sdk::{ - compute_budget::ComputeBudgetInstruction, + epoch_info::EpochInfo, instruction::Instruction, pubkey::Pubkey, - signature::{Keypair, Signable, Signature}, - signer::Signer, + signature::{Keypair, Signer}, }; +use std::net::IpAddr; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::RwLockReadGuard; +use std::{collections::HashMap, net::SocketAddr, str::FromStr, sync::Arc, time::Duration}; use tokio::time::sleep; -use validator_history::{ - self, - constants::{MAX_ALLOC_BYTES, MIN_VOTE_EPOCHS}, - Config, ValidatorHistory, ValidatorHistoryEntry, -}; +use validator_history::ValidatorHistory; +use validator_history::ValidatorHistoryEntry; -use crate::{get_validator_history_accounts_with_retry, start_spy_server, PRIORITY_FEE}; - -#[derive(Clone, Debug)] -pub struct GossipEntry { - pub vote_account: Pubkey, - pub validator_history_account: Pubkey, - pub config: Pubkey, - pub signature: Signature, - pub message: Vec, - pub program_id: Pubkey, - pub identity: Pubkey, - pub signer: Pubkey, -} +use super::keeper_operations::KeeperOperations; -impl GossipEntry { - pub fn new( - vote_account: &Pubkey, - signature: &Signature, - message: &[u8], - program_id: &Pubkey, - identity: &Pubkey, - signer: &Pubkey, - ) -> Self { - let (validator_history_account, _) = Pubkey::find_program_address( - &[ValidatorHistory::SEED, &vote_account.to_bytes()], - program_id, - ); - let (config, _) = Pubkey::find_program_address(&[Config::SEED], program_id); - Self { - vote_account: *vote_account, - validator_history_account, - config, - signature: *signature, - message: message.to_vec(), - program_id: *program_id, - identity: *identity, - signer: *signer, - } - } +fn _get_operation() -> KeeperOperations { + KeeperOperations::GossipUpload } -impl Address for GossipEntry { - fn address(&self) -> Pubkey { - self.validator_history_account - } +fn _should_run(epoch_info: &EpochInfo, runs_for_epoch: u64) -> bool { + // Run at 0%, 50% and 90% completion of epoch + runs_for_epoch < 1 + || (epoch_info.slot_index > epoch_info.slots_in_epoch / 2 && runs_for_epoch < 2) + || (epoch_info.slot_index > epoch_info.slots_in_epoch * 9 / 10 && runs_for_epoch < 3) } -impl CreateTransaction for GossipEntry { - fn create_transaction(&self) -> Vec { - let mut ixs = vec![Instruction { - program_id: self.program_id, - accounts: validator_history::accounts::InitializeValidatorHistoryAccount { - validator_history_account: self.validator_history_account, - vote_account: self.vote_account, - system_program: solana_program::system_program::id(), - signer: self.signer, - } - .to_account_metas(None), - data: validator_history::instruction::InitializeValidatorHistoryAccount {}.data(), - }]; - let num_reallocs = (ValidatorHistory::SIZE - MAX_ALLOC_BYTES) / MAX_ALLOC_BYTES + 1; - ixs.extend(vec![ - Instruction { - program_id: self.program_id, - accounts: validator_history::accounts::ReallocValidatorHistoryAccount { - validator_history_account: self.validator_history_account, - vote_account: self.vote_account, - config: self.config, - system_program: solana_program::system_program::id(), - signer: self.signer, - } - .to_account_metas(None), - data: validator_history::instruction::ReallocValidatorHistoryAccount {}.data(), - }; - num_reallocs - ]); - ixs - } +async fn _process( + client: &Arc, + keypair: &Arc, + program_id: &Pubkey, + entrypoint: &SocketAddr, + keeper_state: &KeeperState, +) -> Result> { + upload_gossip_values(client, keypair, program_id, entrypoint, keeper_state).await } -impl GossipEntry { - pub fn build_update_tx(&self, priority_fee: u64) -> Vec { - let mut ixs = vec![ - ComputeBudgetInstruction::set_compute_unit_limit(100_000), - ComputeBudgetInstruction::set_compute_unit_price(priority_fee), - build_verify_signature_ix( - self.signature.as_ref(), - self.identity.to_bytes(), - &self.message, - ), - ]; - - ixs.push(Instruction { - program_id: self.program_id, - accounts: validator_history::accounts::CopyGossipContactInfo { - validator_history_account: self.validator_history_account, - vote_account: self.vote_account, - instructions: solana_program::sysvar::instructions::id(), - config: self.config, - oracle_authority: self.signer, +pub async fn fire( + client: &Arc, + keypair: &Arc, + program_id: &Pubkey, + entrypoint: &SocketAddr, + keeper_state: &KeeperState, +) -> (KeeperOperations, u64, u64) { + let operation = _get_operation(); + let (mut runs_for_epoch, mut errors_for_epoch) = + keeper_state.copy_runs_and_errors_for_epoch(operation.clone()); + + let should_run = _should_run(&keeper_state.epoch_info, runs_for_epoch); + + if should_run { + match _process(client, keypair, program_id, entrypoint, keeper_state).await { + Ok(stats) => { + for message in stats.results.iter().chain(stats.results.iter()) { + if let Err(e) = message { + datapoint_error!("gossip-upload-error", ("error", e.to_string(), String),); + } + } + if stats.errors == 0 { + runs_for_epoch += 1; + } + } + Err(e) => { + datapoint_error!("gossip-upload-error", ("error", e.to_string(), String),); + errors_for_epoch += 1; } - .to_account_metas(None), - data: validator_history::instruction::CopyGossipContactInfo {}.data(), - }); - ixs + } } -} -pub fn emit_gossip_datapoint(stats: CreateUpdateStats, runs_for_epoch: i64) { - datapoint_info!( - "gossip-upload-stats", - ("num_creates_success", stats.creates.successes, i64), - ("num_creates_error", stats.creates.errors, i64), - ("num_updates_success", stats.updates.successes, i64), - ("num_updates_error", stats.updates.errors, i64), - ("runs_for_epoch", runs_for_epoch, i64), - ); + (operation, runs_for_epoch, errors_for_epoch) } +// ----------------- OPERATION SPECIFIC FUNCTIONS ----------------- + fn check_entry_valid( entry: &CrdsValue, validator_history: &ValidatorHistory, @@ -280,11 +214,15 @@ fn build_gossip_entry( } pub async fn upload_gossip_values( - client: Arc, - keypair: Arc, - entrypoint: SocketAddr, + client: &Arc, + keypair: &Arc, program_id: &Pubkey, -) -> Result> { + entrypoint: &SocketAddr, + keeper_state: &KeeperState, +) -> Result> { + let vote_accounts = keeper_state.vote_account_map.values().collect::>(); + let validator_history_map = &keeper_state.validator_history_map; + let gossip_port = 0; let spy_socket_addr = SocketAddr::new( @@ -293,43 +231,30 @@ pub async fn upload_gossip_values( ); let exit: Arc = Arc::new(AtomicBool::new(false)); let (_gossip_service, cluster_info) = - start_spy_server(entrypoint, gossip_port, spy_socket_addr, &keypair, &exit); - - let vote_accounts = get_vote_accounts_with_retry(&client, MIN_VOTE_EPOCHS, None).await?; - let validator_history_accounts = - get_validator_history_accounts_with_retry(&client, *program_id).await?; - - let validator_history_map = HashMap::from_iter(validator_history_accounts.iter().map(|vh| { - ( - Pubkey::find_program_address( - &[ValidatorHistory::SEED, &vh.vote_account.to_bytes()], - program_id, - ) - .0, - vh, - ) - })); + start_spy_server(*entrypoint, gossip_port, spy_socket_addr, keypair, &exit); // Wait for all active validators to be received sleep(Duration::from_secs(150)).await; let gossip_entries = { - let crds = cluster_info.gossip.crds.read().map_err(|e| e.to_string())?; + let crds = cluster_info + .gossip + .crds + .read() + .map_err(|e: std::sync::PoisonError>| e.to_string())?; vote_accounts .iter() .filter_map(|vote_account| { let vote_account_pubkey = Pubkey::from_str(&vote_account.vote_pubkey).ok()?; - let validator_history_account = validator_history_accounts - .iter() - .find(|account| account.vote_account == vote_account_pubkey)?; + let validator_history_account = validator_history_map.get(&vote_account_pubkey)?; build_gossip_entry( vote_account, validator_history_account, &crds, *program_id, - &keypair, + keypair, ) }) .flatten() @@ -338,46 +263,18 @@ pub async fn upload_gossip_values( exit.store(true, Ordering::Relaxed); - let epoch = client.get_epoch_info().await?.epoch; - - let addresses = gossip_entries - .iter() - .filter_map(|a| { - if gossip_data_uploaded(&validator_history_map, a.address(), epoch) { - None - } else { - Some(a.address()) - } - }) - .collect::>(); - - let existing_accounts_response = get_multiple_accounts_batched(&addresses, &client).await?; - - let create_transactions = existing_accounts_response - .iter() - .zip(gossip_entries.iter()) - .filter_map(|(existing_account, entry)| { - if existing_account.is_none() { - Some(entry.create_transaction()) - } else { - None - } - }) - .collect::>(); - let update_transactions = gossip_entries .iter() .map(|entry| entry.build_update_tx(PRIORITY_FEE)) .collect::>(); - Ok(CreateUpdateStats { - creates: submit_transactions(&client, create_transactions, &keypair).await?, - updates: submit_transactions(&client, update_transactions, &keypair).await?, - }) + let submit_result = submit_transactions(client, update_transactions, keypair).await; + + submit_result.map_err(|e| e.into()) } -fn gossip_data_uploaded( - validator_history_map: &HashMap, +fn _gossip_data_uploaded( + validator_history_map: &HashMap, vote_account: Pubkey, epoch: u64, ) -> bool { diff --git a/keepers/validator-keeper/src/operations/keeper_operations.rs b/keepers/validator-keeper/src/operations/keeper_operations.rs new file mode 100644 index 00000000..67e7622a --- /dev/null +++ b/keepers/validator-keeper/src/operations/keeper_operations.rs @@ -0,0 +1,128 @@ +use solana_metrics::datapoint_info; + +#[derive(Clone)] +pub enum KeeperOperations { + PreCreateUpdate, + CreateMissingAccounts, + PostCreateUpdate, + ClusterHistory, + GossipUpload, + StakeUpload, + VoteAccount, + MevEarned, + MevCommission, + EmitMetrics, +} + +impl KeeperOperations { + pub const LEN: usize = 10; + + pub fn emit( + runs_for_epoch: &[u64; KeeperOperations::LEN], + errors_for_epoch: &[u64; KeeperOperations::LEN], + ) { + datapoint_info!( + "keeper-operation-stats", + ( + "num-pre-create-update-runs", + runs_for_epoch[KeeperOperations::PreCreateUpdate as usize], + i64 + ), + ( + "num-pre-create-update-errors", + errors_for_epoch[KeeperOperations::PreCreateUpdate as usize], + i64 + ), + ( + "num-create-missing-accounts-runs", + runs_for_epoch[KeeperOperations::CreateMissingAccounts as usize], + i64 + ), + ( + "num-create-missing-accounts-errors", + errors_for_epoch[KeeperOperations::CreateMissingAccounts as usize], + i64 + ), + ( + "num-post-create-update-runs", + runs_for_epoch[KeeperOperations::PostCreateUpdate as usize], + i64 + ), + ( + "num-post-create-update-errors", + errors_for_epoch[KeeperOperations::PostCreateUpdate as usize], + i64 + ), + ( + "num-cluster-history-runs", + runs_for_epoch[KeeperOperations::ClusterHistory as usize], + i64 + ), + ( + "num-cluster-history-errors", + errors_for_epoch[KeeperOperations::ClusterHistory as usize], + i64 + ), + ( + "num-gossip-upload-runs", + runs_for_epoch[KeeperOperations::GossipUpload as usize], + i64 + ), + ( + "num-gossip-upload-errors", + errors_for_epoch[KeeperOperations::GossipUpload as usize], + i64 + ), + ( + "num-stake-upload-runs", + runs_for_epoch[KeeperOperations::StakeUpload as usize], + i64 + ), + ( + "num-stake-upload-errors", + errors_for_epoch[KeeperOperations::StakeUpload as usize], + i64 + ), + ( + "num-vote-account-runs", + runs_for_epoch[KeeperOperations::VoteAccount as usize], + i64 + ), + ( + "num-vote-account-errors", + errors_for_epoch[KeeperOperations::VoteAccount as usize], + i64 + ), + ( + "num-mev-earned-runs", + runs_for_epoch[KeeperOperations::MevEarned as usize], + i64 + ), + ( + "num-mev-earned-errors", + errors_for_epoch[KeeperOperations::MevEarned as usize], + i64 + ), + ( + "num-mev-commission-runs", + runs_for_epoch[KeeperOperations::MevCommission as usize], + i64 + ), + ( + "num-mev-commission-errors", + errors_for_epoch[KeeperOperations::MevCommission as usize], + i64 + ), + ( + "num-emit-metrics-runs", + runs_for_epoch[KeeperOperations::EmitMetrics as usize], + i64 + ), + ( + "num-emit-metrics-errors", + errors_for_epoch[KeeperOperations::EmitMetrics as usize], + i64 + ) + ); + } +} diff --git a/keepers/validator-keeper/src/operations/metrics_emit.rs b/keepers/validator-keeper/src/operations/metrics_emit.rs new file mode 100644 index 00000000..6425c837 --- /dev/null +++ b/keepers/validator-keeper/src/operations/metrics_emit.rs @@ -0,0 +1,161 @@ +/* +This program starts several threads to manage the creation of validator history accounts, +and the updating of the various data feeds within the accounts. +It will emits metrics for each data feed, if env var SOLANA_METRICS_CONFIG is set to a valid influx server. +*/ +use crate::state::keeper_state::KeeperState; +use log::*; +use solana_metrics::datapoint_info; +use validator_history::ValidatorHistoryEntry; + +use super::keeper_operations::KeeperOperations; + +fn _get_operation() -> KeeperOperations { + KeeperOperations::EmitMetrics +} + +fn _should_run() -> bool { + true +} + +async fn _process(keeper_state: &KeeperState) -> Result<(), Box> { + emit_validator_history_metrics(keeper_state).await +} + +pub async fn fire(keeper_state: &KeeperState) -> (KeeperOperations, u64, u64) { + let operation = _get_operation(); + let (mut runs_for_epoch, mut errors_for_epoch) = + keeper_state.copy_runs_and_errors_for_epoch(operation.clone()); + + let should_run = _should_run(); + + if should_run { + match _process(keeper_state).await { + Ok(_) => { + runs_for_epoch += 1; + } + Err(e) => { + errors_for_epoch += 1; + error!("Failed to emit validator history metrics: {}", e); + } + } + } + + (operation, runs_for_epoch, errors_for_epoch) +} + +// ----------------- OPERATION SPECIFIC FUNCTIONS ----------------- +pub async fn emit_validator_history_metrics( + keeper_state: &KeeperState, +) -> Result<(), Box> { + let epoch_info = &keeper_state.epoch_info; + let keeper_balance = keeper_state.keeper_balance; + let get_vote_accounts = keeper_state.vote_account_map.values().collect::>(); + let validator_histories = &keeper_state + .validator_history_map + .values() + .collect::>(); + let cluster_history = &keeper_state.cluster_history; + + let mut ips = 0; + let mut versions = 0; + let mut types = 0; + let mut mev_comms = 0; + let mut comms = 0; + let mut epoch_credits = 0; + let mut stakes = 0; + let num_validators = validator_histories.len(); + let default = ValidatorHistoryEntry::default(); + + let mut all_history_vote_accounts = Vec::new(); + for validator_history in validator_histories { + if let Some(entry) = validator_history.history.last() { + if entry.epoch as u64 != epoch_info.epoch { + continue; + } + if entry.ip != default.ip { + ips += 1; + } + if !(entry.version.major == default.version.major + && entry.version.minor == default.version.minor + && entry.version.patch == default.version.patch) + { + versions += 1; + } + if entry.client_type != default.client_type { + types += 1; + } + if entry.mev_commission != default.mev_commission { + mev_comms += 1; + } + if entry.commission != default.commission { + comms += 1; + } + if entry.epoch_credits != default.epoch_credits { + epoch_credits += 1; + } + if entry.activated_stake_lamports != default.activated_stake_lamports { + stakes += 1; + } + } + + all_history_vote_accounts.push(validator_history.vote_account); + } + + let mut cluster_history_blocks: i64 = 0; + let cluster_history_entry = cluster_history.history.last(); + if let Some(cluster_history) = cluster_history_entry { + // Looking for previous epoch to be updated + if cluster_history.epoch as u64 == epoch_info.epoch - 1 { + cluster_history_blocks = 1; + } + } + + let get_vote_accounts_count = get_vote_accounts.len() as i64; + + let live_validator_histories_count = keeper_state.get_live_vote_accounts().len(); + + let get_vote_accounts_voting = get_vote_accounts + .iter() + .filter(|x| { + // Check if the last epoch credit ( most recent ) is the current epoch + x.epoch_credits.last().unwrap().0 == epoch_info.epoch + }) + .count(); + + datapoint_info!( + "validator-history-stats", + ("num_validator_histories", num_validators, i64), + ( + "num_live_validator_histories", + live_validator_histories_count, + i64 + ), + ("num_ips", ips, i64), + ("num_versions", versions, i64), + ("num_client_types", types, i64), + ("num_mev_commissions", mev_comms, i64), + ("num_commissions", comms, i64), + ("num_epoch_credits", epoch_credits, i64), + ("num_stakes", stakes, i64), + ("cluster_history_blocks", cluster_history_blocks, i64), + ("slot_index", epoch_info.slot_index, i64), + ( + "num_get_vote_accounts_responses", + get_vote_accounts_count, + i64 + ), + ( + "num_get_vote_accounts_voting", + get_vote_accounts_voting, + i64 + ), + ); + + datapoint_info!( + "stakenet-keeper-stats", + ("balance_lamports", keeper_balance, i64), + ); + + Ok(()) +} diff --git a/keepers/validator-keeper/src/operations/mev_commission.rs b/keepers/validator-keeper/src/operations/mev_commission.rs new file mode 100644 index 00000000..d7dbcb39 --- /dev/null +++ b/keepers/validator-keeper/src/operations/mev_commission.rs @@ -0,0 +1,147 @@ +/* +This program starts several threads to manage the creation of validator history accounts, +and the updating of the various data feeds within the accounts. +It will emits metrics for each data feed, if env var SOLANA_METRICS_CONFIG is set to a valid influx server. +*/ + +use crate::entries::mev_commission_entry::ValidatorMevCommissionEntry; +use crate::state::keeper_state::KeeperState; +use crate::{KeeperError, PRIORITY_FEE}; +use keeper_core::{submit_instructions, SubmitStats, UpdateInstruction}; +use solana_client::nonblocking::rpc_client::RpcClient; +use solana_metrics::datapoint_error; +use solana_sdk::{ + pubkey::Pubkey, + signature::{Keypair, Signer}, +}; +use std::{collections::HashMap, sync::Arc}; +use validator_history::ValidatorHistory; +use validator_history::ValidatorHistoryEntry; + +use super::keeper_operations::KeeperOperations; + +fn _get_operation() -> KeeperOperations { + KeeperOperations::MevCommission +} + +fn _should_run() -> bool { + true +} + +async fn _process( + client: &Arc, + keypair: &Arc, + program_id: &Pubkey, + tip_distribution_program_id: &Pubkey, + keeper_state: &KeeperState, +) -> Result { + update_mev_commission( + client, + keypair, + program_id, + tip_distribution_program_id, + keeper_state, + ) + .await +} + +pub async fn fire( + client: &Arc, + keypair: &Arc, + program_id: &Pubkey, + tip_distribution_program_id: &Pubkey, + keeper_state: &KeeperState, +) -> (KeeperOperations, u64, u64) { + let operation = _get_operation(); + let (mut runs_for_epoch, mut errors_for_epoch) = + keeper_state.copy_runs_and_errors_for_epoch(operation.clone()); + + let should_run = _should_run(); + + if should_run { + match _process( + client, + keypair, + program_id, + tip_distribution_program_id, + keeper_state, + ) + .await + { + Ok(stats) => { + for message in stats.results.iter().chain(stats.results.iter()) { + if let Err(e) = message { + datapoint_error!("vote-account-error", ("error", e.to_string(), String),); + errors_for_epoch += 1; + } + } + if stats.errors == 0 { + runs_for_epoch += 1; + } + } + Err(e) => { + datapoint_error!("mev-earned-error", ("error", e.to_string(), String),); + errors_for_epoch += 1; + } + }; + } + + (operation, runs_for_epoch, errors_for_epoch) +} + +// ----------------- OPERATION SPECIFIC FUNCTIONS ----------------- + +pub async fn update_mev_commission( + client: &Arc, + keypair: &Arc, + program_id: &Pubkey, + tip_distribution_program_id: &Pubkey, + keeper_state: &KeeperState, +) -> Result { + let epoch_info = &keeper_state.epoch_info; + let validator_history_map = &keeper_state.validator_history_map; + let current_epoch_tip_distribution_map = &keeper_state.current_epoch_tip_distribution_map; + + let existing_entries = current_epoch_tip_distribution_map + .iter() + .filter_map(|(pubkey, account)| account.as_ref().map(|_| *pubkey)) + .collect::>(); + + let entries_to_update = existing_entries + .into_iter() + .filter(|entry| !mev_commission_uploaded(validator_history_map, entry, epoch_info.epoch)) + .collect::>(); + + let update_instructions = entries_to_update + .iter() + .map(|vote_account| { + ValidatorMevCommissionEntry::new( + vote_account, + epoch_info.epoch, + program_id, + tip_distribution_program_id, + &keypair.pubkey(), + ) + .update_instruction() + }) + .collect::>(); + + let submit_result = + submit_instructions(client, update_instructions, keypair, PRIORITY_FEE).await; + + submit_result.map_err(|e| e.into()) +} + +fn mev_commission_uploaded( + validator_history_map: &HashMap, + vote_account: &Pubkey, + epoch: u64, +) -> bool { + if let Some(validator_history) = validator_history_map.get(vote_account) { + if let Some(latest_entry) = validator_history.history.last() { + return latest_entry.epoch == epoch as u16 + && latest_entry.mev_commission != ValidatorHistoryEntry::default().mev_commission; + } + } + false +} diff --git a/keepers/validator-keeper/src/operations/mev_earned.rs b/keepers/validator-keeper/src/operations/mev_earned.rs new file mode 100644 index 00000000..82b0538c --- /dev/null +++ b/keepers/validator-keeper/src/operations/mev_earned.rs @@ -0,0 +1,168 @@ +/* +This program starts several threads to manage the creation of validator history accounts, +and the updating of the various data feeds within the accounts. +It will emits metrics for each data feed, if env var SOLANA_METRICS_CONFIG is set to a valid influx server. +*/ + +use crate::entries::mev_commission_entry::ValidatorMevCommissionEntry; +use crate::state::keeper_state::KeeperState; +use crate::{KeeperError, PRIORITY_FEE}; +use anchor_lang::AccountDeserialize; +use jito_tip_distribution::state::TipDistributionAccount; +use keeper_core::{submit_instructions, SubmitStats, UpdateInstruction}; +use solana_client::nonblocking::rpc_client::RpcClient; +use solana_metrics::datapoint_error; +use solana_sdk::{ + pubkey::Pubkey, + signature::{Keypair, Signer}, +}; +use std::{collections::HashMap, sync::Arc}; +use validator_history::ValidatorHistory; +use validator_history::ValidatorHistoryEntry; + +use super::keeper_operations::KeeperOperations; + +fn _get_operation() -> KeeperOperations { + KeeperOperations::MevEarned +} + +fn _should_run() -> bool { + true +} + +async fn _process( + client: &Arc, + keypair: &Arc, + program_id: &Pubkey, + tip_distribution_program_id: &Pubkey, + keeper_state: &KeeperState, +) -> Result { + update_mev_earned( + client, + keypair, + program_id, + tip_distribution_program_id, + keeper_state, + ) + .await +} + +pub async fn fire( + client: &Arc, + keypair: &Arc, + program_id: &Pubkey, + tip_distribution_program_id: &Pubkey, + keeper_state: &KeeperState, +) -> (KeeperOperations, u64, u64) { + let operation = _get_operation(); + + let (mut runs_for_epoch, mut errors_for_epoch) = + keeper_state.copy_runs_and_errors_for_epoch(operation.clone()); + + let should_run = _should_run(); + + if should_run { + match _process( + client, + keypair, + program_id, + tip_distribution_program_id, + keeper_state, + ) + .await + { + Ok(stats) => { + for message in stats.results.iter().chain(stats.results.iter()) { + if let Err(e) = message { + datapoint_error!("vote-account-error", ("error", e.to_string(), String),); + errors_for_epoch += 1; + } + } + if stats.errors == 0 { + runs_for_epoch += 1; + } + } + Err(e) => { + datapoint_error!("mev-earned-error", ("error", e.to_string(), String),); + errors_for_epoch += 1; + } + }; + } + + (operation, runs_for_epoch, errors_for_epoch) +} + +// ----------------- OPERATION SPECIFIC FUNCTIONS ----------------- + +pub async fn update_mev_earned( + client: &Arc, + keypair: &Arc, + program_id: &Pubkey, + tip_distribution_program_id: &Pubkey, + keeper_state: &KeeperState, +) -> Result { + let epoch_info = &keeper_state.epoch_info; + let validator_history_map = &keeper_state.validator_history_map; + let previous_epoch_tip_distribution_map = &keeper_state.previous_epoch_tip_distribution_map; + + let uploaded_merkleroot_entries = previous_epoch_tip_distribution_map + .iter() + .filter_map(|(address, account)| { + let account_data = account.as_ref()?; + let mut data: &[u8] = &account_data.data; + let tda = TipDistributionAccount::try_deserialize(&mut data).ok()?; + if tda.merkle_root.is_some() { + Some(*address) + } else { + None + } + }) + .collect::>(); + + let entries_to_update = uploaded_merkleroot_entries + .into_iter() + .filter(|entry| { + !mev_earned_uploaded( + validator_history_map, + entry, + epoch_info.epoch.saturating_sub(1), + ) + }) + .collect::>(); + + let update_instructions = entries_to_update + .iter() + .map(|vote_account| { + ValidatorMevCommissionEntry::new( + vote_account, + epoch_info.epoch.saturating_sub(1), + program_id, + tip_distribution_program_id, + &keypair.pubkey(), + ) + .update_instruction() + }) + .collect::>(); + + let submit_result = + submit_instructions(client, update_instructions, keypair, PRIORITY_FEE).await; + + submit_result.map_err(|e| e.into()) +} + +fn mev_earned_uploaded( + validator_history_map: &HashMap, + vote_account: &Pubkey, + epoch: u64, +) -> bool { + if let Some(validator_history) = validator_history_map.get(vote_account) { + if let Some(latest_entry) = validator_history + .history + .epoch_range(epoch as u16, epoch as u16)[0] + { + return latest_entry.epoch == epoch as u16 + && latest_entry.mev_earned != ValidatorHistoryEntry::default().mev_earned; + } + }; + false +} diff --git a/keepers/validator-keeper/src/operations/mod.rs b/keepers/validator-keeper/src/operations/mod.rs new file mode 100644 index 00000000..6d5773c5 --- /dev/null +++ b/keepers/validator-keeper/src/operations/mod.rs @@ -0,0 +1,8 @@ +pub mod cluster_history; +pub mod gossip_upload; +pub mod keeper_operations; +pub mod metrics_emit; +pub mod mev_commission; +pub mod mev_earned; +pub mod stake_upload; +pub mod vote_account; diff --git a/keepers/validator-keeper/src/operations/stake_upload.rs b/keepers/validator-keeper/src/operations/stake_upload.rs new file mode 100644 index 00000000..8155eb5e --- /dev/null +++ b/keepers/validator-keeper/src/operations/stake_upload.rs @@ -0,0 +1,295 @@ +use crate::entries::stake_history_entry::StakeHistoryEntry; +/* +This program starts several threads to manage the creation of validator history accounts, +and the updating of the various data feeds within the accounts. +It will emits metrics for each data feed, if env var SOLANA_METRICS_CONFIG is set to a valid influx server. +*/ +use crate::state::keeper_state::KeeperState; +use crate::{KeeperError, PRIORITY_FEE}; +use keeper_core::{submit_instructions, SubmitStats, UpdateInstruction}; +use log::*; +use solana_client::nonblocking::rpc_client::RpcClient; +use solana_client::rpc_response::RpcVoteAccountInfo; +use solana_metrics::datapoint_error; +use solana_sdk::{ + epoch_info::EpochInfo, + pubkey::Pubkey, + signature::{Keypair, Signer}, +}; +use std::{collections::HashMap, str::FromStr, sync::Arc}; +use validator_history::{ValidatorHistory, ValidatorHistoryEntry}; + +use super::keeper_operations::KeeperOperations; + +fn _get_operation() -> KeeperOperations { + KeeperOperations::StakeUpload +} + +fn _should_run(epoch_info: &EpochInfo, runs_for_epoch: u64) -> bool { + // Run at 0.1%, 50% and 90% completion of epoch + (epoch_info.slot_index > epoch_info.slots_in_epoch / 1000 && runs_for_epoch < 1) + || (epoch_info.slot_index > epoch_info.slots_in_epoch / 2 && runs_for_epoch < 2) + || (epoch_info.slot_index > epoch_info.slots_in_epoch * 9 / 10 && runs_for_epoch < 3) +} + +async fn _process( + client: &Arc, + keypair: &Arc, + program_id: &Pubkey, + keeper_state: &KeeperState, +) -> Result> { + update_stake_history(client, keypair, program_id, keeper_state).await +} + +pub async fn fire( + client: &Arc, + keypair: &Arc, + program_id: &Pubkey, + keeper_state: &KeeperState, +) -> (KeeperOperations, u64, u64) { + let operation = _get_operation(); + let (mut runs_for_epoch, mut errors_for_epoch) = + keeper_state.copy_runs_and_errors_for_epoch(operation.clone()); + + let should_run = _should_run(&keeper_state.epoch_info, runs_for_epoch); + + if should_run { + match _process(client, keypair, program_id, keeper_state).await { + Ok(stats) => { + for message in stats.results.iter().chain(stats.results.iter()) { + if let Err(e) = message { + datapoint_error!("stake-history-error", ("error", e.to_string(), String),); + } + } + + if stats.errors == 0 { + runs_for_epoch += 1; + } + } + Err(e) => { + datapoint_error!("stake-history-error", ("error", e.to_string(), String),); + errors_for_epoch += 1; + } + }; + } + + (operation, runs_for_epoch, errors_for_epoch) +} + +// ----------------- OPERATION SPECIFIC FUNCTIONS ----------------- + +pub async fn update_stake_history( + client: &Arc, + keypair: &Arc, + program_id: &Pubkey, + keeper_state: &KeeperState, +) -> Result> { + let epoch_info = &keeper_state.epoch_info; + let vote_accounts = &keeper_state.vote_account_map.values().collect::>(); + let validator_history_map = &keeper_state.validator_history_map; + + // Need to ensure that the response contains update stake amounts for the current epoch, + // so we find the largest epoch a validator has voted on to confirm the data is fresh + let max_vote_account_epoch = vote_accounts + .iter() + .flat_map(|vote_account| vote_account.epoch_credits.clone()) + .map(|(epoch, _, _)| epoch) + .max() + .unwrap_or(0); + + let (stake_rank_map, superminority_threshold) = + get_stake_rank_map_and_superminority_count(vote_accounts); + + if max_vote_account_epoch != epoch_info.epoch { + //TODO Go through with custom errors + return Err(Box::new(KeeperError::Custom("EpochMismatch".into()))); + } + + let entries_to_update = vote_accounts + .iter() + .filter_map(|vote_account| { + let rank = stake_rank_map[&vote_account.vote_pubkey.clone()]; + let is_superminority = rank <= superminority_threshold; + + if stake_entry_uploaded(validator_history_map, vote_account, epoch_info.epoch) { + return None; + } + + Some(StakeHistoryEntry::new( + vote_account, + program_id, + &keypair.pubkey(), + epoch_info.epoch, + rank, + is_superminority, + )) + }) + .collect::>(); + + let update_instructions = entries_to_update + .iter() + .map(|stake_history_entry| stake_history_entry.update_instruction()) + .collect::>(); + + let submit_result = + submit_instructions(client, update_instructions, keypair, PRIORITY_FEE).await; + + submit_result.map_err(|e| e.into()) +} + +fn stake_entry_uploaded( + validator_history_map: &HashMap, + vote_account: &RpcVoteAccountInfo, + epoch: u64, +) -> bool { + let vote_account = Pubkey::from_str(&vote_account.vote_pubkey) + .map_err(|e| { + error!("Invalid vote account pubkey"); + e + }) + .expect("Invalid vote account pubkey"); + if let Some(validator_history) = validator_history_map.get(&vote_account) { + if let Some(latest_entry) = validator_history.history.last() { + return latest_entry.epoch == epoch as u16 + && latest_entry.is_superminority + != ValidatorHistoryEntry::default().is_superminority + && latest_entry.rank != ValidatorHistoryEntry::default().rank + && latest_entry.activated_stake_lamports + != ValidatorHistoryEntry::default().activated_stake_lamports; + } + } + false +} + +/* +Calculates ordering of validators by stake, assigning a 0..N rank (validator 0 has the most stake), +and returns the index at which all validators before are in the superminority. 0-indexed. +*/ +fn get_stake_rank_map_and_superminority_count( + vote_accounts: &[&RpcVoteAccountInfo], +) -> (HashMap, u32) { + let mut stake_vec = vote_accounts + .iter() + .map(|va| (va.vote_pubkey.clone(), va.activated_stake)) + .collect::>(); + + let total_stake = stake_vec.iter().map(|(_, stake)| *stake).sum::(); + stake_vec.sort_by(|a, b| b.1.cmp(&a.1)); + + let mut cumulative_stake = 0; + let mut superminority_threshold = 0; + for (i, (_, stake)) in stake_vec.iter().enumerate() { + cumulative_stake += stake; + if cumulative_stake > total_stake / 3 { + superminority_threshold = i as u32; + break; + } + } + let stake_rank_map = HashMap::from_iter( + stake_vec + .into_iter() + .enumerate() + .map(|(i, (vote_pubkey, _))| (vote_pubkey, i as u32)), + ); + + (stake_rank_map, superminority_threshold) +} + +// /* +// Utility to recompute the superminority and rank fields for all validators from start_epoch to end_epoch. +// Will over-write the on-chain data, so should only be used when the on-chain data is corrupted. +// */ +// pub async fn _recompute_superminority_and_rank( +// client: Arc, +// keypair: Arc, +// program_id: &Pubkey, +// start_epoch: u64, +// end_epoch: u64, +// ) -> Result<(), KeeperError> { +// // Fetch every ValidatorHistory account +// let gpa_config = RpcProgramAccountsConfig { +// filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new_raw_bytes( +// 0, +// ValidatorHistory::discriminator().into(), +// ))]), +// account_config: RpcAccountInfoConfig { +// encoding: Some(solana_account_decoder::UiAccountEncoding::Base64), +// ..RpcAccountInfoConfig::default() +// }, +// ..RpcProgramAccountsConfig::default() +// }; +// let validator_history_accounts = client +// .get_program_accounts_with_config(&validator_history::id(), gpa_config) +// .await +// .expect("Failed to get validator history accounts"); + +// let validator_histories = validator_history_accounts +// .iter() +// .map(|(_, account)| { +// let validator_history = ValidatorHistory::try_deserialize(&mut account.data.as_slice()) +// .expect("Failed to deserialize validator history account"); +// validator_history +// }) +// .collect::>(); + +// for epoch in start_epoch..=end_epoch { +// // Get entry for each validator for this epoch +// let vote_accounts: Vec = validator_histories +// .iter() +// .filter_map(|validator| { +// validator +// .history +// .arr +// .iter() +// .find(|entry| { +// entry.epoch == epoch as u16 && entry.activated_stake_lamports != u64::MAX +// }) +// .map(|entry| { +// // All values except vote_pubkey and activated_stake are unused +// RpcVoteAccountInfo { +// vote_pubkey: validator.vote_account.to_string(), +// activated_stake: entry.activated_stake_lamports, +// epoch_credits: vec![], +// commission: 0, +// root_slot: 0, +// node_pubkey: "".to_string(), +// epoch_vote_account: false, +// last_vote: 0, +// } +// }) +// .into() +// }) +// .collect(); + +// let (stake_rank_map, superminority_threshold) = +// get_stake_rank_map_and_superminority_count(&vote_accounts); + +// let stake_history_entries = vote_accounts +// .iter() +// .map(|va| { +// let rank = stake_rank_map[&va.vote_pubkey.clone()]; +// let is_superminority = rank <= superminority_threshold; +// StakeHistoryEntry::new( +// va, +// program_id, +// &keypair.pubkey(), +// epoch, +// rank, +// is_superminority, +// ) +// }) +// .collect::>(); + +// let update_instructions = stake_history_entries +// .iter() +// .map(|entry| entry.update_instruction()) +// .collect::>(); + +// match submit_instructions(&client, update_instructions, &keypair, PRIORITY_FEE).await { +// Ok(_) => println!("completed epoch {}", epoch), +// Err(e) => return Err(e.into()), +// }; +// } + +// Ok(()) +// } diff --git a/keepers/validator-keeper/src/operations/vote_account.rs b/keepers/validator-keeper/src/operations/vote_account.rs new file mode 100644 index 00000000..fa09fc97 --- /dev/null +++ b/keepers/validator-keeper/src/operations/vote_account.rs @@ -0,0 +1,138 @@ +/* +This program starts several threads to manage the creation of validator history accounts, +and the updating of the various data feeds within the accounts. +It will emits metrics for each data feed, if env var SOLANA_METRICS_CONFIG is set to a valid influx server. +*/ + +use crate::entries::copy_vote_account_entry::CopyVoteAccountEntry; +use crate::state::keeper_state::KeeperState; +use crate::{KeeperError, PRIORITY_FEE}; +use keeper_core::{submit_instructions, SubmitStats, UpdateInstruction}; +use solana_client::nonblocking::rpc_client::RpcClient; +use solana_metrics::datapoint_error; +use solana_sdk::{ + epoch_info::EpochInfo, + pubkey::Pubkey, + signature::{Keypair, Signer}, +}; +use std::{collections::HashMap, sync::Arc}; +use validator_history::ValidatorHistory; +use validator_history::ValidatorHistoryEntry; + +use super::keeper_operations::KeeperOperations; + +fn _get_operation() -> KeeperOperations { + KeeperOperations::VoteAccount +} + +fn _should_run(epoch_info: &EpochInfo, runs_for_epoch: u64) -> bool { + // Run at 10%, 50% and 90% completion of epoch + (epoch_info.slot_index > epoch_info.slots_in_epoch / 1000 && runs_for_epoch < 1) + || (epoch_info.slot_index > epoch_info.slots_in_epoch / 2 && runs_for_epoch < 2) + || (epoch_info.slot_index > epoch_info.slots_in_epoch * 9 / 10 && runs_for_epoch < 3) +} + +async fn _process( + client: &Arc, + keypair: &Arc, + program_id: &Pubkey, + keeper_state: &KeeperState, +) -> Result { + update_vote_accounts(client, keypair, program_id, keeper_state).await +} + +pub async fn fire( + client: &Arc, + keypair: &Arc, + program_id: &Pubkey, + keeper_state: &KeeperState, +) -> (KeeperOperations, u64, u64) { + let operation = _get_operation(); + let epoch_info = &keeper_state.epoch_info; + let (mut runs_for_epoch, mut errors_for_epoch) = + keeper_state.copy_runs_and_errors_for_epoch(operation.clone()); + + let should_run = _should_run(epoch_info, runs_for_epoch); + + if should_run { + match _process(client, keypair, program_id, keeper_state).await { + Ok(stats) => { + for message in stats.results.iter().chain(stats.results.iter()) { + if let Err(e) = message { + datapoint_error!("vote-account-error", ("error", e.to_string(), String),); + } + } + if stats.errors == 0 { + runs_for_epoch += 1; + } + } + Err(e) => { + datapoint_error!("vote-account-error", ("error", e.to_string(), String),); + errors_for_epoch += 1; + } + }; + } + + (operation, runs_for_epoch, errors_for_epoch) +} + +// SPECIFIC TO THIS OPERATION +pub async fn update_vote_accounts( + rpc_client: &Arc, + keypair: &Arc, + program_id: &Pubkey, + keeper_state: &KeeperState, +) -> Result { + let validator_history_map = &keeper_state.validator_history_map; + let closed_vote_accounts = &keeper_state.get_closed_vote_accounts(); + let epoch_info = &keeper_state.epoch_info; + + // Remove closed vote accounts from all vote accounts + // Remove vote accounts for which this instruction has been called within 50,000 slots + let mut vote_accounts_to_update = keeper_state.vote_account_map.keys().collect::>(); + + vote_accounts_to_update.retain(|vote_account| { + !closed_vote_accounts.contains(vote_account) + && !vote_account_uploaded_recently( + validator_history_map, + vote_account, + epoch_info.epoch, + epoch_info.absolute_slot, + ) + }); + + let entries = vote_accounts_to_update + .iter() + .map(|vote_account| CopyVoteAccountEntry::new(vote_account, program_id, &keypair.pubkey())) + .collect::>(); + + let update_instructions = entries + .iter() + .map(|copy_vote_account_entry| copy_vote_account_entry.update_instruction()) + .collect::>(); + + let submit_result = + submit_instructions(rpc_client, update_instructions, keypair, PRIORITY_FEE).await; + + submit_result.map_err(|e| e.into()) +} + +fn vote_account_uploaded_recently( + validator_history_map: &HashMap, + vote_account: &Pubkey, + epoch: u64, + slot: u64, +) -> bool { + if let Some(validator_history) = validator_history_map.get(vote_account) { + if let Some(entry) = validator_history.history.last() { + if entry.epoch == epoch as u16 + && entry.vote_account_last_update_slot + != ValidatorHistoryEntry::default().vote_account_last_update_slot + && entry.vote_account_last_update_slot > slot - 50000 + { + return true; + } + } + } + false +} diff --git a/keepers/validator-keeper/src/stake.rs b/keepers/validator-keeper/src/stake.rs deleted file mode 100644 index 2d11c4d1..00000000 --- a/keepers/validator-keeper/src/stake.rs +++ /dev/null @@ -1,373 +0,0 @@ -use std::{collections::HashMap, str::FromStr, sync::Arc}; - -use anchor_lang::{AccountDeserialize, Discriminator, InstructionData, ToAccountMetas}; -use keeper_core::{ - build_create_and_update_instructions, get_vote_accounts_with_retry, submit_create_and_update, - submit_instructions, Address, CreateTransaction, CreateUpdateStats, UpdateInstruction, -}; -use log::error; -use solana_client::{ - nonblocking::rpc_client::RpcClient, - rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, - rpc_filter::{Memcmp, RpcFilterType}, - rpc_response::RpcVoteAccountInfo, -}; -use solana_metrics::datapoint_info; -use solana_sdk::{ - commitment_config::CommitmentConfig, instruction::Instruction, pubkey::Pubkey, - signature::Keypair, signer::Signer, -}; -use validator_history::{ - constants::{MAX_ALLOC_BYTES, MIN_VOTE_EPOCHS}, - state::{Config, ValidatorHistory}, - ValidatorHistoryEntry, -}; - -use crate::{get_validator_history_accounts_with_retry, KeeperError, PRIORITY_FEE}; - -pub struct StakeHistoryEntry { - pub stake: u64, - pub rank: u32, - pub is_superminority: bool, - pub vote_account: Pubkey, - pub address: Pubkey, - pub config_address: Pubkey, - pub signer: Pubkey, - pub program_id: Pubkey, - pub epoch: u64, -} - -impl StakeHistoryEntry { - pub fn new( - vote_account: &RpcVoteAccountInfo, - program_id: &Pubkey, - signer: &Pubkey, - epoch: u64, - rank: u32, - is_superminority: bool, - ) -> StakeHistoryEntry { - let vote_pubkey = Pubkey::from_str(&vote_account.vote_pubkey) - .map_err(|e| { - error!("Invalid vote account pubkey"); - e - }) - .expect("Invalid vote account pubkey"); - let (address, _) = Pubkey::find_program_address( - &[ValidatorHistory::SEED, &vote_pubkey.to_bytes()], - program_id, - ); - let (config_address, _) = Pubkey::find_program_address(&[Config::SEED], program_id); - - StakeHistoryEntry { - stake: vote_account.activated_stake, - rank, - is_superminority, - vote_account: vote_pubkey, - address, - config_address, - signer: *signer, - program_id: *program_id, - epoch, - } - } -} - -impl CreateTransaction for StakeHistoryEntry { - fn create_transaction(&self) -> Vec { - let mut ixs = vec![Instruction { - program_id: self.program_id, - accounts: validator_history::accounts::InitializeValidatorHistoryAccount { - validator_history_account: self.address, - vote_account: self.vote_account, - system_program: solana_program::system_program::id(), - signer: self.signer, - } - .to_account_metas(None), - data: validator_history::instruction::InitializeValidatorHistoryAccount {}.data(), - }]; - let num_reallocs = (ValidatorHistory::SIZE - MAX_ALLOC_BYTES) / MAX_ALLOC_BYTES + 1; - ixs.extend(vec![ - Instruction { - program_id: self.program_id, - accounts: validator_history::accounts::ReallocValidatorHistoryAccount { - validator_history_account: self.address, - vote_account: self.vote_account, - config: self.config_address, - system_program: solana_program::system_program::id(), - signer: self.signer, - } - .to_account_metas(None), - data: validator_history::instruction::ReallocValidatorHistoryAccount {}.data(), - }; - num_reallocs - ]); - ixs - } -} - -impl Address for StakeHistoryEntry { - fn address(&self) -> Pubkey { - self.address - } -} - -impl UpdateInstruction for StakeHistoryEntry { - fn update_instruction(&self) -> Instruction { - Instruction { - program_id: self.program_id, - accounts: validator_history::accounts::UpdateStakeHistory { - validator_history_account: self.address, - vote_account: self.vote_account, - config: self.config_address, - oracle_authority: self.signer, - } - .to_account_metas(None), - data: validator_history::instruction::UpdateStakeHistory { - lamports: self.stake, - epoch: self.epoch, - rank: self.rank, - is_superminority: self.is_superminority, - } - .data(), - } - } -} - -/* -Calculates ordering of validators by stake, assigning a 0..N rank (validator 0 has the most stake), -and returns the index at which all validators before are in the superminority. 0-indexed. -*/ -fn get_stake_rank_map_and_superminority_count( - vote_accounts: &[RpcVoteAccountInfo], -) -> (HashMap, u32) { - let mut stake_vec = vote_accounts - .iter() - .map(|va| (va.vote_pubkey.clone(), va.activated_stake)) - .collect::>(); - - let total_stake = stake_vec.iter().map(|(_, stake)| *stake).sum::(); - stake_vec.sort_by(|a, b| b.1.cmp(&a.1)); - - let mut cumulative_stake = 0; - let mut superminority_threshold = 0; - for (i, (_, stake)) in stake_vec.iter().enumerate() { - cumulative_stake += stake; - if cumulative_stake > total_stake / 3 { - superminority_threshold = i as u32; - break; - } - } - let stake_rank_map = HashMap::from_iter( - stake_vec - .into_iter() - .enumerate() - .map(|(i, (vote_pubkey, _))| (vote_pubkey, i as u32)), - ); - - (stake_rank_map, superminority_threshold) -} - -pub async fn update_stake_history( - client: Arc, - keypair: Arc, - program_id: &Pubkey, -) -> Result { - let vote_accounts = get_vote_accounts_with_retry( - &client, - MIN_VOTE_EPOCHS, - Some(CommitmentConfig::finalized()), - ) - .await?; - - // Need to ensure that the response contains update stake amounts for the current epoch, - // so we find the largest epoch a validator has voted on to confirm the data is fresh - let max_vote_account_epoch = vote_accounts - .iter() - .flat_map(|va| va.epoch_credits.clone()) - .map(|(epoch, _, _)| epoch) - .max() - .unwrap_or(0); - - let validator_histories = - get_validator_history_accounts_with_retry(&client, *program_id).await?; - - let validator_history_map = - HashMap::from_iter(validator_histories.iter().map(|vh| (vh.vote_account, vh))); - let (stake_rank_map, superminority_threshold) = - get_stake_rank_map_and_superminority_count(&vote_accounts); - - let epoch = client - .get_epoch_info_with_commitment(CommitmentConfig::finalized()) - .await? - .epoch; - - if max_vote_account_epoch != epoch { - return Err(KeeperError::Custom("EpochMismatch".into())); - } - - let stake_history_entries = vote_accounts - .iter() - .filter_map(|va| { - let rank = stake_rank_map[&va.vote_pubkey.clone()]; - let is_superminority = rank <= superminority_threshold; - - if stake_entry_uploaded(&validator_history_map, va, epoch) { - return None; - } - - Some(StakeHistoryEntry::new( - va, - program_id, - &keypair.pubkey(), - epoch, - rank, - is_superminority, - )) - }) - .collect::>(); - - let (create_transactions, update_instructions) = - build_create_and_update_instructions(&client, &stake_history_entries).await?; - - submit_create_and_update( - &client, - create_transactions, - update_instructions, - &keypair, - PRIORITY_FEE, - ) - .await - .map_err(|e| e.into()) -} - -/* - Utility to recompute the superminority and rank fields for all validators from start_epoch to end_epoch. - Will over-write the on-chain data, so should only be used when the on-chain data is corrupted. -*/ -pub async fn _recompute_superminority_and_rank( - client: Arc, - keypair: Arc, - program_id: &Pubkey, - start_epoch: u64, - end_epoch: u64, -) -> Result<(), KeeperError> { - // Fetch every ValidatorHistory account - let gpa_config = RpcProgramAccountsConfig { - filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new_raw_bytes( - 0, - ValidatorHistory::discriminator().into(), - ))]), - account_config: RpcAccountInfoConfig { - encoding: Some(solana_account_decoder::UiAccountEncoding::Base64), - ..RpcAccountInfoConfig::default() - }, - ..RpcProgramAccountsConfig::default() - }; - let validator_history_accounts = client - .get_program_accounts_with_config(&validator_history::id(), gpa_config) - .await - .expect("Failed to get validator history accounts"); - - let validator_histories = validator_history_accounts - .iter() - .map(|(_, account)| { - let validator_history = ValidatorHistory::try_deserialize(&mut account.data.as_slice()) - .expect("Failed to deserialize validator history account"); - validator_history - }) - .collect::>(); - - for epoch in start_epoch..=end_epoch { - // Get entry for each validator for this epoch - let vote_accounts: Vec = validator_histories - .iter() - .filter_map(|validator| { - validator - .history - .arr - .iter() - .find(|entry| { - entry.epoch == epoch as u16 && entry.activated_stake_lamports != u64::MAX - }) - .map(|entry| { - // All values except vote_pubkey and activated_stake are unused - RpcVoteAccountInfo { - vote_pubkey: validator.vote_account.to_string(), - activated_stake: entry.activated_stake_lamports, - epoch_credits: vec![], - commission: 0, - root_slot: 0, - node_pubkey: "".to_string(), - epoch_vote_account: false, - last_vote: 0, - } - }) - }) - .collect(); - let (stake_rank_map, superminority_threshold) = - get_stake_rank_map_and_superminority_count(&vote_accounts); - - let stake_history_entries = vote_accounts - .iter() - .map(|va| { - let rank = stake_rank_map[&va.vote_pubkey.clone()]; - let is_superminority = rank <= superminority_threshold; - StakeHistoryEntry::new( - va, - program_id, - &keypair.pubkey(), - epoch, - rank, - is_superminority, - ) - }) - .collect::>(); - - let update_instructions = stake_history_entries - .iter() - .map(|entry| entry.update_instruction()) - .collect::>(); - - match submit_instructions(&client, update_instructions, &keypair, PRIORITY_FEE).await { - Ok(_) => println!("completed epoch {}", epoch), - Err(e) => return Err(e.into()), - }; - } - - Ok(()) -} - -fn stake_entry_uploaded( - validator_history_map: &HashMap, - vote_account: &RpcVoteAccountInfo, - epoch: u64, -) -> bool { - let vote_account = Pubkey::from_str(&vote_account.vote_pubkey) - .map_err(|e| { - error!("Invalid vote account pubkey"); - e - }) - .expect("Invalid vote account pubkey"); - if let Some(validator_history) = validator_history_map.get(&vote_account) { - if let Some(latest_entry) = validator_history.history.last() { - return latest_entry.epoch == epoch as u16 - && latest_entry.is_superminority - != ValidatorHistoryEntry::default().is_superminority - && latest_entry.rank != ValidatorHistoryEntry::default().rank - && latest_entry.activated_stake_lamports - != ValidatorHistoryEntry::default().activated_stake_lamports; - } - } - false -} - -pub fn emit_stake_history_datapoint(stats: CreateUpdateStats, runs_for_epoch: i64) { - datapoint_info!( - "stake-history-stats", - ("num_creates_success", stats.creates.successes, i64), - ("num_creates_error", stats.creates.errors, i64), - ("num_updates_success", stats.updates.successes, i64), - ("num_updates_error", stats.updates.errors, i64), - ("runs_for_epoch", runs_for_epoch, i64), - ); -} diff --git a/keepers/validator-keeper/src/state/keeper_state.rs b/keepers/validator-keeper/src/state/keeper_state.rs new file mode 100644 index 00000000..45bcc7e5 --- /dev/null +++ b/keepers/validator-keeper/src/state/keeper_state.rs @@ -0,0 +1,159 @@ +use std::collections::{HashMap, HashSet}; + +use bytemuck::Zeroable; +use solana_client::rpc_response::RpcVoteAccountInfo; +use solana_sdk::{ + account::Account, epoch_info::EpochInfo, pubkey::Pubkey, + vote::program::id as get_vote_program_id, +}; +use validator_history::{ClusterHistory, ValidatorHistory}; + +use crate::{derive_validator_history_address, operations::keeper_operations::KeeperOperations}; + +pub struct KeeperState { + pub epoch_info: EpochInfo, + + // Tally array of runs and errors indexed by their respective KeeperOperations + pub runs_for_epoch: [u64; KeeperOperations::LEN], + pub errors_for_epoch: [u64; KeeperOperations::LEN], + + // All vote account info fetched with get_vote_accounts - key'd by their pubkey + pub vote_account_map: HashMap, + // All validator history entries fetched by get_validator_history_accounts - key'd by their vote_account pubkey + pub validator_history_map: HashMap, + + // All vote accounts mapped and fetched from validator_history_map - key'd by their vote_account pubkey + pub all_history_vote_account_map: HashMap>, + // All vote accounts mapped and fetched from vote_account_map - key'd by their pubkey + pub all_get_vote_account_map: HashMap>, + + // All tip distribution accounts fetched from the last epoch ( current_epoch - 1 ) - key'd by their vote_account pubkey + pub previous_epoch_tip_distribution_map: HashMap>, + // All tip distribution accounts fetched from the current epoch - key'd by their vote_account pubkey + pub current_epoch_tip_distribution_map: HashMap>, + + pub cluster_history: ClusterHistory, + pub keeper_balance: u64, +} +impl KeeperState { + pub fn new() -> Self { + Self::default() + } + + pub fn increment_update_run_for_epoch(&mut self, operation: KeeperOperations) { + let index = operation as usize; + self.runs_for_epoch[index] += 1; + } + + pub fn increment_update_error_for_epoch(&mut self, operation: KeeperOperations) { + let index = operation as usize; + self.errors_for_epoch[index] += 1; + } + + pub fn copy_runs_and_errors_for_epoch(&self, operation: KeeperOperations) -> (u64, u64) { + let index = operation as usize; + (self.runs_for_epoch[index], self.errors_for_epoch[index]) + } + + pub fn set_runs_and_errors_for_epoch( + &mut self, + (operation, runs_for_epoch, errors_for_epoch): (KeeperOperations, u64, u64), + ) { + let index = operation as usize; + self.runs_for_epoch[index] = runs_for_epoch; + self.errors_for_epoch[index] = errors_for_epoch; + } + + pub fn get_history_pubkeys(&self, program_id: &Pubkey) -> HashSet { + self.all_history_vote_account_map + .keys() + .map(|vote_account| derive_validator_history_address(vote_account, program_id)) + .collect() + } + + pub fn get_closed_vote_accounts(&self) -> HashSet<&Pubkey> { + self.all_history_vote_account_map + .iter() + .filter(|(_, vote_account)| { + if let Some(account) = vote_account { + account.owner != get_vote_program_id() + } else { + true + } + }) + .map(|(pubkey, _)| pubkey) + .collect() + } + + pub fn get_live_vote_accounts(&self) -> HashSet<&Pubkey> { + self.all_get_vote_account_map + .iter() + .filter(|(_, vote_account)| { + if let Some(account) = vote_account { + account.owner == get_vote_program_id() + } else { + false + } + }) + .map(|(pubkey, _)| pubkey) + .collect() + } +} + +impl Default for KeeperState { + fn default() -> Self { + Self { + epoch_info: EpochInfo { + epoch: 0, + slot_index: 0, + slots_in_epoch: 0, + absolute_slot: 0, + block_height: 0, + transaction_count: None, + }, + runs_for_epoch: [0; KeeperOperations::LEN], + errors_for_epoch: [0; KeeperOperations::LEN], + vote_account_map: HashMap::new(), + validator_history_map: HashMap::new(), + all_history_vote_account_map: HashMap::new(), + all_get_vote_account_map: HashMap::new(), + previous_epoch_tip_distribution_map: HashMap::new(), + current_epoch_tip_distribution_map: HashMap::new(), + cluster_history: ClusterHistory::zeroed(), + keeper_balance: 0, + } + } +} + +impl std::fmt::Debug for KeeperState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("KeeperState") + .field("epoch_info", &self.epoch_info) + .field("runs_for_epoch", &self.runs_for_epoch) + .field("errors_for_epoch", &self.errors_for_epoch) + .field("vote_account_map_count", &self.vote_account_map.len()) + .field( + "validator_history_map_count", + &self.validator_history_map.len(), + ) + .field( + "all_history_vote_account_map_count", + &self.all_history_vote_account_map.len(), + ) + .field( + "all_get_vote_account_map_count", + &self.all_get_vote_account_map.len(), + ) + .field( + "previous_epoch_tip_distribution_map_count", + &self.previous_epoch_tip_distribution_map.len(), + ) + .field( + "current_epoch_tip_distribution_map_count", + &self.current_epoch_tip_distribution_map.len(), + ) + // .field("cluster_history", &self.cluster_history) + .field("keeper_balance", &self.keeper_balance) + .finish() + } +} diff --git a/keepers/validator-keeper/src/state/mod.rs b/keepers/validator-keeper/src/state/mod.rs new file mode 100644 index 00000000..018888fc --- /dev/null +++ b/keepers/validator-keeper/src/state/mod.rs @@ -0,0 +1,2 @@ +pub mod keeper_state; +pub mod update_state; diff --git a/keepers/validator-keeper/src/state/update_state.rs b/keepers/validator-keeper/src/state/update_state.rs new file mode 100644 index 00000000..45caa381 --- /dev/null +++ b/keepers/validator-keeper/src/state/update_state.rs @@ -0,0 +1,270 @@ +use std::{collections::HashMap, error::Error, str::FromStr, sync::Arc}; + +use anchor_lang::AccountDeserialize; +use jito_tip_distribution::sdk::derive_tip_distribution_account_address; +use keeper_core::{ + get_multiple_accounts_batched, get_vote_accounts_with_retry, submit_transactions, +}; +use solana_client::{nonblocking::rpc_client::RpcClient, rpc_response::RpcVoteAccountInfo}; +use solana_sdk::{ + account::Account, + instruction::Instruction, + pubkey::Pubkey, + signature::{Keypair, Signer}, +}; +use validator_history::{constants::MIN_VOTE_EPOCHS, ClusterHistory, ValidatorHistory}; + +use crate::{ + derive_cluster_history_address, derive_validator_history_address, get_balance_with_retry, + get_create_validator_history_instructions, get_validator_history_accounts_with_retry, + operations::keeper_operations::KeeperOperations, +}; + +use super::keeper_state::KeeperState; + +pub async fn pre_create_update( + client: &Arc, + keypair: &Arc, + program_id: &Pubkey, + keeper_state: &mut KeeperState, +) -> Result<(), Box> { + // Update Epoch + match client.get_epoch_info().await { + Ok(current_epoch) => { + if current_epoch.epoch != keeper_state.epoch_info.epoch { + keeper_state.runs_for_epoch = [0; KeeperOperations::LEN]; + keeper_state.errors_for_epoch = [0; KeeperOperations::LEN]; + keeper_state.epoch_info = current_epoch.clone(); + } + } + Err(e) => { + return Err(Box::new(e)); + } + } + + // Fetch Vote Accounts + keeper_state.vote_account_map = get_vote_account_map(client).await?; + + // Get all get vote accounts + keeper_state.all_get_vote_account_map = + get_all_get_vote_account_map(client, keeper_state).await?; + + // Update Cluster History + keeper_state.cluster_history = get_cluster_history(client, program_id).await?; + + // Update Keeper Balance + keeper_state.keeper_balance = get_balance_with_retry(client, keypair.pubkey()).await?; + + Ok(()) +} + +// Should be called after `pre_create_update` +pub async fn create_missing_accounts( + client: &Arc, + keypair: &Arc, + program_id: &Pubkey, + keeper_state: &KeeperState, +) -> Result<(), Box> { + // Create Missing Accounts + create_missing_validator_history_accounts(client, keypair, program_id, keeper_state).await?; + + Ok(()) +} + +pub async fn post_create_update( + client: &Arc, + program_id: &Pubkey, + tip_distribution_program_id: &Pubkey, + keeper_state: &mut KeeperState, +) -> Result<(), Box> { + // Update Validator History Accounts + keeper_state.validator_history_map = get_validator_history_map(client, program_id).await?; + + // Get all history vote accounts + keeper_state.all_history_vote_account_map = + get_all_history_vote_account_map(client, keeper_state).await?; + + // Update previous tip distribution map + keeper_state.previous_epoch_tip_distribution_map = get_tip_distribution_accounts( + client, + tip_distribution_program_id, + keeper_state, + keeper_state.epoch_info.epoch.saturating_sub(1), + ) + .await?; + + // Update current tip distribution map + keeper_state.current_epoch_tip_distribution_map = get_tip_distribution_accounts( + client, + tip_distribution_program_id, + keeper_state, + keeper_state.epoch_info.epoch, + ) + .await?; + + Ok(()) +} + +async fn get_vote_account_map( + client: &Arc, +) -> Result, Box> { + let active_vote_accounts = HashMap::from_iter( + get_vote_accounts_with_retry(client, MIN_VOTE_EPOCHS, None) + .await? + .iter() + .map(|vote_account_info| { + ( + Pubkey::from_str(vote_account_info.vote_pubkey.as_str()) + .expect("Could not parse vote pubkey"), + vote_account_info.clone(), + ) + }), + ); + + Ok(active_vote_accounts) +} + +async fn get_cluster_history( + client: &Arc, + program_id: &Pubkey, +) -> Result> { + let cluster_history_address = derive_cluster_history_address(program_id); + let cluster_history_account = client.get_account(&cluster_history_address).await?; + let cluster_history = + ClusterHistory::try_deserialize(&mut cluster_history_account.data.as_slice())?; + + Ok(cluster_history) +} + +async fn get_validator_history_map( + client: &Arc, + program_id: &Pubkey, +) -> Result, Box> { + let validator_histories = + get_validator_history_accounts_with_retry(client, *program_id).await?; + + let validator_history_map = HashMap::from_iter( + validator_histories + .iter() + .map(|vote_history| (vote_history.vote_account, *vote_history)), + ); + + Ok(validator_history_map) +} + +async fn get_all_history_vote_account_map( + client: &Arc, + keeper_state: &KeeperState, +) -> Result>, Box> { + let validator_history_map = &keeper_state.validator_history_map; + + let all_history_vote_account_pubkeys: Vec = + validator_history_map.keys().cloned().collect(); + + let all_history_vote_accounts = + get_multiple_accounts_batched(all_history_vote_account_pubkeys.as_slice(), client).await?; + + let history_vote_accounts_map = all_history_vote_account_pubkeys + .into_iter() + .zip(all_history_vote_accounts) + .collect::>>(); + + Ok(history_vote_accounts_map) +} + +async fn get_all_get_vote_account_map( + client: &Arc, + keeper_state: &KeeperState, +) -> Result>, Box> { + let vote_account_map = &keeper_state.vote_account_map; + + // Convert the keys to a vector of Pubkey values + let all_get_vote_account_pubkeys: Vec = vote_account_map.keys().cloned().collect(); + + let all_get_vote_accounts = + get_multiple_accounts_batched(all_get_vote_account_pubkeys.as_slice(), client).await?; + + let get_vote_accounts_map = all_get_vote_account_pubkeys + .into_iter() + .zip(all_get_vote_accounts) + .collect::>>(); + + Ok(get_vote_accounts_map) +} + +async fn get_tip_distribution_accounts( + client: &Arc, + tip_distribution_program_id: &Pubkey, + keeper_state: &KeeperState, + epoch: u64, +) -> Result>, Box> { + let vote_accounts = keeper_state + .all_history_vote_account_map + .keys() + .collect::>(); + + /* Filters tip distribution tuples to the addresses, then fetches accounts to see which ones exist */ + let tip_distribution_addresses = vote_accounts + .iter() + .map(|vote_pubkey| { + let (pubkey, _) = derive_tip_distribution_account_address( + tip_distribution_program_id, + vote_pubkey, + epoch, + ); + pubkey + }) + .collect::>(); + + let tip_distribution_accounts = + get_multiple_accounts_batched(&tip_distribution_addresses, client).await?; + + let result = vote_accounts + .into_iter() + .zip(tip_distribution_accounts) + .map(|(vote_pubkey, account)| (*vote_pubkey, account)) // Dereference vote_pubkey here + .collect::>>(); + + Ok(result) +} + +async fn create_missing_validator_history_accounts( + client: &Arc, + keypair: &Arc, + program_id: &Pubkey, + keeper_state: &KeeperState, +) -> Result<(), Box> { + let vote_accounts = &keeper_state + .vote_account_map + .keys() + .collect::>(); + + let all_history_addresses = &vote_accounts + .iter() + .map(|vote_pubkey| derive_validator_history_address(vote_pubkey, program_id)) + .collect::>(); + + let history_accounts = get_multiple_accounts_batched(all_history_addresses, client).await?; + + assert!(vote_accounts.len() == history_accounts.len()); + + let create_transactions = vote_accounts + .iter() + .zip(history_accounts) + .filter_map(|(vote_pubkey, history_account)| { + match history_account { + Some(_) => None, + None => { + // Create accounts that don't exist + let ix = + get_create_validator_history_instructions(vote_pubkey, program_id, keypair); + Some(ix) + } + } + }) + .collect::>>(); + + submit_transactions(client, create_transactions, keypair).await?; + + Ok(()) +} diff --git a/keepers/validator-keeper/src/vote_account.rs b/keepers/validator-keeper/src/vote_account.rs deleted file mode 100644 index dd9eab71..00000000 --- a/keepers/validator-keeper/src/vote_account.rs +++ /dev/null @@ -1,203 +0,0 @@ -use std::collections::{HashMap, HashSet}; -use std::str::FromStr; -use std::sync::Arc; - -use anchor_lang::{InstructionData, ToAccountMetas}; -use keeper_core::{ - build_create_and_update_instructions, get_multiple_accounts_batched, - get_vote_accounts_with_retry, submit_create_and_update, Address, CreateTransaction, - CreateUpdateStats, UpdateInstruction, -}; -use log::error; -use solana_client::nonblocking::rpc_client::RpcClient; -use solana_program::vote; -use solana_program::{instruction::Instruction, pubkey::Pubkey}; -use solana_sdk::{signature::Keypair, signer::Signer}; - -use validator_history::constants::{MAX_ALLOC_BYTES, MIN_VOTE_EPOCHS}; -use validator_history::state::ValidatorHistory; -use validator_history::{Config, ValidatorHistoryEntry}; - -use crate::{get_validator_history_accounts_with_retry, KeeperError, PRIORITY_FEE}; - -pub struct CopyVoteAccountEntry { - pub vote_account: Pubkey, - pub validator_history_account: Pubkey, - pub config_address: Pubkey, - pub program_id: Pubkey, - pub signer: Pubkey, -} - -impl CopyVoteAccountEntry { - pub fn new(vote_account: &Pubkey, program_id: &Pubkey, signer: &Pubkey) -> Self { - let (validator_history_account, _) = Pubkey::find_program_address( - &[ValidatorHistory::SEED, &vote_account.to_bytes()], - program_id, - ); - let (config_address, _) = Pubkey::find_program_address(&[Config::SEED], program_id); - Self { - vote_account: *vote_account, - validator_history_account, - config_address, - program_id: *program_id, - signer: *signer, - } - } -} - -impl Address for CopyVoteAccountEntry { - fn address(&self) -> Pubkey { - self.validator_history_account - } -} - -impl CreateTransaction for CopyVoteAccountEntry { - fn create_transaction(&self) -> Vec { - let mut ixs = vec![Instruction { - program_id: self.program_id, - accounts: validator_history::accounts::InitializeValidatorHistoryAccount { - validator_history_account: self.validator_history_account, - vote_account: self.vote_account, - system_program: solana_program::system_program::id(), - signer: self.signer, - } - .to_account_metas(None), - data: validator_history::instruction::InitializeValidatorHistoryAccount {}.data(), - }]; - let num_reallocs = (ValidatorHistory::SIZE - MAX_ALLOC_BYTES) / MAX_ALLOC_BYTES + 1; - ixs.extend(vec![ - Instruction { - program_id: self.program_id, - accounts: validator_history::accounts::ReallocValidatorHistoryAccount { - validator_history_account: self.validator_history_account, - vote_account: self.vote_account, - config: self.config_address, - system_program: solana_program::system_program::id(), - signer: self.signer, - } - .to_account_metas(None), - data: validator_history::instruction::ReallocValidatorHistoryAccount {}.data(), - }; - num_reallocs - ]); - ixs - } -} - -impl UpdateInstruction for CopyVoteAccountEntry { - fn update_instruction(&self) -> Instruction { - Instruction { - program_id: self.program_id, - accounts: validator_history::accounts::CopyVoteAccount { - validator_history_account: self.validator_history_account, - vote_account: self.vote_account, - signer: self.signer, - } - .to_account_metas(None), - data: validator_history::instruction::CopyVoteAccount {}.data(), - } - } -} - -pub async fn update_vote_accounts( - rpc_client: Arc, - keypair: Arc, - validator_history_program_id: Pubkey, -) -> Result { - let rpc_vote_accounts = - get_vote_accounts_with_retry(&rpc_client, MIN_VOTE_EPOCHS, None).await?; - - let validator_histories = - get_validator_history_accounts_with_retry(&rpc_client, validator_history_program_id) - .await?; - - let validator_history_map = - HashMap::from_iter(validator_histories.iter().map(|vh| (vh.vote_account, vh))); - let vote_account_pubkeys = validator_history_map - .clone() - .into_keys() - .collect::>(); - - let vote_accounts = get_multiple_accounts_batched(&vote_account_pubkeys, &rpc_client).await?; - let closed_vote_accounts: HashSet = vote_accounts - .iter() - .enumerate() - .filter_map(|(i, account)| match account { - Some(account) => { - if account.owner != vote::program::id() { - Some(vote_account_pubkeys[i]) - } else { - None - } - } - None => Some(vote_account_pubkeys[i]), - }) - .collect(); - - // Merges new and active RPC vote accounts with all validator history accounts, and dedupes - let mut all_vote_accounts = rpc_vote_accounts - .iter() - .filter_map(|rpc_va| { - Pubkey::from_str(&rpc_va.vote_pubkey) - .map_err(|e| { - error!("Invalid vote account pubkey"); - e - }) - .ok() - }) - .chain(validator_histories.iter().map(|vh| vh.vote_account)) - .collect::>(); - - let epoch_info = rpc_client.get_epoch_info().await?; - - // Remove closed vote accounts from all vote accounts - // Remove vote accounts for which this instruction has been called within 50,000 slots - all_vote_accounts.retain(|va| { - !closed_vote_accounts.contains(va) - && !vote_account_uploaded_recently( - &validator_history_map, - va, - epoch_info.epoch, - epoch_info.absolute_slot, - ) - }); - - let entries = all_vote_accounts - .iter() - .map(|va| CopyVoteAccountEntry::new(va, &validator_history_program_id, &keypair.pubkey())) - .collect::>(); - - let (create_transactions, update_instructions) = - build_create_and_update_instructions(&rpc_client, &entries).await?; - - let submit_result = submit_create_and_update( - &rpc_client, - create_transactions, - update_instructions, - &keypair, - PRIORITY_FEE, - ) - .await; - - submit_result.map_err(|e| e.into()) -} - -fn vote_account_uploaded_recently( - validator_history_map: &HashMap, - vote_account: &Pubkey, - epoch: u64, - slot: u64, -) -> bool { - if let Some(validator_history) = validator_history_map.get(vote_account) { - if let Some(entry) = validator_history.history.last() { - if entry.epoch == epoch as u16 - && entry.vote_account_last_update_slot - != ValidatorHistoryEntry::default().vote_account_last_update_slot - && entry.vote_account_last_update_slot > slot - 50000 - { - return true; - } - } - } - false -}