Skip to content

Commit

Permalink
chore: add metrics and instrumentations (#696)
Browse files Browse the repository at this point in the history
  • Loading branch information
carneiro-cw authored Apr 25, 2024
1 parent 9e28b7d commit a3ead05
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 1 deletion.
9 changes: 9 additions & 0 deletions src/bin/importer_online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ async fn import(number: BlockNumber, executor: &EthExecutor, chain: &BlockchainC
// fetch block and receipts
let block = fetch_block(chain, number).await?;

#[cfg(feature = "metrics")]
let start = metrics::now();

// fetch receipts in parallel
let mut receipts = Vec::with_capacity(block.transactions.len());
for tx in &block.transactions {
Expand All @@ -67,6 +70,12 @@ async fn import(number: BlockNumber, executor: &EthExecutor, chain: &BlockchainC
// import block
let receipts: ExternalReceipts = receipts.into();
executor.import_external_to_perm(block, &receipts).await?;

#[cfg(feature = "metrics")]
metrics::inc_n_importer_online_transactions_total(receipts.len() as u64);
#[cfg(feature = "metrics")]
metrics::inc_import_online_mined_block(start.elapsed());

Ok(())
}

Expand Down
13 changes: 13 additions & 0 deletions src/eth/rpc/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,13 @@ async fn eth_gas_price(_: Params<'_>, _: Arc<RpcContext>) -> String {
}

// Block
#[tracing::instrument(skip(ctx))]
async fn eth_block_number(_params: Params<'_>, ctx: Arc<RpcContext>) -> anyhow::Result<JsonValue, RpcError> {
let number = ctx.storage.read_mined_block_number().await?;
Ok(serde_json::to_value(number).unwrap())
}

#[tracing::instrument(skip(ctx))]
async fn eth_get_block_by_selector(params: Params<'_>, ctx: Arc<RpcContext>) -> anyhow::Result<JsonValue, RpcError> {
let (params, block_selection) = next_rpc_param::<BlockSelection>(params.sequence())?;
let (_, full_transactions) = next_rpc_param::<bool>(params)?;
Expand All @@ -229,6 +231,7 @@ async fn eth_get_uncle_by_block_hash_and_index(_params: Params<'_>, _ctx: Arc<Rp

// Transaction

#[tracing::instrument(skip(ctx))]
async fn eth_get_transaction_count(params: Params<'_>, ctx: Arc<RpcContext>) -> anyhow::Result<String, RpcError> {
let (params, address) = next_rpc_param::<Address>(params.sequence())?;
let (_, block_selection) = next_rpc_param_or_default::<BlockSelection>(params)?;
Expand All @@ -238,6 +241,7 @@ async fn eth_get_transaction_count(params: Params<'_>, ctx: Arc<RpcContext>) ->
Ok(hex_num(account.nonce))
}

#[tracing::instrument(skip(ctx))]
async fn eth_get_transaction_by_hash(params: Params<'_>, ctx: Arc<RpcContext>) -> anyhow::Result<JsonValue, RpcError> {
let (_, hash) = next_rpc_param::<Hash>(params.sequence())?;
let mined = ctx.storage.read_mined_transaction(&hash).await?;
Expand All @@ -248,6 +252,7 @@ async fn eth_get_transaction_by_hash(params: Params<'_>, ctx: Arc<RpcContext>) -
}
}

#[tracing::instrument(skip(ctx))]
async fn eth_get_transaction_receipt(params: Params<'_>, ctx: Arc<RpcContext>) -> anyhow::Result<JsonValue, RpcError> {
let (_, hash) = next_rpc_param::<Hash>(params.sequence())?;
match ctx.storage.read_mined_transaction(&hash).await? {
Expand All @@ -256,6 +261,7 @@ async fn eth_get_transaction_receipt(params: Params<'_>, ctx: Arc<RpcContext>) -
}
}

#[tracing::instrument(skip(ctx))]
async fn eth_estimate_gas(params: Params<'_>, ctx: Arc<RpcContext>) -> anyhow::Result<String, RpcError> {
let (_, call) = next_rpc_param::<CallInput>(params.sequence())?;

Expand All @@ -274,6 +280,7 @@ async fn eth_estimate_gas(params: Params<'_>, ctx: Arc<RpcContext>) -> anyhow::R
}
}

#[tracing::instrument(skip(ctx))]
async fn eth_call(params: Params<'_>, ctx: Arc<RpcContext>) -> anyhow::Result<String, RpcError> {
let (params, call) = next_rpc_param::<CallInput>(params.sequence())?;
let (_, block_selection) = next_rpc_param_or_default::<BlockSelection>(params)?;
Expand All @@ -291,6 +298,7 @@ async fn eth_call(params: Params<'_>, ctx: Arc<RpcContext>) -> anyhow::Result<St
}
}

#[tracing::instrument(skip(ctx))]
async fn eth_send_raw_transaction(params: Params<'_>, ctx: Arc<RpcContext>) -> anyhow::Result<String, RpcError> {
let (_, data) = next_rpc_param::<Bytes>(params.sequence())?;
let transaction = parse_rpc_rlp::<TransactionInput>(&data)?;
Expand All @@ -312,6 +320,7 @@ async fn eth_send_raw_transaction(params: Params<'_>, ctx: Arc<RpcContext>) -> a
}

// Logs
#[tracing::instrument(skip(ctx))]
async fn eth_get_logs(params: Params<'_>, ctx: Arc<RpcContext>) -> anyhow::Result<JsonValue, RpcError> {
let (_, filter_input) = next_rpc_param::<LogFilterInput>(params.sequence())?;
let filter = filter_input.parse(&ctx.storage).await?;
Expand All @@ -322,6 +331,7 @@ async fn eth_get_logs(params: Params<'_>, ctx: Arc<RpcContext>) -> anyhow::Resul

// Account

#[tracing::instrument(skip(ctx))]
async fn eth_get_balance(params: Params<'_>, ctx: Arc<RpcContext>) -> anyhow::Result<String, RpcError> {
let (params, address) = next_rpc_param::<Address>(params.sequence())?;
let (_, block_selection) = next_rpc_param_or_default::<BlockSelection>(params)?;
Expand All @@ -332,6 +342,7 @@ async fn eth_get_balance(params: Params<'_>, ctx: Arc<RpcContext>) -> anyhow::Re
Ok(hex_num(account.balance))
}

#[tracing::instrument(skip(ctx))]
async fn eth_get_code(params: Params<'_>, ctx: Arc<RpcContext>) -> anyhow::Result<String, RpcError> {
let (params, address) = next_rpc_param::<Address>(params.sequence())?;
let (_, block_selection) = next_rpc_param_or_default::<BlockSelection>(params)?;
Expand All @@ -344,6 +355,7 @@ async fn eth_get_code(params: Params<'_>, ctx: Arc<RpcContext>) -> anyhow::Resul

// Subscription

#[tracing::instrument(skip(ctx))]
async fn eth_subscribe(params: Params<'_>, pending: PendingSubscriptionSink, ctx: Arc<RpcContext>) -> impl IntoSubscriptionCloseResponse {
let (params, kind) = next_rpc_param::<String>(params.sequence())?;
match kind.deref() {
Expand All @@ -369,6 +381,7 @@ async fn eth_subscribe(params: Params<'_>, pending: PendingSubscriptionSink, ctx
}

// Storage
#[tracing::instrument(skip(ctx))]
async fn eth_get_storage_at(params: Params<'_>, ctx: Arc<RpcContext>) -> anyhow::Result<String, RpcError> {
let (params, address) = next_rpc_param::<Address>(params.sequence())?;
let (params, index) = next_rpc_param::<SlotIndex>(params)?;
Expand Down
8 changes: 7 additions & 1 deletion src/infra/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,13 @@ metrics! {
group: importer_online,

"Time to execute import_online operation."
histogram_duration import_online{} []
histogram_duration import_online{} [],

"Time to import one mined block."
histogram_duration import_online_mined_block{} [],

"Transactions imported"
counter importer_online_transactions_total{} []
}

// Execution metrics.
Expand Down

0 comments on commit a3ead05

Please sign in to comment.