From 3d342208a1c463f21b2f2464e2a4d56286856478 Mon Sep 17 00:00:00 2001 From: Evan B Date: Wed, 7 Feb 2024 13:08:23 -0500 Subject: [PATCH] Improve Keeper Reliability (#19) Improves keeper reliability for landing transactions by: * no longer skipping transactions that initially fail with a BlockhashNotFound error * Merging vote accounts with all validator history accounts for epoch credit cranking, so offline validators that no longer showing up in getVoteAccounts still get cranked (necessary for Steward program scoring) The first part has already been running on mainnet for a week, and for the last 3 epochs, all vote accounts have been updated each epoch (as measured by: same number of commissions tracked as stakes). --- keepers/keeper-core/src/lib.rs | 381 ++++++++---------- keepers/validator-keeper/src/cluster_info.rs | 2 +- keepers/validator-keeper/src/gossip.rs | 36 +- keepers/validator-keeper/src/lib.rs | 96 +++-- keepers/validator-keeper/src/main.rs | 166 ++++++-- .../validator-keeper/src/mev_commission.rs | 85 ++-- keepers/validator-keeper/src/stake.rs | 40 +- keepers/validator-keeper/src/vote_account.rs | 60 ++- 8 files changed, 449 insertions(+), 417 deletions(-) diff --git a/keepers/keeper-core/src/lib.rs b/keepers/keeper-core/src/lib.rs index a8d2a147..15d7cd45 100644 --- a/keepers/keeper-core/src/lib.rs +++ b/keepers/keeper-core/src/lib.rs @@ -1,5 +1,7 @@ +use std::collections::HashSet; use std::fmt::{Display, Formatter}; use std::mem::size_of; +use std::vec; use std::{collections::HashMap, sync::Arc, time::Duration}; use clap::ValueEnum; @@ -8,6 +10,7 @@ use solana_client::rpc_response::RpcVoteAccountInfo; use solana_client::{client_error::ClientError, nonblocking::rpc_client::RpcClient}; use solana_program::hash::Hash; use solana_sdk::packet::PACKET_DATA_SIZE; +use solana_sdk::transaction::TransactionError; use solana_sdk::{ account::Account, commitment_config::CommitmentConfig, instruction::AccountMeta, instruction::Instruction, packet::Packet, pubkey::Pubkey, signature::Keypair, @@ -16,12 +19,13 @@ use solana_sdk::{ use thiserror::Error as ThisError; use tokio::task::{self, JoinError}; -#[derive(Debug, Default, Clone, Copy)] +#[derive(Debug, Default, Clone)] pub struct SubmitStats { pub successes: u64, pub errors: u64, + pub results: Vec>, } -#[derive(Debug, Default, Clone, Copy)] +#[derive(Debug, Default, Clone)] pub struct CreateUpdateStats { pub creates: SubmitStats, pub updates: SubmitStats, @@ -30,14 +34,20 @@ pub struct CreateUpdateStats { pub type Error = Box; #[derive(ThisError, Debug, Clone)] pub enum TransactionExecutionError { - #[error("Transactions failed to execute after multiple retries")] - RetryError(Vec), - #[error("Transactions failed to execute after multiple retries")] - TransactionRetryError(Vec>), #[error("RPC Client error: {0:?}")] - ClientError(String, Vec), + ClientError(String), #[error("RPC Client error: {0:?}")] - TransactionClientError(String, Vec>), + TransactionClientError(String, Vec>), +} + +#[derive(ThisError, Clone, Debug)] +pub enum SendTransactionError { + #[error("Exceeded retries")] + ExceededRetries, + // Stores ClientError.to_string(), since ClientError does not impl Clone, and we want to track both + // io/reqwest errors as well as transaction errors + #[error("Transaction error: {0}")] + TransactionError(String), } #[derive(ThisError, Debug)] @@ -197,73 +207,14 @@ async fn calculate_instructions_per_tx( Ok(size_max.min(compute_max)) } -async fn parallel_submit_transactions( - client: &RpcClient, - signer: &Arc, - // Each &[Instruction] represents a transaction - transactions: &[&[Instruction]], - // Map of signature to index of tx in `transactions` - executed_signatures: &mut HashMap, -) -> Result<(), TransactionExecutionError> { - // Converts arrays of instructions into transactions and submits them in parallel, in batches of 50 (arbitrary, to avoid spamming RPC) - // Saves signatures associated with the indexes of instructions it contains - - const TX_BATCH_SIZE: usize = 50; - for transaction_batch in transactions.chunks(TX_BATCH_SIZE) { - let recent_blockhash = get_latest_blockhash_with_retry(client).await.map_err(|e| { - TransactionExecutionError::TransactionClientError( - e.to_string(), - transactions.iter().map(|&tx| tx.to_vec()).collect(), - ) - })?; - // Convert instructions to transactions in batches and send them all, saving their signatures - let transactions: Vec = transaction_batch - .iter() - .map(|batch| { - Transaction::new_signed_with_payer( - batch, - Some(&signer.pubkey()), - &[signer.as_ref()], - recent_blockhash, - ) - }) - .collect(); - - let tx_futures = transactions - .iter() - .enumerate() - .map(|(i, tx)| async move { - let res = client.send_transaction(tx).await; - match res { - Ok(signature) => Some((signature, i)), - Err(e) => { - warn!("Send transaction failed: {:?}", e); - None - } - } - }) - .collect::>(); - - let results = futures::future::join_all(tx_futures).await; - for (signature, ix_index) in results.into_iter().flatten() { - executed_signatures.insert(signature, ix_index); - } - debug!( - "Transactions sent: {}, executed_signatures: {}", - transactions.len(), - executed_signatures.len() - ); - } - Ok(()) -} - async fn parallel_confirm_transactions( client: &RpcClient, - executed_signatures: &mut HashMap, -) { - // Confirmes TXs in batches of 256 (max allowed by RPC method) + submitted_signatures: HashSet, +) -> HashSet { + // Confirmes TXs in batches of 256 (max allowed by RPC method). Returns confirmed signatures const SIG_STATUS_BATCH_SIZE: usize = 256; - let signatures_to_confirm = executed_signatures.clone().into_keys().collect::>(); + let num_transactions_submitted = submitted_signatures.len(); + let signatures_to_confirm = submitted_signatures.into_iter().collect::>(); // Imperfect logic here: if a transaction is slow to confirm on first submission, and it can only be called once succesfully, // it will be resubmitted and fail. Ideally on the next loop it will not be included in the instructions list @@ -284,12 +235,12 @@ async fn parallel_confirm_transactions( let results = futures::future::join_all(confirmation_futures).await; - let num_transactions_submitted = executed_signatures.len(); + let mut confirmed_signatures: HashSet = HashSet::new(); for result_batch in results.iter() { for (sig, result) in result_batch { if let Some(status) = result { if status.satisfies_commitment(client.commitment()) && status.err.is_none() { - executed_signatures.remove(sig); + confirmed_signatures.insert(*sig); } } } @@ -298,107 +249,151 @@ async fn parallel_confirm_transactions( info!( "{} transactions submitted, {} confirmed", num_transactions_submitted, - num_transactions_submitted - executed_signatures.len() + confirmed_signatures.len() ); + confirmed_signatures } -pub async fn parallel_execute_instructions( - client: &RpcClient, - mut instructions: Vec, +fn sign_txs( + transactions: &[&[Instruction]], + signer: &Arc, + blockhash: Hash, +) -> Vec { + transactions + .iter() + .map(|instructions| { + Transaction::new_signed_with_payer( + instructions, + Some(&signer.pubkey()), + &[signer.as_ref()], + blockhash, + ) + }) + .collect() +} + +pub async fn parallel_execute_transactions( + client: &Arc, + transactions: &[&[Instruction]], signer: &Arc, retry_count: u16, confirmation_time: u64, -) -> Result<(), TransactionExecutionError> { - /* - Note: Assumes all instructions are equivalent in compute, equivalent in size, and can be executed in any order - - 1) Submits all instructions in parallel - 2) Waits a bit for them to confirm - 3) Checks which ones have confirmed, and keeps the ones that haven't - 4) Repeats retry_count number of times until all have confirmed - - Returns all remaining instructions that haven't executed so application can handle - */ +) -> Result>, TransactionExecutionError> { + let mut results = vec![Err(SendTransactionError::ExceededRetries); transactions.len()]; + let mut retries = 0; - if instructions.is_empty() { - return Ok(()); + if transactions.is_empty() { + return Ok(results); } - let instructions_per_tx = calculate_instructions_per_tx(client, &instructions, signer) + let blockhash = get_latest_blockhash_with_retry(client) .await - .map_err(|e| { - TransactionExecutionError::ClientError(e.to_string(), instructions.to_vec()) - })?; - - for _ in 0..retry_count { - let mut executed_signatures: HashMap = HashMap::new(); - let instruction_batches: Vec<&[Instruction]> = - instructions.chunks(instructions_per_tx).collect(); - parallel_submit_transactions( - client, - signer, - &instruction_batches, - &mut executed_signatures, - ) - .await?; + .map_err(|e| TransactionExecutionError::ClientError(e.to_string()))?; + let mut signed_txs = sign_txs(transactions, signer, blockhash); + + while retries < retry_count { + let mut submitted_signatures = HashMap::new(); + let mut is_blockhash_not_found = false; + + for (idx, tx) in signed_txs.iter().enumerate() { + if results[idx].is_ok() { + continue; + } + + // Future optimization: submit these in parallel batches and refresh blockhash for every batch + match client.send_transaction(tx).await { + Ok(signature) => { + submitted_signatures.insert(signature, idx); + } + Err(e) => match e.get_transaction_error() { + Some(TransactionError::BlockhashNotFound) => { + is_blockhash_not_found = true; + break; + } + Some(TransactionError::AlreadyProcessed) => { + submitted_signatures.insert(tx.signatures[0], idx); + } + Some(_) | None => { + warn!("Transaction error: {}", e.to_string()); + results[idx] = Err(SendTransactionError::TransactionError(e.to_string())) + } + }, + } + } tokio::time::sleep(Duration::from_secs(confirmation_time)).await; - parallel_confirm_transactions(client, &mut executed_signatures).await; + for signature in parallel_confirm_transactions( + client, + submitted_signatures.clone().into_keys().collect(), + ) + .await + { + results[submitted_signatures[&signature]] = Ok(()); + } - // All have been executed - if executed_signatures.is_empty() { - return Ok(()); + if results.iter().all(|r| r.is_ok()) { + break; } - // Update instructions to the ones remaining - instructions = executed_signatures - .into_values() - .flat_map(|i| instruction_batches[i]) - .cloned() - .collect::>(); + if is_blockhash_not_found + || !client + .is_blockhash_valid(&blockhash, CommitmentConfig::processed()) + .await + .map_err(|e| { + TransactionExecutionError::TransactionClientError( + e.to_string(), + results.clone(), + ) + })? + { + // Re-sign transactions with fresh blockhash + let blockhash = get_latest_blockhash_with_retry(client).await.map_err(|e| { + TransactionExecutionError::TransactionClientError(e.to_string(), results.clone()) + })?; + signed_txs = sign_txs(transactions, signer, blockhash); + retries += 1; + } } - Err(TransactionExecutionError::RetryError(instructions)) + + Ok(results) } -pub async fn parallel_execute_transactions( +pub async fn parallel_execute_instructions( client: &Arc, - mut transactions: Vec<&[Instruction]>, + instructions: &[Instruction], signer: &Arc, retry_count: u16, confirmation_time: u64, -) -> Result<(), TransactionExecutionError> { - // Accepts a list of transactions (each represented as &[Instruction]) - // Executes them in parallel, returns the ones that failed to execute - // And repeats up to retry_count number of times until all have executed - if transactions.is_empty() { - return Ok(()); - } - - for _ in 0..retry_count { - let mut executed_signatures: HashMap = HashMap::new(); - parallel_submit_transactions(client, signer, &transactions, &mut executed_signatures) - .await?; - - tokio::time::sleep(Duration::from_secs(confirmation_time)).await; +) -> Result>, TransactionExecutionError> { + /* + Note: Assumes all instructions are equivalent in compute, equivalent in size, and can be executed in any order - parallel_confirm_transactions(client, &mut executed_signatures).await; + 1) Submits all instructions in parallel + 2) Waits a bit for them to confirm + 3) Checks which ones have confirmed, and keeps the ones that haven't + 4) Repeats retry_count number of times until all have confirmed - // All have been executed - if executed_signatures.is_empty() { - return Ok(()); - } + Returns all remaining instructions that haven't executed so application can handle + */ - // Update transactions to the ones remaining - transactions = executed_signatures - .into_values() - .map(|i| transactions[i]) - .collect::>(); + if instructions.is_empty() { + return Ok(vec![]); } - Err(TransactionExecutionError::TransactionRetryError( - transactions.iter().map(|t| t.to_vec()).collect(), - )) + let instructions_per_tx = calculate_instructions_per_tx(client, instructions, signer) + .await + .map_err(|e| TransactionExecutionError::ClientError(e.to_string()))?; + let transactions: Vec<&[Instruction]> = instructions.chunks(instructions_per_tx).collect(); + + parallel_execute_transactions( + client, + &transactions, + signer, + retry_count, + confirmation_time, + ) + .await } pub async fn build_create_and_update_instructions< @@ -439,71 +434,41 @@ pub async fn submit_transactions( client: &Arc, transactions: Vec>, keypair: &Arc, -) -> Result { +) -> Result { let mut stats = SubmitStats::default(); let num_transactions = transactions.len(); - match parallel_execute_transactions( - client, - transactions.iter().map(AsRef::as_ref).collect(), - keypair, - 10, - 60, - ) - .await - { - Ok(_) => { - stats.successes = num_transactions as u64; - } - Err(e) => { - let transactions_len = match e.clone() { - TransactionExecutionError::TransactionRetryError(transactions) => { - transactions.len() - } - TransactionExecutionError::TransactionClientError(_, transactions) => { - transactions.len() - } - _ => { - error!("Hit unreachable statement in submit_transactions"); - unreachable!(); - } - }; - stats.successes = num_transactions as u64 - transactions_len as u64; - stats.errors = transactions_len as u64; - return Err((e, stats)); + let tx_slice = transactions + .iter() + .map(|t| t.as_slice()) + .collect::>(); + + match parallel_execute_transactions(client, &tx_slice, keypair, 10, 30).await { + Ok(results) => { + stats.successes = results.iter().filter(|&tx| tx.is_ok()).count() as u64; + stats.errors = num_transactions as u64 - stats.successes; + stats.results = results; + Ok(stats) } + Err(e) => Err(e), } - Ok(stats) } pub async fn submit_instructions( client: &Arc, instructions: Vec, keypair: &Arc, -) -> Result { +) -> Result { let mut stats = SubmitStats::default(); let num_instructions = instructions.len(); - match parallel_execute_instructions(client, instructions, keypair, 10, 30).await { - Ok(_) => { - stats.successes = num_instructions as u64; - } - Err(e) => { - let instructions_len = match e.clone() { - TransactionExecutionError::RetryError(instructions) => instructions.len(), - TransactionExecutionError::ClientError(_, instructions) => instructions.len(), - TransactionExecutionError::TransactionClientError(_, instructions) => { - instructions.concat().len() - } - _ => { - error!("Hit unreachable statement in submit_instructions"); - unreachable!(); - } - }; - stats.successes = num_instructions as u64 - instructions_len as u64; - stats.errors = instructions_len as u64; - return Err((e, stats)); + match parallel_execute_instructions(client, &instructions, keypair, 10, 30).await { + Ok(results) => { + stats.successes = results.iter().filter(|&tx| tx.is_ok()).count() as u64; + stats.errors = num_instructions as u64 - stats.successes; + stats.results = results; + Ok(stats) } + Err(e) => Err(e), } - Ok(stats) } pub async fn submit_create_and_update( @@ -511,19 +476,9 @@ pub async fn submit_create_and_update( create_transactions: Vec>, update_instructions: Vec, keypair: &Arc, -) -> Result { - let mut stats = CreateUpdateStats::default(); - stats.creates = submit_transactions(client, create_transactions, keypair) - .await - .map_err(|(e, submit_stats)| { - stats.creates = submit_stats; - (e, stats) - })?; - stats.updates = submit_instructions(client, update_instructions, keypair) - .await - .map_err(|(e, submit_stats)| { - stats.updates = submit_stats; - (e, stats) - })?; - Ok(stats) +) -> Result { + Ok(CreateUpdateStats { + creates: submit_transactions(client, create_transactions, keypair).await?, + updates: submit_instructions(client, update_instructions, keypair).await?, + }) } diff --git a/keepers/validator-keeper/src/cluster_info.rs b/keepers/validator-keeper/src/cluster_info.rs index 0f45c5bb..eff70644 100644 --- a/keepers/validator-keeper/src/cluster_info.rs +++ b/keepers/validator-keeper/src/cluster_info.rs @@ -10,7 +10,7 @@ pub async fn update_cluster_info( client: Arc, keypair: Arc, program_id: &Pubkey, -) -> Result { +) -> Result { let (cluster_history_account, _) = Pubkey::find_program_address(&[ClusterHistory::SEED], program_id); diff --git a/keepers/validator-keeper/src/gossip.rs b/keepers/validator-keeper/src/gossip.rs index 3be3cc43..1c21688c 100644 --- a/keepers/validator-keeper/src/gossip.rs +++ b/keepers/validator-keeper/src/gossip.rs @@ -250,7 +250,7 @@ pub async fn upload_gossip_values( keypair: Arc, entrypoint: SocketAddr, program_id: &Pubkey, -) -> Result, CreateUpdateStats)> { +) -> Result> { let gossip_port = 0; let spy_socket_addr = SocketAddr::new( @@ -261,19 +261,13 @@ pub async fn upload_gossip_values( let (_gossip_service, cluster_info) = start_spy_server(entrypoint, gossip_port, spy_socket_addr, &keypair, &exit); - let vote_accounts = get_vote_accounts_with_retry(&client, MIN_VOTE_EPOCHS, None) - .await - .map_err(|e| (e.into(), CreateUpdateStats::default()))?; + let vote_accounts = get_vote_accounts_with_retry(&client, MIN_VOTE_EPOCHS, None).await?; // Wait for all active validators to be received sleep(Duration::from_secs(30)).await; let gossip_entries = { - let crds = cluster_info - .gossip - .crds - .read() - .map_err(|e| (e.to_string().into(), CreateUpdateStats::default()))?; + let crds = cluster_info.gossip.crds.read().map_err(|e| e.to_string())?; vote_accounts .iter() @@ -290,9 +284,7 @@ pub async fn upload_gossip_values( .iter() .map(|a| a.address()) .collect::>(); - let existing_accounts_response = get_multiple_accounts_batched(&addresses, &client) - .await - .map_err(|e| (e.into(), CreateUpdateStats::default()))?; + let existing_accounts_response = get_multiple_accounts_batched(&addresses, &client).await?; let create_transactions = existing_accounts_response .iter() @@ -311,22 +303,10 @@ pub async fn upload_gossip_values( .map(|entry| entry.build_update_tx()) .collect::>(); - let mut stats = CreateUpdateStats::default(); - stats.creates = submit_transactions(&client, create_transactions, &keypair) - .await - .map_err(|(e, submit_stats)| { - stats.creates = submit_stats; - (e.into(), stats) - })?; - - stats.updates = submit_transactions(&client, update_transactions, &keypair) - .await - .map_err(|(e, submit_stats)| { - stats.updates = submit_stats; - (e.into(), stats) - })?; - - Ok(stats) + Ok(CreateUpdateStats { + creates: submit_transactions(&client, create_transactions, &keypair).await?, + updates: submit_transactions(&client, update_transactions, &keypair).await?, + }) } // CODE BELOW SLIGHTLY MODIFIED FROM diff --git a/keepers/validator-keeper/src/lib.rs b/keepers/validator-keeper/src/lib.rs index e6a88561..286be460 100644 --- a/keepers/validator-keeper/src/lib.rs +++ b/keepers/validator-keeper/src/lib.rs @@ -4,10 +4,14 @@ use std::{ }; use anchor_lang::{AccountDeserialize, Discriminator}; -use keeper_core::{CreateUpdateStats, SubmitStats}; +use keeper_core::{ + get_vote_accounts_with_retry, CreateUpdateStats, MultipleAccountsError, SubmitStats, + TransactionExecutionError, +}; use log::error; use solana_account_decoder::UiDataSliceConfig; use solana_client::{ + client_error::ClientError, nonblocking::rpc_client::RpcClient, rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, rpc_filter::{Memcmp, RpcFilterType}, @@ -25,7 +29,10 @@ use solana_sdk::{ use solana_streamer::socket::SocketAddrSpace; use jito_tip_distribution::state::TipDistributionAccount; -use validator_history::{ClusterHistory, ValidatorHistory, ValidatorHistoryEntry}; +use thiserror::Error as ThisError; +use validator_history::{ + constants::MIN_VOTE_EPOCHS, ClusterHistory, ValidatorHistory, ValidatorHistoryEntry, +}; pub mod cluster_info; pub mod gossip; @@ -35,6 +42,18 @@ pub mod vote_account; pub type Error = Box; +#[derive(ThisError, Debug)] +pub enum KeeperError { + #[error(transparent)] + ClientError(#[from] ClientError), + #[error(transparent)] + TransactionExecutionError(#[from] TransactionExecutionError), + #[error(transparent)] + MultipleAccountsError(#[from] MultipleAccountsError), + #[error("Custom: {0}")] + Custom(String), +} + pub async fn get_tip_distribution_accounts( rpc_client: &RpcClient, tip_distribution_program: &Pubkey, @@ -116,28 +135,7 @@ pub async fn emit_validator_history_metrics( ) -> Result<(), Box> { let epoch = client.get_epoch_info().await?; - // Fetch every ValidatorHistory account - let gpa_config = RpcProgramAccountsConfig { - filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new_raw_bytes( - 0, - ValidatorHistory::discriminator().into(), - ))]), - account_config: RpcAccountInfoConfig { - encoding: Some(solana_account_decoder::UiAccountEncoding::Base64), - ..RpcAccountInfoConfig::default() - }, - ..RpcProgramAccountsConfig::default() - }; - let mut validator_history_accounts = client - .get_program_accounts_with_config(&program_id, gpa_config) - .await?; - - let validator_histories = validator_history_accounts - .iter_mut() - .filter_map(|(_, account)| { - ValidatorHistory::try_deserialize(&mut account.data.as_slice()).ok() - }) - .collect::>(); + let validator_histories = get_validator_history_accounts(client, program_id).await?; let mut ips = 0; let mut versions = 0; @@ -195,6 +193,10 @@ pub async fn emit_validator_history_metrics( } } + let get_vote_accounts_count = get_vote_accounts_with_retry(client, MIN_VOTE_EPOCHS, None) + .await? + .len(); + datapoint_info!( "validator-history-stats", ("num_validator_histories", num_validators, i64), @@ -207,11 +209,57 @@ pub async fn emit_validator_history_metrics( ("num_stakes", stakes, i64), ("cluster_history_blocks", cluster_history_blocks, i64), ("slot_index", epoch.slot_index, i64), + ( + "num_get_vote_accounts_responses", + get_vote_accounts_count, + i64 + ), ); Ok(()) } +pub async fn get_validator_history_accounts( + client: &RpcClient, + program_id: Pubkey, +) -> Result, ClientError> { + let gpa_config = RpcProgramAccountsConfig { + filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new_raw_bytes( + 0, + ValidatorHistory::discriminator().into(), + ))]), + account_config: RpcAccountInfoConfig { + encoding: Some(solana_account_decoder::UiAccountEncoding::Base64), + ..RpcAccountInfoConfig::default() + }, + ..RpcProgramAccountsConfig::default() + }; + let mut validator_history_accounts = client + .get_program_accounts_with_config(&program_id, gpa_config) + .await?; + + let validator_histories = validator_history_accounts + .iter_mut() + .filter_map(|(_, account)| { + ValidatorHistory::try_deserialize(&mut account.data.as_slice()).ok() + }) + .collect::>(); + + Ok(validator_histories) +} + +pub async fn get_validator_history_accounts_with_retry( + client: &RpcClient, + program_id: Pubkey, +) -> Result, ClientError> { + for _ in 0..4 { + if let Ok(validator_histories) = get_validator_history_accounts(client, program_id).await { + return Ok(validator_histories); + } + } + get_validator_history_accounts(client, program_id).await +} + pub fn start_spy_server( cluster_entrypoint: SocketAddr, gossip_port: u16, diff --git a/keepers/validator-keeper/src/main.rs b/keepers/validator-keeper/src/main.rs index f1e2eebb..a06d1e82 100644 --- a/keepers/validator-keeper/src/main.rs +++ b/keepers/validator-keeper/src/main.rs @@ -7,7 +7,7 @@ It will emits metrics for each data feed, if env var SOLANA_METRICS_CONFIG is se use std::{collections::HashMap, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; use clap::{arg, command, Parser}; -use keeper_core::{Cluster, CreateUpdateStats, SubmitStats}; +use keeper_core::{Cluster, CreateUpdateStats, SubmitStats, TransactionExecutionError}; use log::*; use solana_client::nonblocking::rpc_client::RpcClient; use solana_metrics::{datapoint_error, set_host_id}; @@ -24,6 +24,7 @@ use validator_keeper::{ mev_commission::{update_mev_commission, update_mev_earned}, stake::{emit_stake_history_datapoint, update_stake_history}, vote_account::update_vote_accounts, + KeeperError, }; #[derive(Parser, Debug)] @@ -92,7 +93,7 @@ async fn mev_commission_loop( loop { // Continuously runs throughout an epoch, polling for new tip distribution accounts // and submitting update txs when new accounts are detected - match update_mev_commission( + let stats = match update_mev_commission( client.clone(), keypair.clone(), &commission_history_program_id, @@ -102,16 +103,22 @@ async fn mev_commission_loop( ) .await { - Ok(stats) => { - emit_mev_commission_datapoint(stats); - sleep(Duration::from_secs(interval)).await; - } - Err((e, stats)) => { - emit_mev_commission_datapoint(stats); + Ok(stats) => stats, + Err(e) => { + let mut stats = CreateUpdateStats::default(); + if let KeeperError::TransactionExecutionError( + TransactionExecutionError::TransactionClientError(_, results), + ) = &e + { + stats.updates.successes = results.iter().filter(|r| r.is_ok()).count() as u64; + stats.updates.errors = results.iter().filter(|r| r.is_err()).count() as u64; + } datapoint_error!("mev-commission-error", ("error", e.to_string(), String),); - sleep(Duration::from_secs(5)).await; + stats } }; + emit_mev_commission_datapoint(stats); + sleep(Duration::from_secs(interval)).await; } } @@ -129,7 +136,7 @@ async fn mev_earned_loop( loop { // Continuously runs throughout an epoch, polling for tip distribution accounts from the prev epoch with uploaded merkle roots // and submitting update_mev_earned (technically update_mev_comission) txs when the uploaded merkle roots are detected - match update_mev_earned( + let stats = match update_mev_earned( &client, &keypair, &commission_history_program_id, @@ -139,16 +146,22 @@ async fn mev_earned_loop( ) .await { - Ok(stats) => { - emit_mev_earned_datapoint(stats); - sleep(Duration::from_secs(interval)).await; - } - Err((e, stats)) => { - emit_mev_earned_datapoint(stats); + Ok(stats) => stats, + Err(e) => { + let mut stats = CreateUpdateStats::default(); + if let KeeperError::TransactionExecutionError( + TransactionExecutionError::TransactionClientError(_, results), + ) = &e + { + stats.updates.successes = results.iter().filter(|r| r.is_ok()).count() as u64; + stats.updates.errors = results.iter().filter(|r| r.is_err()).count() as u64; + } datapoint_error!("mev-earned-error", ("error", e.to_string(), String),); - sleep(Duration::from_secs(5)).await; + stats } }; + emit_mev_earned_datapoint(stats); + sleep(Duration::from_secs(interval)).await; } } @@ -174,27 +187,52 @@ async fn vote_account_loop( runs_for_epoch = 0; } // Run at 10%, 50% and 90% completion of epoch - let should_run = (epoch_info.slot_index > epoch_info.slots_in_epoch / 10 + let should_run = (epoch_info.slot_index > epoch_info.slots_in_epoch / 1000 && runs_for_epoch < 1) || (epoch_info.slot_index > epoch_info.slots_in_epoch / 2 && runs_for_epoch < 2) || (epoch_info.slot_index > epoch_info.slots_in_epoch * 9 / 10 && runs_for_epoch < 3); if should_run { - stats = - match update_vote_accounts(rpc_client.clone(), keypair.clone(), program_id).await { - Ok(stats) => { + stats = match update_vote_accounts(rpc_client.clone(), keypair.clone(), program_id) + .await + { + Ok(stats) => { + for message in stats + .creates + .results + .iter() + .chain(stats.updates.results.iter()) + { + if let Err(e) = message { + datapoint_error!( + "vote-account-error", + ("error", e.to_string(), String), + ); + } + } + if stats.updates.errors == 0 && stats.creates.errors == 0 { runs_for_epoch += 1; - sleep(Duration::from_secs(interval)).await; - stats } - Err((e, stats)) => { - datapoint_error!("vote-account-error", ("error", e.to_string(), String),); - stats + sleep(Duration::from_secs(interval)).await; + stats + } + Err(e) => { + let mut stats = CreateUpdateStats::default(); + if let KeeperError::TransactionExecutionError( + TransactionExecutionError::TransactionClientError(_, results), + ) = &e + { + stats.updates.successes = + results.iter().filter(|r| r.is_ok()).count() as u64; + stats.updates.errors = results.iter().filter(|r| r.is_err()).count() as u64; } - }; + datapoint_error!("vote-account-error", ("error", e.to_string(), String),); + stats + } + }; } current_epoch = epoch_info.epoch; - emit_validator_commission_datapoint(stats, runs_for_epoch); + emit_validator_commission_datapoint(stats.clone(), runs_for_epoch); sleep(Duration::from_secs(interval)).await; } } @@ -231,12 +269,37 @@ async fn stake_upload_loop( if should_run { stats = match update_stake_history(client.clone(), keypair.clone(), &program_id).await { Ok(run_stats) => { - runs_for_epoch += 1; + for message in stats + .creates + .results + .iter() + .chain(stats.updates.results.iter()) + { + if let Err(e) = message { + datapoint_error!( + "stake-history-error", + ("error", e.to_string(), String), + ); + } + } + + if stats.creates.errors == 0 && stats.updates.errors == 0 { + runs_for_epoch += 1; + } run_stats } - Err((e, run_stats)) => { + Err(e) => { + let mut stats = CreateUpdateStats::default(); + if let KeeperError::TransactionExecutionError( + TransactionExecutionError::TransactionClientError(_, results), + ) = &e + { + stats.updates.successes = + results.iter().filter(|r| r.is_ok()).count() as u64; + stats.updates.errors = results.iter().filter(|r| r.is_err()).count() as u64; + } datapoint_error!("stake-history-error", ("error", e.to_string(), String),); - run_stats + stats } }; } @@ -285,10 +348,34 @@ async fn gossip_upload_loop( .await { Ok(stats) => { - runs_for_epoch += 1; + for message in stats + .creates + .results + .iter() + .chain(stats.updates.results.iter()) + { + if let Err(e) = message { + datapoint_error!( + "gossip-upload-error", + ("error", e.to_string(), String), + ); + } + } + if stats.creates.errors == 0 && stats.updates.errors == 0 { + runs_for_epoch += 1; + } stats } - Err((e, stats)) => { + Err(e) => { + let mut stats = CreateUpdateStats::default(); + if let Some(TransactionExecutionError::TransactionClientError(_, results)) = + e.downcast_ref::() + { + stats.updates.successes = + results.iter().filter(|r| r.is_ok()).count() as u64; + stats.updates.errors = results.iter().filter(|r| r.is_err()).count() as u64; + } + datapoint_error!("gossip-upload-error", ("error", e.to_string(), String),); stats } @@ -334,12 +421,19 @@ async fn cluster_history_loop( if should_run { stats = match update_cluster_info(client.clone(), keypair.clone(), &program_id).await { Ok(run_stats) => { - runs_for_epoch += 1; + if run_stats.errors == 0 { + runs_for_epoch += 1; + } run_stats } - Err((e, run_stats)) => { + Err(e) => { + let mut stats = SubmitStats::default(); + if let TransactionExecutionError::TransactionClientError(_, results) = &e { + stats.successes = results.iter().filter(|r| r.is_ok()).count() as u64; + stats.errors = results.iter().filter(|r| r.is_err()).count() as u64; + } datapoint_error!("cluster-history-error", ("error", e.to_string(), String),); - run_stats + stats } }; } diff --git a/keepers/validator-keeper/src/mev_commission.rs b/keepers/validator-keeper/src/mev_commission.rs index eea66619..97a41472 100644 --- a/keepers/validator-keeper/src/mev_commission.rs +++ b/keepers/validator-keeper/src/mev_commission.rs @@ -6,26 +6,17 @@ use jito_tip_distribution::state::TipDistributionAccount; use keeper_core::{ build_create_and_update_instructions, get_multiple_accounts_batched, get_vote_accounts_with_retry, submit_create_and_update, Address, CreateTransaction, - CreateUpdateStats, MultipleAccountsError, TransactionExecutionError, UpdateInstruction, + CreateUpdateStats, MultipleAccountsError, UpdateInstruction, }; use log::error; +use solana_client::nonblocking::rpc_client::RpcClient; use solana_client::rpc_response::RpcVoteAccountInfo; -use solana_client::{client_error::ClientError, nonblocking::rpc_client::RpcClient}; use solana_program::{instruction::Instruction, pubkey::Pubkey}; use solana_sdk::{signature::Keypair, signer::Signer}; -use thiserror::Error as ThisError; use validator_history::constants::MIN_VOTE_EPOCHS; use validator_history::{constants::MAX_ALLOC_BYTES, Config, ValidatorHistory}; -#[derive(ThisError, Debug)] -pub enum MevCommissionError { - #[error(transparent)] - ClientError(#[from] ClientError), - #[error(transparent)] - TransactionExecutionError(#[from] TransactionExecutionError), - #[error(transparent)] - MultipleAccountsError(#[from] MultipleAccountsError), -} +use crate::KeeperError; #[derive(Clone)] pub struct ValidatorMevCommissionEntry { @@ -138,20 +129,14 @@ pub async fn update_mev_commission( tip_distribution_program_id: &Pubkey, validators_updated: &mut HashMap, prev_epoch: &mut u64, -) -> Result { - let epoch = client - .get_epoch_info() - .await - .map_err(|e| (e.into(), CreateUpdateStats::default()))? - .epoch; +) -> Result { + let epoch = client.get_epoch_info().await?.epoch; if epoch > *prev_epoch { validators_updated.clear(); } *prev_epoch = epoch; - let vote_accounts = get_vote_accounts_with_retry(&client, MIN_VOTE_EPOCHS, None) - .await - .map_err(|e| (e.into(), CreateUpdateStats::default()))?; + let vote_accounts = get_vote_accounts_with_retry(&client, MIN_VOTE_EPOCHS, None).await?; let entries = vote_accounts .iter() @@ -166,32 +151,33 @@ pub async fn update_mev_commission( }) .collect::>(); - let existing_entries = get_existing_entries(client.clone(), &entries) - .await - .map_err(|e| (e.into(), CreateUpdateStats::default()))?; + let existing_entries = get_existing_entries(client.clone(), &entries).await?; let entries_to_update = existing_entries .into_iter() .filter(|entry| !validators_updated.contains_key(&entry.tip_distribution_account)) .collect::>(); let (create_transactions, update_instructions) = - build_create_and_update_instructions(&client, &entries_to_update) - .await - .map_err(|e| (e.into(), CreateUpdateStats::default()))?; + build_create_and_update_instructions(&client, &entries_to_update).await?; - let submit_result = - submit_create_and_update(&client, create_transactions, update_instructions, &keypair).await; - if submit_result.is_ok() { - for ValidatorMevCommissionEntry { - vote_account, - tip_distribution_account, - .. - } in entries_to_update - { - validators_updated.insert(tip_distribution_account, vote_account); + match submit_create_and_update(&client, create_transactions, update_instructions, &keypair) + .await + { + Ok(submit_result) => { + if submit_result.creates.errors == 0 && submit_result.updates.errors == 0 { + for ValidatorMevCommissionEntry { + vote_account, + tip_distribution_account, + .. + } in entries_to_update + { + validators_updated.insert(tip_distribution_account, vote_account); + } + } + Ok(submit_result) } + Err(e) => Err(e.into()), } - submit_result.map_err(|(e, stats)| (e.into(), stats)) } pub async fn update_mev_earned( @@ -201,12 +187,8 @@ pub async fn update_mev_earned( tip_distribution_program_id: &Pubkey, validators_updated: &mut HashMap, curr_epoch: &mut u64, -) -> Result { - let epoch = client - .get_epoch_info() - .await - .map_err(|e| (e.into(), CreateUpdateStats::default()))? - .epoch; +) -> Result { + let epoch = client.get_epoch_info().await?.epoch; if epoch > *curr_epoch { // new epoch started, we assume here that all the validators with TDAs from curr_epoch-1 have had their merkle roots uploaded/processed by this point @@ -215,9 +197,7 @@ pub async fn update_mev_earned( } *curr_epoch = epoch; - let vote_accounts = get_vote_accounts_with_retry(client, MIN_VOTE_EPOCHS, None) - .await - .map_err(|e| (e.into(), CreateUpdateStats::default()))?; + let vote_accounts = get_vote_accounts_with_retry(client, MIN_VOTE_EPOCHS, None).await?; let entries = vote_accounts .iter() @@ -232,18 +212,15 @@ pub async fn update_mev_earned( }) .collect::>(); - let uploaded_merkleroot_entries = get_entries_with_uploaded_merkleroot(client, &entries) - .await - .map_err(|e| (e.into(), CreateUpdateStats::default()))?; + let uploaded_merkleroot_entries = + get_entries_with_uploaded_merkleroot(client, &entries).await?; let entries_to_update = uploaded_merkleroot_entries .into_iter() .filter(|entry| !validators_updated.contains_key(&entry.tip_distribution_account)) .collect::>(); let (create_transactions, update_instructions) = - build_create_and_update_instructions(client, &entries_to_update) - .await - .map_err(|e| (e.into(), CreateUpdateStats::default()))?; + build_create_and_update_instructions(client, &entries_to_update).await?; let submit_result = submit_create_and_update(client, create_transactions, update_instructions, keypair).await; @@ -257,7 +234,7 @@ pub async fn update_mev_earned( validators_updated.insert(tip_distribution_account, vote_account); } } - submit_result.map_err(|(e, stats)| (e.into(), stats)) + submit_result.map_err(|e| e.into()) } async fn get_existing_entries( diff --git a/keepers/validator-keeper/src/stake.rs b/keepers/validator-keeper/src/stake.rs index 803039bc..20c44da5 100644 --- a/keepers/validator-keeper/src/stake.rs +++ b/keepers/validator-keeper/src/stake.rs @@ -3,12 +3,10 @@ use std::{collections::HashMap, str::FromStr, sync::Arc}; use anchor_lang::{AccountDeserialize, Discriminator, InstructionData, ToAccountMetas}; use keeper_core::{ build_create_and_update_instructions, get_vote_accounts_with_retry, submit_create_and_update, - submit_instructions, Address, CreateTransaction, CreateUpdateStats, MultipleAccountsError, - SubmitStats, TransactionExecutionError, UpdateInstruction, + submit_instructions, Address, CreateTransaction, CreateUpdateStats, UpdateInstruction, }; use log::error; use solana_client::{ - client_error::ClientError, nonblocking::rpc_client::RpcClient, rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, rpc_filter::{Memcmp, RpcFilterType}, @@ -19,23 +17,12 @@ use solana_sdk::{ commitment_config::CommitmentConfig, instruction::Instruction, pubkey::Pubkey, signature::Keypair, signer::Signer, }; -use thiserror::Error as ThisError; use validator_history::{ constants::{MAX_ALLOC_BYTES, MIN_VOTE_EPOCHS}, state::{Config, ValidatorHistory}, }; -#[derive(ThisError, Debug)] -pub enum StakeHistoryError { - #[error(transparent)] - ClientError(#[from] ClientError), - #[error(transparent)] - TransactionExecutionError(#[from] TransactionExecutionError), - #[error(transparent)] - MultipleAccountsError(#[from] MultipleAccountsError), - #[error("Epoch mismatch")] - EpochMismatch, -} +use crate::KeeperError; pub struct StakeHistoryEntry { pub stake: u64, @@ -183,14 +170,13 @@ pub async fn update_stake_history( client: Arc, keypair: Arc, program_id: &Pubkey, -) -> Result { +) -> Result { let vote_accounts = get_vote_accounts_with_retry( &client, MIN_VOTE_EPOCHS, Some(CommitmentConfig::finalized()), ) - .await - .map_err(|e| (e.into(), CreateUpdateStats::default()))?; + .await?; // Need to ensure that the response contains update stake amounts for the current epoch, // so we find the largest epoch a validator has voted on to confirm the data is fresh @@ -206,15 +192,11 @@ pub async fn update_stake_history( let epoch = client .get_epoch_info_with_commitment(CommitmentConfig::finalized()) - .await - .map_err(|e| (e.into(), CreateUpdateStats::default()))? + .await? .epoch; if max_vote_account_epoch != epoch { - return Err(( - StakeHistoryError::EpochMismatch, - CreateUpdateStats::default(), - )); + return Err(KeeperError::Custom("EpochMismatch".into())); } let stake_history_entries = vote_accounts @@ -234,13 +216,11 @@ pub async fn update_stake_history( .collect::>(); let (create_transactions, update_instructions) = - build_create_and_update_instructions(&client, &stake_history_entries) - .await - .map_err(|e| (e.into(), CreateUpdateStats::default()))?; + build_create_and_update_instructions(&client, &stake_history_entries).await?; submit_create_and_update(&client, create_transactions, update_instructions, &keypair) .await - .map_err(|(e, stats)| (e.into(), stats)) + .map_err(|e| e.into()) } /* @@ -253,7 +233,7 @@ pub async fn _recompute_superminority_and_rank( program_id: &Pubkey, start_epoch: u64, end_epoch: u64, -) -> Result<(), (StakeHistoryError, SubmitStats)> { +) -> Result<(), KeeperError> { // Fetch every ValidatorHistory account let gpa_config = RpcProgramAccountsConfig { filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new_raw_bytes( @@ -333,7 +313,7 @@ pub async fn _recompute_superminority_and_rank( match submit_instructions(&client, update_instructions, &keypair).await { Ok(_) => println!("completed epoch {}", epoch), - Err((e, stats)) => return Err((e.into(), stats)), + Err(e) => return Err(e.into()), }; } diff --git a/keepers/validator-keeper/src/vote_account.rs b/keepers/validator-keeper/src/vote_account.rs index 49ba0c46..6bea2a32 100644 --- a/keepers/validator-keeper/src/vote_account.rs +++ b/keepers/validator-keeper/src/vote_account.rs @@ -1,31 +1,22 @@ +use std::collections::HashSet; use std::str::FromStr; use std::sync::Arc; use anchor_lang::{InstructionData, ToAccountMetas}; use keeper_core::{ build_create_and_update_instructions, get_vote_accounts_with_retry, submit_create_and_update, - Address, CreateTransaction, CreateUpdateStats, MultipleAccountsError, - TransactionExecutionError, UpdateInstruction, + Address, CreateTransaction, CreateUpdateStats, UpdateInstruction, }; use log::error; -use solana_client::rpc_response::RpcVoteAccountInfo; -use solana_client::{client_error::ClientError, nonblocking::rpc_client::RpcClient}; +use solana_client::nonblocking::rpc_client::RpcClient; use solana_program::{instruction::Instruction, pubkey::Pubkey}; use solana_sdk::{signature::Keypair, signer::Signer}; -use thiserror::Error as ThisError; + use validator_history::constants::{MAX_ALLOC_BYTES, MIN_VOTE_EPOCHS}; use validator_history::state::ValidatorHistory; use validator_history::Config; -#[derive(ThisError, Debug)] -pub enum UpdateCommissionError { - #[error(transparent)] - ClientError(#[from] ClientError), - #[error(transparent)] - TransactionExecutionError(#[from] TransactionExecutionError), - #[error(transparent)] - MultipleAccountsError(#[from] MultipleAccountsError), -} +use crate::{get_validator_history_accounts_with_retry, KeeperError}; pub struct CopyVoteAccountEntry { pub vote_account: Pubkey, @@ -36,20 +27,14 @@ pub struct CopyVoteAccountEntry { } impl CopyVoteAccountEntry { - pub fn new(vote_account: &RpcVoteAccountInfo, program_id: &Pubkey, signer: &Pubkey) -> Self { - let vote_account = Pubkey::from_str(&vote_account.vote_pubkey) - .map_err(|e| { - error!("Invalid vote account pubkey"); - e - }) - .expect("Invalid vote account pubkey"); + pub fn new(vote_account: &Pubkey, program_id: &Pubkey, signer: &Pubkey) -> Self { let (validator_history_account, _) = Pubkey::find_program_address( &[ValidatorHistory::SEED, &vote_account.to_bytes()], program_id, ); let (config_address, _) = Pubkey::find_program_address(&[Config::SEED], program_id); Self { - vote_account, + vote_account: *vote_account, validator_history_account, config_address, program_id: *program_id, @@ -116,12 +101,27 @@ pub async fn update_vote_accounts( rpc_client: Arc, keypair: Arc, validator_history_program_id: Pubkey, -) -> Result { - let stats = CreateUpdateStats::default(); +) -> Result { + let rpc_vote_accounts = + get_vote_accounts_with_retry(&rpc_client, MIN_VOTE_EPOCHS, None).await?; - let vote_accounts = get_vote_accounts_with_retry(&rpc_client, MIN_VOTE_EPOCHS, None) - .await - .map_err(|e| (e.into(), stats))?; + let validator_histories = + get_validator_history_accounts_with_retry(&rpc_client, validator_history_program_id) + .await?; + + // Merges new and active RPC vote accounts with all validator history accounts, and dedupes + let vote_accounts = rpc_vote_accounts + .iter() + .filter_map(|rpc_va| { + Pubkey::from_str(&rpc_va.vote_pubkey) + .map_err(|e| { + error!("Invalid vote account pubkey"); + e + }) + .ok() + }) + .chain(validator_histories.iter().map(|vh| vh.vote_account)) + .collect::>(); let entries = vote_accounts .iter() @@ -129,9 +129,7 @@ pub async fn update_vote_accounts( .collect::>(); let (create_transactions, update_instructions) = - build_create_and_update_instructions(&rpc_client, &entries) - .await - .map_err(|e| (e.into(), stats))?; + build_create_and_update_instructions(&rpc_client, &entries).await?; let submit_result = submit_create_and_update( &rpc_client, @@ -141,5 +139,5 @@ pub async fn update_vote_accounts( ) .await; - submit_result.map_err(|(e, stats)| (e.into(), stats)) + submit_result.map_err(|e| e.into()) }