Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Mar 18, 2024
1 parent a672d39 commit 233a9e2
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 50 deletions.
42 changes: 31 additions & 11 deletions benchrunner-service/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,49 @@ pub fn get_funded_payer_from_env() -> Keypair {
}

pub fn read_tenant_configs(env_vars: Vec<(String, String)>) -> Vec<TenantConfig> {
let map = env_vars.iter()
let map = env_vars
.iter()
.filter(|(k, _)| k.starts_with("TENANT"))
.into_group_map_by(|(k, v)| {
let tenant_id = k.split('_').nth(0).unwrap().replace("TENANT", "");
tenant_id.to_string()
});

let values = map.iter().map(|(k, v)| {
TenantConfig {
tenant_id: v.iter().find(|(k, _)| k.ends_with("_ID")).expect("need ID").1.to_string(),
rpc_addr: v.iter().find(|(k, _)| k.ends_with("_RPC_ADDR")).expect("need RPC_ADDR").1.to_string(),
}
}).collect::<Vec<TenantConfig>>();
let values = map
.iter()
.map(|(k, v)| TenantConfig {
tenant_id: v
.iter()
.find(|(k, _)| k.ends_with("_ID"))
.expect("need ID")
.1
.to_string(),
rpc_addr: v
.iter()
.find(|(k, _)| k.ends_with("_RPC_ADDR"))
.expect("need RPC_ADDR")
.1
.to_string(),
})
.collect::<Vec<TenantConfig>>();

values
}

#[test]
fn test_env_vars() {
let env_vars = vec![(String::from("TENANT1_ID"), String::from("solana-rpc")),
(String::from("TENANT1_RPC_ADDR"), String::from("http://localhost:8899")),
(String::from("TENANT2_ID"), String::from("lite-rpc")),
(String::from("TENANT2_RPC_ADDR"), String::from("http://localhost:8890"))];
let env_vars = vec![
(String::from("TENANT1_ID"), String::from("solana-rpc")),
(
String::from("TENANT1_RPC_ADDR"),
String::from("http://localhost:8899"),
),
(String::from("TENANT2_ID"), String::from("lite-rpc")),
(
String::from("TENANT2_RPC_ADDR"),
String::from("http://localhost:8890"),
),
];
let tenant_configs = read_tenant_configs(env_vars);

assert_eq!(tenant_configs.len(), 2);
Expand Down
76 changes: 52 additions & 24 deletions benchrunner-service/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,32 @@ mod postgres;
mod postgres_session;
mod prometheus;

use std::net::SocketAddr;
use std::ops::AddAssign;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use crate::args::{get_funded_payer_from_env, read_tenant_configs};
use crate::cli::Args;
use crate::postgres::metrics_dbstore::{
save_metrics_to_postgres, upsert_benchrun_status, BenchRunStatus,
};
use crate::postgres_session::{PostgresSession, PostgresSessionConfig};
use crate::prometheus::metrics_prometheus::publish_metrics_on_prometheus;
use crate::prometheus::prometheus_sync::PrometheusSync;
use bench::create_memo_tx;
use bench::helpers::BenchHelper;
use bench::metrics::{Metric, TxMetricData};
use bench::service_adapter::BenchConfig;
use clap::Parser;
use futures_util::future::join_all;
use itertools::Itertools;
use log::{debug, error, info, trace, warn};
use solana_sdk::signature::Keypair;
use std::net::SocketAddr;
use std::ops::AddAssign;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use tokio::join;
use tokio::sync::mpsc::Sender;
use tokio_postgres::types::ToSql;
use tracing_subscriber::filter::FilterExt;
use bench::create_memo_tx;
use bench::helpers::BenchHelper;
use bench::metrics::{Metric, TxMetricData};
use bench::service_adapter::BenchConfig;
use crate::args::{get_funded_payer_from_env, read_tenant_configs};
use crate::cli::Args;
use crate::postgres::metrics_dbstore::{BenchRunStatus, save_metrics_to_postgres, upsert_benchrun_status};
use crate::postgres_session::{PostgresSession, PostgresSessionConfig};
use crate::prometheus::metrics_prometheus::publish_metrics_on_prometheus;
use crate::prometheus::prometheus_sync::PrometheusSync;



#[tokio::main]
async fn main() {
Expand All @@ -49,7 +49,10 @@ async fn main() {
let tenant_configs = read_tenant_configs(std::env::vars().collect::<Vec<(String, String)>>());

info!("Start running benchmarks every {:?}", bench_interval);
info!("Found tenants: {}", tenant_configs.iter().map(|tc| &tc.tenant_id).join(", "));
info!(
"Found tenants: {}",
tenant_configs.iter().map(|tc| &tc.tenant_id).join(", ")
);

if tenant_configs.is_empty() {
error!("No tenants found (missing env vars) - exit");
Expand All @@ -66,7 +69,10 @@ async fn main() {
let jh_runner = tokio::spawn(async move {
let mut interval = tokio::time::interval(bench_interval);
for run_count in 1.. {
debug!("Invoke bench execution (#{}) on tenant <{}>..", run_count, tenant_config.tenant_id);
debug!(
"Invoke bench execution (#{}) on tenant <{}>..",
run_count, tenant_config.tenant_id
);
let benchrun_at = SystemTime::now();

let bench_config = bench::service_adapter::BenchConfig {
Expand All @@ -76,22 +82,44 @@ async fn main() {
};

if let Ok(postgres_session) = postgres_session.as_ref() {
upsert_benchrun_status(postgres_session, &bench_config, benchrun_at, BenchRunStatus::STARTED).await;
upsert_benchrun_status(
postgres_session,
&bench_config,
benchrun_at,
BenchRunStatus::STARTED,
)
.await;
}

let metric = bench::service_adapter::bench_servicerunner(
&bench_config, tenant_config.rpc_addr.clone(), funded_payer.insecure_clone(), size_tx).await;
&bench_config,
tenant_config.rpc_addr.clone(),
funded_payer.insecure_clone(),
size_tx,
)
.await;

if let Ok(postgres_session) = postgres_session.as_ref() {
save_metrics_to_postgres(postgres_session, &bench_config, &metric, benchrun_at).await;
save_metrics_to_postgres(postgres_session, &bench_config, &metric, benchrun_at)
.await;
}

publish_metrics_on_prometheus(&bench_config, &metric).await;

if let Ok(postgres_session) = postgres_session.as_ref() {
upsert_benchrun_status(postgres_session, &bench_config, benchrun_at, BenchRunStatus::FINISHED).await;
upsert_benchrun_status(
postgres_session,
&bench_config,
benchrun_at,
BenchRunStatus::FINISHED,
)
.await;
}
debug!("Bench execution (#{}) done in {:?}", run_count, benchrun_at.elapsed().unwrap());
debug!(
"Bench execution (#{}) done in {:?}",
run_count,
benchrun_at.elapsed().unwrap()
);
interval.tick().await;
}
});
Expand Down
11 changes: 4 additions & 7 deletions benchrunner-service/src/postgres/metrics_dbstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,10 @@ pub async fn upsert_benchrun_status(
status: BenchRunStatus,
) {
let values: &[&(dyn ToSql + Sync)] =
&[
&bench_config.tenant,
&benchrun_at,
&status.to_db_string(),
];
let write_result = postgres_session.execute(
r#"
&[&bench_config.tenant, &benchrun_at, &status.to_db_string()];
let write_result = postgres_session
.execute(
r#"
INSERT INTO benchrunner.bench_runs (
tenant,
ts,
Expand Down
19 changes: 13 additions & 6 deletions benchrunner-service/src/prometheus/metrics_prometheus.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use bench::metrics::Metric;

use prometheus::{GaugeVec, IntGaugeVec, opts, register_gauge_vec, register_int_gauge_vec};
use bench::service_adapter::BenchConfig;

use prometheus::{opts, register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec};

// https://github.com/blockworks-foundation/lite-rpc/blob/production/bench/src/metrics.rs
lazy_static::lazy_static! {
Expand All @@ -17,8 +16,16 @@ lazy_static::lazy_static! {
pub async fn publish_metrics_on_prometheus(bench_config: &BenchConfig, metric: &Metric) {
let dimensions: &[&str] = &[&bench_config.tenant];

PROM_TXS_SENT.with_label_values(dimensions).set(metric.txs_sent as i64);
PROM_TXS_CONFIRMED.with_label_values(dimensions).set(metric.txs_confirmed as i64);
PROM_TXS_UN_CONFIRMED.with_label_values(dimensions).set(metric.txs_un_confirmed as i64);
PROM_AVG_CONFIRM.with_label_values(dimensions).set(metric.average_confirmation_time_ms);
PROM_TXS_SENT
.with_label_values(dimensions)
.set(metric.txs_sent as i64);
PROM_TXS_CONFIRMED
.with_label_values(dimensions)
.set(metric.txs_confirmed as i64);
PROM_TXS_UN_CONFIRMED
.with_label_values(dimensions)
.set(metric.txs_un_confirmed as i64);
PROM_AVG_CONFIRM
.with_label_values(dimensions)
.set(metric.average_confirmation_time_ms);
}
1 change: 0 additions & 1 deletion benchrunner-service/src/prometheus/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
pub mod metrics_prometheus;
pub mod prometheus_sync;

4 changes: 3 additions & 1 deletion benchrunner-service/src/prometheus/prometheus_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ impl PrometheusSync {
Ok(())
}

pub fn sync(addr: impl ToSocketAddrs + Send + 'static) -> tokio::task::JoinHandle<anyhow::Result<()>> {
pub fn sync(
addr: impl ToSocketAddrs + Send + 'static,
) -> tokio::task::JoinHandle<anyhow::Result<()>> {
tokio::spawn(async move {
let listener = TcpListener::bind(addr).await?;

Expand Down

0 comments on commit 233a9e2

Please sign in to comment.