Skip to content

Commit

Permalink
feat: improving tracing configuration and adding tokio-console (#881)
Browse files Browse the repository at this point in the history
  • Loading branch information
dinhani-cw authored May 21, 2024
1 parent a138da1 commit 842ef05
Show file tree
Hide file tree
Showing 13 changed files with 236 additions and 77 deletions.
2 changes: 2 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[build]
rustflags = ["--cfg", "tokio_unstable"]
119 changes: 113 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,16 @@ reqwest = { version = "=0.12.4", features = ["json"] }
tower = "=0.4.13"

# observability
console-subscriber = "=0.2.0"
metrics = { version = "=0.21.1", optional = true }
metrics-exporter-prometheus = { version = "=0.12.2", optional = true }
tracing = { version = "=0.1.40", features = ["attributes"] }
tracing-subscriber = { version = "=0.3.18", features = ["env-filter", "json"] }
tracing-opentelemetry = "=0.23.0"
sentry = "=0.32.3"
opentelemetry = "=0.22.0"
opentelemetry-otlp = "=0.15.0"
opentelemetry_sdk = { version = "=0.22.1", features = ["rt-tokio"] }
opentelemetry-otlp = "=0.15.0"
sentry = "=0.32.3"
tracing = { version = "=0.1.40", features = ["attributes"] }
tracing-opentelemetry = "=0.23.0"
tracing-subscriber = { version = "=0.3.18", features = ["env-filter", "json"] }


# storage
Expand Down
1 change: 1 addition & 0 deletions docker/Dockerfile.run_stratus
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ RUN apt-get install -y libclang-dev cmake

ENV CARGO_PROFILE_RELEASE_DEBUG=1
ENV JSON_LOGS=1
ENV NO_COLOR=1
RUN cargo build --release --features metrics

# Runtime
Expand Down
1 change: 1 addition & 0 deletions docker/Dockerfile.run_with_importer
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ RUN apt-get install -y libclang-dev cmake

ENV CARGO_PROFILE_RELEASE_DEBUG=1
ENV JSON_LOGS=1
ENV NO_COLOR=1
RUN cargo build --release --bin run-with-importer --features metrics,rocks

# Runtime
Expand Down
1 change: 1 addition & 0 deletions docker/Dockerfile.run_with_importer_postgres
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ RUN apt-get install -y libclang-dev cmake

ENV CARGO_PROFILE_RELEASE_DEBUG=1
ENV JSON_LOGS=1
ENV NO_COLOR=1
RUN cargo build --release --bin run-with-importer --features metrics

# Runtime
Expand Down
10 changes: 6 additions & 4 deletions src/bin/importer_online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use stratus::eth::primitives::Hash;
use stratus::eth::storage::StratusStorage;
use stratus::eth::BlockMiner;
use stratus::eth::Executor;
use stratus::ext::warn_task_cancellation;
use stratus::ext::warn_task_tx_closed;
#[cfg(feature = "metrics")]
use stratus::infra::metrics;
use stratus::infra::BlockchainClient;
Expand Down Expand Up @@ -125,7 +127,7 @@ async fn start_block_executor(
) {
while let Some((block, receipts)) = backlog_rx.recv().await {
if cancellation.is_cancelled() {
tracing::warn!("exiting block-executor because cancellation");
warn_task_cancellation("block-executor");
break;
}

Expand All @@ -151,7 +153,7 @@ async fn start_block_executor(
metrics::inc_import_online_mined_block(start.elapsed());
}
}
tracing::warn!("exiting block-executor because backlog channel was closed by the other side");
warn_task_tx_closed("block-executor");
}

// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -182,7 +184,7 @@ async fn start_number_fetcher(chain: Arc<BlockchainClient>, cancellation: Cancel
loop {
// check cancellation
if cancellation.is_cancelled() {
tracing::warn!("exiting number-fetcher because cancellation");
warn_task_cancellation("number-fetcher");
break;
}
tracing::info!("fetching current block number");
Expand Down Expand Up @@ -255,7 +257,7 @@ async fn start_block_fetcher(
) {
loop {
if cancellation.is_cancelled() {
tracing::warn!("exiting block-fetcher because cancellation");
warn_task_cancellation("block-fetcher");
break;
}

Expand Down
3 changes: 2 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::eth::Consensus;
use crate::eth::EvmTask;
use crate::eth::Executor;
use crate::eth::TransactionRelayer;
use crate::ext::warn_task_tx_closed;
use crate::infra::BlockchainClient;

/// Loads .env files according to the binary and environment.
Expand Down Expand Up @@ -205,7 +206,7 @@ impl ExecutorConfig {
tracing::error!(reason = ?e, "failed to send evm execution result");
};
}
tracing::warn!("stopping evm thread because tx channel was closed");
warn_task_tx_closed("evm thread");
})
.expect("spawning evm threads should not fail");
}
Expand Down
25 changes: 14 additions & 11 deletions src/eth/rpc/rpc_subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ use crate::eth::primitives::Hash;
use crate::eth::primitives::LogFilter;
use crate::eth::primitives::LogMined;
use crate::ext::not;
use crate::ext::spawn_named;
use crate::ext::warn_task_cancellation;
use crate::ext::warn_task_tx_closed;
use crate::if_else;
use crate::infra::metrics;

Expand Down Expand Up @@ -58,10 +61,10 @@ impl RpcSubscriptions {
fn spawn_subscriptions_cleaner(subs: Arc<RpcSubscriptionsConnected>, cancellation: CancellationToken) -> JoinHandle<anyhow::Result<()>> {
tracing::info!("spawning rpc subscriptions cleaner");

tokio::spawn(async move {
spawn_named("rpc::sub::cleaner", async move {
loop {
if cancellation.is_cancelled() {
tracing::warn!("exiting rpc subscription cleaner because of cancellation");
warn_task_cancellation("rpc subscription cleaner");
return Ok(());
}

Expand Down Expand Up @@ -89,15 +92,15 @@ impl RpcSubscriptions {
) -> JoinHandle<anyhow::Result<()>> {
tracing::info!("spawning rpc newPendingTransactions notifier");

tokio::spawn(async move {
spawn_named("rpc::sub::newPendingTransactions", async move {
loop {
if cancellation.is_cancelled() {
tracing::warn!("exiting rpc newPendingTransactions notifier because of cancellation");
warn_task_cancellation("rpc newPendingTransactions notifier");
return Ok(());
}

let Ok(hash) = rx.recv().await else {
tracing::warn!("stopping newPendingTransactions notifier because tx channel was closed");
warn_task_tx_closed("rpc newPendingTransactions notifier");
break;
};

Expand All @@ -116,15 +119,15 @@ impl RpcSubscriptions {
) -> JoinHandle<anyhow::Result<()>> {
tracing::info!("spawning rpc newHeads notifier");

tokio::spawn(async move {
spawn_named("rpc::sub::newHeads", async move {
loop {
if cancellation.is_cancelled() {
tracing::warn!("exiting rpc newHeads notifier because of cancellation");
warn_task_cancellation("rpc newHeads notifier");
return Ok(());
}

let Ok(header) = rx.recv().await else {
tracing::warn!("stopping newHeads notifier because tx channel was closed");
warn_task_tx_closed("rpc newHeads notifier");
break;
};

Expand All @@ -143,15 +146,15 @@ impl RpcSubscriptions {
) -> JoinHandle<anyhow::Result<()>> {
tracing::info!("spawning rpc logs notifier");

tokio::spawn(async move {
spawn_named("rpc::sub::logs", async move {
loop {
if cancellation.is_cancelled() {
tracing::warn!("exiting rpc logs cleaner because of cancellation");
warn_task_cancellation("rpc logs notifier");
return Ok(());
}

let Ok(log) = rx.recv().await else {
tracing::warn!("stopping logs notifier because tx channel was closed");
warn_task_tx_closed("rpc logs notifier");
break;
};

Expand Down
Loading

0 comments on commit 842ef05

Please sign in to comment.