From 233a9e2caf906bea81a139d80a40e1bb6c76120e Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Mon, 18 Mar 2024 19:57:04 +0100 Subject: [PATCH] fmt --- benchrunner-service/src/args.rs | 42 +++++++--- benchrunner-service/src/main.rs | 76 +++++++++++++------ .../src/postgres/metrics_dbstore.rs | 11 +-- .../src/prometheus/metrics_prometheus.rs | 19 +++-- benchrunner-service/src/prometheus/mod.rs | 1 - .../src/prometheus/prometheus_sync.rs | 4 +- 6 files changed, 103 insertions(+), 50 deletions(-) diff --git a/benchrunner-service/src/args.rs b/benchrunner-service/src/args.rs index 075dad7a..65f5972d 100644 --- a/benchrunner-service/src/args.rs +++ b/benchrunner-service/src/args.rs @@ -17,29 +17,49 @@ pub fn get_funded_payer_from_env() -> Keypair { } pub fn read_tenant_configs(env_vars: Vec<(String, String)>) -> Vec { - let map = env_vars.iter() + let map = env_vars + .iter() .filter(|(k, _)| k.starts_with("TENANT")) .into_group_map_by(|(k, v)| { let tenant_id = k.split('_').nth(0).unwrap().replace("TENANT", ""); tenant_id.to_string() }); - let values = map.iter().map(|(k, v)| { - TenantConfig { - tenant_id: v.iter().find(|(k, _)| k.ends_with("_ID")).expect("need ID").1.to_string(), - rpc_addr: v.iter().find(|(k, _)| k.ends_with("_RPC_ADDR")).expect("need RPC_ADDR").1.to_string(), - } - }).collect::>(); + let values = map + .iter() + .map(|(k, v)| TenantConfig { + tenant_id: v + .iter() + .find(|(k, _)| k.ends_with("_ID")) + .expect("need ID") + .1 + .to_string(), + rpc_addr: v + .iter() + .find(|(k, _)| k.ends_with("_RPC_ADDR")) + .expect("need RPC_ADDR") + .1 + .to_string(), + }) + .collect::>(); values } #[test] fn test_env_vars() { - let env_vars = vec![(String::from("TENANT1_ID"), String::from("solana-rpc")), - (String::from("TENANT1_RPC_ADDR"), String::from("http://localhost:8899")), - (String::from("TENANT2_ID"), String::from("lite-rpc")), - (String::from("TENANT2_RPC_ADDR"), String::from("http://localhost:8890"))]; + let env_vars = vec![ + (String::from("TENANT1_ID"), String::from("solana-rpc")), + ( + String::from("TENANT1_RPC_ADDR"), + String::from("http://localhost:8899"), + ), + (String::from("TENANT2_ID"), String::from("lite-rpc")), + ( + String::from("TENANT2_RPC_ADDR"), + String::from("http://localhost:8890"), + ), + ]; let tenant_configs = read_tenant_configs(env_vars); assert_eq!(tenant_configs.len(), 2); diff --git a/benchrunner-service/src/main.rs b/benchrunner-service/src/main.rs index d9eeb867..6c032f87 100644 --- a/benchrunner-service/src/main.rs +++ b/benchrunner-service/src/main.rs @@ -4,32 +4,32 @@ mod postgres; mod postgres_session; mod prometheus; -use std::net::SocketAddr; -use std::ops::AddAssign; -use std::str::FromStr; -use std::sync::Arc; -use std::time::{Duration, Instant, SystemTime}; +use crate::args::{get_funded_payer_from_env, read_tenant_configs}; +use crate::cli::Args; +use crate::postgres::metrics_dbstore::{ + save_metrics_to_postgres, upsert_benchrun_status, BenchRunStatus, +}; +use crate::postgres_session::{PostgresSession, PostgresSessionConfig}; +use crate::prometheus::metrics_prometheus::publish_metrics_on_prometheus; +use crate::prometheus::prometheus_sync::PrometheusSync; +use bench::create_memo_tx; +use bench::helpers::BenchHelper; +use bench::metrics::{Metric, TxMetricData}; +use bench::service_adapter::BenchConfig; use clap::Parser; use futures_util::future::join_all; use itertools::Itertools; use log::{debug, error, info, trace, warn}; use solana_sdk::signature::Keypair; +use std::net::SocketAddr; +use std::ops::AddAssign; +use std::str::FromStr; +use std::sync::Arc; +use std::time::{Duration, Instant, SystemTime}; use tokio::join; use tokio::sync::mpsc::Sender; use tokio_postgres::types::ToSql; use tracing_subscriber::filter::FilterExt; -use bench::create_memo_tx; -use bench::helpers::BenchHelper; -use bench::metrics::{Metric, TxMetricData}; -use bench::service_adapter::BenchConfig; -use crate::args::{get_funded_payer_from_env, read_tenant_configs}; -use crate::cli::Args; -use crate::postgres::metrics_dbstore::{BenchRunStatus, save_metrics_to_postgres, upsert_benchrun_status}; -use crate::postgres_session::{PostgresSession, PostgresSessionConfig}; -use crate::prometheus::metrics_prometheus::publish_metrics_on_prometheus; -use crate::prometheus::prometheus_sync::PrometheusSync; - - #[tokio::main] async fn main() { @@ -49,7 +49,10 @@ async fn main() { let tenant_configs = read_tenant_configs(std::env::vars().collect::>()); info!("Start running benchmarks every {:?}", bench_interval); - info!("Found tenants: {}", tenant_configs.iter().map(|tc| &tc.tenant_id).join(", ")); + info!( + "Found tenants: {}", + tenant_configs.iter().map(|tc| &tc.tenant_id).join(", ") + ); if tenant_configs.is_empty() { error!("No tenants found (missing env vars) - exit"); @@ -66,7 +69,10 @@ async fn main() { let jh_runner = tokio::spawn(async move { let mut interval = tokio::time::interval(bench_interval); for run_count in 1.. { - debug!("Invoke bench execution (#{}) on tenant <{}>..", run_count, tenant_config.tenant_id); + debug!( + "Invoke bench execution (#{}) on tenant <{}>..", + run_count, tenant_config.tenant_id + ); let benchrun_at = SystemTime::now(); let bench_config = bench::service_adapter::BenchConfig { @@ -76,22 +82,44 @@ async fn main() { }; if let Ok(postgres_session) = postgres_session.as_ref() { - upsert_benchrun_status(postgres_session, &bench_config, benchrun_at, BenchRunStatus::STARTED).await; + upsert_benchrun_status( + postgres_session, + &bench_config, + benchrun_at, + BenchRunStatus::STARTED, + ) + .await; } let metric = bench::service_adapter::bench_servicerunner( - &bench_config, tenant_config.rpc_addr.clone(), funded_payer.insecure_clone(), size_tx).await; + &bench_config, + tenant_config.rpc_addr.clone(), + funded_payer.insecure_clone(), + size_tx, + ) + .await; if let Ok(postgres_session) = postgres_session.as_ref() { - save_metrics_to_postgres(postgres_session, &bench_config, &metric, benchrun_at).await; + save_metrics_to_postgres(postgres_session, &bench_config, &metric, benchrun_at) + .await; } publish_metrics_on_prometheus(&bench_config, &metric).await; if let Ok(postgres_session) = postgres_session.as_ref() { - upsert_benchrun_status(postgres_session, &bench_config, benchrun_at, BenchRunStatus::FINISHED).await; + upsert_benchrun_status( + postgres_session, + &bench_config, + benchrun_at, + BenchRunStatus::FINISHED, + ) + .await; } - debug!("Bench execution (#{}) done in {:?}", run_count, benchrun_at.elapsed().unwrap()); + debug!( + "Bench execution (#{}) done in {:?}", + run_count, + benchrun_at.elapsed().unwrap() + ); interval.tick().await; } }); diff --git a/benchrunner-service/src/postgres/metrics_dbstore.rs b/benchrunner-service/src/postgres/metrics_dbstore.rs index 5ea10281..05a566d3 100644 --- a/benchrunner-service/src/postgres/metrics_dbstore.rs +++ b/benchrunner-service/src/postgres/metrics_dbstore.rs @@ -26,13 +26,10 @@ pub async fn upsert_benchrun_status( status: BenchRunStatus, ) { let values: &[&(dyn ToSql + Sync)] = - &[ - &bench_config.tenant, - &benchrun_at, - &status.to_db_string(), - ]; - let write_result = postgres_session.execute( - r#" + &[&bench_config.tenant, &benchrun_at, &status.to_db_string()]; + let write_result = postgres_session + .execute( + r#" INSERT INTO benchrunner.bench_runs ( tenant, ts, diff --git a/benchrunner-service/src/prometheus/metrics_prometheus.rs b/benchrunner-service/src/prometheus/metrics_prometheus.rs index 35528262..fc1bc8d5 100644 --- a/benchrunner-service/src/prometheus/metrics_prometheus.rs +++ b/benchrunner-service/src/prometheus/metrics_prometheus.rs @@ -1,8 +1,7 @@ use bench::metrics::Metric; -use prometheus::{GaugeVec, IntGaugeVec, opts, register_gauge_vec, register_int_gauge_vec}; use bench::service_adapter::BenchConfig; - +use prometheus::{opts, register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec}; // https://github.com/blockworks-foundation/lite-rpc/blob/production/bench/src/metrics.rs lazy_static::lazy_static! { @@ -17,8 +16,16 @@ lazy_static::lazy_static! { pub async fn publish_metrics_on_prometheus(bench_config: &BenchConfig, metric: &Metric) { let dimensions: &[&str] = &[&bench_config.tenant]; - PROM_TXS_SENT.with_label_values(dimensions).set(metric.txs_sent as i64); - PROM_TXS_CONFIRMED.with_label_values(dimensions).set(metric.txs_confirmed as i64); - PROM_TXS_UN_CONFIRMED.with_label_values(dimensions).set(metric.txs_un_confirmed as i64); - PROM_AVG_CONFIRM.with_label_values(dimensions).set(metric.average_confirmation_time_ms); + PROM_TXS_SENT + .with_label_values(dimensions) + .set(metric.txs_sent as i64); + PROM_TXS_CONFIRMED + .with_label_values(dimensions) + .set(metric.txs_confirmed as i64); + PROM_TXS_UN_CONFIRMED + .with_label_values(dimensions) + .set(metric.txs_un_confirmed as i64); + PROM_AVG_CONFIRM + .with_label_values(dimensions) + .set(metric.average_confirmation_time_ms); } diff --git a/benchrunner-service/src/prometheus/mod.rs b/benchrunner-service/src/prometheus/mod.rs index 3f999bac..906f95af 100644 --- a/benchrunner-service/src/prometheus/mod.rs +++ b/benchrunner-service/src/prometheus/mod.rs @@ -1,3 +1,2 @@ pub mod metrics_prometheus; pub mod prometheus_sync; - diff --git a/benchrunner-service/src/prometheus/prometheus_sync.rs b/benchrunner-service/src/prometheus/prometheus_sync.rs index a79348d7..ac0b47f1 100644 --- a/benchrunner-service/src/prometheus/prometheus_sync.rs +++ b/benchrunner-service/src/prometheus/prometheus_sync.rs @@ -38,7 +38,9 @@ impl PrometheusSync { Ok(()) } - pub fn sync(addr: impl ToSocketAddrs + Send + 'static) -> tokio::task::JoinHandle> { + pub fn sync( + addr: impl ToSocketAddrs + Send + 'static, + ) -> tokio::task::JoinHandle> { tokio::spawn(async move { let listener = TcpListener::bind(addr).await?;