Skip to content

Commit

Permalink
feat: more logs improvements (#950)
Browse files Browse the repository at this point in the history
  • Loading branch information
dinhani-cw authored May 29, 2024
1 parent bdd56ff commit 2ecd9e6
Show file tree
Hide file tree
Showing 25 changed files with 265 additions and 257 deletions.
3 changes: 2 additions & 1 deletion src/bin/importer_offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use anyhow::anyhow;
use futures::try_join;
use futures::StreamExt;
use itertools::Itertools;
use stratus::channel_read;
use stratus::config::ImporterOfflineConfig;
use stratus::eth::primitives::Block;
use stratus::eth::primitives::BlockNumber;
Expand Down Expand Up @@ -152,7 +153,7 @@ async fn execute_block_importer(
};

// receive new tasks to execute, or exit
let Some((blocks, receipts)) = backlog_rx.recv().await else {
let Some((blocks, receipts)) = channel_read!(backlog_rx) else {
tracing::info!("{} has no more blocks to process", TASK_NAME);
return Ok(());
};
Expand Down
5 changes: 3 additions & 2 deletions src/bin/importer_online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::time::Duration;
use futures::try_join;
use futures::StreamExt;
use serde::Deserialize;
use stratus::channel_read;
use stratus::config::ImporterOnlineConfig;
use stratus::eth::primitives::BlockNumber;
use stratus::eth::primitives::ExternalBlock;
Expand Down Expand Up @@ -132,7 +133,7 @@ async fn start_block_executor(
) -> anyhow::Result<()> {
const TASK_NAME: &str = "block-executor";

while let Some((block, receipts)) = backlog_rx.recv().await {
while let Some((block, receipts)) = channel_read!(backlog_rx) {
if GlobalState::warn_if_shutdown(TASK_NAME) {
return Ok(());
}
Expand All @@ -143,7 +144,7 @@ async fn start_block_executor(
// execute and mine
let receipts = ExternalReceipts::from(receipts);
if let Err(e) = executor.reexecute_external(&block, &receipts).await {
let message = GlobalState::shutdown_from(TASK_NAME, "failed to re-execute external block");
let message = GlobalState::shutdown_from(TASK_NAME, "failed to reexecute external block");
return log_and_err!(reason = e, message);
};

Expand Down
2 changes: 1 addition & 1 deletion src/bin/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async fn run(config: ExternalRelayerConfig) -> anyhow::Result<()> {
};

match block_number {
Some(block_number) => tracing::info!(?block_number, "relayed"),
Some(block_number) => tracing::info!(number = %block_number, "relayed"),
None => {
tracing::info!("no pending block found");
tokio::time::sleep(backoff).await;
Expand Down
17 changes: 12 additions & 5 deletions src/eth/block_miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl BlockMiner {

// validate
let Some(external_block) = block.external_block else {
return log_and_err!("failed to mine external block because there is no external block being re-executed");
return log_and_err!("failed to mine external block because there is no external block being reexecuted");
};
if not(local_txs.is_empty()) {
return log_and_err!("failed to mine external block because one of the transactions is a local transaction");
Expand Down Expand Up @@ -159,7 +159,7 @@ impl BlockMiner {

// validate
let Some(external_block) = block.external_block else {
return log_and_err!("failed to mine mixed block because there is no external block being re-executed");
return log_and_err!("failed to mine mixed block because there is no external block being reexecuted");
};

// mine external transactions
Expand Down Expand Up @@ -233,9 +233,15 @@ impl BlockMiner {

// notify
for log in block_logs {
let _ = self.notifier_logs.send(log);
let tx_hash = log.block_hash;
let log_index = log.log_index;
if self.notifier_logs.send(log).is_err() {
tracing::error!(number = %block_number, hash = %tx_hash, index = %log_index, "failed to send transaction log notification");
};
}
let _ = self.notifier_blocks.send(block_header);
if self.notifier_blocks.send(block_header).is_err() {
tracing::error!(number = %block_number, "failed to send new block notification");
};

Ok(())
}
Expand Down Expand Up @@ -345,14 +351,15 @@ mod interval_miner {
use tokio::sync::mpsc;
use tokio::time::Instant;

use crate::channel_read;
use crate::eth::BlockMiner;
use crate::infra::tracing::warn_task_rx_closed;
use crate::GlobalState;

pub async fn run(miner: Arc<BlockMiner>, mut ticks_rx: mpsc::UnboundedReceiver<Instant>) {
const TASK_NAME: &str = "interval-miner-ticker";

while let Some(tick) = ticks_rx.recv().await {
while let Some(tick) = channel_read!(ticks_rx) {
if GlobalState::warn_if_shutdown(TASK_NAME) {
return;
}
Expand Down
6 changes: 4 additions & 2 deletions src/eth/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub mod consensus_kube {
use tonic::Response;
use tonic::Status;

use crate::channel_read;

pub mod append_entry {
tonic::include_proto!("append_entry");
}
Expand Down Expand Up @@ -78,7 +80,7 @@ pub mod consensus_kube {
followers.iter().map(|f| f.address.to_string()).collect::<Vec<String>>().join(", ")
);

while let Some(data) = receiver.recv().await {
while let Some(data) = channel_read!(receiver) {
if Self::is_leader(leader_name_clone.clone()) {
//TODO add data to consensus-log-transactions
//TODO at the begining of temp-storage, load the consensus-log-transactions so the index becomes clear
Expand Down Expand Up @@ -107,7 +109,7 @@ pub mod consensus_kube {
let (sender, mut receiver) = mpsc::channel::<Block>(32);

tokio::spawn(async move {
while let Some(data) = receiver.recv().await {
while let Some(data) = channel_read!(receiver) {
tracing::info!(number = data.header.number.as_u64(), "Received block");
}
});
Expand Down
14 changes: 8 additions & 6 deletions src/eth/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,14 @@ impl Executor {
None => tx,
// conflict: reexecute
Some(conflicts) => {
tracing::warn!(?conflicts, "re-executing serially because parallel execution conflicted");
tracing::warn!(?conflicts, "reexecuting serially because parallel execution conflicted");
self.reexecute_external_tx(&tx.tx, &tx.receipt, block).await.map_err(|(_, _, e)| e)?
}
},

// failure: reexecute
Err((tx, receipt, e)) => {
tracing::warn!(reason = ?e, "re-executing serially because parallel execution errored");
tracing::warn!(reason = ?e, "reexecuting serially because parallel execution errored");
self.reexecute_external_tx(tx, receipt, block).await.map_err(|(_, _, e)| e)?
}
}
Expand Down Expand Up @@ -185,6 +185,8 @@ impl Executor {
receipt: &'b ExternalReceipt,
block: &ExternalBlock,
) -> anyhow::Result<ExternalTransactionExecution> {
tracing::info!(number = %block.number(), hash = %tx.hash(), "reexecuting external transaction");

#[cfg(feature = "metrics")]
let start = metrics::now();

Expand Down Expand Up @@ -212,7 +214,7 @@ impl Executor {
Err(e) => {
let json_tx = serde_json::to_string(&tx).expect_infallible();
let json_receipt = serde_json::to_string(&receipt).expect_infallible();
tracing::error!(reason = ?e, %json_tx, %json_receipt, "unexpected error reexecuting transaction");
tracing::error!(reason = ?e, number = %block.number(), hash = %tx.hash(), %json_tx, %json_receipt, "failed to reexecute external transaction");
return Err(e);
}
};
Expand All @@ -232,7 +234,7 @@ impl Executor {
let json_tx = serde_json::to_string(&tx).expect_infallible();
let json_receipt = serde_json::to_string(&receipt).expect_infallible();
let json_execution_logs = serde_json::to_string(&evm_result.execution.logs).expect_infallible();
tracing::error!(%json_tx, %json_receipt, %json_execution_logs, "mismatch reexecuting transaction");
tracing::error!(reason = %"mismatch reexecuting transaction", number = %block.number(), hash = %tx.hash(), %json_tx, %json_receipt, %json_execution_logs, "failed to reexecute external transaction");
return Err(e);
};

Expand All @@ -257,7 +259,7 @@ impl Executor {
to = ?tx_input.to,
data_len = %tx_input.input.len(),
data = %tx_input.input,
"executing transaction"
"executing local transaction"
);

// validate
Expand Down Expand Up @@ -323,7 +325,7 @@ impl Executor {
data_len = input.data.len(),
data = %input.data,
?point_in_time,
"executing read-only transaction"
"executing read-only local transaction"
);

let evm_input = EvmInput::from_eth_call(input, point_in_time);
Expand Down
24 changes: 16 additions & 8 deletions src/eth/primitives/block_number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
//! operations like validating blockchain continuity and retrieving specific
//! blocks.
use std::fmt::Display;
use std::num::TryFromIntError;
use std::ops::Add;
use std::ops::AddAssign;
Expand All @@ -29,7 +28,22 @@ use sqlx::types::BigDecimal;
use crate::eth::primitives::Hash;
use crate::gen_newtype_from;

#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash, derive_more::Add, derive_more::Sub, serde::Serialize, serde::Deserialize)]
#[derive(
Debug,
Clone,
derive_more::Display,
Copy,
Default,
PartialEq,
Eq,
PartialOrd,
Ord,
Hash,
derive_more::Add,
derive_more::Sub,
serde::Serialize,
serde::Deserialize,
)]
#[serde(transparent)]
pub struct BlockNumber(U64);

Expand Down Expand Up @@ -77,12 +91,6 @@ impl BlockNumber {
}
}

impl Display for BlockNumber {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}

impl Dummy<Faker> for BlockNumber {
fn dummy_with_rng<R: ethers_core::rand::prelude::Rng + ?Sized>(_: &Faker, rng: &mut R) -> Self {
rng.next_u64().into()
Expand Down
10 changes: 1 addition & 9 deletions src/eth/primitives/chain_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
//! The module enables the specification and verification of the network for
//! which a particular transaction is intended.
use std::fmt::Display;

use anyhow::anyhow;
use ethereum_types::U256;
use ethereum_types::U64;
Expand All @@ -24,7 +22,7 @@ use sqlx::Decode;
use crate::gen_newtype_from;
use crate::gen_newtype_try_from;

#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
#[derive(Debug, derive_more::Display, Clone, Copy, Default, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct ChainId(U64);

impl ChainId {
Expand All @@ -37,12 +35,6 @@ impl ChainId {
}
}

impl Display for ChainId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}

impl Dummy<Faker> for ChainId {
fn dummy_with_rng<R: ethers_core::rand::prelude::Rng + ?Sized>(_: &Faker, rng: &mut R) -> Self {
rng.next_u64().into()
Expand Down
9 changes: 1 addition & 8 deletions src/eth/primitives/difficulty.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::fmt::Display;
use std::str::FromStr;

use ethereum_types::U256;
Expand All @@ -13,16 +12,10 @@ use sqlx::types::BigDecimal;

use crate::gen_newtype_from;

#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[derive(Debug, derive_more::Display, Clone, Copy, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(transparent)]
pub struct Difficulty(U256);

impl Display for Difficulty {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}

impl Dummy<Faker> for Difficulty {
fn dummy_with_rng<R: ethers_core::rand::prelude::Rng + ?Sized>(_: &Faker, rng: &mut R) -> Self {
rng.next_u64().into()
Expand Down
10 changes: 1 addition & 9 deletions src/eth/primitives/gas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
//! calculate, track, and limit gas usage in transactions and smart contract
//! execution.
use std::fmt::Display;

use anyhow::anyhow;
use ethereum_types::U256;
use ethereum_types::U64;
Expand All @@ -25,7 +23,7 @@ use sqlx::types::BigDecimal;
use crate::gen_newtype_from;
use crate::gen_newtype_try_from;

#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[derive(Debug, derive_more::Display, Clone, Copy, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(transparent)]
pub struct Gas(U64);

Expand All @@ -38,12 +36,6 @@ impl Gas {
}
}

impl Display for Gas {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}

impl Dummy<Faker> for Gas {
fn dummy_with_rng<R: ethers_core::rand::prelude::Rng + ?Sized>(_: &Faker, rng: &mut R) -> Self {
rng.next_u64().into()
Expand Down
13 changes: 3 additions & 10 deletions src/eth/primitives/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
//! module includes functionality to handle conversions and representations of
//! indexes, aligning with Ethereum's blockchain data structure needs.
use std::fmt::Display;
use std::num::TryFromIntError;
use std::str::FromStr;

Expand All @@ -23,9 +22,9 @@ use crate::gen_newtype_from;
use crate::gen_newtype_try_from;

/// Represents a transaction index or log index.
///
/// TODO: representing it as u16 is probably wrong because external libs uses u64.
#[derive(Debug, Clone, Copy, PartialEq, Eq, fake::Dummy, serde::Serialize, serde::Deserialize, derive_more::Add, Hash, PartialOrd, Ord)]
#[derive(
Debug, derive_more::Display, Clone, Copy, PartialEq, Eq, fake::Dummy, serde::Serialize, serde::Deserialize, derive_more::Add, Hash, PartialOrd, Ord,
)]
pub struct Index(u64);

impl Index {
Expand All @@ -41,12 +40,6 @@ impl Index {
}
}

impl Display for Index {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}

// -----------------------------------------------------------------------------
// Conversions: Other -> Self
// -----------------------------------------------------------------------------
Expand Down
10 changes: 1 addition & 9 deletions src/eth/primitives/miner_nonce.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::fmt::Display;

use ethereum_types::H64;
use fake::Dummy;
use fake::Faker;
Expand All @@ -13,7 +11,7 @@ use sqlx::Decode;
use crate::gen_newtype_from;

/// The nonce of an Ethereum block.
#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
#[derive(Debug, derive_more::Display, Clone, Copy, Default, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
pub struct MinerNonce(H64);

impl MinerNonce {
Expand All @@ -29,12 +27,6 @@ impl Dummy<Faker> for MinerNonce {
}
}

impl Display for MinerNonce {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}

// -----------------------------------------------------------------------------
// Conversions: Other -> Self
// -----------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 2ecd9e6

Please sign in to comment.