Skip to content

Commit

Permalink
emit is now at the end
Browse files Browse the repository at this point in the history
  • Loading branch information
coachchucksol committed May 16, 2024
1 parent a1673c9 commit 1b9ac08
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 250 deletions.
8 changes: 5 additions & 3 deletions keepers/validator-keeper/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ Note: All maps are key'd by the `vote_account`
Gather all needed arguments, and initialize the global `KeeperState`.

### Loop
The forever loop consists of two parts: **Fetch** and **Fire**. There is only ever one **Fetch** section, and there can be several **Fire** sections.
The forever loop consists of three parts: **Fetch**, **Fire** and **Emit**. There is only ever one **Fetch** and **Emit** section, and there can be several **Fire** sections.

The **Fire** sections can run on independent cadences - say we want the Validator History functions to run every 300sec and we want to emit metrics every 60sec.

The **Fetch** section is run _before_ and **Fire** section.
The **Emit** section is *one tick* after any **Fire** section.

#### Fetch
The **Fetch** section is in charge of three operations:
Expand All @@ -38,14 +39,15 @@ Notes:
#### Fire
There are several **Fire** sections running at their own cadence. Before any **Fire** section is run, the **Fetch** section will be called.

Each **Fire** is a call to `operations::operation_name::fire_and_emit` which will fire off the operation, emit the data points and return the new count of runs and errors for that operation to be saved in the `KeeperState`
Each **Fire** is a call to `operations::operation_name::fire` which will fire off the operation and return the new count of runs and errors for that operation to be saved in the `KeeperState`

Notes:
- Each **Fire** is self contained, one should not be dependant on another.
- No **Fire* will fetch any accounts, if there are needs for them, they should be added to the `KeeperState`



#### Emit
This section emits the state of the Keeper one tick after any operation has been called. This is because we want to emit a failure of any **Fetch** operation, which on failure advances the tick.



39 changes: 10 additions & 29 deletions keepers/validator-keeper/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,12 @@ struct Args {
#[arg(short, long, env, default_value = "60")]
metrics_interval: u64,

// Priority Fees (mLamports)
#[arg(short, long, env, default_value = "50_000")]
priority_fees: u64,

#[arg(short, long, env, default_value_t = Cluster::Mainnet)]
cluster: Cluster,
}

fn should_emit(tick: u64, intervals: &[u64]) -> bool {
intervals.iter().any(|interval| (tick + 1) % interval == 0)
intervals.iter().any(|interval| tick % (interval + 1) == 0)
}

fn should_update(tick: u64, intervals: &[u64]) -> bool {
Expand Down Expand Up @@ -100,7 +96,6 @@ struct RunLoopConfig {
gossip_entrypoint: Option<SocketAddr>,
validator_history_interval: u64,
metrics_interval: u64,
priority_fees: u64,
}

async fn run_loop(config: RunLoopConfig) {
Expand All @@ -113,7 +108,6 @@ async fn run_loop(config: RunLoopConfig) {
gossip_entrypoint,
validator_history_interval,
metrics_interval,
priority_fees,
} = config;
let intervals = vec![validator_history_interval, metrics_interval];

Expand Down Expand Up @@ -183,29 +177,18 @@ async fn run_loop(config: RunLoopConfig) {

info!("Updating cluster history...");
keeper_state.set_runs_and_errors_for_epoch(
operations::cluster_history::fire_and_emit(
&client,
&keypair,
&program_id,
&keeper_state,
)
.await,
operations::cluster_history::fire(&client, &keypair, &program_id, &keeper_state)
.await,
);

info!("Updating copy vote accounts...");
keeper_state.set_runs_and_errors_for_epoch(
operations::vote_account::fire_and_emit(
&client,
&keypair,
&program_id,
&keeper_state,
)
.await,
operations::vote_account::fire(&client, &keypair, &program_id, &keeper_state).await,
);

info!("Updating mev commission...");
keeper_state.set_runs_and_errors_for_epoch(
operations::mev_commission::fire_and_emit(
operations::mev_commission::fire(
&client,
&keypair,
&program_id,
Expand All @@ -217,7 +200,7 @@ async fn run_loop(config: RunLoopConfig) {

info!("Updating mev earned...");
keeper_state.set_runs_and_errors_for_epoch(
operations::mev_earned::fire_and_emit(
operations::mev_earned::fire(
&client,
&keypair,
&program_id,
Expand All @@ -230,7 +213,7 @@ async fn run_loop(config: RunLoopConfig) {
if let Some(oracle_authority_keypair) = &oracle_authority_keypair {
info!("Updating stake accounts...");
keeper_state.set_runs_and_errors_for_epoch(
operations::stake_upload::fire_and_emit(
operations::stake_upload::fire(
&client,
oracle_authority_keypair,
&program_id,
Expand All @@ -245,7 +228,7 @@ async fn run_loop(config: RunLoopConfig) {
{
info!("Updating gossip accounts...");
keeper_state.set_runs_and_errors_for_epoch(
operations::gossip_upload::fire_and_emit(
operations::gossip_upload::fire(
&client,
oracle_authority_keypair,
&program_id,
Expand All @@ -261,9 +244,8 @@ async fn run_loop(config: RunLoopConfig) {

if should_fire(tick, metrics_interval) {
info!("Emitting metrics...");
keeper_state.set_runs_and_errors_for_epoch(
operations::metrics_emit::fire_and_emit(&keeper_state).await,
);
keeper_state
.set_runs_and_errors_for_epoch(operations::metrics_emit::fire(&keeper_state).await);
}

// ---------------------- EMIT ---------------------------------
Expand Down Expand Up @@ -314,7 +296,6 @@ async fn main() {
gossip_entrypoint,
validator_history_interval: args.validator_history_interval,
metrics_interval: args.metrics_interval,
priority_fees: args.priority_fees,
};

run_loop(config).await;
Expand Down
26 changes: 3 additions & 23 deletions keepers/validator-keeper/src/operations/cluster_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{derive_cluster_history_address, PRIORITY_FEE};
use anchor_lang::{InstructionData, ToAccountMetas};
use keeper_core::{submit_transactions, SubmitStats, TransactionExecutionError};
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_metrics::{datapoint_error, datapoint_info};
use solana_metrics::datapoint_error;
use solana_sdk::{
compute_budget,
epoch_info::EpochInfo,
Expand Down Expand Up @@ -40,17 +40,7 @@ async fn _process(
update_cluster_info(client, keypair, program_id).await
}

fn _emit(stats: &SubmitStats, runs_for_epoch: i64, errors_for_epoch: i64) {
datapoint_info!(
"cluster-history-stats",
("num_success", stats.successes, i64),
("num_errors", stats.errors, i64),
("runs_for_epoch", runs_for_epoch, i64),
("errors_for_epoch", errors_for_epoch, i64)
);
}

pub async fn fire_and_emit(
pub async fn fire(
client: &Arc<RpcClient>,
keypair: &Arc<Keypair>,
program_id: &Pubkey,
Expand All @@ -64,9 +54,8 @@ pub async fn fire_and_emit(

let should_run = _should_run(epoch_info, runs_for_epoch);

let mut stats = SubmitStats::default();
if should_run {
stats = match _process(client, keypair, program_id).await {
match _process(client, keypair, program_id).await {
Ok(stats) => {
for message in stats.results.iter() {
if let Err(e) = message {
Expand All @@ -76,23 +65,14 @@ pub async fn fire_and_emit(
if stats.errors == 0 {
runs_for_epoch += 1;
}
stats
}
Err(e) => {
let mut stats = SubmitStats::default();
if let TransactionExecutionError::TransactionClientError(_, results) = &e {
stats.successes = results.iter().filter(|r| r.is_ok()).count() as u64;
stats.errors = results.iter().filter(|r| r.is_err()).count() as u64;
}
datapoint_error!("cluster-history-error", ("error", e.to_string(), String),);
errors_for_epoch += 1;
stats
}
};
}

_emit(&stats, runs_for_epoch as i64, errors_for_epoch as i64);

(operation, runs_for_epoch, errors_for_epoch)
}

Expand Down
33 changes: 5 additions & 28 deletions keepers/validator-keeper/src/operations/gossip_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ It will emits metrics for each data feed, if env var SOLANA_METRICS_CONFIG is se
use crate::state::keeper_state::KeeperState;
use crate::{start_spy_server, PRIORITY_FEE};
use bytemuck::{bytes_of, Pod, Zeroable};
use keeper_core::{submit_transactions, SubmitStats, TransactionExecutionError};
use keeper_core::{submit_transactions, SubmitStats};
use log::*;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_client::rpc_response::RpcVoteAccountInfo;
use solana_gossip::crds::Crds;
use solana_gossip::crds_value::{CrdsData, CrdsValue, CrdsValueLabel};
use solana_metrics::{datapoint_error, datapoint_info};
use solana_metrics::datapoint_error;
use solana_sdk::signature::Signable;
use solana_sdk::{
epoch_info::EpochInfo,
Expand Down Expand Up @@ -52,17 +52,7 @@ async fn _process(
upload_gossip_values(client, keypair, program_id, entrypoint, keeper_state).await
}

fn _emit(stats: &SubmitStats, runs_for_epoch: i64, errors_for_epoch: i64) {
datapoint_info!(
"gossip-upload-stats",
("num_updates_success", stats.successes, i64),
("num_updates_error", stats.errors, i64),
("runs_for_epoch", runs_for_epoch, i64),
("errors_for_epoch", errors_for_epoch, i64),
);
}

pub async fn fire_and_emit(
pub async fn fire(
client: &Arc<RpcClient>,
keypair: &Arc<Keypair>,
program_id: &Pubkey,
Expand All @@ -75,9 +65,8 @@ pub async fn fire_and_emit(

let should_run = _should_run(&keeper_state.epoch_info, runs_for_epoch);

let mut stats = SubmitStats::default();
if should_run {
stats = match _process(client, keypair, program_id, entrypoint, keeper_state).await {
match _process(client, keypair, program_id, entrypoint, keeper_state).await {
Ok(stats) => {
for message in stats.results.iter().chain(stats.results.iter()) {
if let Err(e) = message {
Expand All @@ -87,26 +76,14 @@ pub async fn fire_and_emit(
if stats.errors == 0 {
runs_for_epoch += 1;
}
stats
}
Err(e) => {
let mut stats = SubmitStats::default();
if let Some(TransactionExecutionError::TransactionClientError(_, results)) =
e.downcast_ref::<TransactionExecutionError>()
{
stats.successes = results.iter().filter(|r| r.is_ok()).count() as u64;
stats.errors = results.iter().filter(|r| r.is_err()).count() as u64;
}

datapoint_error!("gossip-upload-error", ("error", e.to_string(), String),);
errors_for_epoch += 1;
stats
}
};
}
}

_emit(&stats, runs_for_epoch as i64, errors_for_epoch as i64);

(operation, runs_for_epoch, errors_for_epoch)
}

Expand Down
91 changes: 91 additions & 0 deletions keepers/validator-keeper/src/operations/keeper_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub enum KeeperOperations {
MevCommission,
EmitMetrics,
}

impl KeeperOperations {
pub const LEN: usize = 10;

Expand All @@ -31,6 +32,96 @@ impl KeeperOperations {
"num-pre-create-update-errors",
errors_for_epoch[KeeperOperations::PreCreateUpdate as usize],
i64
),
(
"num-create-missing-accounts-runs",
runs_for_epoch[KeeperOperations::CreateMissingAccounts as usize],
i64
),
(
"num-create-missing-accounts-errors",
errors_for_epoch[KeeperOperations::CreateMissingAccounts as usize],
i64
),
(
"num-post-create-update-runs",
runs_for_epoch[KeeperOperations::PostCreateUpdate as usize],
i64
),
(
"num-post-create-update-errors",
errors_for_epoch[KeeperOperations::PostCreateUpdate as usize],
i64
),
(
"num-cluster-history-runs",
runs_for_epoch[KeeperOperations::ClusterHistory as usize],
i64
),
(
"num-cluster-history-errors",
errors_for_epoch[KeeperOperations::ClusterHistory as usize],
i64
),
(
"num-gossip-upload-runs",
runs_for_epoch[KeeperOperations::GossipUpload as usize],
i64
),
(
"num-gossip-upload-errors",
errors_for_epoch[KeeperOperations::GossipUpload as usize],
i64
),
(
"num-stake-upload-runs",
runs_for_epoch[KeeperOperations::StakeUpload as usize],
i64
),
(
"num-stake-upload-errors",
errors_for_epoch[KeeperOperations::StakeUpload as usize],
i64
),
(
"num-vote-account-runs",
runs_for_epoch[KeeperOperations::VoteAccount as usize],
i64
),
(
"num-vote-account-errors",
errors_for_epoch[KeeperOperations::VoteAccount as usize],
i64
),
(
"num-mev-earned-runs",
runs_for_epoch[KeeperOperations::MevEarned as usize],
i64
),
(
"num-mev-earned-errors",
errors_for_epoch[KeeperOperations::MevEarned as usize],
i64
),
(
"num-mev-commission-runs",
runs_for_epoch[KeeperOperations::MevCommission as usize],
i64
),
(
"num-mev-commission-errors",
errors_for_epoch[KeeperOperations::MevCommission as usize],
i64
),
(
"num-emit-metrics-runs",
runs_for_epoch[KeeperOperations::EmitMetrics as usize],
i64
),
(
"num-emit-metrics-errors",
errors_for_epoch[KeeperOperations::EmitMetrics as usize],
i64
)
);
}
Expand Down
Loading

0 comments on commit 1b9ac08

Please sign in to comment.