Skip to content

Commit

Permalink
Merge branch 'main' into importer-component
Browse files Browse the repository at this point in the history
  • Loading branch information
gabriel-aranha-cw committed Aug 5, 2024
2 parents 235fef8 + 7a6372c commit c530357
Show file tree
Hide file tree
Showing 14 changed files with 383 additions and 350 deletions.
37 changes: 25 additions & 12 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Application configuration.
use std::env;
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::atomic::AtomicUsize;
Expand All @@ -23,10 +24,12 @@ use crate::eth::storage::ExternalRpcStorageConfig;
use crate::eth::storage::StratusStorageConfig;
use crate::ext::parse_duration;
use crate::infra::build_info;
use crate::infra::metrics::MetricsConfig;
use crate::infra::sentry::SentryConfig;
use crate::infra::tracing::TracingConfig;

/// Loads .env files according to the binary and environment.
pub fn load_dotenv() {
pub fn load_dotenv_file() {
// parse env manually because this is executed before clap
let env = match std::env::var("ENV") {
Ok(env) => Environment::from_str(env.as_str()),
Expand All @@ -49,6 +52,22 @@ pub fn load_dotenv() {
}
}

/// Applies env-var aliases because Clap does not support this feature.
pub fn load_env_aliases() {
fn env_alias(canonical: &'static str, alias: &'static str) {
if let Ok(value) = env::var(alias) {
env::set_var(canonical, value);
}
}
env_alias("EXECUTOR_CHAIN_ID", "CHAIN_ID");
env_alias("EXECUTOR_EVMS", "EVMS");
env_alias("EXECUTOR_EVMS", "NUM_EVMS");
env_alias("EXECUTOR_REJECT_NOT_CONTRACT", "REJECT_NOT_CONTRACT");
env_alias("EXECUTOR_STRATEGY", "STRATEGY");
env_alias("TRACING_LOG_FORMAT", "LOG_FORMAT");
env_alias("TRACING_URL", "TRACING_COLLECTOR_URL");
}

// -----------------------------------------------------------------------------
// Config: Common
// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -76,17 +95,11 @@ pub struct CommonConfig {
#[clap(flatten)]
pub tracing: TracingConfig,

/// Address where Prometheus metrics will be exposed.
#[arg(long = "metrics-exporter-address", env = "METRICS_EXPORTER_ADDRESS", default_value = "0.0.0.0:9000")]
pub metrics_exporter_address: SocketAddr,

// Address where Tokio Console GRPC server will be exposed.
#[arg(long = "tokio-console-address", env = "TRACING_TOKIO_CONSOLE_ADDRESS")]
pub tokio_console_address: Option<SocketAddr>,
#[clap(flatten)]
pub sentry: Option<SentryConfig>,

/// Sentry URL where error events will be pushed.
#[arg(long = "sentry-url", env = "SENTRY_URL")]
pub sentry_url: Option<String>,
#[clap(flatten)]
pub metrics: MetricsConfig,

/// Direct access to peers via IP address, why will be included on data propagation and leader election.
#[arg(long = "candidate-peers", env = "CANDIDATE_PEERS", value_delimiter = ',')]
Expand All @@ -109,7 +122,7 @@ impl WithCommonConfig for CommonConfig {

impl CommonConfig {
/// Initializes Tokio runtime.
pub fn init_runtime(&self) -> anyhow::Result<Runtime> {
pub fn init_tokio_runtime(&self) -> anyhow::Result<Runtime> {
println!(
"creating tokio runtime | async_threads={} blocking_threads={}",
self.num_async_threads, self.num_blocking_threads
Expand Down
12 changes: 6 additions & 6 deletions src/eth/rpc/rpc_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ use display_json::DebugAsJson;

#[derive(Parser, DebugAsJson, Clone, serde::Serialize)]
pub struct RpcServerConfig {
/// JSON-RPC binding address.
/// JSON-RPC server binding address.
#[arg(short = 'a', long = "address", env = "ADDRESS", default_value = "0.0.0.0:3000")]
pub address: SocketAddr,
pub rpc_address: SocketAddr,

/// JSON-RPC max active connections
/// JSON-RPC server max active connections
#[arg(long = "max-connections", env = "MAX_CONNECTIONS", default_value = "200")]
pub max_connections: u32,
pub rpc_max_connections: u32,

/// JSON-RPC max active subscriptions per client.
/// JSON-RPC server max active subscriptions per client.
#[arg(long = "max-subscriptions", env = "MAX_SUBSCRIPTIONS", default_value = "15")]
pub max_subscriptions: u32,
pub rpc_max_subscriptions: u32,
}
8 changes: 4 additions & 4 deletions src/eth/rpc/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ pub async fn serve_rpc(
chain_id: ChainId,
) -> anyhow::Result<()> {
const TASK_NAME: &str = "rpc-server";
tracing::info!(%rpc_config.address, %rpc_config.max_connections, "creating {}", TASK_NAME);
tracing::info!(%rpc_config.rpc_address, %rpc_config.rpc_max_connections, "creating {}", TASK_NAME);

// configure subscriptions
let subs = RpcSubscriptions::spawn(
Expand Down Expand Up @@ -121,8 +121,8 @@ pub async fn serve_rpc(
.set_rpc_middleware(rpc_middleware)
.set_http_middleware(http_middleware)
.set_id_provider(RandomStringIdProvider::new(8))
.max_connections(rpc_config.max_connections)
.build(rpc_config.address)
.max_connections(rpc_config.rpc_max_connections)
.build(rpc_config.rpc_address)
.await?;

let handle_rpc_server = server.start(module);
Expand Down Expand Up @@ -799,7 +799,7 @@ async fn eth_subscribe(params: Params<'_>, pending: PendingSubscriptionSink, ctx
};

// check subscription limits
if let Err(e) = ctx.subs.check_client_subscriptions(ctx.rpc_server.max_subscriptions, &client).await {
if let Err(e) = ctx.subs.check_client_subscriptions(ctx.rpc_server.rpc_max_subscriptions, &client).await {
drop(method_enter);
pending.reject(e).instrument(method_span).await;
return Ok(());
Expand Down
45 changes: 14 additions & 31 deletions src/globals.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::env;
use std::fmt::Debug;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
Expand All @@ -8,10 +7,9 @@ use sentry::ClientInitGuard;
use tokio::runtime::Runtime;
use tokio_util::sync::CancellationToken;

use crate::config::load_dotenv;
use crate::config;
use crate::config::WithCommonConfig;
use crate::ext::spawn_signal_handler;
use crate::infra;
use crate::infra::tracing::warn_task_cancellation;

// -----------------------------------------------------------------------------
Expand All @@ -36,57 +34,42 @@ where
where
T: clap::Parser + WithCommonConfig + Debug,
{
// .dotfile support
load_dotenv();

// apply env-var aliases
env_alias("EXECUTOR_CHAIN_ID", "CHAIN_ID");
env_alias("EXECUTOR_EVMS", "EVMS");
env_alias("EXECUTOR_EVMS", "NUM_EVMS");
env_alias("EXECUTOR_REJECT_NOT_CONTRACT", "REJECT_NOT_CONTRACT");
env_alias("EXECUTOR_STRATEGY", "STRATEGY");
env_alias("TRACING_LOG_FORMAT", "LOG_FORMAT");
env_alias("TRACING_URL", "TRACING_COLLECTOR_URL");
// env-var support
config::load_dotenv_file();
config::load_env_aliases();

// parse configuration
let config = T::parse();
let common = config.common();

// init tokio
let runtime = common.init_runtime().expect("failed to init tokio runtime");
let tokio = common.init_tokio_runtime().expect("failed to init tokio runtime");

// init tracing
runtime
.block_on(infra::init_tracing(&common.tracing, common.sentry_url.as_deref(), common.tokio_console_address))
.expect("failed to init tracing");
tokio.block_on(async {
common.tracing.init(&common.sentry).expect("failed to init tracing");
});

// init metrics
infra::init_metrics(common.metrics_exporter_address).expect("failed to init metrics");
// init observability services
common.metrics.init().expect("failed to init metrics");

// init sentry
let sentry_guard = common
.sentry_url
.sentry
.as_ref()
.map(|sentry_url| infra::init_sentry(sentry_url, common.env).expect("failed to init sentry"));
.map(|sentry_config| sentry_config.init(common.env).expect("failed to init sentry"));

// init signal handler
runtime.block_on(spawn_signal_handler()).expect("failed to init signal handlers");
tokio.block_on(spawn_signal_handler()).expect("failed to init signal handlers");

Self {
config,
runtime,
runtime: tokio,
_sentry_guard: sentry_guard,
}
}
}

/// Translates an aliased environment variable to a canonical one.
fn env_alias(canonical: &'static str, alias: &'static str) {
if let Ok(value) = env::var(alias) {
env::set_var(canonical, value);
}
}

// -----------------------------------------------------------------------------
// Global state
// -----------------------------------------------------------------------------
Expand Down
67 changes: 67 additions & 0 deletions src/infra/metrics/metrics_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use std::net::SocketAddr;
use std::stringify;

use clap::Parser;
use display_json::DebugAsJson;

use crate::infra::metrics::metrics_for_consensus;
use crate::infra::metrics::metrics_for_evm;
use crate::infra::metrics::metrics_for_executor;
use crate::infra::metrics::metrics_for_importer_online;
use crate::infra::metrics::metrics_for_json_rpc;
use crate::infra::metrics::metrics_for_rocks;
use crate::infra::metrics::metrics_for_storage_read;
use crate::infra::metrics::metrics_for_storage_write;

#[derive(DebugAsJson, Clone, Parser, serde::Serialize)]
pub struct MetricsConfig {
/// Metrics exporter binding address.
#[arg(long = "metrics-exporter-address", env = "METRICS_EXPORTER_ADDRESS", default_value = "0.0.0.0:9000")]
pub metrics_exporter_address: SocketAddr,
}

impl MetricsConfig {
/// Inits application global metrics exporter.
pub fn init(&self) -> anyhow::Result<()> {
tracing::info!(address = %self.metrics_exporter_address, "creating metrics exporter");

// get metric definitions
let mut metrics = Vec::new();
metrics.extend(metrics_for_importer_online());
metrics.extend(metrics_for_json_rpc());
metrics.extend(metrics_for_executor());
metrics.extend(metrics_for_evm());
metrics.extend(metrics_for_storage_read());
metrics.extend(metrics_for_storage_write());
metrics.extend(metrics_for_rocks());
metrics.extend(metrics_for_consensus());

// init metric exporter
init_metrics_exporter(self.metrics_exporter_address);

// init metric description (always after provider started)
for metric in &metrics {
metric.register_description();
}

Ok(())
}
}

#[cfg(feature = "metrics")]
fn init_metrics_exporter(address: SocketAddr) {
tracing::info!(%address, "creating prometheus metrics exporter");
if let Err(e) = metrics_exporter_prometheus::PrometheusBuilder::new()
.add_global_label("service", crate::infra::build_info::service_name())
.add_global_label("version", crate::infra::build_info::version())
.with_http_listener(address)
.install()
{
tracing::error!(reason = ?e, %address, "failed to create metrics exporter");
}
}

#[cfg(not(feature = "metrics"))]
fn init_metrics_exporter(_: SocketAddr) {
tracing::info!("creating noop metrics exporter");
}
57 changes: 0 additions & 57 deletions src/infra/metrics/metrics_init.rs

This file was deleted.

4 changes: 2 additions & 2 deletions src/infra/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
mod metrics_config;
mod metrics_definitions;
mod metrics_init;
mod metrics_macros;
mod metrics_types;

use std::time::Instant;

pub use metrics_config::MetricsConfig;
pub use metrics_definitions::*;
pub use metrics_init::init_metrics;
pub use metrics_types::*;

/// Track metrics execution starting instant.
Expand Down
3 changes: 0 additions & 3 deletions src/infra/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,3 @@ pub mod sentry;
pub mod tracing;

pub use blockchain_client::BlockchainClient;
pub use metrics::init_metrics;
pub use sentry::init_sentry;
pub use tracing::init_tracing;
24 changes: 0 additions & 24 deletions src/infra/sentry.rs

This file was deleted.

3 changes: 3 additions & 0 deletions src/infra/sentry/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod sentry_config;

pub use sentry_config::SentryConfig;
Loading

0 comments on commit c530357

Please sign in to comment.