Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX: Keeper now works with new metrics #44

Merged
merged 17 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ test-ledger
**/targets
**/credentials
**/config
**/*.env
**/*.env
**/.vscode
/scripts
5 changes: 0 additions & 5 deletions .vscode/settings.json

This file was deleted.

238 changes: 208 additions & 30 deletions keepers/keeper-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use std::{collections::HashMap, sync::Arc, time::Duration};

use clap::ValueEnum;
use log::*;
use solana_client::rpc_response::RpcVoteAccountInfo;
use solana_client::rpc_response::{Response, RpcSimulateTransactionResult, RpcVoteAccountInfo};
use solana_client::{client_error::ClientError, nonblocking::rpc_client::RpcClient};
use solana_metrics::datapoint_error;
use solana_program::hash::Hash;
use solana_sdk::compute_budget::ComputeBudgetInstruction;
use solana_sdk::packet::PACKET_DATA_SIZE;
Expand All @@ -21,6 +22,8 @@ use thiserror::Error as ThisError;
use tokio::task::{self, JoinError};
use tokio::time::sleep;

const DEFAULT_COMPUTE_LIMIT: usize = 200_000;

#[derive(Debug, Default, Clone)]
pub struct SubmitStats {
pub successes: u64,
Expand Down Expand Up @@ -114,6 +117,70 @@ pub async fn get_multiple_accounts_batched(
Ok(accounts_result)
}

async fn simulate_instruction(
client: &RpcClient,
instruction: &Instruction,
signer: &Keypair,
priority_fee_in_microlamports: u64,
max_cu_per_tx: u32,
) -> Result<Response<RpcSimulateTransactionResult>, ClientError> {
let latest_blockhash = get_latest_blockhash_with_retry(client).await?;

let test_tx = Transaction::new_signed_with_payer(
&[
ComputeBudgetInstruction::set_compute_unit_limit(max_cu_per_tx),
ComputeBudgetInstruction::set_compute_unit_price(priority_fee_in_microlamports),
instruction.to_owned(),
],
Some(&signer.pubkey()),
&[signer],
latest_blockhash,
);

client.simulate_transaction(&test_tx).await
}

async fn simulate_instruction_with_retry(
client: &RpcClient,
instruction: &Instruction,
signer: &Keypair,
priority_fee_in_microlamports: u64,
max_cu_per_tx: u32,
) -> Result<Response<RpcSimulateTransactionResult>, ClientError> {
for _ in 0..5 {
match simulate_instruction(
client,
instruction,
signer,
priority_fee_in_microlamports,
max_cu_per_tx,
)
.await
{
Ok(response) => match response.value.err {
Some(e) => {
if e == TransactionError::BlockhashNotFound {
sleep(Duration::from_secs(3)).await;
} else {
return Err(e.into());
}
}
None => return Ok(response),
},
Err(e) => return Err(e),
}
}

simulate_instruction(
client,
instruction,
signer,
priority_fee_in_microlamports,
max_cu_per_tx,
)
.await
}

async fn get_latest_blockhash_with_retry(client: &RpcClient) -> Result<Hash, ClientError> {
for _ in 1..4 {
let result = client
Expand Down Expand Up @@ -174,46 +241,64 @@ pub async fn get_vote_accounts_with_retry(
}
}

const DEFAULT_COMPUTE_LIMIT: usize = 200_000;

async fn calculate_instructions_per_tx(
client: &RpcClient,
instructions: &[Instruction],
signer: &Keypair,
async fn find_ix_per_tx(
client: &Arc<RpcClient>,
instruction: &Instruction,
signer: &Arc<Keypair>,
priority_fee_in_microlamports: u64,
max_cu_per_tx: u32,
) -> Result<usize, ClientError> {
let blockhash = get_latest_blockhash_with_retry(client).await?;
let test_tx = Transaction::new_signed_with_payer(
&[instructions[0].to_owned()],
&[instruction.to_owned()],
Some(&signer.pubkey()),
&[signer],
blockhash,
);
let response = client.simulate_transaction(&test_tx).await?;

let response = simulate_instruction_with_retry(
client,
instruction,
signer,
priority_fee_in_microlamports,
max_cu_per_tx,
)
.await?;
if let Some(err) = response.value.clone().err {
debug!("Simulation error: {:?}", response.value);
return Err(err.into());
error!("Simulation error: {} {:?}", max_cu_per_tx, response.value);

datapoint_error!(
"simulation-error",
("error", err.to_string(), String),
("instruction", format!("{:?}", instruction), String)
);

return Err(err.into()); // Return the error immediately, stopping further execution
}
let compute = response
.value
.units_consumed
.unwrap_or(DEFAULT_COMPUTE_LIMIT as u64);

let serialized_size = Packet::from_data(None, &test_tx).unwrap().meta().size;

// additional size per ix
let size_per_ix =
instructions[0].accounts.len() * size_of::<AccountMeta>() + instructions[0].data.len();
instruction.accounts.len() * size_of::<AccountMeta>() + instruction.data.len();
let size_max = (PACKET_DATA_SIZE - serialized_size + size_per_ix) / size_per_ix;

let compute_max = DEFAULT_COMPUTE_LIMIT / compute as usize;
let compute_max = max_cu_per_tx as usize / compute as usize;

let size = size_max.min(compute_max);

Ok(size_max.min(compute_max))
Ok(size)
}

async fn parallel_confirm_transactions(
client: &RpcClient,
submitted_signatures: HashSet<Signature>,
) -> HashSet<Signature> {
// Confirmes TXs in batches of 256 (max allowed by RPC method). Returns confirmed signatures
// Confirms TXs in batches of 256 (max allowed by RPC method). Returns confirmed signatures
const SIG_STATUS_BATCH_SIZE: usize = 256;
let num_transactions_submitted = submitted_signatures.len();
let signatures_to_confirm = submitted_signatures.into_iter().collect::<Vec<_>>();
Expand Down Expand Up @@ -364,13 +449,69 @@ pub async fn parallel_execute_transactions(
Ok(results)
}

pub async fn pack_instructions(
client: &Arc<RpcClient>,
instructions: &[Instruction],
signer: &Arc<Keypair>,
priority_fee_in_microlamports: u64,
max_cu_per_tx: u32,
) -> Result<Vec<Vec<Instruction>>, Box<dyn std::error::Error>> {
let mut instructions_with_grouping: Vec<(&Instruction, usize)> = Vec::new();

for instruction in instructions.iter() {
let result = find_ix_per_tx(
client,
instruction,
signer,
priority_fee_in_microlamports,
max_cu_per_tx,
)
.await;

match result {
Ok(ix_per_tx) => {
instructions_with_grouping.push((instruction, ix_per_tx));
}
Err(e) => {
error!("Could not simulate instruction: {:?}", e);
// Skip this instruction if there is an error
continue;
}
}
}

// Group instructions by their grouping size
let mut grouped_instructions: HashMap<usize, Vec<&Instruction>> = HashMap::new();
for (instruction, group_size) in instructions_with_grouping {
grouped_instructions
.entry(group_size)
.or_default()
.push(instruction);
}

// Convert HashMap to Vec<Vec<&Instruction>>, ensuring each group meets the length requirement
let mut result: Vec<Vec<Instruction>> = Vec::new();
for (group_number, group) in grouped_instructions {
for chunk in group.chunks(group_number) {
let mut tx_instructions = Vec::new();
for instruction in chunk {
tx_instructions.push((*instruction).clone());
}
result.push(tx_instructions);
}
}

Ok(result)
}

pub async fn parallel_execute_instructions(
client: &Arc<RpcClient>,
instructions: &[Instruction],
signer: &Arc<Keypair>,
retry_count: u16,
confirmation_time: u64,
microlamports: u64,
priority_fee_in_microlamports: u64,
max_cu_per_tx: Option<u32>,
) -> Result<Vec<Result<(), SendTransactionError>>, TransactionExecutionError> {
/*
Note: Assumes all instructions are equivalent in compute, equivalent in size, and can be executed in any order
Expand All @@ -387,20 +528,40 @@ pub async fn parallel_execute_instructions(
return Ok(vec![]);
}

let instructions_per_tx = calculate_instructions_per_tx(client, instructions, signer)
.await
.map_err(|e| TransactionExecutionError::ClientError(e.to_string()))?
- 1;
// let instructions_per_tx = calculate_instructions_per_tx(
// client,
// instructions,
// signer,
// priority_fee_in_microlamports,
// max_cu_per_tx,
// )
// .await
// .map_err(|e| TransactionExecutionError::ClientError(e.to_string()))?
// - 1;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can delete this


let max_cu_per_tx = max_cu_per_tx.unwrap_or(DEFAULT_COMPUTE_LIMIT as u32);

let mut transactions: Vec<Vec<Instruction>> = pack_instructions(
client,
instructions,
signer,
priority_fee_in_microlamports,
max_cu_per_tx,
)
.await
.map_err(|e| TransactionExecutionError::ClientError(e.to_string()))?;

let mut transactions: Vec<Vec<Instruction>> = instructions
.chunks(instructions_per_tx)
.map(|c| c.to_vec())
.collect();
for tx in transactions.iter_mut() {
tx.insert(
0,
ComputeBudgetInstruction::set_compute_unit_price(microlamports),
ComputeBudgetInstruction::set_compute_unit_price(priority_fee_in_microlamports),
);
if max_cu_per_tx != DEFAULT_COMPUTE_LIMIT as u32 {
tx.insert(
0,
ComputeBudgetInstruction::set_compute_unit_limit(max_cu_per_tx),
);
}
}
let transactions: Vec<&[Instruction]> = transactions.iter().map(|c| c.as_slice()).collect();

Expand Down Expand Up @@ -474,11 +635,20 @@ pub async fn submit_instructions(
client: &Arc<RpcClient>,
instructions: Vec<Instruction>,
keypair: &Arc<Keypair>,
microlamports: u64,
priority_fee_in_microlamports: u64,
max_cu_per_tx: Option<u32>,
) -> Result<SubmitStats, TransactionExecutionError> {
let mut stats = SubmitStats::default();
match parallel_execute_instructions(client, &instructions, keypair, 100, 20, microlamports)
.await
match parallel_execute_instructions(
client,
&instructions,
keypair,
100,
20,
priority_fee_in_microlamports,
max_cu_per_tx,
)
.await
{
Ok(results) => {
stats.successes = results.iter().filter(|&tx| tx.is_ok()).count() as u64;
Expand All @@ -495,10 +665,18 @@ pub async fn submit_create_and_update(
create_transactions: Vec<Vec<Instruction>>,
update_instructions: Vec<Instruction>,
keypair: &Arc<Keypair>,
microlamports: u64,
priority_fee_in_microlamports: u64,
max_cu_per_tx: Option<u32>,
) -> Result<CreateUpdateStats, TransactionExecutionError> {
Ok(CreateUpdateStats {
creates: submit_transactions(client, create_transactions, keypair).await?,
updates: submit_instructions(client, update_instructions, keypair, microlamports).await?,
updates: submit_instructions(
client,
update_instructions,
keypair,
priority_fee_in_microlamports,
max_cu_per_tx,
)
.await?,
})
}
Loading