Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce redundant keeper retries, emit keeper balance #31

Merged
merged 11 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ jobs:
uses: baptiste0928/cargo-install@v3
with:
crate: cargo-udeps
version: 0.1.47 # Remove once anchor 0.30 is merged
- run: cargo +nightly-2023-10-05 udeps --all-features --all-targets --tests

verified_build:
Expand Down
5 changes: 5 additions & 0 deletions keepers/keeper-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use solana_sdk::{
};
use thiserror::Error as ThisError;
use tokio::task::{self, JoinError};
use tokio::time::sleep;

#[derive(Debug, Default, Clone)]
pub struct SubmitStats {
Expand Down Expand Up @@ -300,6 +301,10 @@ pub async fn parallel_execute_transactions(
if results[idx].is_ok() {
continue;
}
if idx % 50 == 0 {
// Need to avoid spamming the rpc or lots of transactions will get dropped
sleep(Duration::from_secs(3)).await;
}

// Future optimization: submit these in parallel batches and refresh blockhash for every batch
match client.send_transaction(tx).await {
Expand Down
64 changes: 53 additions & 11 deletions keepers/validator-keeper/src/gossip.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
collections::HashMap,
net::{IpAddr, SocketAddr},
str::FromStr,
sync::{
Expand All @@ -22,6 +23,7 @@ use solana_gossip::{
};
use solana_metrics::datapoint_info;
use solana_sdk::{
compute_budget::ComputeBudgetInstruction,
instruction::Instruction,
pubkey::Pubkey,
signature::{Keypair, Signable, Signature},
Expand All @@ -31,10 +33,10 @@ use tokio::time::sleep;
use validator_history::{
self,
constants::{MAX_ALLOC_BYTES, MIN_VOTE_EPOCHS},
Config, ValidatorHistory,
Config, ValidatorHistory, ValidatorHistoryEntry,
};

use crate::{get_validator_history_accounts_with_retry, start_spy_server};
use crate::{get_validator_history_accounts_with_retry, start_spy_server, PRIORITY_FEE};

#[derive(Clone, Debug)]
pub struct GossipEntry {
Expand Down Expand Up @@ -115,12 +117,16 @@ impl CreateTransaction for GossipEntry {
}

impl GossipEntry {
pub fn build_update_tx(&self) -> Vec<Instruction> {
let mut ixs = vec![build_verify_signature_ix(
self.signature.as_ref(),
self.identity.to_bytes(),
&self.message,
)];
pub fn build_update_tx(&self, priority_fee: u64) -> Vec<Instruction> {
let mut ixs = vec![
ComputeBudgetInstruction::set_compute_unit_limit(100_000),
ComputeBudgetInstruction::set_compute_unit_price(priority_fee),
build_verify_signature_ix(
self.signature.as_ref(),
self.identity.to_bytes(),
&self.message,
),
];

ixs.push(Instruction {
program_id: self.program_id,
Expand Down Expand Up @@ -293,8 +299,19 @@ pub async fn upload_gossip_values(
let validator_history_accounts =
get_validator_history_accounts_with_retry(&client, *program_id).await?;

let validator_history_map = HashMap::from_iter(validator_history_accounts.iter().map(|vh| {
(
Pubkey::find_program_address(
&[ValidatorHistory::SEED, &vh.vote_account.to_bytes()],
program_id,
)
.0,
vh,
)
}));

// Wait for all active validators to be received
sleep(Duration::from_secs(30)).await;
sleep(Duration::from_secs(150)).await;

let gossip_entries = {
let crds = cluster_info.gossip.crds.read().map_err(|e| e.to_string())?;
Expand All @@ -321,10 +338,19 @@ pub async fn upload_gossip_values(

exit.store(true, Ordering::Relaxed);

let epoch = client.get_epoch_info().await?.epoch;

let addresses = gossip_entries
.iter()
.map(|a| a.address())
.filter_map(|a| {
if gossip_data_uploaded(&validator_history_map, a.address(), epoch) {
None
} else {
Some(a.address())
}
})
.collect::<Vec<Pubkey>>();

let existing_accounts_response = get_multiple_accounts_batched(&addresses, &client).await?;

let create_transactions = existing_accounts_response
Expand All @@ -341,7 +367,7 @@ pub async fn upload_gossip_values(

let update_transactions = gossip_entries
.iter()
.map(|entry| entry.build_update_tx())
.map(|entry| entry.build_update_tx(PRIORITY_FEE))
.collect::<Vec<_>>();

Ok(CreateUpdateStats {
Expand All @@ -350,6 +376,22 @@ pub async fn upload_gossip_values(
})
}

fn gossip_data_uploaded(
validator_history_map: &HashMap<Pubkey, &ValidatorHistory>,
vote_account: Pubkey,
epoch: u64,
) -> bool {
if let Some(validator_history) = validator_history_map.get(&vote_account) {
if let Some(latest_entry) = validator_history.history.last() {
return latest_entry.epoch == epoch as u16
&& latest_entry.ip != ValidatorHistoryEntry::default().ip
&& latest_entry.version.major != ValidatorHistoryEntry::default().version.major
&& latest_entry.client_type != ValidatorHistoryEntry::default().client_type;
}
}
false
}

// CODE BELOW SLIGHTLY MODIFIED FROM
// solana_sdk/src/ed25519_instruction.rs

Expand Down
28 changes: 27 additions & 1 deletion keepers/validator-keeper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub mod vote_account;

pub type Error = Box<dyn std::error::Error>;

pub const PRIORITY_FEE: u64 = 500_000;
pub const PRIORITY_FEE: u64 = 200_000;

#[derive(ThisError, Debug)]
pub enum KeeperError {
Expand Down Expand Up @@ -134,6 +134,7 @@ pub fn emit_cluster_history_datapoint(stats: SubmitStats, runs_for_epoch: i64) {
pub async fn emit_validator_history_metrics(
client: &Arc<RpcClient>,
program_id: Pubkey,
keeper_address: Pubkey,
) -> Result<(), Box<dyn std::error::Error>> {
let epoch = client.get_epoch_info().await?;

Expand Down Expand Up @@ -199,6 +200,8 @@ pub async fn emit_validator_history_metrics(
.await?
.len();

let keeper_balance = get_balance_with_retry(client, keeper_address).await?;

datapoint_info!(
"validator-history-stats",
("num_validator_histories", num_validators, i64),
Expand All @@ -218,6 +221,11 @@ pub async fn emit_validator_history_metrics(
),
);

datapoint_info!(
"stakenet-keeper-stats",
("balance_lamports", keeper_balance, i64),
);

Ok(())
}

Expand Down Expand Up @@ -262,6 +270,24 @@ pub async fn get_validator_history_accounts_with_retry(
get_validator_history_accounts(client, program_id).await
}

pub async fn get_balance_with_retry(
client: &RpcClient,
account: Pubkey,
) -> Result<u64, ClientError> {
let mut retries = 5;
loop {
match client.get_balance(&account).await {
Ok(balance) => return Ok(balance),
Err(e) => {
if retries == 0 {
return Err(e);
}
retries -= 1;
}
}
}
}

pub fn start_spy_server(
cluster_entrypoint: SocketAddr,
gossip_port: u16,
Expand Down
26 changes: 10 additions & 16 deletions keepers/validator-keeper/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ and the updating of the various data feeds within the accounts.
It will emits metrics for each data feed, if env var SOLANA_METRICS_CONFIG is set to a valid influx server.
*/

use std::{collections::HashMap, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};

use clap::{arg, command, Parser};
use keeper_core::{Cluster, CreateUpdateStats, SubmitStats, TransactionExecutionError};
Expand All @@ -13,7 +13,7 @@ use solana_client::nonblocking::rpc_client::RpcClient;
use solana_metrics::{datapoint_error, set_host_id};
use solana_sdk::{
pubkey::Pubkey,
signature::{read_keypair_file, Keypair},
signature::{read_keypair_file, Keypair, Signer},
};
use tokio::time::sleep;
use validator_keeper::{
Expand Down Expand Up @@ -67,9 +67,14 @@ struct Args {
cluster: Cluster,
}

async fn monitoring_loop(client: Arc<RpcClient>, program_id: Pubkey, interval: u64) {
async fn monitoring_loop(
client: Arc<RpcClient>,
program_id: Pubkey,
keeper_address: Pubkey,
interval: u64,
) {
loop {
match emit_validator_history_metrics(&client, program_id).await {
match emit_validator_history_metrics(&client, program_id, keeper_address).await {
Ok(_) => {}
Err(e) => {
error!("Failed to emit validator history metrics: {}", e);
Expand All @@ -86,10 +91,6 @@ async fn mev_commission_loop(
tip_distribution_program_id: Pubkey,
interval: u64,
) {
let mut prev_epoch = 0;
// {TipDistributionAccount : VoteAccount}
let mut validators_updated: HashMap<Pubkey, Pubkey> = HashMap::new();

loop {
// Continuously runs throughout an epoch, polling for new tip distribution accounts
// and submitting update txs when new accounts are detected
Expand All @@ -98,8 +99,6 @@ async fn mev_commission_loop(
keypair.clone(),
&commission_history_program_id,
&tip_distribution_program_id,
&mut validators_updated,
&mut prev_epoch,
)
.await
{
Expand Down Expand Up @@ -141,10 +140,6 @@ async fn mev_earned_loop(
tip_distribution_program_id: Pubkey,
interval: u64,
) {
let mut curr_epoch = 0;
// {TipDistributionAccount : VoteAccount}
let mut validators_updated: HashMap<Pubkey, Pubkey> = HashMap::new();

loop {
// Continuously runs throughout an epoch, polling for tip distribution accounts from the prev epoch with uploaded merkle roots
// and submitting update_mev_earned (technically update_mev_comission) txs when the uploaded merkle roots are detected
Expand All @@ -153,8 +148,6 @@ async fn mev_earned_loop(
&keypair,
&commission_history_program_id,
&tip_distribution_program_id,
&mut validators_updated,
&mut curr_epoch,
)
.await
{
Expand Down Expand Up @@ -493,6 +486,7 @@ async fn main() {
tokio::spawn(monitoring_loop(
Arc::clone(&client),
args.program_id,
keypair.pubkey(),
args.interval,
));

Expand Down
Loading
Loading