Skip to content

Commit

Permalink
Cluster History account with blocks per epoch tracking (#14)
Browse files Browse the repository at this point in the history
Cluster History Struct:

- Sets up new account type: ClusterHistory
- Reads number of landed blocks from SlotHistory for last epoch
- Adds support for cranking this per epoch
- Backfill instruction + CLI command for filling in data since epoch 500
- Tests

Also includes IDL updated with ClusterHistory types
  • Loading branch information
ebatsell authored Jan 10, 2024
1 parent 56373f9 commit f21e475
Show file tree
Hide file tree
Showing 15 changed files with 1,628 additions and 11 deletions.
29 changes: 29 additions & 0 deletions keepers/validator-keeper/src/cluster_info.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use std::sync::Arc;

use anchor_lang::{InstructionData, ToAccountMetas};
use keeper_core::{submit_instructions, SubmitStats, TransactionExecutionError};
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{instruction::Instruction, pubkey::Pubkey, signature::Keypair, signer::Signer};
use validator_history::state::ClusterHistory;

pub async fn update_cluster_info(
client: Arc<RpcClient>,
keypair: Arc<Keypair>,
program_id: &Pubkey,
) -> Result<SubmitStats, (TransactionExecutionError, SubmitStats)> {
let (cluster_history_account, _) =
Pubkey::find_program_address(&[ClusterHistory::SEED], program_id);

let update_instruction = Instruction {
program_id: *program_id,
accounts: validator_history::accounts::CopyClusterInfo {
cluster_history_account,
slot_history: solana_program::sysvar::slot_history::id(),
signer: keypair.pubkey(),
}
.to_account_metas(None),
data: validator_history::instruction::CopyClusterInfo {}.data(),
};

submit_instructions(&client, vec![update_instruction], &keypair).await
}
34 changes: 30 additions & 4 deletions keepers/validator-keeper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ use std::{
};

use anchor_lang::{AccountDeserialize, Discriminator};
use keeper_core::CreateUpdateStats;
use keeper_core::{CreateUpdateStats, SubmitStats};
use log::error;
use solana_account_decoder::UiDataSliceConfig;
use solana_client::{
client_error::ClientError,
nonblocking::rpc_client::RpcClient,
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
rpc_filter::{Memcmp, RpcFilterType},
Expand All @@ -26,8 +25,9 @@ use solana_sdk::{
use solana_streamer::socket::SocketAddrSpace;

use jito_tip_distribution::state::TipDistributionAccount;
use validator_history::{ValidatorHistory, ValidatorHistoryEntry};
use validator_history::{ClusterHistory, ValidatorHistory, ValidatorHistoryEntry};

pub mod cluster_info;
pub mod gossip;
pub mod mev_commission;
pub mod stake;
Expand Down Expand Up @@ -91,10 +91,19 @@ pub fn emit_validator_commission_datapoint(stats: CreateUpdateStats, runs_for_ep
);
}

pub fn emit_cluster_history_datapoint(stats: SubmitStats, runs_for_epoch: i64) {
datapoint_info!(
"cluster-history-stats",
("num_success", stats.successes, i64),
("num_errors", stats.errors, i64),
("runs_for_epoch", runs_for_epoch, i64),
);
}

pub async fn emit_validator_history_metrics(
client: &Arc<RpcClient>,
program_id: Pubkey,
) -> Result<(), ClientError> {
) -> Result<(), Box<dyn std::error::Error>> {
let epoch = client.get_epoch_info().await?;

// Fetch every ValidatorHistory account
Expand Down Expand Up @@ -161,6 +170,21 @@ pub async fn emit_validator_history_metrics(
}
}

let (cluster_history_address, _) =
Pubkey::find_program_address(&[ClusterHistory::SEED], &program_id);
let cluster_history_account = client.get_account(&cluster_history_address).await?;
let cluster_history =
ClusterHistory::try_deserialize(&mut cluster_history_account.data.as_slice())?;

let mut cluster_history_blocks: i64 = 0;
let cluster_history_entry = cluster_history.history.last();
if let Some(cluster_history) = cluster_history_entry {
// Looking for previous epoch to be updated
if cluster_history.epoch as u64 == epoch.epoch - 1 {
cluster_history_blocks = 1;
}
}

datapoint_info!(
"validator-history-stats",
("num_validator_histories", num_validators, i64),
Expand All @@ -171,8 +195,10 @@ pub async fn emit_validator_history_metrics(
("num_commissions", comms, i64),
("num_epoch_credits", epoch_credits, i64),
("num_stakes", stakes, i64),
("cluster_history_blocks", cluster_history_blocks, i64),
("slot_index", epoch.slot_index, i64),
);

Ok(())
}

Expand Down
64 changes: 61 additions & 3 deletions keepers/validator-keeper/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ It will emits metrics for each data feed, if env var SOLANA_METRICS_CONFIG is se
use std::{collections::HashMap, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};

use clap::{arg, command, Parser};
use keeper_core::{Cluster, CreateUpdateStats};
use keeper_core::{Cluster, CreateUpdateStats, SubmitStats};
use log::*;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_metrics::{datapoint_error, set_host_id};
Expand All @@ -17,8 +17,9 @@ use solana_sdk::{
};
use tokio::time::sleep;
use validator_keeper::{
emit_mev_commission_datapoint, emit_validator_commission_datapoint,
emit_validator_history_metrics,
cluster_info::update_cluster_info,
emit_cluster_history_datapoint, emit_mev_commission_datapoint,
emit_validator_commission_datapoint, emit_validator_history_metrics,
gossip::{emit_gossip_datapoint, upload_gossip_values},
mev_commission::update_mev_commission,
stake::{emit_stake_history_datapoint, update_stake_history},
Expand Down Expand Up @@ -262,6 +263,56 @@ async fn gossip_upload_loop(
}
}

async fn cluster_history_loop(
client: Arc<RpcClient>,
keypair: Arc<Keypair>,
program_id: Pubkey,
interval: u64,
) {
let mut runs_for_epoch = 0;
let mut current_epoch = 0;

loop {
let epoch_info = match client.get_epoch_info().await {
Ok(epoch_info) => epoch_info,
Err(e) => {
error!("Failed to get epoch info: {}", e);
sleep(Duration::from_secs(5)).await;
continue;
}
};
let epoch = epoch_info.epoch;

let mut stats = SubmitStats::default();

if current_epoch != epoch {
runs_for_epoch = 0;
}

// Run at 0.1%, 50% and 90% completion of epoch
let should_run = (epoch_info.slot_index > epoch_info.slots_in_epoch / 1000
&& runs_for_epoch < 1)
|| (epoch_info.slot_index > epoch_info.slots_in_epoch / 2 && runs_for_epoch < 2)
|| (epoch_info.slot_index > epoch_info.slots_in_epoch * 9 / 10 && runs_for_epoch < 3);
if should_run {
stats = match update_cluster_info(client.clone(), keypair.clone(), &program_id).await {
Ok(run_stats) => {
runs_for_epoch += 1;
run_stats
}
Err((e, run_stats)) => {
datapoint_error!("cluster-history-error", ("error", e.to_string(), String),);
run_stats
}
};
}

current_epoch = epoch;
emit_cluster_history_datapoint(stats, runs_for_epoch);
sleep(Duration::from_secs(interval)).await;
}
}

#[tokio::main]
async fn main() {
env_logger::init();
Expand All @@ -284,6 +335,13 @@ async fn main() {
args.interval,
));

tokio::spawn(cluster_history_loop(
Arc::clone(&client),
Arc::clone(&keypair),
args.program_id,
args.interval,
));

tokio::spawn(vote_account_loop(
Arc::clone(&client),
Arc::clone(&keypair),
Expand Down
Loading

0 comments on commit f21e475

Please sign in to comment.