From 67ec903eed2f5a0b9f078faea79feb681f1b1929 Mon Sep 17 00:00:00 2001 From: Renato Dinhani <101204870+dinhani-cw@users.noreply.github.com> Date: Wed, 29 May 2024 21:12:54 -0300 Subject: [PATCH] feat: opentelemetry config (#952) --- config/stratus.env.local | 2 ++ docker-compose.yaml | 4 +-- justfile | 7 +++-- src/config.rs | 4 +-- src/eth/block_miner.rs | 10 ++----- src/eth/rpc/rpc_server.rs | 55 +++++++++++++++++++++++++++++---------- src/globals.rs | 5 +--- src/infra/tracing.rs | 21 +++++++++------ 8 files changed, 66 insertions(+), 42 deletions(-) diff --git a/config/stratus.env.local b/config/stratus.env.local index 476b92e15..b567252a6 100644 --- a/config/stratus.env.local +++ b/config/stratus.env.local @@ -1,3 +1,5 @@ +RUST_LOG=info,stratus::eth::rpc::rpc_subscriptions::rx=off,stratus::eth::consensus::consensus_kube::rx=off + CHAIN_ID=2008 EVMS=1 diff --git a/docker-compose.yaml b/docker-compose.yaml index 7b1bdab34..206c6f4c7 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -36,7 +36,7 @@ services: - "--log.level=debug" jaeger: - image: jaegertracing/all-in-one:1.56 + image: jaegertracing/all-in-one:1.57 environment: SPAN_STORAGE_TYPE: badger BADGER_EPHEMERAL: false @@ -45,8 +45,8 @@ services: volumes: - /tmp:/badger ports: - - 16686:16686 - 4317:4317 + - 16686:16686 volumes: postgres-data: diff --git a/justfile b/justfile index ab4bf4da6..16c07299a 100644 --- a/justfile +++ b/justfile @@ -3,7 +3,6 @@ import '.justfile_helpers' # _lint, _outdated # Environment variables automatically passed to executed commands. export CARGO_PROFILE_RELEASE_DEBUG := env("CARGO_PROFILE_RELEASE_DEBUG", "1") export RUST_BACKTRACE := "0" -export RUST_LOG := env("RUST_LOG", "stratus=debug,rpc_downloader=info,importer_offline=info,importer_online=info,state_validator=info,relayer=debug") # Global arguments that can be passed to receipts. feature_flags := "dev," + env("FEATURES", "") @@ -52,7 +51,7 @@ check: # Stratus: Check all features individually using cargo hack check-features *args="": - command -v cargo-hack >/dev/null 2>&1 || { cargo install cargo-hack; } + command -v cargo-hack >/dev/null 2>&1 || { cargo install cargo-hack; } cargo hack check --each-feature --keep-going {{args}} # Stratus: Clean build artifacts @@ -376,7 +375,7 @@ e2e-relayer-external-up: cargo run --release --bin stratus --no-default-features --features "dev,rocks,kubernetes" -- --enable-test-accounts --block-mode 1s --perm-storage=rocks --relayer-db-url "postgres://postgres:123@localhost:5432/stratus" --relayer-db-connections 5 --relayer-db-timeout 1s -a 0.0.0.0:3000 > e2e_logs/stratus.log & # Wait for Stratus to start - wait-service --tcp 0.0.0.0:3000 -t {{ wait_service_timeout }} -- echo + wait-service --tcp 0.0.0.0:3000 -t {{ wait_service_timeout }} -- echo # Install npm and start hardhat node in the e2e directory if [ -d e2e ]; then @@ -397,7 +396,7 @@ e2e-relayer-external-up: if [ -d e2e ]; then ( cd e2e - npx hardhat test test/relayer/*.test.ts --network stratus > ../e2e_logs/test.log + npx hardhat test test/relayer/*.test.ts --network stratus > ../e2e_logs/test.log ) fi diff --git a/src/config.rs b/src/config.rs index 052059b5f..4860aabff 100644 --- a/src/config.rs +++ b/src/config.rs @@ -96,8 +96,8 @@ pub struct CommonConfig { pub tracing_url: Option, /// Enables tokio-console. - #[arg(long = "disable-tokio-console", env = "DISABLE_TOKIO_CONSOLE", default_value = "false")] - pub disable_tokio_console: bool, + #[arg(long = "enable-tokio-console", env = "ENABLE_TOKIO_CONSOLE", default_value = "true")] + pub enable_tokio_console: bool, } impl WithCommonConfig for CommonConfig { diff --git a/src/eth/block_miner.rs b/src/eth/block_miner.rs index 0a9d8b8e8..108477102 100644 --- a/src/eth/block_miner.rs +++ b/src/eth/block_miner.rs @@ -233,15 +233,9 @@ impl BlockMiner { // notify for log in block_logs { - let tx_hash = log.block_hash; - let log_index = log.log_index; - if self.notifier_logs.send(log).is_err() { - tracing::error!(number = %block_number, hash = %tx_hash, index = %log_index, "failed to send transaction log notification"); - }; + let _ = self.notifier_logs.send(log); } - if self.notifier_blocks.send(block_header).is_err() { - tracing::error!(number = %block_number, "failed to send new block notification"); - }; + let _ = self.notifier_blocks.send(block_header); Ok(()) } diff --git a/src/eth/rpc/rpc_server.rs b/src/eth/rpc/rpc_server.rs index 1cf3c710c..dd0786bbd 100644 --- a/src/eth/rpc/rpc_server.rs +++ b/src/eth/rpc/rpc_server.rs @@ -219,7 +219,10 @@ async fn debug_read_all_slots(params: Params<'_>, ctx: Arc) -> anyho Ok(serde_json::to_value(ctx.storage.read_all_slots(&address).await?).expect_infallible()) } +// ----------------------------------------------------------------------------- // Status +// ----------------------------------------------------------------------------- + async fn net_listening(params: Params<'_>, arc: Arc) -> anyhow::Result { stratus_readiness(params, arc).await } @@ -236,34 +239,45 @@ async fn stratus_liveness(_: Params<'_>, _: Arc) -> anyhow::Result, ctx: Arc) -> String { ctx.chain_id.to_string() } +#[tracing::instrument(name = "rpc::eth_chainId", parent = None, skip_all)] async fn eth_chain_id(_: Params<'_>, ctx: Arc) -> String { hex_num(ctx.chain_id) } +#[tracing::instrument(name = "rpc::web3_clientVersion", parent = None, skip_all)] async fn web3_client_version(_: Params<'_>, ctx: Arc) -> String { ctx.client_version.to_owned() } +// ----------------------------------------------------------------------------- // Gas +// ----------------------------------------------------------------------------- +#[tracing::instrument(name = "rpc::eth_gasPrice", parent = None, skip_all)] async fn eth_gas_price(_: Params<'_>, _: Arc) -> String { hex_zero() } +// ----------------------------------------------------------------------------- // Block -#[tracing::instrument(skip_all)] +// ----------------------------------------------------------------------------- + +#[tracing::instrument(name = "rpc::eth_blockNumber", parent = None, skip_all)] async fn eth_block_number(_params: Params<'_>, ctx: Arc) -> anyhow::Result { let number = ctx.storage.read_mined_block_number().await?; Ok(serde_json::to_value(number).expect_infallible()) } -#[tracing::instrument(skip_all)] +#[tracing::instrument(name = "rpc::eth_getBlock", parent = None, skip_all)] async fn eth_get_block_by_selector(params: Params<'_>, ctx: Arc) -> anyhow::Result { let (params, block_selection) = next_rpc_param::(params.sequence())?; let (_, full_transactions) = next_rpc_param::(params)?; @@ -277,13 +291,16 @@ async fn eth_get_block_by_selector(params: Params<'_>, ctx: Arc) -> } } +#[tracing::instrument(name = "rpc::eth_getUncleByBlockHashAndIndex", parent = None, skip_all)] async fn eth_get_uncle_by_block_hash_and_index(_params: Params<'_>, _ctx: Arc) -> anyhow::Result { Ok(JsonValue::Null) } +// ----------------------------------------------------------------------------- // Transaction +// ----------------------------------------------------------------------------- -#[tracing::instrument(skip_all)] +#[tracing::instrument(name = "rpc::eth_getTransactionCount", parent = None, skip_all)] async fn eth_get_transaction_count(params: Params<'_>, ctx: Arc) -> anyhow::Result { let (params, address) = next_rpc_param::
(params.sequence())?; let (_, block_selection) = next_rpc_param_or_default::(params)?; @@ -293,7 +310,7 @@ async fn eth_get_transaction_count(params: Params<'_>, ctx: Arc) -> Ok(hex_num(account.nonce)) } -#[tracing::instrument(skip_all)] +#[tracing::instrument(name = "rpc::eth_getTransactionByHash", parent = None, skip_all)] async fn eth_get_transaction_by_hash(params: Params<'_>, ctx: Arc) -> anyhow::Result { let (_, hash) = next_rpc_param::(params.sequence())?; let mined = ctx.storage.read_mined_transaction(&hash).await?; @@ -304,7 +321,7 @@ async fn eth_get_transaction_by_hash(params: Params<'_>, ctx: Arc) - } } -#[tracing::instrument(skip_all)] +#[tracing::instrument(name = "rpc::eth_getTransactionReceipt", parent = None, skip_all)] async fn eth_get_transaction_receipt(params: Params<'_>, ctx: Arc) -> anyhow::Result { let (_, hash) = next_rpc_param::(params.sequence())?; match ctx.storage.read_mined_transaction(&hash).await? { @@ -313,7 +330,7 @@ async fn eth_get_transaction_receipt(params: Params<'_>, ctx: Arc) - } } -#[tracing::instrument(skip_all)] +#[tracing::instrument(name = "rpc::eth_estimateGas", parent = None, skip_all)] async fn eth_estimate_gas(params: Params<'_>, ctx: Arc) -> anyhow::Result { let (_, call) = next_rpc_param::(params.sequence())?; @@ -332,7 +349,7 @@ async fn eth_estimate_gas(params: Params<'_>, ctx: Arc) -> anyhow::R } } -#[tracing::instrument(skip_all)] +#[tracing::instrument(name = "rpc::eth_Call", parent = None, skip_all)] async fn eth_call(params: Params<'_>, ctx: Arc) -> anyhow::Result { let (params, call) = next_rpc_param::(params.sequence())?; let (_, block_selection) = next_rpc_param_or_default::(params)?; @@ -350,7 +367,7 @@ async fn eth_call(params: Params<'_>, ctx: Arc) -> anyhow::Result, ctx: Arc) -> anyhow::Result { let (_, data) = next_rpc_param::(params.sequence())?; let transaction = parse_rpc_rlp::(&data)?; @@ -371,8 +388,11 @@ async fn eth_send_raw_transaction(params: Params<'_>, ctx: Arc) -> a } } +// ----------------------------------------------------------------------------- // Logs -#[tracing::instrument(skip_all)] +// ----------------------------------------------------------------------------- + +#[tracing::instrument(name = "rpc::eth_getLogs", parent = None, skip_all)] async fn eth_get_logs(params: Params<'_>, ctx: Arc) -> anyhow::Result { let (_, filter_input) = next_rpc_param::(params.sequence())?; let filter = filter_input.parse(&ctx.storage).await?; @@ -381,9 +401,11 @@ async fn eth_get_logs(params: Params<'_>, ctx: Arc) -> anyhow::Resul Ok(JsonValue::Array(logs.into_iter().map(|x| x.to_json_rpc_log()).collect())) } +// ----------------------------------------------------------------------------- // Account +// ----------------------------------------------------------------------------- -#[tracing::instrument(skip_all)] +#[tracing::instrument(name = "rpc::eth_getBalance", parent = None, skip_all)] async fn eth_get_balance(params: Params<'_>, ctx: Arc) -> anyhow::Result { let (params, address) = next_rpc_param::
(params.sequence())?; let (_, block_selection) = next_rpc_param_or_default::(params)?; @@ -394,7 +416,7 @@ async fn eth_get_balance(params: Params<'_>, ctx: Arc) -> anyhow::Re Ok(hex_num(account.balance)) } -#[tracing::instrument(skip_all)] +#[tracing::instrument(name = "rpc::eth_getCode", parent = None, skip_all)] async fn eth_get_code(params: Params<'_>, ctx: Arc) -> anyhow::Result { let (params, address) = next_rpc_param::
(params.sequence())?; let (_, block_selection) = next_rpc_param_or_default::(params)?; @@ -405,9 +427,11 @@ async fn eth_get_code(params: Params<'_>, ctx: Arc) -> anyhow::Resul Ok(account.bytecode.map(hex_data).unwrap_or_else(hex_null)) } -// Subscription +// ----------------------------------------------------------------------------- +// Subscriptions +// ----------------------------------------------------------------------------- -#[tracing::instrument(skip_all)] +#[tracing::instrument(name = "rpc::eth_subscribe", parent = None, skip_all)] async fn eth_subscribe(params: Params<'_>, pending: PendingSubscriptionSink, ctx: Arc) -> impl IntoSubscriptionCloseResponse { let (params, kind) = next_rpc_param::(params.sequence())?; match kind.deref() { @@ -436,8 +460,11 @@ async fn eth_subscribe(params: Params<'_>, pending: PendingSubscriptionSink, ctx Ok(()) } +// ----------------------------------------------------------------------------- // Storage -#[tracing::instrument(skip_all)] +// ----------------------------------------------------------------------------- + +#[tracing::instrument(name = "rpc::eth_getStorageAt", parent = None, skip_all)] async fn eth_get_storage_at(params: Params<'_>, ctx: Arc) -> anyhow::Result { let (params, address) = next_rpc_param::
(params.sequence())?; let (params, index) = next_rpc_param::(params)?; diff --git a/src/globals.rs b/src/globals.rs index f6dc5b185..05e29f528 100644 --- a/src/globals.rs +++ b/src/globals.rs @@ -56,10 +56,7 @@ where runtime.block_on(spawn_signal_handler())?; // init tracing - runtime.block_on(infra::init_tracing( - config.common().tracing_url.as_ref(), - !config.common().disable_tokio_console, - )); + runtime.block_on(infra::init_tracing(config.common().tracing_url.as_ref(), config.common().enable_tokio_console)); Ok(Self { config, diff --git a/src/infra/tracing.rs b/src/infra/tracing.rs index 218503d1c..7e1b16a96 100644 --- a/src/infra/tracing.rs +++ b/src/infra/tracing.rs @@ -46,7 +46,7 @@ pub async fn init_tracing(url: Option<&String>, enable_console: bool) { let opentelemetry_layer = match url { Some(url) => { println!("tracing registry enabling opentelemetry exporter | url={}", url); - let tracer_config = trace::config().with_resource(Resource::new(vec![KeyValue::new("service", "stratus")])); + let tracer_config = trace::config().with_resource(Resource::new(vec![KeyValue::new("service.name", "stratus")])); let tracer_exporter = opentelemetry_otlp::new_exporter().tonic().with_endpoint(url); let tracer = opentelemetry_otlp::new_pipeline() @@ -65,14 +65,11 @@ pub async fn init_tracing(url: Option<&String>, enable_console: bool) { } }; - // init registry - let registry = tracing_subscriber::registry().with(stdout_layer).with(opentelemetry_layer); - - if enable_console { + // init tokio console registry + let tokio_console_layer = if enable_console { // configure tokio console layer println!("tracing registry enabling tokio console"); let (console_layer, console_server) = ConsoleLayer::builder().with_default_env().build(); - registry.with(console_layer).init(); // init tokio console server spawn_named("console::grpc-server", async move { @@ -80,9 +77,17 @@ pub async fn init_tracing(url: Option<&String>, enable_console: bool) { tracing::error!(reason = ?e, "failed to create tokio-console server"); }; }); + Some(console_layer) } else { - registry.init(); - } + None + }; + + // init registry + tracing_subscriber::registry() + .with(stdout_layer) + .with(opentelemetry_layer) + .with(tokio_console_layer) + .init(); } /// Emits an info message that a task was spawned to backgroud.