diff --git a/.gitignore b/.gitignore index 1fcfbf16..8ae27317 100644 --- a/.gitignore +++ b/.gitignore @@ -13,4 +13,5 @@ test-ledger **/credentials **/config **/*.env +**/.vscode /scripts \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index fbcb5ba8..00000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "cSpell.words": [ - "datapoint" - ] -} \ No newline at end of file diff --git a/keepers/keeper-core/src/lib.rs b/keepers/keeper-core/src/lib.rs index 18b2179d..dfd0cd58 100644 --- a/keepers/keeper-core/src/lib.rs +++ b/keepers/keeper-core/src/lib.rs @@ -6,8 +6,9 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use clap::ValueEnum; use log::*; -use solana_client::rpc_response::RpcVoteAccountInfo; +use solana_client::rpc_response::{Response, RpcSimulateTransactionResult, RpcVoteAccountInfo}; use solana_client::{client_error::ClientError, nonblocking::rpc_client::RpcClient}; +use solana_metrics::datapoint_error; use solana_program::hash::Hash; use solana_sdk::compute_budget::ComputeBudgetInstruction; use solana_sdk::packet::PACKET_DATA_SIZE; @@ -21,6 +22,8 @@ use thiserror::Error as ThisError; use tokio::task::{self, JoinError}; use tokio::time::sleep; +const DEFAULT_COMPUTE_LIMIT: usize = 200_000; + #[derive(Debug, Default, Clone)] pub struct SubmitStats { pub successes: u64, @@ -114,6 +117,70 @@ pub async fn get_multiple_accounts_batched( Ok(accounts_result) } +async fn simulate_instruction( + client: &RpcClient, + instruction: &Instruction, + signer: &Keypair, + priority_fee_in_microlamports: u64, + max_cu_per_tx: u32, +) -> Result, ClientError> { + let latest_blockhash = get_latest_blockhash_with_retry(client).await?; + + let test_tx = Transaction::new_signed_with_payer( + &[ + ComputeBudgetInstruction::set_compute_unit_limit(max_cu_per_tx), + ComputeBudgetInstruction::set_compute_unit_price(priority_fee_in_microlamports), + instruction.to_owned(), + ], + Some(&signer.pubkey()), + &[signer], + latest_blockhash, + ); + + client.simulate_transaction(&test_tx).await +} + +async fn simulate_instruction_with_retry( + client: &RpcClient, + instruction: &Instruction, + signer: &Keypair, + priority_fee_in_microlamports: u64, + max_cu_per_tx: u32, +) -> Result, ClientError> { + for _ in 0..5 { + match simulate_instruction( + client, + instruction, + signer, + priority_fee_in_microlamports, + max_cu_per_tx, + ) + .await + { + Ok(response) => match response.value.err { + Some(e) => { + if e == TransactionError::BlockhashNotFound { + sleep(Duration::from_secs(3)).await; + } else { + return Err(e.into()); + } + } + None => return Ok(response), + }, + Err(e) => return Err(e), + } + } + + simulate_instruction( + client, + instruction, + signer, + priority_fee_in_microlamports, + max_cu_per_tx, + ) + .await +} + async fn get_latest_blockhash_with_retry(client: &RpcClient) -> Result { for _ in 1..4 { let result = client @@ -174,24 +241,39 @@ pub async fn get_vote_accounts_with_retry( } } -const DEFAULT_COMPUTE_LIMIT: usize = 200_000; - -async fn calculate_instructions_per_tx( - client: &RpcClient, - instructions: &[Instruction], - signer: &Keypair, +async fn find_ix_per_tx( + client: &Arc, + instruction: &Instruction, + signer: &Arc, + priority_fee_in_microlamports: u64, + max_cu_per_tx: u32, ) -> Result { let blockhash = get_latest_blockhash_with_retry(client).await?; let test_tx = Transaction::new_signed_with_payer( - &[instructions[0].to_owned()], + &[instruction.to_owned()], Some(&signer.pubkey()), &[signer], blockhash, ); - let response = client.simulate_transaction(&test_tx).await?; + + let response = simulate_instruction_with_retry( + client, + instruction, + signer, + priority_fee_in_microlamports, + max_cu_per_tx, + ) + .await?; if let Some(err) = response.value.clone().err { - debug!("Simulation error: {:?}", response.value); - return Err(err.into()); + error!("Simulation error: {} {:?}", max_cu_per_tx, response.value); + + datapoint_error!( + "simulation-error", + ("error", err.to_string(), String), + ("instruction", format!("{:?}", instruction), String) + ); + + return Err(err.into()); // Return the error immediately, stopping further execution } let compute = response .value @@ -199,21 +281,24 @@ async fn calculate_instructions_per_tx( .unwrap_or(DEFAULT_COMPUTE_LIMIT as u64); let serialized_size = Packet::from_data(None, &test_tx).unwrap().meta().size; + // additional size per ix let size_per_ix = - instructions[0].accounts.len() * size_of::() + instructions[0].data.len(); + instruction.accounts.len() * size_of::() + instruction.data.len(); let size_max = (PACKET_DATA_SIZE - serialized_size + size_per_ix) / size_per_ix; - let compute_max = DEFAULT_COMPUTE_LIMIT / compute as usize; + let compute_max = max_cu_per_tx as usize / compute as usize; + + let size = size_max.min(compute_max); - Ok(size_max.min(compute_max)) + Ok(size) } async fn parallel_confirm_transactions( client: &RpcClient, submitted_signatures: HashSet, ) -> HashSet { - // Confirmes TXs in batches of 256 (max allowed by RPC method). Returns confirmed signatures + // Confirms TXs in batches of 256 (max allowed by RPC method). Returns confirmed signatures const SIG_STATUS_BATCH_SIZE: usize = 256; let num_transactions_submitted = submitted_signatures.len(); let signatures_to_confirm = submitted_signatures.into_iter().collect::>(); @@ -364,13 +449,69 @@ pub async fn parallel_execute_transactions( Ok(results) } +pub async fn pack_instructions( + client: &Arc, + instructions: &[Instruction], + signer: &Arc, + priority_fee_in_microlamports: u64, + max_cu_per_tx: u32, +) -> Result>, Box> { + let mut instructions_with_grouping: Vec<(&Instruction, usize)> = Vec::new(); + + for instruction in instructions.iter() { + let result = find_ix_per_tx( + client, + instruction, + signer, + priority_fee_in_microlamports, + max_cu_per_tx, + ) + .await; + + match result { + Ok(ix_per_tx) => { + instructions_with_grouping.push((instruction, ix_per_tx)); + } + Err(e) => { + error!("Could not simulate instruction: {:?}", e); + // Skip this instruction if there is an error + continue; + } + } + } + + // Group instructions by their grouping size + let mut grouped_instructions: HashMap> = HashMap::new(); + for (instruction, group_size) in instructions_with_grouping { + grouped_instructions + .entry(group_size) + .or_default() + .push(instruction); + } + + // Convert HashMap to Vec>, ensuring each group meets the length requirement + let mut result: Vec> = Vec::new(); + for (group_number, group) in grouped_instructions { + for chunk in group.chunks(group_number) { + let mut tx_instructions = Vec::new(); + for instruction in chunk { + tx_instructions.push((*instruction).clone()); + } + result.push(tx_instructions); + } + } + + Ok(result) +} + pub async fn parallel_execute_instructions( client: &Arc, instructions: &[Instruction], signer: &Arc, retry_count: u16, confirmation_time: u64, - microlamports: u64, + priority_fee_in_microlamports: u64, + max_cu_per_tx: Option, ) -> Result>, TransactionExecutionError> { /* Note: Assumes all instructions are equivalent in compute, equivalent in size, and can be executed in any order @@ -387,20 +528,40 @@ pub async fn parallel_execute_instructions( return Ok(vec![]); } - let instructions_per_tx = calculate_instructions_per_tx(client, instructions, signer) - .await - .map_err(|e| TransactionExecutionError::ClientError(e.to_string()))? - - 1; + // let instructions_per_tx = calculate_instructions_per_tx( + // client, + // instructions, + // signer, + // priority_fee_in_microlamports, + // max_cu_per_tx, + // ) + // .await + // .map_err(|e| TransactionExecutionError::ClientError(e.to_string()))? + // - 1; + + let max_cu_per_tx = max_cu_per_tx.unwrap_or(DEFAULT_COMPUTE_LIMIT as u32); + + let mut transactions: Vec> = pack_instructions( + client, + instructions, + signer, + priority_fee_in_microlamports, + max_cu_per_tx, + ) + .await + .map_err(|e| TransactionExecutionError::ClientError(e.to_string()))?; - let mut transactions: Vec> = instructions - .chunks(instructions_per_tx) - .map(|c| c.to_vec()) - .collect(); for tx in transactions.iter_mut() { tx.insert( 0, - ComputeBudgetInstruction::set_compute_unit_price(microlamports), + ComputeBudgetInstruction::set_compute_unit_price(priority_fee_in_microlamports), ); + if max_cu_per_tx != DEFAULT_COMPUTE_LIMIT as u32 { + tx.insert( + 0, + ComputeBudgetInstruction::set_compute_unit_limit(max_cu_per_tx), + ); + } } let transactions: Vec<&[Instruction]> = transactions.iter().map(|c| c.as_slice()).collect(); @@ -474,11 +635,20 @@ pub async fn submit_instructions( client: &Arc, instructions: Vec, keypair: &Arc, - microlamports: u64, + priority_fee_in_microlamports: u64, + max_cu_per_tx: Option, ) -> Result { let mut stats = SubmitStats::default(); - match parallel_execute_instructions(client, &instructions, keypair, 100, 20, microlamports) - .await + match parallel_execute_instructions( + client, + &instructions, + keypair, + 100, + 20, + priority_fee_in_microlamports, + max_cu_per_tx, + ) + .await { Ok(results) => { stats.successes = results.iter().filter(|&tx| tx.is_ok()).count() as u64; @@ -495,10 +665,18 @@ pub async fn submit_create_and_update( create_transactions: Vec>, update_instructions: Vec, keypair: &Arc, - microlamports: u64, + priority_fee_in_microlamports: u64, + max_cu_per_tx: Option, ) -> Result { Ok(CreateUpdateStats { creates: submit_transactions(client, create_transactions, keypair).await?, - updates: submit_instructions(client, update_instructions, keypair, microlamports).await?, + updates: submit_instructions( + client, + update_instructions, + keypair, + priority_fee_in_microlamports, + max_cu_per_tx, + ) + .await?, }) } diff --git a/keepers/validator-keeper/src/main.rs b/keepers/validator-keeper/src/main.rs index 54193712..f89b36db 100644 --- a/keepers/validator-keeper/src/main.rs +++ b/keepers/validator-keeper/src/main.rs @@ -3,69 +3,25 @@ This program starts several threads to manage the creation of validator history and the updating of the various data feeds within the accounts. It will emits metrics for each data feed, if env var SOLANA_METRICS_CONFIG is set to a valid influx server. */ -use clap::{arg, command, Parser}; -use keeper_core::Cluster; +use clap::Parser; use log::*; use solana_client::nonblocking::rpc_client::RpcClient; use solana_metrics::set_host_id; -use solana_sdk::{ - pubkey::Pubkey, - signature::{read_keypair_file, Keypair}, -}; -use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; +use solana_sdk::signature::read_keypair_file; +use std::{sync::Arc, time::Duration}; use tokio::time::sleep; use validator_keeper::{ - operations::{self, keeper_operations::KeeperOperations}, + operations::{ + self, + keeper_operations::{KeeperCreates, KeeperOperations}, + }, state::{ + keeper_config::{Args, KeeperConfig}, keeper_state::KeeperState, update_state::{create_missing_accounts, post_create_update, pre_create_update}, }, }; -#[derive(Parser, Debug)] -#[command(about = "Keeps commission history accounts up to date")] -struct Args { - /// RPC URL for the cluster - #[arg( - short, - long, - env, - default_value = "https://api.mainnet-beta.solana.com" - )] - json_rpc_url: String, - - /// Gossip entrypoint in the form of URL:PORT - #[arg(short, long, env)] - gossip_entrypoint: Option, - - /// Path to keypair used to pay for account creation and execute transactions - #[arg(short, long, env, default_value = "./credentials/keypair.json")] - keypair: PathBuf, - - /// Path to keypair used specifically for submitting permissioned transactions - #[arg(short, long, env)] - oracle_authority_keypair: Option, - - /// Validator history program ID (Pubkey as base58 string) - #[arg(short, long, env)] - program_id: Pubkey, - - /// Tip distribution program ID (Pubkey as base58 string) - #[arg(short, long, env)] - tip_distribution_program_id: Pubkey, - - // Interval to update Validator History Accounts (default 300 sec) - #[arg(short, long, env, default_value = "300")] - validator_history_interval: u64, - - // Interval to emit metrics (default 60 sec) - #[arg(short, long, env, default_value = "60")] - metrics_interval: u64, - - #[arg(short, long, env, default_value_t = Cluster::Mainnet)] - cluster: Cluster, -} - fn should_emit(tick: u64, intervals: &[u64]) -> bool { intervals.iter().any(|interval| tick % (interval + 1) == 0) } @@ -87,28 +43,11 @@ async fn sleep_and_tick(tick: &mut u64) { advance_tick(tick); } -struct RunLoopConfig { - client: Arc, - keypair: Arc, - program_id: Pubkey, - tip_distribution_program_id: Pubkey, - oracle_authority_keypair: Option>, - gossip_entrypoint: Option, - validator_history_interval: u64, - metrics_interval: u64, -} +async fn run_keeper(keeper_config: KeeperConfig) { + // Intervals + let metrics_interval = keeper_config.metrics_interval; + let validator_history_interval = keeper_config.validator_history_interval; -async fn run_loop(config: RunLoopConfig) { - let RunLoopConfig { - client, - keypair, - program_id, - tip_distribution_program_id, - oracle_authority_keypair, - gossip_entrypoint, - validator_history_interval, - metrics_interval, - } = config; let intervals = vec![validator_history_interval, metrics_interval]; // Stateful data @@ -121,136 +60,132 @@ async fn run_loop(config: RunLoopConfig) { // Additionally, this function will update the keeper state. If update fails - it will skip the fire functions. if should_update(tick, &intervals) { info!("Pre-fetching data for update..."); - match pre_create_update(&client, &keypair, &program_id, &mut keeper_state).await { + match pre_create_update(&keeper_config, &mut keeper_state).await { Ok(_) => { keeper_state.increment_update_run_for_epoch(KeeperOperations::PreCreateUpdate); } Err(e) => { error!("Failed to pre create update: {:?}", e); - advance_tick(&mut tick); + keeper_state .increment_update_error_for_epoch(KeeperOperations::PreCreateUpdate); + + advance_tick(&mut tick); continue; } } info!("Creating missing accounts..."); - match create_missing_accounts(&client, &keypair, &program_id, &keeper_state).await { - Ok(_) => { + match create_missing_accounts(&keeper_config, &keeper_state).await { + Ok(new_accounts_created) => { keeper_state .increment_update_run_for_epoch(KeeperOperations::CreateMissingAccounts); + + let total_txs: usize = new_accounts_created.iter().map(|(_, txs)| txs).sum(); + keeper_state.increment_update_txs_for_epoch( + KeeperOperations::CreateMissingAccounts, + total_txs as u64, + ); + + new_accounts_created + .iter() + .for_each(|(operation, created_accounts)| { + keeper_state.increment_creations_for_epoch(( + operation.clone(), + *created_accounts as u64, + )); + }); } Err(e) => { error!("Failed to create missing accounts: {:?}", e); - advance_tick(&mut tick); + keeper_state .increment_update_error_for_epoch(KeeperOperations::CreateMissingAccounts); + + advance_tick(&mut tick); continue; } } info!("Post-fetching data for update..."); - match post_create_update( - &client, - &program_id, - &tip_distribution_program_id, - &mut keeper_state, - ) - .await - { + match post_create_update(&keeper_config, &mut keeper_state).await { Ok(_) => { keeper_state.increment_update_run_for_epoch(KeeperOperations::PostCreateUpdate); } Err(e) => { error!("Failed to post create update: {:?}", e); - advance_tick(&mut tick); + keeper_state .increment_update_error_for_epoch(KeeperOperations::PostCreateUpdate); + + advance_tick(&mut tick); continue; } } } // ---------------------- FIRE ----------------------------------- + + // VALIDATOR HISTORY if should_fire(tick, validator_history_interval) { info!("Firing operations..."); info!("Updating cluster history..."); - keeper_state.set_runs_and_errors_for_epoch( - operations::cluster_history::fire(&client, &keypair, &program_id, &keeper_state) - .await, + keeper_state.set_runs_errors_and_txs_for_epoch( + operations::cluster_history::fire(&keeper_config, &keeper_state).await, ); info!("Updating copy vote accounts..."); - keeper_state.set_runs_and_errors_for_epoch( - operations::vote_account::fire(&client, &keypair, &program_id, &keeper_state).await, + keeper_state.set_runs_errors_and_txs_for_epoch( + operations::vote_account::fire(&keeper_config, &keeper_state).await, ); info!("Updating mev commission..."); - keeper_state.set_runs_and_errors_for_epoch( - operations::mev_commission::fire( - &client, - &keypair, - &program_id, - &tip_distribution_program_id, - &keeper_state, - ) - .await, + keeper_state.set_runs_errors_and_txs_for_epoch( + operations::mev_commission::fire(&keeper_config, &keeper_state).await, ); info!("Updating mev earned..."); - keeper_state.set_runs_and_errors_for_epoch( - operations::mev_earned::fire( - &client, - &keypair, - &program_id, - &tip_distribution_program_id, - &keeper_state, - ) - .await, + keeper_state.set_runs_errors_and_txs_for_epoch( + operations::mev_earned::fire(&keeper_config, &keeper_state).await, ); - if let Some(oracle_authority_keypair) = &oracle_authority_keypair { + if keeper_config.oracle_authority_keypair.is_some() { info!("Updating stake accounts..."); - keeper_state.set_runs_and_errors_for_epoch( - operations::stake_upload::fire( - &client, - oracle_authority_keypair, - &program_id, - &keeper_state, - ) - .await, + keeper_state.set_runs_errors_and_txs_for_epoch( + operations::stake_upload::fire(&keeper_config, &keeper_state).await, ); } - if let (Some(gossip_entrypoint), Some(oracle_authority_keypair)) = - (gossip_entrypoint, &oracle_authority_keypair) + if keeper_config.oracle_authority_keypair.is_some() + && keeper_config.gossip_entrypoint.is_some() { info!("Updating gossip accounts..."); - keeper_state.set_runs_and_errors_for_epoch( - operations::gossip_upload::fire( - &client, - oracle_authority_keypair, - &program_id, - &gossip_entrypoint, - &keeper_state, - ) - .await, + keeper_state.set_runs_errors_and_txs_for_epoch( + operations::gossip_upload::fire(&keeper_config, &keeper_state).await, ); } } - // ---------------------- EMIT METRICS ----------------------------------- - + // ON-CHAIN METRICS if should_fire(tick, metrics_interval) { info!("Emitting metrics..."); - keeper_state - .set_runs_and_errors_for_epoch(operations::metrics_emit::fire(&keeper_state).await); + keeper_state.set_runs_errors_and_txs_for_epoch( + operations::metrics_emit::fire(&keeper_state).await, + ); } // ---------------------- EMIT --------------------------------- if should_emit(tick, &intervals) { - KeeperOperations::emit(&keeper_state.runs_for_epoch, &keeper_state.errors_for_epoch) + keeper_state.emit(); + + KeeperOperations::emit( + &keeper_state.runs_for_epoch, + &keeper_state.errors_for_epoch, + &keeper_state.txs_for_epoch, + ); + + KeeperCreates::emit(&keeper_state.created_accounts_for_epoch); } // ---------- SLEEP ---------- @@ -262,6 +197,7 @@ async fn run_loop(config: RunLoopConfig) { async fn main() { env_logger::init(); let args = Args::parse(); + set_host_id(format!("{}", args.cluster)); let client = Arc::new(RpcClient::new_with_timeout( @@ -287,16 +223,17 @@ async fn main() { info!("Starting validator history keeper..."); - let config = RunLoopConfig { + let config = KeeperConfig { client, keypair, program_id: args.program_id, tip_distribution_program_id: args.tip_distribution_program_id, + priority_fee_in_microlamports: args.priority_fees, oracle_authority_keypair, gossip_entrypoint, validator_history_interval: args.validator_history_interval, metrics_interval: args.metrics_interval, }; - run_loop(config).await; + run_keeper(config).await; } diff --git a/keepers/validator-keeper/src/mev_commission.rs b/keepers/validator-keeper/src/mev_commission.rs index 84b87179..fb06b773 100644 --- a/keepers/validator-keeper/src/mev_commission.rs +++ b/keepers/validator-keeper/src/mev_commission.rs @@ -129,6 +129,7 @@ pub async fn update_mev_commission( keypair: Arc, validator_history_program_id: &Pubkey, tip_distribution_program_id: &Pubkey, + priority_fee_in_microlamports: u64, validators_updated: &mut HashMap, prev_epoch: &mut u64, ) -> Result { @@ -167,7 +168,7 @@ pub async fn update_mev_commission( create_transactions, update_instructions, &keypair, - PRIORITY_FEE, + priority_fee_in_microlamports, ) .await { @@ -193,6 +194,7 @@ pub async fn update_mev_earned( keypair: &Arc, validator_history_program_id: &Pubkey, tip_distribution_program_id: &Pubkey, + priority_fee_in_microlamports: u64, validators_updated: &mut HashMap, curr_epoch: &mut u64, ) -> Result { @@ -235,7 +237,7 @@ pub async fn update_mev_earned( create_transactions, update_instructions, keypair, - PRIORITY_FEE, + priority_fee_in_microlamports, ) .await; match submit_result { diff --git a/keepers/validator-keeper/src/operations/cluster_history.rs b/keepers/validator-keeper/src/operations/cluster_history.rs index 8f348521..4a7c4fa3 100644 --- a/keepers/validator-keeper/src/operations/cluster_history.rs +++ b/keepers/validator-keeper/src/operations/cluster_history.rs @@ -4,8 +4,9 @@ and the updating of the various data feeds within the accounts. It will emits metrics for each data feed, if env var SOLANA_METRICS_CONFIG is set to a valid influx server. */ +use crate::derive_cluster_history_address; +use crate::state::keeper_config::KeeperConfig; use crate::state::keeper_state::KeeperState; -use crate::{derive_cluster_history_address, PRIORITY_FEE}; use anchor_lang::{InstructionData, ToAccountMetas}; use keeper_core::{submit_transactions, SubmitStats, TransactionExecutionError}; use solana_client::nonblocking::rpc_client::RpcClient; @@ -36,32 +37,39 @@ async fn _process( client: &Arc, keypair: &Arc, program_id: &Pubkey, + priority_fee_in_microlamports: u64, ) -> Result { - update_cluster_info(client, keypair, program_id).await + update_cluster_info(client, keypair, program_id, priority_fee_in_microlamports).await } pub async fn fire( - client: &Arc, - keypair: &Arc, - program_id: &Pubkey, + keeper_config: &KeeperConfig, keeper_state: &KeeperState, -) -> (KeeperOperations, u64, u64) { +) -> (KeeperOperations, u64, u64, u64) { + let client = &keeper_config.client; + let keypair = &keeper_config.keypair; + let program_id = &keeper_config.program_id; + let priority_fee_in_microlamports = keeper_config.priority_fee_in_microlamports; + let operation = _get_operation(); let epoch_info = &keeper_state.epoch_info; - let (mut runs_for_epoch, mut errors_for_epoch) = - keeper_state.copy_runs_and_errors_for_epoch(operation.clone()); + let (mut runs_for_epoch, mut errors_for_epoch, mut txs_for_epoch) = + keeper_state.copy_runs_errors_and_txs_for_epoch(operation.clone()); let should_run = _should_run(epoch_info, runs_for_epoch); if should_run { - match _process(client, keypair, program_id).await { + match _process(client, keypair, program_id, priority_fee_in_microlamports).await { Ok(stats) => { for message in stats.results.iter() { if let Err(e) = message { datapoint_error!("cluster-history-error", ("error", e.to_string(), String),); + } else { + txs_for_epoch += 1; } } + if stats.errors == 0 { runs_for_epoch += 1; } @@ -73,7 +81,7 @@ pub async fn fire( }; } - (operation, runs_for_epoch, errors_for_epoch) + (operation, runs_for_epoch, errors_for_epoch, txs_for_epoch) } // ----------------- OPERATION SPECIFIC FUNCTIONS ----------------- @@ -81,11 +89,13 @@ pub async fn fire( pub fn get_update_cluster_info_instructions( program_id: &Pubkey, keypair: &Pubkey, + priority_fee_in_microlamports: u64, ) -> Vec { let cluster_history_account = derive_cluster_history_address(program_id); - let priority_fee_ix = - compute_budget::ComputeBudgetInstruction::set_compute_unit_price(PRIORITY_FEE); + let priority_fee_ix = compute_budget::ComputeBudgetInstruction::set_compute_unit_price( + priority_fee_in_microlamports, + ); let heap_request_ix = compute_budget::ComputeBudgetInstruction::request_heap_frame(256 * 1024); let compute_budget_ix = compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(1_400_000); @@ -112,8 +122,13 @@ pub async fn update_cluster_info( client: &Arc, keypair: &Arc, program_id: &Pubkey, + priority_fee_in_microlamports: u64, ) -> Result { - let ixs = get_update_cluster_info_instructions(program_id, &keypair.pubkey()); + let ixs = get_update_cluster_info_instructions( + program_id, + &keypair.pubkey(), + priority_fee_in_microlamports, + ); submit_transactions(client, vec![ixs], keypair).await } diff --git a/keepers/validator-keeper/src/operations/gossip_upload.rs b/keepers/validator-keeper/src/operations/gossip_upload.rs index 427f1d39..12fc3757 100644 --- a/keepers/validator-keeper/src/operations/gossip_upload.rs +++ b/keepers/validator-keeper/src/operations/gossip_upload.rs @@ -4,8 +4,9 @@ This program starts several threads to manage the creation of validator history and the updating of the various data feeds within the accounts. It will emits metrics for each data feed, if env var SOLANA_METRICS_CONFIG is set to a valid influx server. */ +use crate::start_spy_server; +use crate::state::keeper_config::KeeperConfig; use crate::state::keeper_state::KeeperState; -use crate::{start_spy_server, PRIORITY_FEE}; use bytemuck::{bytes_of, Pod, Zeroable}; use keeper_core::{submit_transactions, SubmitStats}; use log::*; @@ -46,31 +47,57 @@ async fn _process( client: &Arc, keypair: &Arc, program_id: &Pubkey, + priority_fee_in_microlamports: u64, entrypoint: &SocketAddr, keeper_state: &KeeperState, ) -> Result> { - upload_gossip_values(client, keypair, program_id, entrypoint, keeper_state).await + upload_gossip_values( + client, + keypair, + program_id, + priority_fee_in_microlamports, + entrypoint, + keeper_state, + ) + .await } pub async fn fire( - client: &Arc, - keypair: &Arc, - program_id: &Pubkey, - entrypoint: &SocketAddr, + keeper_config: &KeeperConfig, keeper_state: &KeeperState, -) -> (KeeperOperations, u64, u64) { +) -> (KeeperOperations, u64, u64, u64) { + let client = &keeper_config.client; + let keypair = &keeper_config.keypair; + let program_id = &keeper_config.program_id; + let entrypoint = &keeper_config + .gossip_entrypoint + .expect("Entry point not set"); + + let priority_fee_in_microlamports = keeper_config.priority_fee_in_microlamports; + let operation = _get_operation(); - let (mut runs_for_epoch, mut errors_for_epoch) = - keeper_state.copy_runs_and_errors_for_epoch(operation.clone()); + let (mut runs_for_epoch, mut errors_for_epoch, mut txs_for_epoch) = + keeper_state.copy_runs_errors_and_txs_for_epoch(operation.clone()); let should_run = _should_run(&keeper_state.epoch_info, runs_for_epoch); if should_run { - match _process(client, keypair, program_id, entrypoint, keeper_state).await { + match _process( + client, + keypair, + program_id, + priority_fee_in_microlamports, + entrypoint, + keeper_state, + ) + .await + { Ok(stats) => { for message in stats.results.iter().chain(stats.results.iter()) { if let Err(e) = message { datapoint_error!("gossip-upload-error", ("error", e.to_string(), String),); + } else { + txs_for_epoch += 1; } } if stats.errors == 0 { @@ -84,7 +111,7 @@ pub async fn fire( } } - (operation, runs_for_epoch, errors_for_epoch) + (operation, runs_for_epoch, errors_for_epoch, txs_for_epoch) } // ----------------- OPERATION SPECIFIC FUNCTIONS ----------------- @@ -217,6 +244,7 @@ pub async fn upload_gossip_values( client: &Arc, keypair: &Arc, program_id: &Pubkey, + priority_fee_in_microlamports: u64, entrypoint: &SocketAddr, keeper_state: &KeeperState, ) -> Result> { @@ -270,7 +298,7 @@ pub async fn upload_gossip_values( let update_transactions = gossip_entries .iter() - .map(|entry| entry.build_update_tx(PRIORITY_FEE)) + .map(|entry| entry.build_update_tx(priority_fee_in_microlamports)) .collect::>(); let submit_result = submit_transactions(client, update_transactions, keypair).await; diff --git a/keepers/validator-keeper/src/operations/keeper_operations.rs b/keepers/validator-keeper/src/operations/keeper_operations.rs index b4f5008c..4d577f21 100644 --- a/keepers/validator-keeper/src/operations/keeper_operations.rs +++ b/keepers/validator-keeper/src/operations/keeper_operations.rs @@ -1,5 +1,30 @@ use solana_metrics::datapoint_info; +#[derive(Clone)] +pub enum KeeperCreates { + CreateValidatorHistory, +} + +impl KeeperCreates { + pub const LEN: usize = 1; + + pub fn emit(created_accounts_for_epoch: &[u64; KeeperCreates::LEN]) { + let aggregate_creates = created_accounts_for_epoch.iter().sum::(); + + datapoint_info!( + "keeper-create-stats", + // AGGREGATE + ("num-aggregate-creates", aggregate_creates, i64), + // CREATE VALIDATOR HISTORY + ( + "num-validator-history-creates", + created_accounts_for_epoch[KeeperCreates::CreateValidatorHistory as usize], + i64 + ), + ); + } +} + #[derive(Clone)] pub enum KeeperOperations { PreCreateUpdate, @@ -20,14 +45,19 @@ impl KeeperOperations { pub fn emit( runs_for_epoch: &[u64; KeeperOperations::LEN], errors_for_epoch: &[u64; KeeperOperations::LEN], + txs_for_epoch: &[u64; KeeperOperations::LEN], ) { let aggregate_actions = runs_for_epoch.iter().sum::(); let aggregate_errors = errors_for_epoch.iter().sum::(); + let aggregate_txs = txs_for_epoch.iter().sum::(); datapoint_info!( "keeper-operation-stats", + // AGGREGATE ("num-aggregate-actions", aggregate_actions, i64), ("num-aggregate-errors", aggregate_errors, i64), + ("num-aggregate-txs", aggregate_txs, i64), + // PRE CREATE UPDATE ( "num-pre-create-update-runs", runs_for_epoch[KeeperOperations::PreCreateUpdate as usize], @@ -38,6 +68,12 @@ impl KeeperOperations { errors_for_epoch[KeeperOperations::PreCreateUpdate as usize], i64 ), + ( + "num-pre-create-update-txs", + txs_for_epoch[KeeperOperations::PreCreateUpdate as usize], + i64 + ), + // CREATE MISSING ACCOUNTS ( "num-create-missing-accounts-runs", runs_for_epoch[KeeperOperations::CreateMissingAccounts as usize], @@ -48,6 +84,12 @@ impl KeeperOperations { errors_for_epoch[KeeperOperations::CreateMissingAccounts as usize], i64 ), + ( + "num-create-missing-accounts-txs", + txs_for_epoch[KeeperOperations::CreateMissingAccounts as usize], + i64 + ), + // POST CREATE UPDATE ( "num-post-create-update-runs", runs_for_epoch[KeeperOperations::PostCreateUpdate as usize], @@ -58,6 +100,12 @@ impl KeeperOperations { errors_for_epoch[KeeperOperations::PostCreateUpdate as usize], i64 ), + ( + "num-post-create-update-txs", + txs_for_epoch[KeeperOperations::PostCreateUpdate as usize], + i64 + ), + // CLUSTER HISTORY ( "num-cluster-history-runs", runs_for_epoch[KeeperOperations::ClusterHistory as usize], @@ -68,6 +116,12 @@ impl KeeperOperations { errors_for_epoch[KeeperOperations::ClusterHistory as usize], i64 ), + ( + "num-cluster-history-txs", + txs_for_epoch[KeeperOperations::ClusterHistory as usize], + i64 + ), + // GOSSIP UPLOAD ( "num-gossip-upload-runs", runs_for_epoch[KeeperOperations::GossipUpload as usize], @@ -78,6 +132,12 @@ impl KeeperOperations { errors_for_epoch[KeeperOperations::GossipUpload as usize], i64 ), + ( + "num-gossip-upload-txs", + txs_for_epoch[KeeperOperations::GossipUpload as usize], + i64 + ), + // STAKE UPLOAD ( "num-stake-upload-runs", runs_for_epoch[KeeperOperations::StakeUpload as usize], @@ -88,6 +148,12 @@ impl KeeperOperations { errors_for_epoch[KeeperOperations::StakeUpload as usize], i64 ), + ( + "num-stake-upload-txs", + txs_for_epoch[KeeperOperations::StakeUpload as usize], + i64 + ), + // VOTE ACCOUNT ( "num-vote-account-runs", runs_for_epoch[KeeperOperations::VoteAccount as usize], @@ -98,6 +164,12 @@ impl KeeperOperations { errors_for_epoch[KeeperOperations::VoteAccount as usize], i64 ), + ( + "num-vote-account-txs", + txs_for_epoch[KeeperOperations::VoteAccount as usize], + i64 + ), + // MEV EARNED ( "num-mev-earned-runs", runs_for_epoch[KeeperOperations::MevEarned as usize], @@ -108,6 +180,12 @@ impl KeeperOperations { errors_for_epoch[KeeperOperations::MevEarned as usize], i64 ), + ( + "num-mev-earned-txs", + txs_for_epoch[KeeperOperations::MevEarned as usize], + i64 + ), + // MEV COMMISSION ( "num-mev-commission-runs", runs_for_epoch[KeeperOperations::MevCommission as usize], @@ -118,6 +196,12 @@ impl KeeperOperations { errors_for_epoch[KeeperOperations::MevCommission as usize], i64 ), + ( + "num-mev-commission-txs", + txs_for_epoch[KeeperOperations::MevCommission as usize], + i64 + ), + // EMIT METRICS ( "num-emit-metrics-runs", runs_for_epoch[KeeperOperations::EmitMetrics as usize], @@ -127,7 +211,12 @@ impl KeeperOperations { "num-emit-metrics-errors", errors_for_epoch[KeeperOperations::EmitMetrics as usize], i64 - ) + ), + ( + "num-emit-metrics-txs", + txs_for_epoch[KeeperOperations::EmitMetrics as usize], + i64 + ), ); } } diff --git a/keepers/validator-keeper/src/operations/metrics_emit.rs b/keepers/validator-keeper/src/operations/metrics_emit.rs index 6425c837..6b941f06 100644 --- a/keepers/validator-keeper/src/operations/metrics_emit.rs +++ b/keepers/validator-keeper/src/operations/metrics_emit.rs @@ -22,10 +22,10 @@ async fn _process(keeper_state: &KeeperState) -> Result<(), Box (KeeperOperations, u64, u64) { +pub async fn fire(keeper_state: &KeeperState) -> (KeeperOperations, u64, u64, u64) { let operation = _get_operation(); - let (mut runs_for_epoch, mut errors_for_epoch) = - keeper_state.copy_runs_and_errors_for_epoch(operation.clone()); + let (mut runs_for_epoch, mut errors_for_epoch, txs_for_epoch) = + keeper_state.copy_runs_errors_and_txs_for_epoch(operation.clone()); let should_run = _should_run(); @@ -41,7 +41,7 @@ pub async fn fire(keeper_state: &KeeperState) -> (KeeperOperations, u64, u64) { } } - (operation, runs_for_epoch, errors_for_epoch) + (operation, runs_for_epoch, errors_for_epoch, txs_for_epoch) } // ----------------- OPERATION SPECIFIC FUNCTIONS ----------------- diff --git a/keepers/validator-keeper/src/operations/mev_commission.rs b/keepers/validator-keeper/src/operations/mev_commission.rs index d7dbcb39..a306755a 100644 --- a/keepers/validator-keeper/src/operations/mev_commission.rs +++ b/keepers/validator-keeper/src/operations/mev_commission.rs @@ -4,9 +4,11 @@ and the updating of the various data feeds within the accounts. It will emits metrics for each data feed, if env var SOLANA_METRICS_CONFIG is set to a valid influx server. */ -use crate::entries::mev_commission_entry::ValidatorMevCommissionEntry; use crate::state::keeper_state::KeeperState; -use crate::{KeeperError, PRIORITY_FEE}; +use crate::KeeperError; +use crate::{ + entries::mev_commission_entry::ValidatorMevCommissionEntry, state::keeper_config::KeeperConfig, +}; use keeper_core::{submit_instructions, SubmitStats, UpdateInstruction}; use solana_client::nonblocking::rpc_client::RpcClient; use solana_metrics::datapoint_error; @@ -34,6 +36,7 @@ async fn _process( program_id: &Pubkey, tip_distribution_program_id: &Pubkey, keeper_state: &KeeperState, + priority_fee_in_microlamports: u64, ) -> Result { update_mev_commission( client, @@ -41,20 +44,24 @@ async fn _process( program_id, tip_distribution_program_id, keeper_state, + priority_fee_in_microlamports, ) .await } pub async fn fire( - client: &Arc, - keypair: &Arc, - program_id: &Pubkey, - tip_distribution_program_id: &Pubkey, + keeper_config: &KeeperConfig, keeper_state: &KeeperState, -) -> (KeeperOperations, u64, u64) { +) -> (KeeperOperations, u64, u64, u64) { + let client = &keeper_config.client; + let keypair = &keeper_config.keypair; + let program_id = &keeper_config.program_id; + let tip_distribution_program_id = &keeper_config.tip_distribution_program_id; + let priority_fee_in_microlamports = keeper_config.priority_fee_in_microlamports; + let operation = _get_operation(); - let (mut runs_for_epoch, mut errors_for_epoch) = - keeper_state.copy_runs_and_errors_for_epoch(operation.clone()); + let (mut runs_for_epoch, mut errors_for_epoch, mut txs_for_epoch) = + keeper_state.copy_runs_errors_and_txs_for_epoch(operation.clone()); let should_run = _should_run(); @@ -65,6 +72,7 @@ pub async fn fire( program_id, tip_distribution_program_id, keeper_state, + priority_fee_in_microlamports, ) .await { @@ -73,6 +81,8 @@ pub async fn fire( if let Err(e) = message { datapoint_error!("vote-account-error", ("error", e.to_string(), String),); errors_for_epoch += 1; + } else { + txs_for_epoch += 1; } } if stats.errors == 0 { @@ -86,7 +96,7 @@ pub async fn fire( }; } - (operation, runs_for_epoch, errors_for_epoch) + (operation, runs_for_epoch, errors_for_epoch, txs_for_epoch) } // ----------------- OPERATION SPECIFIC FUNCTIONS ----------------- @@ -97,6 +107,7 @@ pub async fn update_mev_commission( program_id: &Pubkey, tip_distribution_program_id: &Pubkey, keeper_state: &KeeperState, + priority_fee_in_microlamports: u64, ) -> Result { let epoch_info = &keeper_state.epoch_info; let validator_history_map = &keeper_state.validator_history_map; @@ -126,8 +137,14 @@ pub async fn update_mev_commission( }) .collect::>(); - let submit_result = - submit_instructions(client, update_instructions, keypair, PRIORITY_FEE).await; + let submit_result = submit_instructions( + client, + update_instructions, + keypair, + priority_fee_in_microlamports, + None, + ) + .await; submit_result.map_err(|e| e.into()) } diff --git a/keepers/validator-keeper/src/operations/mev_earned.rs b/keepers/validator-keeper/src/operations/mev_earned.rs index 82b0538c..38ebe0f0 100644 --- a/keepers/validator-keeper/src/operations/mev_earned.rs +++ b/keepers/validator-keeper/src/operations/mev_earned.rs @@ -4,9 +4,11 @@ and the updating of the various data feeds within the accounts. It will emits metrics for each data feed, if env var SOLANA_METRICS_CONFIG is set to a valid influx server. */ -use crate::entries::mev_commission_entry::ValidatorMevCommissionEntry; use crate::state::keeper_state::KeeperState; -use crate::{KeeperError, PRIORITY_FEE}; +use crate::KeeperError; +use crate::{ + entries::mev_commission_entry::ValidatorMevCommissionEntry, state::keeper_config::KeeperConfig, +}; use anchor_lang::AccountDeserialize; use jito_tip_distribution::state::TipDistributionAccount; use keeper_core::{submit_instructions, SubmitStats, UpdateInstruction}; @@ -35,12 +37,14 @@ async fn _process( keypair: &Arc, program_id: &Pubkey, tip_distribution_program_id: &Pubkey, + priority_fee_in_microlamports: u64, keeper_state: &KeeperState, ) -> Result { update_mev_earned( client, keypair, program_id, + priority_fee_in_microlamports, tip_distribution_program_id, keeper_state, ) @@ -48,16 +52,18 @@ async fn _process( } pub async fn fire( - client: &Arc, - keypair: &Arc, - program_id: &Pubkey, - tip_distribution_program_id: &Pubkey, + keeper_config: &KeeperConfig, keeper_state: &KeeperState, -) -> (KeeperOperations, u64, u64) { +) -> (KeeperOperations, u64, u64, u64) { + let client = &keeper_config.client; + let keypair = &keeper_config.keypair; + let program_id = &keeper_config.program_id; + let tip_distribution_program_id = &keeper_config.tip_distribution_program_id; + let priority_fee_in_microlamports = keeper_config.priority_fee_in_microlamports; let operation = _get_operation(); - let (mut runs_for_epoch, mut errors_for_epoch) = - keeper_state.copy_runs_and_errors_for_epoch(operation.clone()); + let (mut runs_for_epoch, mut errors_for_epoch, mut txs_for_epoch) = + keeper_state.copy_runs_errors_and_txs_for_epoch(operation.clone()); let should_run = _should_run(); @@ -67,6 +73,7 @@ pub async fn fire( keypair, program_id, tip_distribution_program_id, + priority_fee_in_microlamports, keeper_state, ) .await @@ -76,6 +83,8 @@ pub async fn fire( if let Err(e) = message { datapoint_error!("vote-account-error", ("error", e.to_string(), String),); errors_for_epoch += 1; + } else { + txs_for_epoch += 1; } } if stats.errors == 0 { @@ -89,7 +98,7 @@ pub async fn fire( }; } - (operation, runs_for_epoch, errors_for_epoch) + (operation, runs_for_epoch, errors_for_epoch, txs_for_epoch) } // ----------------- OPERATION SPECIFIC FUNCTIONS ----------------- @@ -98,6 +107,7 @@ pub async fn update_mev_earned( client: &Arc, keypair: &Arc, program_id: &Pubkey, + priority_fee_in_microlamports: u64, tip_distribution_program_id: &Pubkey, keeper_state: &KeeperState, ) -> Result { @@ -144,8 +154,14 @@ pub async fn update_mev_earned( }) .collect::>(); - let submit_result = - submit_instructions(client, update_instructions, keypair, PRIORITY_FEE).await; + let submit_result = submit_instructions( + client, + update_instructions, + keypair, + priority_fee_in_microlamports, + None, + ) + .await; submit_result.map_err(|e| e.into()) } diff --git a/keepers/validator-keeper/src/operations/stake_upload.rs b/keepers/validator-keeper/src/operations/stake_upload.rs index 685cb288..381c3939 100644 --- a/keepers/validator-keeper/src/operations/stake_upload.rs +++ b/keepers/validator-keeper/src/operations/stake_upload.rs @@ -1,11 +1,11 @@ -use crate::entries::stake_history_entry::StakeHistoryEntry; +use crate::{entries::stake_history_entry::StakeHistoryEntry, state::keeper_config::KeeperConfig}; /* This program starts several threads to manage the creation of validator history accounts, and the updating of the various data feeds within the accounts. It will emits metrics for each data feed, if env var SOLANA_METRICS_CONFIG is set to a valid influx server. */ use crate::state::keeper_state::KeeperState; -use crate::{KeeperError, PRIORITY_FEE}; +use crate::KeeperError; use keeper_core::{submit_instructions, SubmitStats, UpdateInstruction}; use log::*; use solana_client::nonblocking::rpc_client::RpcClient; @@ -36,29 +36,50 @@ async fn _process( client: &Arc, keypair: &Arc, program_id: &Pubkey, + priority_fee_in_microlamports: u64, keeper_state: &KeeperState, ) -> Result> { - update_stake_history(client, keypair, program_id, keeper_state).await + update_stake_history( + client, + keypair, + program_id, + priority_fee_in_microlamports, + keeper_state, + ) + .await } pub async fn fire( - client: &Arc, - keypair: &Arc, - program_id: &Pubkey, + keeper_config: &KeeperConfig, keeper_state: &KeeperState, -) -> (KeeperOperations, u64, u64) { +) -> (KeeperOperations, u64, u64, u64) { + let client = &keeper_config.client; + let keypair = &keeper_config.keypair; + let program_id = &keeper_config.program_id; + let priority_fee_in_microlamports = keeper_config.priority_fee_in_microlamports; + let operation = _get_operation(); - let (mut runs_for_epoch, mut errors_for_epoch) = - keeper_state.copy_runs_and_errors_for_epoch(operation.clone()); + let (mut runs_for_epoch, mut errors_for_epoch, mut txs_for_epoch) = + keeper_state.copy_runs_errors_and_txs_for_epoch(operation.clone()); let should_run = _should_run(&keeper_state.epoch_info, runs_for_epoch); if should_run { - match _process(client, keypair, program_id, keeper_state).await { + match _process( + client, + keypair, + program_id, + priority_fee_in_microlamports, + keeper_state, + ) + .await + { Ok(stats) => { for message in stats.results.iter().chain(stats.results.iter()) { if let Err(e) = message { datapoint_error!("stake-history-error", ("error", e.to_string(), String),); + } else { + txs_for_epoch += 1; } } @@ -73,7 +94,7 @@ pub async fn fire( }; } - (operation, runs_for_epoch, errors_for_epoch) + (operation, runs_for_epoch, errors_for_epoch, txs_for_epoch) } // ----------------- OPERATION SPECIFIC FUNCTIONS ----------------- @@ -82,6 +103,7 @@ pub async fn update_stake_history( client: &Arc, keypair: &Arc, program_id: &Pubkey, + priority_fee_in_microlamports: u64, keeper_state: &KeeperState, ) -> Result> { let epoch_info = &keeper_state.epoch_info; @@ -131,8 +153,14 @@ pub async fn update_stake_history( .map(|stake_history_entry| stake_history_entry.update_instruction()) .collect::>(); - let submit_result = - submit_instructions(client, update_instructions, keypair, PRIORITY_FEE).await; + let submit_result = submit_instructions( + client, + update_instructions, + keypair, + priority_fee_in_microlamports, + None, + ) + .await; submit_result.map_err(|e| e.into()) } diff --git a/keepers/validator-keeper/src/operations/vote_account.rs b/keepers/validator-keeper/src/operations/vote_account.rs index e0c962e4..f80196af 100644 --- a/keepers/validator-keeper/src/operations/vote_account.rs +++ b/keepers/validator-keeper/src/operations/vote_account.rs @@ -4,9 +4,11 @@ and the updating of the various data feeds within the accounts. It will emits metrics for each data feed, if env var SOLANA_METRICS_CONFIG is set to a valid influx server. */ -use crate::entries::copy_vote_account_entry::CopyVoteAccountEntry; use crate::state::keeper_state::KeeperState; -use crate::{KeeperError, PRIORITY_FEE}; +use crate::KeeperError; +use crate::{ + entries::copy_vote_account_entry::CopyVoteAccountEntry, state::keeper_config::KeeperConfig, +}; use keeper_core::{submit_instructions, SubmitStats, UpdateInstruction}; use solana_client::nonblocking::rpc_client::RpcClient; use solana_metrics::datapoint_error; @@ -36,30 +38,51 @@ async fn _process( client: &Arc, keypair: &Arc, program_id: &Pubkey, + priority_fee_in_microlamports: u64, keeper_state: &KeeperState, ) -> Result { - update_vote_accounts(client, keypair, program_id, keeper_state).await + update_vote_accounts( + client, + keypair, + program_id, + priority_fee_in_microlamports, + keeper_state, + ) + .await } pub async fn fire( - client: &Arc, - keypair: &Arc, - program_id: &Pubkey, + keeper_config: &KeeperConfig, keeper_state: &KeeperState, -) -> (KeeperOperations, u64, u64) { +) -> (KeeperOperations, u64, u64, u64) { + let client = &keeper_config.client; + let keypair = &keeper_config.keypair; + let program_id = &keeper_config.program_id; + let priority_fee_in_microlamports = keeper_config.priority_fee_in_microlamports; + let operation = _get_operation(); let epoch_info = &keeper_state.epoch_info; - let (mut runs_for_epoch, mut errors_for_epoch) = - keeper_state.copy_runs_and_errors_for_epoch(operation.clone()); + let (mut runs_for_epoch, mut errors_for_epoch, mut txs_for_epoch) = + keeper_state.copy_runs_errors_and_txs_for_epoch(operation.clone()); let should_run = _should_run(epoch_info, runs_for_epoch); if should_run { - match _process(client, keypair, program_id, keeper_state).await { + match _process( + client, + keypair, + program_id, + priority_fee_in_microlamports, + keeper_state, + ) + .await + { Ok(stats) => { for message in stats.results.iter().chain(stats.results.iter()) { if let Err(e) = message { datapoint_error!("vote-account-error", ("error", e.to_string(), String),); + } else { + txs_for_epoch += 1; } } if stats.errors == 0 { @@ -73,7 +96,7 @@ pub async fn fire( }; } - (operation, runs_for_epoch, errors_for_epoch) + (operation, runs_for_epoch, errors_for_epoch, txs_for_epoch) } // SPECIFIC TO THIS OPERATION @@ -81,6 +104,7 @@ pub async fn update_vote_accounts( rpc_client: &Arc, keypair: &Arc, program_id: &Pubkey, + priority_fee_in_microlamports: u64, keeper_state: &KeeperState, ) -> Result { let validator_history_map = &keeper_state.validator_history_map; @@ -107,8 +131,14 @@ pub async fn update_vote_accounts( .map(|copy_vote_account_entry| copy_vote_account_entry.update_instruction()) .collect::>(); - let submit_result = - submit_instructions(rpc_client, update_instructions, keypair, PRIORITY_FEE).await; + let submit_result = submit_instructions( + rpc_client, + update_instructions, + keypair, + priority_fee_in_microlamports, + Some(300_000), + ) + .await; submit_result.map_err(|e| e.into()) } diff --git a/keepers/validator-keeper/src/state/keeper_config.rs b/keepers/validator-keeper/src/state/keeper_config.rs new file mode 100644 index 00000000..df8718ab --- /dev/null +++ b/keepers/validator-keeper/src/state/keeper_config.rs @@ -0,0 +1,75 @@ +use clap::{arg, command, Parser}; +use keeper_core::Cluster; +use solana_client::nonblocking::rpc_client::RpcClient; +use solana_sdk::{pubkey::Pubkey, signature::Keypair}; +use std::{net::SocketAddr, path::PathBuf, sync::Arc}; + +pub struct KeeperConfig { + pub client: Arc, + pub keypair: Arc, + pub program_id: Pubkey, + pub tip_distribution_program_id: Pubkey, + pub priority_fee_in_microlamports: u64, + pub oracle_authority_keypair: Option>, + pub gossip_entrypoint: Option, + pub validator_history_interval: u64, + pub metrics_interval: u64, +} + +#[derive(Parser, Debug)] +#[command(about = "Keeps commission history accounts up to date")] +pub struct Args { + /// RPC URL for the cluster + #[arg( + short, + long, + env, + default_value = "https://api.mainnet-beta.solana.com" + )] + pub json_rpc_url: String, + + /// Gossip entrypoint in the form of URL:PORT + #[arg(short, long, env)] + pub gossip_entrypoint: Option, + + /// Path to keypair used to pay for account creation and execute transactions + #[arg(short, long, env, default_value = "./credentials/keypair.json")] + pub keypair: PathBuf, + + /// Path to keypair used specifically for submitting permissioned transactions + #[arg(short, long, env)] + pub oracle_authority_keypair: Option, + + /// Validator history program ID (Pubkey as base58 string) + #[arg( + short, + long, + env, + default_value = "HistoryJTGbKQD2mRgLZ3XhqHnN811Qpez8X9kCcGHoa" + )] + pub program_id: Pubkey, + + /// Tip distribution program ID (Pubkey as base58 string) + #[arg( + short, + long, + env, + default_value = "4R3gSG8BpU4t19KYj8CfnbtRpnT8gtk4dvTHxVRwc2r7" + )] + pub tip_distribution_program_id: Pubkey, + + // Interval to update Validator History Accounts (default 300 sec) + #[arg(short, long, env, default_value = "300")] + pub validator_history_interval: u64, + + // Interval to emit metrics (default 60 sec) + #[arg(short, long, env, default_value = "60")] + pub metrics_interval: u64, + + // Priority Fees in microlamports + #[arg(long, env, default_value = "200000")] + pub priority_fees: u64, + + #[arg(short, long, env, default_value_t = Cluster::Mainnet)] + pub cluster: Cluster, +} diff --git a/keepers/validator-keeper/src/state/keeper_state.rs b/keepers/validator-keeper/src/state/keeper_state.rs index 0a3a3dcc..4a2927d2 100644 --- a/keepers/validator-keeper/src/state/keeper_state.rs +++ b/keepers/validator-keeper/src/state/keeper_state.rs @@ -2,13 +2,17 @@ use std::collections::{HashMap, HashSet}; use bytemuck::Zeroable; use solana_client::rpc_response::RpcVoteAccountInfo; +use solana_metrics::datapoint_info; use solana_sdk::{ account::Account, epoch_info::EpochInfo, pubkey::Pubkey, vote::program::id as get_vote_program_id, }; use validator_history::{ClusterHistory, ValidatorHistory}; -use crate::{derive_validator_history_address, operations::keeper_operations::KeeperOperations}; +use crate::{ + derive_validator_history_address, + operations::keeper_operations::{KeeperCreates, KeeperOperations}, +}; pub struct KeeperState { pub epoch_info: EpochInfo, @@ -16,6 +20,10 @@ pub struct KeeperState { // Tally array of runs and errors indexed by their respective KeeperOperations pub runs_for_epoch: [u64; KeeperOperations::LEN], pub errors_for_epoch: [u64; KeeperOperations::LEN], + pub txs_for_epoch: [u64; KeeperOperations::LEN], + + // Tally for creates + pub created_accounts_for_epoch: [u64; KeeperCreates::LEN], // All vote account info fetched with get_vote_accounts - key'd by their pubkey pub vote_account_map: HashMap, @@ -50,18 +58,44 @@ impl KeeperState { self.errors_for_epoch[index] += 1; } - pub fn copy_runs_and_errors_for_epoch(&self, operation: KeeperOperations) -> (u64, u64) { + pub fn increment_update_txs_for_epoch(&mut self, operation: KeeperOperations, txs: u64) { + let index = operation as usize; + self.errors_for_epoch[index] += txs; + } + + pub fn copy_runs_errors_and_txs_for_epoch( + &self, + operation: KeeperOperations, + ) -> (u64, u64, u64) { let index = operation as usize; - (self.runs_for_epoch[index], self.errors_for_epoch[index]) + ( + self.runs_for_epoch[index], + self.errors_for_epoch[index], + self.txs_for_epoch[index], + ) } - pub fn set_runs_and_errors_for_epoch( + pub fn set_runs_errors_and_txs_for_epoch( &mut self, - (operation, runs_for_epoch, errors_for_epoch): (KeeperOperations, u64, u64), + (operation, runs_for_epoch, errors_for_epoch, txs_for_epoch): ( + KeeperOperations, + u64, + u64, + u64, + ), ) { let index = operation as usize; self.runs_for_epoch[index] = runs_for_epoch; self.errors_for_epoch[index] = errors_for_epoch; + self.txs_for_epoch[index] = txs_for_epoch; + } + + pub fn increment_creations_for_epoch( + &mut self, + (operation, created_accounts_for_epoch): (KeeperCreates, u64), + ) { + let index = operation as usize; + self.created_accounts_for_epoch[index] += created_accounts_for_epoch; } pub fn get_history_pubkeys(&self, program_id: &Pubkey) -> HashSet { @@ -119,6 +153,50 @@ impl KeeperState { .map(|(pubkey, _)| pubkey) .collect() } + + pub fn emit(&self) { + datapoint_info!( + "keeper-state", + // EPOCH INFO + ("epoch", self.epoch_info.epoch as i64, i64), + ("slot_index", self.epoch_info.slot_index as i64, i64), + ("slots_in_epoch", self.epoch_info.slots_in_epoch as i64, i64), + ("absolute_slot", self.epoch_info.absolute_slot as i64, i64), + ("block_height", self.epoch_info.block_height as i64, i64), + // KEEPER STATE + ("keeper_balance", self.keeper_balance as i64, i64), + ( + "vote_account_map_count", + self.vote_account_map.len() as i64, + i64 + ), + ( + "validator_history_map_count", + self.validator_history_map.len() as i64, + i64 + ), + ( + "all_history_vote_account_map_count", + self.all_history_vote_account_map.len() as i64, + i64 + ), + ( + "all_get_vote_account_map_count", + self.all_get_vote_account_map.len() as i64, + i64 + ), + ( + "previous_epoch_tip_distribution_map_count", + self.previous_epoch_tip_distribution_map.len() as i64, + i64 + ), + ( + "current_epoch_tip_distribution_map_count", + self.current_epoch_tip_distribution_map.len() as i64, + i64 + ), + ) + } } impl Default for KeeperState { @@ -134,6 +212,8 @@ impl Default for KeeperState { }, runs_for_epoch: [0; KeeperOperations::LEN], errors_for_epoch: [0; KeeperOperations::LEN], + txs_for_epoch: [0; KeeperOperations::LEN], + created_accounts_for_epoch: [0; KeeperCreates::LEN], vote_account_map: HashMap::new(), validator_history_map: HashMap::new(), all_history_vote_account_map: HashMap::new(), diff --git a/keepers/validator-keeper/src/state/mod.rs b/keepers/validator-keeper/src/state/mod.rs index 018888fc..c85a7a73 100644 --- a/keepers/validator-keeper/src/state/mod.rs +++ b/keepers/validator-keeper/src/state/mod.rs @@ -1,2 +1,3 @@ +pub mod keeper_config; pub mod keeper_state; pub mod update_state; diff --git a/keepers/validator-keeper/src/state/update_state.rs b/keepers/validator-keeper/src/state/update_state.rs index de8fb500..45ecad9c 100644 --- a/keepers/validator-keeper/src/state/update_state.rs +++ b/keepers/validator-keeper/src/state/update_state.rs @@ -7,33 +7,35 @@ use keeper_core::{ }; use solana_client::{nonblocking::rpc_client::RpcClient, rpc_response::RpcVoteAccountInfo}; use solana_sdk::{ - account::Account, - instruction::Instruction, - pubkey::Pubkey, - signature::{Keypair, Signer}, + account::Account, instruction::Instruction, pubkey::Pubkey, signature::Keypair, signer::Signer, }; use validator_history::{constants::MIN_VOTE_EPOCHS, ClusterHistory, ValidatorHistory}; use crate::{ derive_cluster_history_address, derive_validator_history_address, get_balance_with_retry, get_create_validator_history_instructions, get_validator_history_accounts_with_retry, - operations::keeper_operations::KeeperOperations, + operations::keeper_operations::{KeeperCreates, KeeperOperations}, }; -use super::keeper_state::KeeperState; +use super::{keeper_config::KeeperConfig, keeper_state::KeeperState}; pub async fn pre_create_update( - client: &Arc, - keypair: &Arc, - program_id: &Pubkey, + keeper_config: &KeeperConfig, keeper_state: &mut KeeperState, ) -> Result<(), Box> { + let client = &keeper_config.client; + let program_id = &keeper_config.program_id; + let keypair = &keeper_config.keypair; + // Update Epoch match client.get_epoch_info().await { Ok(latest_epoch) => { if latest_epoch.epoch != keeper_state.epoch_info.epoch { keeper_state.runs_for_epoch = [0; KeeperOperations::LEN]; keeper_state.errors_for_epoch = [0; KeeperOperations::LEN]; + keeper_state.txs_for_epoch = [0; KeeperOperations::LEN]; + + keeper_state.created_accounts_for_epoch = [0; KeeperCreates::LEN]; } // Always update the epoch info @@ -62,23 +64,35 @@ pub async fn pre_create_update( // Should be called after `pre_create_update` pub async fn create_missing_accounts( - client: &Arc, - keypair: &Arc, - program_id: &Pubkey, + keeper_config: &KeeperConfig, keeper_state: &KeeperState, -) -> Result<(), Box> { - // Create Missing Accounts - create_missing_validator_history_accounts(client, keypair, program_id, keeper_state).await?; +) -> Result, Box> { + let client = &keeper_config.client; + let program_id = &keeper_config.program_id; + let keypair = &keeper_config.keypair; - Ok(()) + let mut created_accounts_for_epoch = vec![]; + + // Create Missing Accounts + let new_validator_history_accounts = + create_missing_validator_history_accounts(client, keypair, program_id, keeper_state) + .await?; + created_accounts_for_epoch.push(( + KeeperCreates::CreateValidatorHistory, + new_validator_history_accounts, + )); + + Ok(created_accounts_for_epoch) } pub async fn post_create_update( - client: &Arc, - program_id: &Pubkey, - tip_distribution_program_id: &Pubkey, + keeper_config: &KeeperConfig, keeper_state: &mut KeeperState, ) -> Result<(), Box> { + let client = &keeper_config.client; + let program_id = &keeper_config.program_id; + let tip_distribution_program_id = &keeper_config.tip_distribution_program_id; + // Update Validator History Accounts keeper_state.validator_history_map = get_validator_history_map(client, program_id).await?; @@ -235,7 +249,7 @@ async fn create_missing_validator_history_accounts( keypair: &Arc, program_id: &Pubkey, keeper_state: &KeeperState, -) -> Result<(), Box> { +) -> Result> { let vote_accounts = &keeper_state .vote_account_map .keys() @@ -266,7 +280,9 @@ async fn create_missing_validator_history_accounts( }) .collect::>>(); + let accounts_created = create_transactions.len(); + submit_transactions(client, create_transactions, keypair).await?; - Ok(()) + Ok(accounts_created) }