diff --git a/Cargo.lock b/Cargo.lock index 0b149f6198..2c813c5972 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7203,6 +7203,7 @@ name = "solana-tip-distributor" version = "1.16.17" dependencies = [ "anchor-lang", + "async-channel", "clap 4.1.11", "env_logger", "futures 0.3.28", @@ -7212,6 +7213,7 @@ dependencies = [ "jito-tip-payment", "log", "num-traits", + "rand 0.7.3", "serde", "serde_json", "solana-client", @@ -7220,6 +7222,7 @@ dependencies = [ "solana-merkle-tree", "solana-metrics", "solana-program", + "solana-program-runtime", "solana-rpc-client-api", "solana-runtime", "solana-sdk", diff --git a/tip-distributor/Cargo.toml b/tip-distributor/Cargo.toml index 722b99c689..7f8325e599 100644 --- a/tip-distributor/Cargo.toml +++ b/tip-distributor/Cargo.toml @@ -7,29 +7,32 @@ description = "Collection of binaries used to distribute MEV rewards to delegato [dependencies] anchor-lang = { workspace = true } -clap = { version = "=4.1.11", features = ["derive", "env"] } -env_logger = "0.9.0" -futures = "0.3.21" -im = "15.1.0" -itertools = "0.10.3" +async-channel = { workspace = true } +clap = { version = "4.1.11", features = ["derive", "env"] } +env_logger = { workspace = true } +futures = { workspace = true } +im = { workspace = true } +itertools = { workspace = true } jito-tip-distribution = { workspace = true } jito-tip-payment = { workspace = true } -log = "0.4.17" -num-traits = "0.2.15" -serde = "1.0.137" -serde_json = "1.0.81" +log = { workspace = true } +num-traits = { workspace = true } +rand = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } solana-client = { workspace = true } solana-genesis-utils = { workspace = true } solana-ledger = { workspace = true } solana-merkle-tree = { workspace = true } solana-metrics = { workspace = true } solana-program = { workspace = true } +solana-program-runtime = { workspace = true } solana-rpc-client-api = { workspace = true } solana-runtime = { workspace = true } solana-sdk = { workspace = true } solana-stake-program = { workspace = true } thiserror = { workspace = true } -tokio = { workspace = true, features = ["rt-multi-thread", "macros", "sync", "time", "full"] } +tokio = { workspace = true, features = ["rt-multi-thread", "macros", "sync", "time"] } [[bin]] name = "solana-stake-meta-generator" diff --git a/tip-distributor/README.md b/tip-distributor/README.md index c100843a41..a5530bd463 100644 --- a/tip-distributor/README.md +++ b/tip-distributor/README.md @@ -27,8 +27,16 @@ out into the PDA until some slot in epoch N + 1. Due to this we cannot rely on t in the PDAs. We use the bank solely to take a snapshot of delegations, but an RPC node to fetch the PDA lamports for more up-to-date data. ### merkle-root-generator -This script accepts a path to the above JSON file as one of its arguments, and generates a merkle-root. It'll optionally upload the root -on-chain if specified. Additionally, it'll spit the generated merkle trees out into a JSON file. +This script accepts a path to the above JSON file as one of its arguments, and generates a merkle-root into a JSON file. + +### merkle-root-uploader +Uploads the root on-chain. + +### claim-mev-tips +This reads the file outputted by `merkle-root-generator` and finds all eligible accounts to receive mev tips. Transactions +are created and sent to the RPC server. +Example usage: + ## How it works? In order to use this library as the merkle root creator one must follow the following steps: @@ -38,6 +46,8 @@ In order to use this library as the merkle root creator one must follow the foll 1. The snapshot created at `${WHERE_TO_CREATE_SNAPSHOT}` will have the highest slot of `${YOUR_SLOT}`, assuming you downloaded the correct snapshot. 4. Run `stake-meta-generator --ledger-path ${WHERE_TO_CREATE_SNAPSHOT} --tip-distribution-program-id ${PUBKEY} --out-path ${JSON_OUT_PATH} --snapshot-slot ${SLOT} --rpc-url ${URL}` 1. Note: `${WHERE_TO_CREATE_SNAPSHOT}` must be the same in steps 3 & 4. -5. Run `merkle-root-generator --path-to-my-keypair ${KEYPAIR_PATH} --stake-meta-coll-path ${STAKE_META_COLLECTION_JSON} --rpc-url ${URL} --upload-roots ${BOOL} --force-upload-root ${BOOL}` +5. Run `merkle-root-generator --stake-meta-coll-path ${STAKE_META_COLLECTION_JSON} --rpc-url ${URL} --out-path ${MERKLE_ROOT_PATH}` +6. Run `merkle-root-uploader --out-path ${MERKLE_ROOT_PATH} --keypair-path ${KEYPAIR_PATH} --rpc-url ${URL} --tip-distribution-program-id ${PROGRAM_ID}` +7. Run `solana-claim-mev-tips --merkle-trees-path /solana/ledger/autosnapshot/merkle-tree-221615999.json --rpc-url ${URL} --tip-distribution-program-id ${PROGRAM_ID} --keypair-path ${KEYPAIR_PATH}` Voila! diff --git a/tip-distributor/src/bin/claim-mev-tips.rs b/tip-distributor/src/bin/claim-mev-tips.rs index 4a9a789509..4dee099327 100644 --- a/tip-distributor/src/bin/claim-mev-tips.rs +++ b/tip-distributor/src/bin/claim-mev-tips.rs @@ -3,9 +3,13 @@ use { clap::Parser, log::*, + solana_metrics::{datapoint_error, datapoint_info}, solana_sdk::pubkey::Pubkey, solana_tip_distributor::claim_mev_workflow::claim_mev_tips, - std::{path::PathBuf, str::FromStr}, + std::{ + path::PathBuf, + time::{Duration, Instant}, + }, }; #[derive(Parser, Debug)] @@ -16,37 +20,71 @@ struct Args { merkle_trees_path: PathBuf, /// RPC to send transactions through - #[arg(long, env)] + #[arg(long, env, default_value = "http://localhost:8899")] rpc_url: String, /// Tip distribution program ID #[arg(long, env)] - tip_distribution_program_id: String, + tip_distribution_program_id: Pubkey, /// Path to keypair #[arg(long, env)] keypair_path: PathBuf, + + /// Number of unique connections to the RPC server for sending txns + #[arg(long, env, default_value_t = 100)] + rpc_send_connection_count: u64, + + /// Rate-limits the maximum number of requests per RPC connection + #[arg(long, env, default_value_t = 200)] + max_concurrent_rpc_get_reqs: usize, + + #[arg(long, env, default_value_t = 5)] + max_loop_retries: u64, + + /// Limits how long before send loop runs before stopping. Defaults to 10 mins + #[arg(long, env, default_value_t = 10*60)] + max_loop_duration_secs: u64, } -fn main() { +#[tokio::main] +async fn main() { env_logger::init(); - info!("Starting to claim mev tips..."); - let args: Args = Args::parse(); + info!("Starting to claim mev tips..."); + let start = Instant::now(); - let tip_distribution_program_id = Pubkey::from_str(&args.tip_distribution_program_id) - .expect("valid tip_distribution_program_id"); - - if let Err(e) = claim_mev_tips( + match claim_mev_tips( &args.merkle_trees_path, - &args.rpc_url, - &tip_distribution_program_id, + args.rpc_url, + args.rpc_send_connection_count, + args.max_concurrent_rpc_get_reqs, + &args.tip_distribution_program_id, &args.keypair_path, - ) { - panic!("error claiming mev tips: {:?}", e); + args.max_loop_retries, + Duration::from_secs(args.max_loop_duration_secs), + ) + .await + { + Err(e) => datapoint_error!( + "claim_mev_workflow-claim_error", + ("error", 1, i64), + ("err_str", e.to_string(), String), + ( + "merkle_trees_path", + args.merkle_trees_path.to_string_lossy(), + String + ), + ("latency_us", start.elapsed().as_micros(), i64), + ), + Ok(()) => datapoint_info!( + "claim_mev_workflow-claim_completion", + ( + "merkle_trees_path", + args.merkle_trees_path.to_string_lossy(), + String + ), + ("latency_us", start.elapsed().as_micros(), i64), + ), } - info!( - "done claiming mev tips from file {:?}", - args.merkle_trees_path - ); } diff --git a/tip-distributor/src/bin/merkle-root-uploader.rs b/tip-distributor/src/bin/merkle-root-uploader.rs index 9fcd7b8ed7..9000ce66d0 100644 --- a/tip-distributor/src/bin/merkle-root-uploader.rs +++ b/tip-distributor/src/bin/merkle-root-uploader.rs @@ -1,9 +1,6 @@ use { - clap::Parser, - log::info, - solana_sdk::pubkey::Pubkey, - solana_tip_distributor::merkle_root_upload_workflow::upload_merkle_root, - std::{path::PathBuf, str::FromStr}, + clap::Parser, log::info, solana_sdk::pubkey::Pubkey, + solana_tip_distributor::merkle_root_upload_workflow::upload_merkle_root, std::path::PathBuf, }; #[derive(Parser, Debug)] @@ -23,7 +20,15 @@ struct Args { /// Tip distribution program ID #[arg(long, env)] - tip_distribution_program_id: String, + tip_distribution_program_id: Pubkey, + + /// Rate-limits the maximum number of requests per RPC connection + #[arg(long, env, default_value_t = 100)] + max_concurrent_rpc_get_reqs: usize, + + /// Number of transactions to send to RPC at a time. + #[arg(long, env, default_value_t = 64)] + txn_send_batch_size: usize, } fn main() { @@ -31,15 +36,14 @@ fn main() { let args: Args = Args::parse(); - let tip_distribution_program_id = Pubkey::from_str(&args.tip_distribution_program_id) - .expect("valid tip_distribution_program_id"); - info!("starting merkle root uploader..."); if let Err(e) = upload_merkle_root( &args.merkle_root_path, &args.keypair_path, &args.rpc_url, - &tip_distribution_program_id, + &args.tip_distribution_program_id, + args.max_concurrent_rpc_get_reqs, + args.txn_send_batch_size, ) { panic!("failed to upload merkle roots: {:?}", e); } diff --git a/tip-distributor/src/bin/reclaim-rent.rs b/tip-distributor/src/bin/reclaim-rent.rs index 5aa372a27a..27d37d73e3 100644 --- a/tip-distributor/src/bin/reclaim-rent.rs +++ b/tip-distributor/src/bin/reclaim-rent.rs @@ -8,7 +8,7 @@ use { commitment_config::CommitmentConfig, pubkey::Pubkey, signature::read_keypair_file, }, solana_tip_distributor::reclaim_rent_workflow::reclaim_rent, - std::{path::PathBuf, str::FromStr, time::Duration}, + std::{path::PathBuf, time::Duration}, tokio::runtime::Runtime, }; @@ -22,7 +22,7 @@ struct Args { rpc_url: String, /// Tip distribution program ID. - #[arg(long, env, value_parser = Pubkey::from_str)] + #[arg(long, env)] tip_distribution_program_id: Pubkey, /// The keypair signing and paying for transactions. @@ -33,6 +33,14 @@ struct Args { #[arg(long, env, default_value_t = 180)] rpc_timeout_secs: u64, + /// Rate-limits the maximum number of requests per RPC connection + #[arg(long, env, default_value_t = 100)] + max_concurrent_rpc_get_reqs: usize, + + /// Number of transactions to send to RPC at a time. + #[arg(long, env, default_value_t = 64)] + txn_send_batch_size: usize, + /// Specifies whether to reclaim rent on behalf of validators from respective TDAs. #[arg(long, env)] should_reclaim_tdas: bool, @@ -53,6 +61,8 @@ fn main() { ), args.tip_distribution_program_id, read_keypair_file(&args.keypair_path).expect("read keypair file"), + args.max_concurrent_rpc_get_reqs, + args.txn_send_batch_size, args.should_reclaim_tdas, )) { panic!("error reclaiming rent: {e:?}"); diff --git a/tip-distributor/src/claim_mev_workflow.rs b/tip-distributor/src/claim_mev_workflow.rs index 446011b1ae..63ddd1e808 100644 --- a/tip-distributor/src/claim_mev_workflow.rs +++ b/tip-distributor/src/claim_mev_workflow.rs @@ -1,26 +1,35 @@ use { crate::{ - read_json_from_file, sign_and_send_transactions_with_retries, GeneratedMerkleTreeCollection, + read_json_from_file, sign_and_send_transactions_with_retries_multi_rpc, + GeneratedMerkleTreeCollection, TreeNode, FAIL_DELAY, MAX_RETRIES, }, anchor_lang::{AccountDeserialize, InstructionData, ToAccountMetas}, - jito_tip_distribution::state::*, - log::{debug, info, warn}, - solana_client::{nonblocking::rpc_client::RpcClient, rpc_request::RpcError}, + itertools::Itertools, + jito_tip_distribution::state::{ClaimStatus, Config, TipDistributionAccount}, + log::{debug, info}, + solana_client::nonblocking::rpc_client::RpcClient, + solana_metrics::{datapoint_error, datapoint_info, datapoint_warn}, solana_program::{ fee_calculator::DEFAULT_TARGET_LAMPORTS_PER_SIGNATURE, native_token::LAMPORTS_PER_SOL, stake::state::StakeState, system_program, }, - solana_rpc_client_api::client_error, + solana_rpc_client_api::request::MAX_MULTIPLE_ACCOUNTS, solana_sdk::{ + account::Account, commitment_config::CommitmentConfig, instruction::Instruction, pubkey::Pubkey, signature::{read_keypair_file, Signer}, transaction::Transaction, }, - std::{path::PathBuf, time::Duration}, + std::{ + collections::HashMap, + path::PathBuf, + sync::Arc, + time::{Duration, Instant}, + }, thiserror::Error, - tokio::runtime::Builder, + tokio::sync::Semaphore, }; #[derive(Error, Debug)] @@ -32,121 +41,426 @@ pub enum ClaimMevError { JsonError(#[from] serde_json::Error), } -pub fn claim_mev_tips( +pub async fn claim_mev_tips( merkle_root_path: &PathBuf, - rpc_url: &str, + rpc_url: String, + rpc_connection_count: u64, + max_concurrent_rpc_get_reqs: usize, tip_distribution_program_id: &Pubkey, keypair_path: &PathBuf, + max_loop_retries: u64, + max_loop_duration: Duration, ) -> Result<(), ClaimMevError> { - const MAX_RETRY_DURATION: Duration = Duration::from_secs(600); - let merkle_trees: GeneratedMerkleTreeCollection = read_json_from_file(merkle_root_path).expect("read GeneratedMerkleTreeCollection"); - let keypair = read_keypair_file(keypair_path).expect("read keypair file"); + let keypair = Arc::new(read_keypair_file(keypair_path).expect("read keypair file")); + let payer_pubkey = keypair.pubkey(); + let blockhash_rpc_client = Arc::new(RpcClient::new_with_commitment( + rpc_url.clone(), + CommitmentConfig::finalized(), + )); + let rpc_clients = Arc::new( + (0..rpc_connection_count) + .map(|_| { + Arc::new(RpcClient::new_with_commitment( + rpc_url.clone(), + CommitmentConfig::confirmed(), + )) + }) + .collect_vec(), + ); - let tip_distribution_config = - Pubkey::find_program_address(&[Config::SEED], tip_distribution_program_id).0; + let tree_nodes = merkle_trees + .generated_merkle_trees + .iter() + .flat_map(|tree| &tree.tree_nodes) + .collect_vec(); + let stake_acct_min_rent = blockhash_rpc_client + .get_minimum_balance_for_rent_exemption(StakeState::size_of()) + .await + .expect("Failed to calculate min rent"); - let rpc_client = - RpcClient::new_with_commitment(rpc_url.to_string(), CommitmentConfig::finalized()); + // fetch all accounts up front + info!("Starting to fetch accounts"); + let account_fetch_start = Instant::now(); + let tdas = get_batched_accounts( + &blockhash_rpc_client, + max_concurrent_rpc_get_reqs, + merkle_trees + .generated_merkle_trees + .iter() + .map(|tree| tree.tip_distribution_account) + .collect_vec(), + ) + .await + .unwrap() + .into_iter() + .filter_map(|(pubkey, maybe_account)| { + let account = match maybe_account { + Some(account) => account, + None => { + datapoint_warn!( + "claim_mev_workflow-account_error", + ("pubkey", pubkey.to_string(), String), + ("account_type", "tip_distribution_account", String), + ("error", 1, i64), + ("err_type", "fetch", String), + ("err_str", "Failed to fetch Account", String) + ); + return None; + } + }; - let runtime = Builder::new_multi_thread() - .worker_threads(16) - .enable_all() - .build() - .unwrap(); + let account = match TipDistributionAccount::try_deserialize(&mut account.data.as_slice()) { + Ok(a) => a, + Err(e) => { + datapoint_warn!( + "claim_mev_workflow-account_error", + ("pubkey", pubkey.to_string(), String), + ("account_type", "tip_distribution_account", String), + ("error", 1, i64), + ("err_type", "deserialize_tip_distribution_account", String), + ("err_str", e.to_string(), String) + ); + return None; + } + }; + Some((pubkey, account)) + }) + .collect::>(); + + // track balances only + let claimants = get_batched_accounts( + &blockhash_rpc_client, + max_concurrent_rpc_get_reqs, + tree_nodes + .iter() + .map(|tree_node| tree_node.claimant) + .collect_vec(), + ) + .await + .unwrap() + .into_iter() + .map(|(pubkey, maybe_account)| { + ( + pubkey, + maybe_account + .map(|account| account.lamports) + .unwrap_or_default(), + ) + }) + .collect::>(); + + let claim_statuses = get_batched_accounts( + &blockhash_rpc_client, + max_concurrent_rpc_get_reqs, + tree_nodes + .iter() + .map(|tree_node| tree_node.claim_status_pubkey) + .collect_vec(), + ) + .await + .unwrap(); + let account_fetch_elapsed = account_fetch_start.elapsed(); - let mut instructions = Vec::new(); + // Try sending txns to RPC + let mut retries = 0; + let mut failed_transaction_count = 0; + loop { + let transaction_prepare_start = Instant::now(); + let ( + skipped_merkle_root_count, + zero_lamports_count, + already_claimed_count, + below_min_rent_count, + transactions, + ) = build_transactions( + tip_distribution_program_id, + &merkle_trees, + &payer_pubkey, + &tree_nodes, + stake_acct_min_rent, + &tdas, + &claimants, + &claim_statuses, + ); + datapoint_info!( + "claim_mev_workflow-prepare_transactions", + ("tree_node_count", tree_nodes.len(), i64), + ("tda_count", tdas.len(), i64), + ("claimant_count", claimants.len(), i64), + ("claim_status_count", claim_statuses.len(), i64), + ("skipped_merkle_root_count", skipped_merkle_root_count, i64), + ("zero_lamports_count", zero_lamports_count, i64), + ("already_claimed_count", already_claimed_count, i64), + ("below_min_rent_count", below_min_rent_count, i64), + ("transaction_count", transactions.len(), i64), + ( + "account_fetch_latency_us", + account_fetch_elapsed.as_micros(), + i64 + ), + ( + "transaction_prepare_latency_us", + transaction_prepare_start.elapsed().as_micros(), + i64 + ), + ); + + if transactions.is_empty() { + return Ok(()); + } - runtime.block_on(async move { - let start_balance = rpc_client.get_balance(&keypair.pubkey()).await.expect("failed to get balance"); - // heuristic to make sure we have enough funds to cover the rent costs if epoch has many validators + if let Some((start_balance, desired_balance, sol_to_deposit)) = is_sufficient_balance( + &payer_pubkey, + &blockhash_rpc_client, + transactions.len() as u64, + ) + .await { - // most amounts are for 0 lamports. had 1736 non-zero claims out of 164742 - let node_count = merkle_trees.generated_merkle_trees.iter().flat_map(|tree| &tree.tree_nodes).filter(|node| node.amount > 0).count(); - let min_rent_per_claim = rpc_client.get_minimum_balance_for_rent_exemption(ClaimStatus::SIZE).await.expect("Failed to calculate min rent"); - let desired_balance = (node_count as u64).checked_mul(min_rent_per_claim.checked_add(DEFAULT_TARGET_LAMPORTS_PER_SIGNATURE).unwrap()).unwrap(); - if start_balance < desired_balance { - let sol_to_deposit = desired_balance.checked_sub(start_balance).unwrap().checked_add(LAMPORTS_PER_SOL).unwrap().checked_sub(1).unwrap().checked_div(LAMPORTS_PER_SOL).unwrap(); // rounds up to nearest sol - panic!("Expected to have at least {} lamports in {}, current balance is {} lamports, deposit {} SOL to continue.", - desired_balance, &keypair.pubkey(), start_balance, sol_to_deposit) - } + panic!("Expected to have at least {desired_balance} lamports in {payer_pubkey}. Current balance is {start_balance} lamports. Deposit {sol_to_deposit} SOL to continue."); } - let stake_acct_min_rent = rpc_client.get_minimum_balance_for_rent_exemption(StakeState::size_of()).await.expect("Failed to calculate min rent"); - let mut below_min_rent_count: usize = 0; - let mut zero_lamports_count: usize = 0; - for tree in merkle_trees.generated_merkle_trees { - // only claim for ones that have merkle root on-chain - let account = rpc_client.get_account(&tree.tip_distribution_account).await.expect("expected to fetch tip distribution account"); - let fetched_tip_distribution_account = TipDistributionAccount::try_deserialize(&mut account.data.as_slice()).expect("failed to deserialize tip_distribution_account state"); - if fetched_tip_distribution_account.merkle_root.is_none() { - info!( - "not claiming because merkle root isn't uploaded yet. skipped {} claimants for tda: {:?}", - tree.tree_nodes.len(), - tree.tip_distribution_account - ); + let transactions_len = transactions.len(); + + info!("Sending {} tip claim transactions. {zero_lamports_count} would transfer zero lamports, {below_min_rent_count} would be below minimum rent", transactions.len()); + let send_start = Instant::now(); + let (remaining_transactions, new_failed_transaction_count) = + sign_and_send_transactions_with_retries_multi_rpc( + &keypair, + &blockhash_rpc_client, + &rpc_clients, + transactions, + max_loop_duration, + ) + .await; + + datapoint_info!( + "claim_mev_workflow-send_transactions", + ("transaction_count", transactions_len, i64), + ( + "successful_transaction_count", + transactions_len - remaining_transactions.len(), + i64 + ), + ( + "remaining_transaction_count", + remaining_transactions.len(), + i64 + ), + ( + "failed_transaction_count", + new_failed_transaction_count, + i64 + ), + ("send_latency_us", send_start.elapsed().as_micros(), i64), + ); + + if remaining_transactions.is_empty() { + info!("Finished claiming tips. {max_loop_retries} retries. {} remaining mev claim transactions, {failed_transaction_count} failed requests.", remaining_transactions.len()); + return Ok(()); + } + + failed_transaction_count += new_failed_transaction_count; + retries += 1; + if retries >= max_loop_retries { + panic!( + "Failed after {max_loop_retries} retries. {} remaining mev claim transactions, {failed_transaction_count} failed requests.", + remaining_transactions.len(), + ); + } + } +} + +fn build_transactions( + tip_distribution_program_id: &Pubkey, + merkle_trees: &GeneratedMerkleTreeCollection, + payer_pubkey: &Pubkey, + tree_nodes: &[&TreeNode], + stake_acct_min_rent: u64, + tdas: &HashMap, + claimants: &HashMap, + claim_statuses: &HashMap>, +) -> (usize, usize, usize, usize, Vec) { + let tip_distribution_config = + Pubkey::find_program_address(&[Config::SEED], tip_distribution_program_id).0; + let mut skipped_merkle_root_count: usize = 0; + let mut zero_lamports_count: usize = 0; + let mut already_claimed_count: usize = 0; + let mut below_min_rent_count: usize = 0; + let mut instructions = + Vec::with_capacity(tree_nodes.iter().filter(|node| node.amount > 0).count()); + + // prepare instructions to transfer to all claimants + for tree in &merkle_trees.generated_merkle_trees { + let fetched_tip_distribution_account = match tdas.get(&tree.tip_distribution_account) { + Some(account) => account, + None => panic!( + "TDA not found in cache for account: {:?}", + tree.tip_distribution_account + ), + }; + // only claim for ones that have merkle root on-chain + if fetched_tip_distribution_account.merkle_root.is_none() { + info!( + "Merkle root has not uploaded yet. Skipped {} claimants for TDA: {:?}", + tree.tree_nodes.len(), + tree.tip_distribution_account + ); + skipped_merkle_root_count = skipped_merkle_root_count.checked_add(1).unwrap(); + continue; + } + for node in &tree.tree_nodes { + if node.amount == 0 { + zero_lamports_count = zero_lamports_count.checked_add(1).unwrap(); continue; } - for node in tree.tree_nodes { - if node.amount == 0 { - zero_lamports_count = zero_lamports_count.checked_add(1).unwrap(); + + // make sure not previously claimed + match claim_statuses.get(&node.claim_status_pubkey) { + Some(Some(_account)) => { + debug!( + "Claim status account already exists (already paid out). Skipping pubkey: {:?}.", node.claim_status_pubkey, + ); + already_claimed_count = already_claimed_count.checked_add(1).unwrap(); continue; } + None => panic!( + "Account not found in cache for {:?}", + node.claim_status_pubkey + ), + Some(None) => {} // expected to not find ClaimStatus account, don't skip + }; + let current_balance = match claimants.get(&node.claimant) { + Some(balance) => balance, + None => panic!( + "Claimant not found in cache for pubkey: {:?}", + node.claimant + ), + }; - // make sure not previously claimed - match rpc_client.get_account(&node.claim_status_pubkey).await { - Ok(_) => { - debug!("claim status account already exists, skipping pubkey {:?}.", node.claim_status_pubkey); - continue; - } - // expected to not find ClaimStatus account, don't skip - Err(client_error::Error { kind: client_error::ErrorKind::RpcError(RpcError::ForUser(err)), .. }) if err.starts_with("AccountNotFound") => {} - Err(err) => panic!("Unexpected RPC Error: {}", err), + // some older accounts can be rent-paying + // any new transfers will need to make the account rent-exempt (runtime enforced) + let balance_with_tip = current_balance.checked_add(node.amount).unwrap(); + if balance_with_tip < stake_acct_min_rent { + debug!("Current balance + tip claim amount of {balance_with_tip} is less than required rent-exempt of {stake_acct_min_rent} for pubkey: {}. Skipping.", node.claimant); + below_min_rent_count = below_min_rent_count.checked_add(1).unwrap(); + continue; + } + instructions.push(Instruction { + program_id: *tip_distribution_program_id, + data: jito_tip_distribution::instruction::Claim { + proof: node.proof.clone().unwrap(), + amount: node.amount, + bump: node.claim_status_bump, } - - let current_balance = rpc_client.get_balance(&node.claimant).await.expect("Failed to get balance"); - // some older accounts can be rent-paying - // any new transfers will need to make the account rent-exempt (runtime enforced) - if current_balance.checked_add(node.amount).unwrap() < stake_acct_min_rent { - warn!("Current balance + tip claim amount of {} is less than required rent-exempt of {} for pubkey: {}. Skipping.", - current_balance.checked_add(node.amount).unwrap(), stake_acct_min_rent, node.claimant); - below_min_rent_count = below_min_rent_count.checked_add(1).unwrap(); - continue; + .data(), + accounts: jito_tip_distribution::accounts::Claim { + config: tip_distribution_config, + tip_distribution_account: tree.tip_distribution_account, + claimant: node.claimant, + claim_status: node.claim_status_pubkey, + payer: *payer_pubkey, + system_program: system_program::id(), } - instructions.push(Instruction { - program_id: *tip_distribution_program_id, - data: jito_tip_distribution::instruction::Claim { - proof: node.proof.unwrap(), - amount: node.amount, - bump: node.claim_status_bump, - }.data(), - accounts: jito_tip_distribution::accounts::Claim { - config: tip_distribution_config, - tip_distribution_account: tree.tip_distribution_account, - claimant: node.claimant, - claim_status: node.claim_status_pubkey, - payer: keypair.pubkey(), - system_program: system_program::id(), - }.to_account_metas(None), - }); - } + .to_account_metas(None), + }); } + } - let transactions = instructions.into_iter().map(|ix|{ - Transaction::new_with_payer( - &[ix], - Some(&keypair.pubkey()), - ) - }).collect::>(); + let transactions = instructions + .into_iter() + .map(|ix| Transaction::new_with_payer(&[ix], Some(payer_pubkey))) + .collect::>(); + ( + skipped_merkle_root_count, + zero_lamports_count, + already_claimed_count, + below_min_rent_count, + transactions, + ) +} - info!("Sending {} tip claim transactions. {} tried sending zero lamports, {} would be below minimum rent", - &transactions.len(), zero_lamports_count, below_min_rent_count); +/// heuristic to make sure we have enough funds to cover the rent costs if epoch has many validators +/// If insufficient funds, returns start balance, desired balance, and amount of sol to deposit +async fn is_sufficient_balance( + payer: &Pubkey, + rpc_client: &RpcClient, + instruction_count: u64, +) -> Option<(u64, u64, u64)> { + let start_balance = rpc_client + .get_balance(payer) + .await + .expect("Failed to get starting balance"); + // most amounts are for 0 lamports. had 1736 non-zero claims out of 164742 + let min_rent_per_claim = rpc_client + .get_minimum_balance_for_rent_exemption(ClaimStatus::SIZE) + .await + .expect("Failed to calculate min rent"); + let desired_balance = instruction_count + .checked_mul( + min_rent_per_claim + .checked_add(DEFAULT_TARGET_LAMPORTS_PER_SIGNATURE) + .unwrap(), + ) + .unwrap(); + if start_balance < desired_balance { + let sol_to_deposit = desired_balance + .checked_sub(start_balance) + .unwrap() + .checked_add(LAMPORTS_PER_SOL) + .unwrap() + .checked_sub(1) + .unwrap() + .checked_div(LAMPORTS_PER_SOL) + .unwrap(); // rounds up to nearest sol + Some((start_balance, desired_balance, sol_to_deposit)) + } else { + None + } +} - let failed_transactions = sign_and_send_transactions_with_retries(&keypair, &rpc_client, transactions, MAX_RETRY_DURATION).await; - if !failed_transactions.is_empty() { - panic!("failed to send {} transactions", failed_transactions.len()); +/// Fetch accounts in parallel batches with retries. +async fn get_batched_accounts( + rpc_client: &RpcClient, + max_concurrent_rpc_get_reqs: usize, + pubkeys: Vec, +) -> solana_rpc_client_api::client_error::Result>> { + let semaphore = Arc::new(Semaphore::new(max_concurrent_rpc_get_reqs)); + let futs = pubkeys.chunks(MAX_MULTIPLE_ACCOUNTS).map(|pubkeys| { + let semaphore = semaphore.clone(); + + async move { + let _permit = semaphore.acquire_owned().await.unwrap(); // wait until our turn + let mut retries = 0; + loop { + match rpc_client.get_multiple_accounts(pubkeys).await { + Ok(accts) => return Ok(accts), + Err(e) => { + retries += 1; + if retries == MAX_RETRIES { + datapoint_error!( + "claim_mev_workflow-get_batched_accounts_error", + ("pubkeys", format!("{pubkeys:?}"), String), + ("error", 1, i64), + ("err_type", "fetch_account", String), + ("err_str", e.to_string(), String) + ); + return Err(e); + } + tokio::time::sleep(FAIL_DELAY).await; + } + } + } } }); - Ok(()) + let claimant_accounts = futures::future::join_all(futs) + .await + .into_iter() + .collect::>>>>()? // fail on single error + .into_iter() + .flatten() + .collect_vec(); + + Ok(pubkeys.into_iter().zip(claimant_accounts).collect()) } diff --git a/tip-distributor/src/lib.rs b/tip-distributor/src/lib.rs index bd8de90230..a1fd8d316e 100644 --- a/tip-distributor/src/lib.rs +++ b/tip-distributor/src/lib.rs @@ -10,6 +10,7 @@ use { stake_meta_generator_workflow::StakeMetaGeneratorError::CheckedMathError, }, anchor_lang::Id, + itertools::Itertools, jito_tip_distribution::{ program::JitoTipDistribution, state::{ClaimStatus, TipDistributionAccount}, @@ -20,13 +21,16 @@ use { TIP_ACCOUNT_SEED_7, }, log::*, + rand::prelude::SliceRandom, serde::{de::DeserializeOwned, Deserialize, Serialize}, solana_client::{nonblocking::rpc_client::RpcClient, rpc_client::RpcClient as SyncRpcClient}, solana_merkle_tree::MerkleTree, solana_metrics::{datapoint_error, datapoint_warn}, + solana_program::instruction::InstructionError, solana_rpc_client_api::{ client_error::{Error, ErrorKind}, - request::RpcRequest, + request::{RpcError, RpcResponseErrorData}, + response::RpcSimulateTransactionResult, }, solana_sdk::{ account::{AccountSharedData, ReadableAccount}, @@ -35,17 +39,23 @@ use { pubkey::Pubkey, signature::{Keypair, Signature}, stake_history::Epoch, - transaction::{Transaction, TransactionError::AlreadyProcessed}, + transaction::{ + Transaction, + TransactionError::{self}, + }, }, std::{ collections::HashMap, fs::File, io::BufReader, path::PathBuf, - sync::Arc, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, time::{Duration, Instant}, }, - tokio::time::sleep, + tokio::sync::{RwLock, Semaphore}, }; #[derive(Deserialize, Serialize, Debug)] @@ -460,104 +470,212 @@ pub fn derive_tip_distribution_account_address( ) } +pub const MAX_RETRIES: usize = 5; +pub const FAIL_DELAY: Duration = Duration::from_millis(100); + +/// Returns unprocessed transactions, along with fail count +pub async fn sign_and_send_transactions_with_retries_multi_rpc( + signer: &Arc, + blockhash_rpc_client: &Arc, + rpc_clients: &Arc>>, + mut transactions: Vec, + max_loop_duration: Duration, +) -> (Vec, u64) { + let error_count = Arc::new(AtomicU64::default()); + let blockhash = Arc::new(RwLock::new( + blockhash_rpc_client + .get_latest_blockhash() + .await + .expect("fetch latest blockhash"), + )); + let mut rng = rand::thread_rng(); + transactions.shuffle(&mut rng); // shuffle to avoid sending same txns as other claim-tip processes + let (tx, rx) = async_channel::bounded::(2 * rpc_clients.len()); + let dispatcher_handle = { + let blockhash_rpc_client = blockhash_rpc_client.clone(); + let blockhash = blockhash.clone(); + tokio::spawn(async move { + let start = Instant::now(); + let mut last_blockhash_update = Instant::now(); + while start.elapsed() < max_loop_duration && !transactions.is_empty() { + // ensure we always have a recent blockhash + if last_blockhash_update.elapsed() > Duration::from_secs(30) { + let hash = blockhash_rpc_client + .get_latest_blockhash() + .await + .expect("fetch latest blockhash"); + info!( + "Got hash {hash:?}. Sending {} transactions to claim mev tips.", + transactions.len() + ); + *blockhash.write().await = hash; + last_blockhash_update = Instant::now(); + } + match transactions.pop() { + Some(txn) => tx.send(txn).await.unwrap(), + None => break, + } + } + + info!( + "Exited dispatcher thread. {} transactions remain.", + transactions.len() + ); + drop(tx); + transactions + }) + }; + let send_handles = rpc_clients + .iter() + .map(|rpc_client| { + let signer = signer.clone(); + let rx = rx.clone(); + let rpc_client = rpc_client.clone(); + let error_count = error_count.clone(); + let blockhash = blockhash.clone(); + tokio::spawn(async move { + let mut iterations = 0; + while let Ok(txn) = rx.recv().await { + let mut retries = 0; + while retries < MAX_RETRIES { + iterations += 1; + let (_signed_txn, res) = + signed_send(&signer, &rpc_client, *blockhash.read().await, txn.clone()) + .await; + match res { + Ok(_) => break, + Err(_) => { + retries += 1; + error_count.fetch_add(1, Ordering::Relaxed); + tokio::time::sleep(FAIL_DELAY).await; + } + } + } + } + + info!("Exited send thread. Ran {iterations} times."); + }) + }) + .collect_vec(); + + for handle in send_handles { + if let Err(e) = handle.await { + warn!("Error joining handle: {e:?}") + } + } + let transactions = dispatcher_handle.await.unwrap(); + (transactions, error_count.load(Ordering::Relaxed)) +} + pub async fn sign_and_send_transactions_with_retries( signer: &Keypair, rpc_client: &RpcClient, + max_concurrent_rpc_get_reqs: usize, transactions: Vec, - max_retry_duration: Duration, -) -> HashMap { - use tokio::sync::Semaphore; - const MAX_CONCURRENT_RPC_CALLS: usize = 50; - let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_RPC_CALLS)); - + txn_send_batch_size: usize, + max_loop_duration: Duration, +) -> (Vec, HashMap) { + let semaphore = Arc::new(Semaphore::new(max_concurrent_rpc_get_reqs)); let mut errors = HashMap::default(); let mut blockhash = rpc_client .get_latest_blockhash() .await .expect("fetch latest blockhash"); - - let mut signatures_to_transactions = transactions + // track unsigned txns + let mut transactions_to_process = transactions .into_iter() - .map(|mut tx| { - tx.sign(&[signer], blockhash); - (tx.signatures[0], tx) - }) - .collect::>(); + .map(|txn| (txn.message_data(), txn)) + .collect::, Transaction>>(); let start = Instant::now(); - while start.elapsed() < max_retry_duration && !signatures_to_transactions.is_empty() { + while start.elapsed() < max_loop_duration && !transactions_to_process.is_empty() { + // ensure we always have a recent blockhash if start.elapsed() > Duration::from_secs(60) { blockhash = rpc_client .get_latest_blockhash() .await .expect("fetch latest blockhash"); - signatures_to_transactions - .iter_mut() - .for_each(|(_sig, tx)| { - *tx = Transaction::new_unsigned(tx.message.clone()); - tx.sign(&[signer], blockhash); - }); } + info!( + "Sending {txn_send_batch_size} of {} transactions to claim mev tips", + transactions_to_process.len() + ); + let send_futs = transactions_to_process + .iter() + .take(txn_send_batch_size) + .map(|(hash, txn)| { + let semaphore = semaphore.clone(); + async move { + let _permit = semaphore.acquire_owned().await.unwrap(); // wait until our turn + let (txn, res) = signed_send(signer, rpc_client, blockhash, txn.clone()).await; + (hash.clone(), txn, res) + } + }); - let futs = signatures_to_transactions.iter().map(|(sig, tx)| { - let semaphore = semaphore.clone(); - async move { - let permit = semaphore.clone().acquire_owned().await.unwrap(); - let res = match rpc_client.send_transaction(tx).await { - Ok(sig) => { - info!("sent transaction: {sig:?}"); - drop(permit); - sleep(Duration::from_secs(10)).await; - - let _permit = semaphore.acquire_owned().await.unwrap(); - match rpc_client.confirm_transaction(&sig).await { - Ok(true) => Ok(()), - Ok(false) => Err(Error::new_with_request( - ErrorKind::Custom("transaction failed to confirm".to_string()), - RpcRequest::SendTransaction, - )), - Err(e) => Err(e), - } - } - Err(e) => Err(e), - }; - - let res = res - .err() - .map(|e| { - if let ErrorKind::TransactionError(AlreadyProcessed) = e.kind { - Ok(()) - } else { - error!("error sending transaction {sig:?} error: {e:?}"); - Err(e) - } - }) - .unwrap_or(Ok(())); - - (*sig, res) - } - }); - - errors = futures::future::join_all(futs) - .await + let send_res = futures::future::join_all(send_futs).await; + let new_errors = send_res .into_iter() - .filter(|(sig, result)| { - if result.is_err() { - true - } else { - let _ = signatures_to_transactions.remove(sig); - false + .filter_map(|(hash, txn, result)| match result { + Err(e) => Some((txn.signatures[0], e)), + Ok(..) => { + let _ = transactions_to_process.remove(&hash); + None } }) - .map(|(sig, result)| { - let e = result.err().unwrap(); - warn!("error sending transaction: [error={e}, signature={sig}]"); - (sig, e) - }) .collect::>(); + + errors.extend(new_errors); } - errors + (transactions_to_process.values().cloned().collect(), errors) +} +async fn signed_send( + signer: &Keypair, + rpc_client: &RpcClient, + blockhash: Hash, + mut txn: Transaction, +) -> (Transaction, solana_rpc_client_api::client_error::Result<()>) { + txn.sign(&[signer], blockhash); // just in time signing + let res = match rpc_client.send_and_confirm_transaction(&txn).await { + Ok(_) => Ok(()), + Err(e) => { + match e.kind { + // Already claimed, skip. + ErrorKind::TransactionError(TransactionError::AlreadyProcessed) + | ErrorKind::TransactionError(TransactionError::InstructionError( + 0, + InstructionError::Custom(0), + )) + | ErrorKind::RpcError(RpcError::RpcResponseError { + data: + RpcResponseErrorData::SendTransactionPreflightFailure( + RpcSimulateTransactionResult { + err: + Some(TransactionError::InstructionError( + 0, + InstructionError::Custom(0), + )), + .. + }, + ), + .. + }) => Ok(()), + + // transaction got held up too long and blockhash expired. retry txn + ErrorKind::TransactionError(TransactionError::BlockhashNotFound) => Err(e), + + _ => { + error!( + "Error sending transaction. Signature: {}, Error: {e:?}", + txn.signatures[0] + ); + Err(e) + } + } + } + }; + + (txn, res) } mod pubkey_string_conversion { diff --git a/tip-distributor/src/merkle_root_upload_workflow.rs b/tip-distributor/src/merkle_root_upload_workflow.rs index cc75797f05..e40465581f 100644 --- a/tip-distributor/src/merkle_root_upload_workflow.rs +++ b/tip-distributor/src/merkle_root_upload_workflow.rs @@ -38,6 +38,8 @@ pub fn upload_merkle_root( keypair_path: &PathBuf, rpc_url: &str, tip_distribution_program_id: &Pubkey, + max_concurrent_rpc_get_reqs: usize, + txn_send_batch_size: usize, ) -> Result<(), MerkleRootUploadError> { const MAX_RETRY_DURATION: Duration = Duration::from_secs(600); @@ -124,9 +126,11 @@ pub fn upload_merkle_root( ) }) .collect(); - let failed_transactions = sign_and_send_transactions_with_retries(&keypair, &rpc_client, transactions, MAX_RETRY_DURATION).await; - if !failed_transactions.is_empty() { - panic!("failed to send {} transactions", failed_transactions.len()); + + let (to_process, failed_transactions) = sign_and_send_transactions_with_retries( + &keypair, &rpc_client, max_concurrent_rpc_get_reqs, transactions, txn_send_batch_size, MAX_RETRY_DURATION).await; + if !to_process.is_empty() { + panic!("{} remaining mev claim transactions, {} failed requests.", to_process.len(), failed_transactions.len()); } }); diff --git a/tip-distributor/src/reclaim_rent_workflow.rs b/tip-distributor/src/reclaim_rent_workflow.rs index da8d6c6362..2c08a95e11 100644 --- a/tip-distributor/src/reclaim_rent_workflow.rs +++ b/tip-distributor/src/reclaim_rent_workflow.rs @@ -29,6 +29,8 @@ pub async fn reclaim_rent( rpc_client: RpcClient, tip_distribution_program_id: Pubkey, signer: Keypair, + max_concurrent_rpc_get_reqs: usize, + txn_send_batch_size: usize, // Optionally reclaim TipDistributionAccount rents on behalf of validators. should_reclaim_tdas: bool, ) -> Result<(), Box> { @@ -152,15 +154,21 @@ pub async fn reclaim_rent( } info!("sending {} transactions", transactions.len()); - let failed_txs = sign_and_send_transactions_with_retries( + let (to_process, failed_transactions) = sign_and_send_transactions_with_retries( &signer, &rpc_client, + max_concurrent_rpc_get_reqs, transactions, + txn_send_batch_size, Duration::from_secs(300), ) .await; - if !failed_txs.is_empty() { - panic!("failed to send {} transactions", failed_txs.len()); + if !to_process.is_empty() { + panic!( + "{} remaining mev claim transactions, {} failed requests.", + to_process.len(), + failed_transactions.len() + ); } Ok(())