Skip to content

Commit

Permalink
Reduce redundant keeper retries, emit keeper balance
Browse files Browse the repository at this point in the history
  • Loading branch information
ebatsell committed Apr 15, 2024
1 parent d813d4f commit 5a40ac0
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 77 deletions.
41 changes: 39 additions & 2 deletions keepers/validator-keeper/src/gossip.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
collections::HashMap,
net::{IpAddr, SocketAddr},
str::FromStr,
sync::{
Expand Down Expand Up @@ -31,7 +32,7 @@ use tokio::time::sleep;
use validator_history::{
self,
constants::{MAX_ALLOC_BYTES, MIN_VOTE_EPOCHS},
Config, ValidatorHistory,
Config, ValidatorHistory, ValidatorHistoryEntry,
};

use crate::{get_validator_history_accounts_with_retry, start_spy_server};
Expand Down Expand Up @@ -293,6 +294,12 @@ pub async fn upload_gossip_values(
let validator_history_accounts =
get_validator_history_accounts_with_retry(&client, *program_id).await?;

let validator_history_map = HashMap::from_iter(
validator_history_accounts
.iter()
.map(|vh| (vh.vote_account, vh)),
);

// Wait for all active validators to be received
sleep(Duration::from_secs(30)).await;

Expand Down Expand Up @@ -321,10 +328,19 @@ pub async fn upload_gossip_values(

exit.store(true, Ordering::Relaxed);

let epoch = client.get_epoch_info().await?.epoch;

let addresses = gossip_entries
.iter()
.map(|a| a.address())
.filter_map(|a| {
if gossip_data_uploaded(&validator_history_map, a.address(), epoch) {
None
} else {
Some(a.address())
}
})
.collect::<Vec<Pubkey>>();

let existing_accounts_response = get_multiple_accounts_batched(&addresses, &client).await?;

let create_transactions = existing_accounts_response
Expand All @@ -350,6 +366,27 @@ pub async fn upload_gossip_values(
})
}

fn gossip_data_uploaded(
validator_history_map: &HashMap<Pubkey, &ValidatorHistory>,
vote_account: Pubkey,
epoch: u64,
) -> bool {
let validator_history = validator_history_map.get(&vote_account);
if validator_history.is_none() {
return false;
}

let validator_history = validator_history.unwrap();

if let Some(latest_entry) = validator_history.history.last() {
return latest_entry.epoch == epoch as u16
&& latest_entry.ip != ValidatorHistoryEntry::default().ip
&& latest_entry.version.major != ValidatorHistoryEntry::default().version.major
&& latest_entry.client_type != ValidatorHistoryEntry::default().client_type;
}
false
}

// CODE BELOW SLIGHTLY MODIFIED FROM
// solana_sdk/src/ed25519_instruction.rs

Expand Down
28 changes: 27 additions & 1 deletion keepers/validator-keeper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub mod vote_account;

pub type Error = Box<dyn std::error::Error>;

pub const PRIORITY_FEE: u64 = 500_000;
pub const PRIORITY_FEE: u64 = 100_000;

#[derive(ThisError, Debug)]
pub enum KeeperError {
Expand Down Expand Up @@ -134,6 +134,7 @@ pub fn emit_cluster_history_datapoint(stats: SubmitStats, runs_for_epoch: i64) {
pub async fn emit_validator_history_metrics(
client: &Arc<RpcClient>,
program_id: Pubkey,
keeper_address: Pubkey,
) -> Result<(), Box<dyn std::error::Error>> {
let epoch = client.get_epoch_info().await?;

Expand Down Expand Up @@ -199,6 +200,8 @@ pub async fn emit_validator_history_metrics(
.await?
.len();

let keeper_balance = get_balance_with_retry(client, keeper_address).await?;

datapoint_info!(
"validator-history-stats",
("num_validator_histories", num_validators, i64),
Expand All @@ -218,6 +221,11 @@ pub async fn emit_validator_history_metrics(
),
);

datapoint_info!(
"stakenet-keeper-stats",
("balance_lamports", keeper_balance, i64),
);

Ok(())
}

Expand Down Expand Up @@ -262,6 +270,24 @@ pub async fn get_validator_history_accounts_with_retry(
get_validator_history_accounts(client, program_id).await
}

pub async fn get_balance_with_retry(
client: &RpcClient,
account: Pubkey,
) -> Result<u64, ClientError> {
let mut retries = 5;
loop {
match client.get_balance(&account).await {
Ok(balance) => return Ok(balance),
Err(e) => {
if retries == 0 {
return Err(e);
}
retries -= 1;
}
}
}
}

pub fn start_spy_server(
cluster_entrypoint: SocketAddr,
gossip_port: u16,
Expand Down
12 changes: 0 additions & 12 deletions keepers/validator-keeper/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@ async fn mev_commission_loop(
tip_distribution_program_id: Pubkey,
interval: u64,
) {
let mut prev_epoch = 0;
// {TipDistributionAccount : VoteAccount}
let mut validators_updated: HashMap<Pubkey, Pubkey> = HashMap::new();

loop {
// Continuously runs throughout an epoch, polling for new tip distribution accounts
// and submitting update txs when new accounts are detected
Expand All @@ -98,8 +94,6 @@ async fn mev_commission_loop(
keypair.clone(),
&commission_history_program_id,
&tip_distribution_program_id,
&mut validators_updated,
&mut prev_epoch,
)
.await
{
Expand Down Expand Up @@ -141,10 +135,6 @@ async fn mev_earned_loop(
tip_distribution_program_id: Pubkey,
interval: u64,
) {
let mut curr_epoch = 0;
// {TipDistributionAccount : VoteAccount}
let mut validators_updated: HashMap<Pubkey, Pubkey> = HashMap::new();

loop {
// Continuously runs throughout an epoch, polling for tip distribution accounts from the prev epoch with uploaded merkle roots
// and submitting update_mev_earned (technically update_mev_comission) txs when the uploaded merkle roots are detected
Expand All @@ -153,8 +143,6 @@ async fn mev_earned_loop(
&keypair,
&commission_history_program_id,
&tip_distribution_program_id,
&mut validators_updated,
&mut curr_epoch,
)
.await
{
Expand Down
111 changes: 58 additions & 53 deletions keepers/validator-keeper/src/mev_commission.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ use solana_client::rpc_response::RpcVoteAccountInfo;
use solana_program::{instruction::Instruction, pubkey::Pubkey};
use solana_sdk::{signature::Keypair, signer::Signer};
use validator_history::constants::MIN_VOTE_EPOCHS;
use validator_history::ValidatorHistoryEntry;
use validator_history::{constants::MAX_ALLOC_BYTES, Config, ValidatorHistory};

use crate::{KeeperError, PRIORITY_FEE};
use crate::{get_validator_history_accounts_with_retry, KeeperError, PRIORITY_FEE};

#[derive(Clone)]
pub struct ValidatorMevCommissionEntry {
Expand Down Expand Up @@ -127,16 +128,15 @@ pub async fn update_mev_commission(
keypair: Arc<Keypair>,
validator_history_program_id: &Pubkey,
tip_distribution_program_id: &Pubkey,
validators_updated: &mut HashMap<Pubkey, Pubkey>,
prev_epoch: &mut u64,
) -> Result<CreateUpdateStats, KeeperError> {
let epoch = client.get_epoch_info().await?.epoch;
if epoch > *prev_epoch {
validators_updated.clear();
}
*prev_epoch = epoch;

let vote_accounts = get_vote_accounts_with_retry(&client, MIN_VOTE_EPOCHS, None).await?;
let validator_histories =
get_validator_history_accounts_with_retry(&client, *validator_history_program_id).await?;

let validator_history_map =
HashMap::from_iter(validator_histories.iter().map(|vh| (vh.vote_account, vh)));

let entries = vote_accounts
.iter()
Expand All @@ -155,55 +155,36 @@ pub async fn update_mev_commission(

let entries_to_update = existing_entries
.into_iter()
.filter(|entry| !validators_updated.contains_key(&entry.tip_distribution_account))
.filter(|entry| !mev_commission_uploaded(&validator_history_map, entry.address(), epoch))
.collect::<Vec<ValidatorMevCommissionEntry>>();
let (create_transactions, update_instructions) =
build_create_and_update_instructions(&client, &entries_to_update).await?;

match submit_create_and_update(
submit_create_and_update(
&client,
create_transactions,
update_instructions,
&keypair,
PRIORITY_FEE,
)
.await
{
Ok(submit_result) => {
if submit_result.creates.errors == 0 && submit_result.updates.errors == 0 {
for ValidatorMevCommissionEntry {
vote_account,
tip_distribution_account,
..
} in entries_to_update
{
validators_updated.insert(tip_distribution_account, vote_account);
}
}
Ok(submit_result)
}
Err(e) => Err(e.into()),
}
.map_err(|e| e.into())
}

pub async fn update_mev_earned(
client: &Arc<RpcClient>,
keypair: &Arc<Keypair>,
validator_history_program_id: &Pubkey,
tip_distribution_program_id: &Pubkey,
validators_updated: &mut HashMap<Pubkey, Pubkey>,
curr_epoch: &mut u64,
) -> Result<CreateUpdateStats, KeeperError> {
let epoch = client.get_epoch_info().await?.epoch;

if epoch > *curr_epoch {
// new epoch started, we assume here that all the validators with TDAs from curr_epoch-1 have had their merkle roots uploaded/processed by this point
// clear our map of TDAs derived from curr_epoch -1 and start fresh for epoch-1 (or curr_epoch)
validators_updated.clear();
}
*curr_epoch = epoch;

let vote_accounts = get_vote_accounts_with_retry(client, MIN_VOTE_EPOCHS, None).await?;
let validator_histories =
get_validator_history_accounts_with_retry(&client, *validator_history_program_id).await?;

Check failure on line 184 in keepers/validator-keeper/src/mev_commission.rs

View workflow job for this annotation

GitHub Actions / lint

this expression creates a reference which is immediately dereferenced by the compiler

let validator_history_map =
HashMap::from_iter(validator_histories.iter().map(|vh| (vh.vote_account, vh)));

let entries = vote_accounts
.iter()
Expand All @@ -223,35 +204,21 @@ pub async fn update_mev_earned(

let entries_to_update = uploaded_merkleroot_entries
.into_iter()
.filter(|entry| !validators_updated.contains_key(&entry.tip_distribution_account))
.filter(|entry| !mev_earned_uploaded(&validator_history_map, entry.address(), epoch - 1))
.collect::<Vec<ValidatorMevCommissionEntry>>();

let (create_transactions, update_instructions) =
build_create_and_update_instructions(client, &entries_to_update).await?;

let submit_result = submit_create_and_update(
submit_create_and_update(
client,
create_transactions,
update_instructions,
keypair,
PRIORITY_FEE,
)
.await;
match submit_result {
Ok(submit_result) => {
if submit_result.creates.errors == 0 && submit_result.updates.errors == 0 {
for ValidatorMevCommissionEntry {
vote_account,
tip_distribution_account,
..
} in entries_to_update
{
validators_updated.insert(tip_distribution_account, vote_account);
}
}
Ok(submit_result)
}
Err(e) => Err(e.into()),
}
.await
.map_err(|e| e.into())
}

async fn get_existing_entries(
Expand Down Expand Up @@ -309,3 +276,41 @@ async fn get_entries_with_uploaded_merkleroot(
// Fetch tip distribution accounts with uploaded merkle roots for this epoch
Ok(result)
}

fn mev_commission_uploaded(
validator_history_map: &HashMap<Pubkey, &ValidatorHistory>,
vote_account: Pubkey,
epoch: u64,
) -> bool {
let validator_history = validator_history_map.get(&vote_account);
if validator_history.is_none() {
return false;
}

let validator_history = validator_history.unwrap();

if let Some(latest_entry) = validator_history.history.last() {
return latest_entry.epoch == epoch as u16
&& latest_entry.mev_commission != ValidatorHistoryEntry::default().mev_commission;
}
false
}

fn mev_earned_uploaded(
validator_history_map: &HashMap<Pubkey, &ValidatorHistory>,
vote_account: Pubkey,
epoch: u64,
) -> bool {
let validator_history = validator_history_map.get(&vote_account);
if validator_history.is_none() {
return false;
}

let validator_history = validator_history.unwrap();

if let Some(latest_entry) = validator_history.history.last() {
return latest_entry.epoch == epoch as u16
&& latest_entry.mev_earned != ValidatorHistoryEntry::default().mev_earned;
}
false
}
Loading

0 comments on commit 5a40ac0

Please sign in to comment.