Skip to content

Commit

Permalink
chore: importer-offline: add context-id to logs
Browse files Browse the repository at this point in the history
  • Loading branch information
marcospb19-cw committed May 9, 2024
1 parent da19374 commit f79762e
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 21 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ default-run = "stratus"

[dependencies]

# stdx
# general
anyhow = "=1.0.82"
async-trait = "=0.1.80"
byte-unit = "=5.1.4"
Expand All @@ -32,6 +32,7 @@ rand = "=0.8.5"
strum = "=0.26.2"
thiserror = "=1.0.59"
url = "=2.5.0"
uuid = { version = "=1.8.0", features = ["v4", "fast-rng" ] }

# async
tokio = { version = "=1.37.0", features = ["rt-multi-thread", "macros", "signal"] }
Expand Down
57 changes: 38 additions & 19 deletions src/bin/importer_offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ use stratus::ext::not;
#[cfg(feature = "metrics")]
use stratus::infra::metrics;
use stratus::log_and_err;
use stratus::utils::new_context_id;
use stratus::GlobalServices;
use tokio::runtime::Handle;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::info_span;
use tracing::Instrument;

/// Number of tasks in the backlog. Each task contains 10_000 blocks and all receipts for them.
const BACKLOG_SIZE: usize = 50;
Expand Down Expand Up @@ -143,6 +146,7 @@ fn signal_handler(cancellation: CancellationToken) {
// -----------------------------------------------------------------------------
// Block importer
// -----------------------------------------------------------------------------
#[tracing::instrument(name = "[Importer]", skip_all)]
async fn execute_block_importer(
// services
executor: Arc<Executor>,
Expand Down Expand Up @@ -176,29 +180,37 @@ async fn execute_block_importer(
// imports block transactions
tracing::info!(%block_start, %block_end, receipts = %receipts.len(), "importing blocks");
for (block_index, block) in blocks.into_iter().enumerate() {
#[cfg(feature = "metrics")]
let start = metrics::now();
let span = info_span!("re-executing block", context_id = new_context_id());

// re-execute block
executor.reexecute_external(&block, &receipts).await?;
async {
#[cfg(feature = "metrics")]
let start = metrics::now();

// mine block
let mined_block = miner.mine_external().await?;
storage.temp.remove_executions_before(mined_block.transactions.len()).await?;
// re-execute block
executor.reexecute_external(&block, &receipts).await?;

// export to csv OR permanent storage
match csv {
Some(ref mut csv) => import_external_to_csv(&storage, csv, mined_block.clone(), block_index, block_last_index).await?,
None => miner.commit(mined_block.clone()).await?,
};
// mine block
let mined_block = miner.mine_external().await?;
storage.temp.remove_executions_before(mined_block.transactions.len()).await?;

// export snapshot for tests
if blocks_to_export_snapshot.contains(mined_block.number()) {
export_snapshot(&block, &receipts, &mined_block)?;
}
// export to csv OR permanent storage
match csv {
Some(ref mut csv) => import_external_to_csv(&storage, csv, mined_block.clone(), block_index, block_last_index).await?,
None => miner.commit(mined_block.clone()).await?,
};

// export snapshot for tests
if blocks_to_export_snapshot.contains(mined_block.number()) {
export_snapshot(&block, &receipts, &mined_block)?;
}

#[cfg(feature = "metrics")]
metrics::inc_import_offline(start.elapsed());
#[cfg(feature = "metrics")]
metrics::inc_import_offline(start.elapsed());

anyhow::Ok(())
}
.instrument(span)
.await?;
}
};

Expand All @@ -209,6 +221,7 @@ async fn execute_block_importer(
// -----------------------------------------------------------------------------
// Block loader
// -----------------------------------------------------------------------------
#[tracing::instrument(name = "[RPC]", skip_all, fields(start, end, block_by_fetch))]
async fn execute_external_rpc_storage_loader(
// services
rpc_storage: Arc<dyn ExternalRpcStorage>,
Expand All @@ -226,7 +239,13 @@ async fn execute_external_rpc_storage_loader(
let mut tasks = Vec::new();
while start <= end {
let end = min(start + (blocks_by_fetch - 1), end);
tasks.push(load_blocks_and_receipts(Arc::clone(&rpc_storage), cancellation.clone(), start, end));

let span = info_span!("fetching block", context_id = new_context_id());

let task = load_blocks_and_receipts(Arc::clone(&rpc_storage), cancellation.clone(), start, end);
let task = task.instrument(span);
tasks.push(task);

start += blocks_by_fetch;
}

Expand Down
1 change: 0 additions & 1 deletion src/infra/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::Layer;
//use tracing_subscriber::EnvFilter;

/// Init application global tracing.
pub fn init_tracing(url: Option<&String>) {
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod config;
pub mod eth;
pub mod ext;
pub mod infra;
pub mod utils;

pub struct GlobalServices<T>
where
Expand Down
5 changes: 5 additions & 0 deletions src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use uuid::Uuid;

pub fn new_context_id() -> String {
Uuid::new_v4().to_string()
}

0 comments on commit f79762e

Please sign in to comment.