Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: importer-offline: add context-id to logs #812

Merged
merged 1 commit into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ default-run = "stratus"

[dependencies]

# stdx
# general
anyhow = "=1.0.82"
async-trait = "=0.1.80"
byte-unit = "=5.1.4"
Expand All @@ -32,6 +32,7 @@ rand = "=0.8.5"
strum = "=0.26.2"
thiserror = "=1.0.59"
url = "=2.5.0"
uuid = { version = "=1.8.0", features = ["v4", "fast-rng" ] }

# async
tokio = { version = "=1.37.0", features = ["rt-multi-thread", "macros", "signal"] }
Expand Down
57 changes: 38 additions & 19 deletions src/bin/importer_offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ use stratus::ext::not;
#[cfg(feature = "metrics")]
use stratus::infra::metrics;
use stratus::log_and_err;
use stratus::utils::new_context_id;
use stratus::GlobalServices;
use tokio::runtime::Handle;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::info_span;
use tracing::Instrument;

/// Number of tasks in the backlog. Each task contains 10_000 blocks and all receipts for them.
const BACKLOG_SIZE: usize = 50;
Expand Down Expand Up @@ -143,6 +146,7 @@ fn signal_handler(cancellation: CancellationToken) {
// -----------------------------------------------------------------------------
// Block importer
// -----------------------------------------------------------------------------
#[tracing::instrument(name = "[Importer]", skip_all)]
async fn execute_block_importer(
// services
executor: Arc<Executor>,
Expand Down Expand Up @@ -176,29 +180,37 @@ async fn execute_block_importer(
// imports block transactions
tracing::info!(%block_start, %block_end, receipts = %receipts.len(), "importing blocks");
for (block_index, block) in blocks.into_iter().enumerate() {
#[cfg(feature = "metrics")]
let start = metrics::now();
let span = info_span!("re-executing block", context_id = new_context_id());

// re-execute block
executor.reexecute_external(&block, &receipts).await?;
async {
#[cfg(feature = "metrics")]
let start = metrics::now();

// mine block
let mined_block = miner.mine_external().await?;
storage.temp.remove_executions_before(mined_block.transactions.len()).await?;
// re-execute block
executor.reexecute_external(&block, &receipts).await?;

// export to csv OR permanent storage
match csv {
Some(ref mut csv) => import_external_to_csv(&storage, csv, mined_block.clone(), block_index, block_last_index).await?,
None => miner.commit(mined_block.clone()).await?,
};
// mine block
let mined_block = miner.mine_external().await?;
storage.temp.remove_executions_before(mined_block.transactions.len()).await?;

// export snapshot for tests
if blocks_to_export_snapshot.contains(mined_block.number()) {
export_snapshot(&block, &receipts, &mined_block)?;
}
// export to csv OR permanent storage
match csv {
Some(ref mut csv) => import_external_to_csv(&storage, csv, mined_block.clone(), block_index, block_last_index).await?,
None => miner.commit(mined_block.clone()).await?,
};

// export snapshot for tests
if blocks_to_export_snapshot.contains(mined_block.number()) {
export_snapshot(&block, &receipts, &mined_block)?;
}

#[cfg(feature = "metrics")]
metrics::inc_import_offline(start.elapsed());
#[cfg(feature = "metrics")]
metrics::inc_import_offline(start.elapsed());

anyhow::Ok(())
}
.instrument(span)
.await?;
}
};

Expand All @@ -209,6 +221,7 @@ async fn execute_block_importer(
// -----------------------------------------------------------------------------
// Block loader
// -----------------------------------------------------------------------------
#[tracing::instrument(name = "[RPC]", skip_all, fields(start, end, block_by_fetch))]
async fn execute_external_rpc_storage_loader(
// services
rpc_storage: Arc<dyn ExternalRpcStorage>,
Expand All @@ -226,7 +239,13 @@ async fn execute_external_rpc_storage_loader(
let mut tasks = Vec::new();
while start <= end {
let end = min(start + (blocks_by_fetch - 1), end);
tasks.push(load_blocks_and_receipts(Arc::clone(&rpc_storage), cancellation.clone(), start, end));

let span = info_span!("fetching block", context_id = new_context_id());

let task = load_blocks_and_receipts(Arc::clone(&rpc_storage), cancellation.clone(), start, end);
let task = task.instrument(span);
tasks.push(task);

start += blocks_by_fetch;
}

Expand Down
1 change: 0 additions & 1 deletion src/infra/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::Layer;
//use tracing_subscriber::EnvFilter;

/// Init application global tracing.
pub fn init_tracing(url: Option<&String>) {
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod config;
pub mod eth;
pub mod ext;
pub mod infra;
pub mod utils;

pub struct GlobalServices<T>
where
Expand Down
5 changes: 5 additions & 0 deletions src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use uuid::Uuid;

pub fn new_context_id() -> String {
Uuid::new_v4().to_string()
}
Loading