Skip to content

Commit

Permalink
Fixes + cleanup for updated keeper (#21)
Browse files Browse the repository at this point in the history
New keeper will retry failed transactions forever, so we need to prevent
sending txs that are doomed from the start. Also some nit improvements.

* ~Comments out cluster history cranking until that is fixed~
* Filtering out closed vote accounts for now
* Filtering out gossip values that are expired
  • Loading branch information
ebatsell authored Feb 26, 2024
1 parent c8a5c36 commit a5d1f41
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 42 deletions.
8 changes: 3 additions & 5 deletions keepers/keeper-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ pub async fn parallel_execute_transactions(
submitted_signatures.insert(tx.signatures[0], idx);
}
Some(_) | None => {
warn!("Transaction error: {}", e.to_string());
warn!("Transaction error: {:?}", e);
results[idx] = Err(SendTransactionError::TransactionError(e.to_string()))
}
},
Expand Down Expand Up @@ -436,7 +436,6 @@ pub async fn submit_transactions(
keypair: &Arc<Keypair>,
) -> Result<SubmitStats, TransactionExecutionError> {
let mut stats = SubmitStats::default();
let num_transactions = transactions.len();
let tx_slice = transactions
.iter()
.map(|t| t.as_slice())
Expand All @@ -445,7 +444,7 @@ pub async fn submit_transactions(
match parallel_execute_transactions(client, &tx_slice, keypair, 10, 30).await {
Ok(results) => {
stats.successes = results.iter().filter(|&tx| tx.is_ok()).count() as u64;
stats.errors = num_transactions as u64 - stats.successes;
stats.errors = results.len() as u64 - stats.successes;
stats.results = results;
Ok(stats)
}
Expand All @@ -459,11 +458,10 @@ pub async fn submit_instructions(
keypair: &Arc<Keypair>,
) -> Result<SubmitStats, TransactionExecutionError> {
let mut stats = SubmitStats::default();
let num_instructions = instructions.len();
match parallel_execute_instructions(client, &instructions, keypair, 10, 30).await {
Ok(results) => {
stats.successes = results.iter().filter(|&tx| tx.is_ok()).count() as u64;
stats.errors = num_instructions as u64 - stats.successes;
stats.errors = results.len() as u64 - stats.successes;
stats.results = results;
Ok(stats)
}
Expand Down
62 changes: 51 additions & 11 deletions keepers/validator-keeper/src/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use validator_history::{
Config, ValidatorHistory,
};

use crate::start_spy_server;
use crate::{get_validator_history_accounts_with_retry, start_spy_server};

#[derive(Clone, Debug)]
pub struct GossipEntry {
Expand Down Expand Up @@ -148,12 +148,38 @@ pub fn emit_gossip_datapoint(stats: CreateUpdateStats, runs_for_epoch: i64) {
);
}

fn check_entry_valid(entry: &CrdsValue, validator_identity: Pubkey) -> bool {
fn check_entry_valid(
entry: &CrdsValue,
validator_history: &ValidatorHistory,
validator_identity: Pubkey,
) -> bool {
// Filters out invalid gossip entries that would fail transaction submission. Checks for:
// 0. Entry belongs to one of the expected types
// 1. Entry timestamp is not too old
// 2. Entry is for the correct validator
match &entry.data {
CrdsData::LegacyContactInfo(_) => {}
CrdsData::LegacyVersion(_) => {}
CrdsData::Version(_) => {}
CrdsData::ContactInfo(_) => {}
CrdsData::LegacyContactInfo(legacy_contact_info) => {
if legacy_contact_info.wallclock() < validator_history.last_ip_timestamp {
return false;
}
}
CrdsData::LegacyVersion(legacy_version) => {
if legacy_version.wallclock < validator_history.last_version_timestamp {
return false;
}
}
CrdsData::Version(version) => {
if version.wallclock < validator_history.last_version_timestamp {
return false;
}
}
CrdsData::ContactInfo(contact_info) => {
if contact_info.wallclock() < validator_history.last_ip_timestamp
|| contact_info.wallclock() < validator_history.last_version_timestamp
{
return false;
}
}
_ => {
return false;
}
Expand All @@ -173,6 +199,7 @@ fn check_entry_valid(entry: &CrdsValue, validator_identity: Pubkey) -> bool {

fn build_gossip_entry(
vote_account: &RpcVoteAccountInfo,
validator_history: &ValidatorHistory,
crds: &RwLockReadGuard<'_, Crds>,
program_id: Pubkey,
keypair: &Arc<Keypair>,
Expand All @@ -189,7 +216,7 @@ fn build_gossip_entry(
// Current ContactInfo has both IP and Version, but LegacyContactInfo has only IP.
// So if there is not ContactInfo, we need to submit tx for LegacyContactInfo + one of (Version, LegacyVersion)
if let Some(entry) = crds.get::<&CrdsValue>(&contact_info_key) {
if !check_entry_valid(entry, validator_identity) {
if !check_entry_valid(entry, validator_history, validator_identity) {
return None;
}
Some(vec![GossipEntry::new(
Expand All @@ -203,7 +230,7 @@ fn build_gossip_entry(
} else {
let mut entries = vec![];
if let Some(entry) = crds.get::<&CrdsValue>(&legacy_contact_info_key) {
if !check_entry_valid(entry, validator_identity) {
if !check_entry_valid(entry, validator_history, validator_identity) {
return None;
}
entries.push(GossipEntry::new(
Expand All @@ -217,7 +244,7 @@ fn build_gossip_entry(
}

if let Some(entry) = crds.get::<&CrdsValue>(&version_key) {
if !check_entry_valid(entry, validator_identity) {
if !check_entry_valid(entry, validator_history, validator_identity) {
return None;
}
entries.push(GossipEntry::new(
Expand All @@ -229,7 +256,7 @@ fn build_gossip_entry(
&keypair.pubkey(),
))
} else if let Some(entry) = crds.get::<&CrdsValue>(&legacy_version_key) {
if !check_entry_valid(entry, validator_identity) {
if !check_entry_valid(entry, validator_history, validator_identity) {
return None;
}
entries.push(GossipEntry::new(
Expand Down Expand Up @@ -262,6 +289,8 @@ pub async fn upload_gossip_values(
start_spy_server(entrypoint, gossip_port, spy_socket_addr, &keypair, &exit);

let vote_accounts = get_vote_accounts_with_retry(&client, MIN_VOTE_EPOCHS, None).await?;
let validator_history_accounts =
get_validator_history_accounts_with_retry(&client, *program_id).await?;

// Wait for all active validators to be received
sleep(Duration::from_secs(30)).await;
Expand All @@ -272,7 +301,18 @@ pub async fn upload_gossip_values(
vote_accounts
.iter()
.filter_map(|vote_account| {
build_gossip_entry(vote_account, &crds, *program_id, &keypair)
let vote_account_pubkey = Pubkey::from_str(&vote_account.vote_pubkey).ok()?;
let validator_history_account = validator_history_accounts
.iter()
.find(|account| account.vote_account == vote_account_pubkey)?;

build_gossip_entry(
vote_account,
validator_history_account,
&crds,
*program_id,
&keypair,
)
})
.flatten()
.collect::<Vec<_>>()
Expand Down
36 changes: 34 additions & 2 deletions keepers/validator-keeper/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,19 @@ async fn mev_commission_loop(
)
.await
{
Ok(stats) => stats,
Ok(stats) => {
for message in stats
.creates
.results
.iter()
.chain(stats.updates.results.iter())
{
if let Err(e) = message {
datapoint_error!("vote-account-error", ("error", e.to_string(), String),);
}
}
stats
}
Err(e) => {
let mut stats = CreateUpdateStats::default();
if let KeeperError::TransactionExecutionError(
Expand Down Expand Up @@ -146,7 +158,19 @@ async fn mev_earned_loop(
)
.await
{
Ok(stats) => stats,
Ok(stats) => {
for message in stats
.creates
.results
.iter()
.chain(stats.updates.results.iter())
{
if let Err(e) = message {
datapoint_error!("vote-account-error", ("error", e.to_string(), String),);
}
}
stats
}
Err(e) => {
let mut stats = CreateUpdateStats::default();
if let KeeperError::TransactionExecutionError(
Expand Down Expand Up @@ -421,6 +445,14 @@ async fn cluster_history_loop(
if should_run {
stats = match update_cluster_info(client.clone(), keypair.clone(), &program_id).await {
Ok(run_stats) => {
for message in run_stats.results.iter() {
if let Err(e) = message {
datapoint_error!(
"cluster-history-error",
("error", e.to_string(), String),
);
}
}
if run_stats.errors == 0 {
runs_for_epoch += 1;
}
Expand Down
23 changes: 14 additions & 9 deletions keepers/validator-keeper/src/mev_commission.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,17 +224,22 @@ pub async fn update_mev_earned(

let submit_result =
submit_create_and_update(client, create_transactions, update_instructions, keypair).await;
if submit_result.is_ok() {
for ValidatorMevCommissionEntry {
vote_account,
tip_distribution_account,
..
} in entries_to_update
{
validators_updated.insert(tip_distribution_account, vote_account);
match submit_result {
Ok(submit_result) => {
if submit_result.creates.errors == 0 && submit_result.updates.errors == 0 {
for ValidatorMevCommissionEntry {
vote_account,
tip_distribution_account,
..
} in entries_to_update
{
validators_updated.insert(tip_distribution_account, vote_account);
}
}
Ok(submit_result)
}
Err(e) => Err(e.into()),
}
submit_result.map_err(|e| e.into())
}

async fn get_existing_entries(
Expand Down
34 changes: 30 additions & 4 deletions keepers/validator-keeper/src/vote_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ use std::sync::Arc;

use anchor_lang::{InstructionData, ToAccountMetas};
use keeper_core::{
build_create_and_update_instructions, get_vote_accounts_with_retry, submit_create_and_update,
Address, CreateTransaction, CreateUpdateStats, UpdateInstruction,
build_create_and_update_instructions, get_multiple_accounts_batched,
get_vote_accounts_with_retry, submit_create_and_update, Address, CreateTransaction,
CreateUpdateStats, UpdateInstruction,
};
use log::error;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_program::vote;
use solana_program::{instruction::Instruction, pubkey::Pubkey};
use solana_sdk::{signature::Keypair, signer::Signer};

Expand Down Expand Up @@ -109,8 +111,29 @@ pub async fn update_vote_accounts(
get_validator_history_accounts_with_retry(&rpc_client, validator_history_program_id)
.await?;

let vote_account_pubkeys = validator_histories
.iter()
.map(|vh| vh.vote_account)
.collect::<Vec<_>>();

let vote_accounts = get_multiple_accounts_batched(&vote_account_pubkeys, &rpc_client).await?;
let closed_vote_accounts: HashSet<Pubkey> = vote_accounts
.iter()
.enumerate()
.filter_map(|(i, account)| match account {
Some(account) => {
if account.owner != vote::program::id() {
Some(vote_account_pubkeys[i])
} else {
None
}
}
None => Some(vote_account_pubkeys[i]),
})
.collect();

// Merges new and active RPC vote accounts with all validator history accounts, and dedupes
let vote_accounts = rpc_vote_accounts
let mut all_vote_accounts = rpc_vote_accounts
.iter()
.filter_map(|rpc_va| {
Pubkey::from_str(&rpc_va.vote_pubkey)
Expand All @@ -123,7 +146,10 @@ pub async fn update_vote_accounts(
.chain(validator_histories.iter().map(|vh| vh.vote_account))
.collect::<HashSet<_>>();

let entries = vote_accounts
// Remove closed vote accounts from all vote accounts
all_vote_accounts.retain(|va| !closed_vote_accounts.contains(va));

let entries = all_vote_accounts
.iter()
.map(|va| CopyVoteAccountEntry::new(va, &validator_history_program_id, &keypair.pubkey()))
.collect::<Vec<_>>();
Expand Down
6 changes: 3 additions & 3 deletions programs/validator-history/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ mod tests {

#[test]
fn test_fixed_point_sol() {
assert_eq!(fixed_point_sol(1000000000), 100);
assert_eq!(fixed_point_sol(4294967295000000000), 4294967295);
assert_eq!(fixed_point_sol(1_000_000_000), 100);
assert_eq!(fixed_point_sol(4_294_967_295_000_000_000), 4294967295);

assert_eq!(fixed_point_sol(429496729600000000), 4294967295)
assert_eq!(fixed_point_sol(429_496_729_600_000_000), 4294967295)
}
}
Loading

0 comments on commit a5d1f41

Please sign in to comment.