Skip to content

Commit

Permalink
feat: error logs and importer-offline fix
Browse files Browse the repository at this point in the history
  • Loading branch information
dinhani-cw committed May 8, 2024
1 parent cd71a1a commit 8e1385a
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 10 deletions.
16 changes: 11 additions & 5 deletions src/bin/importer_offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,18 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> {
let _ = storage_thread.spawn(move || {
let _tokio_guard = storage_tokio.enter();

storage_tokio.block_on(execute_external_rpc_storage_loader(
let result = storage_tokio.block_on(execute_external_rpc_storage_loader(
rpc_storage,
storage_cancellation,
config.blocks_by_fetch,
config.paralellism,
block_start,
block_end,
backlog_tx,
))
));
if let Err(e) = result {
tracing::error!(reason = ?e, "storage-loader failed");
}
});

// execute thread: block importer
Expand All @@ -106,15 +109,18 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> {
let importer_cancellation = cancellation.clone();
let importer_join = importer_thread.spawn(move || {
let _tokio_guard = importer_tokio.enter();
importer_tokio.block_on(execute_block_importer(
let result = importer_tokio.block_on(execute_block_importer(
executor,
miner,
stratus_storage,
csv,
importer_cancellation,
backlog_rx,
block_snapshots,
))
));
if let Err(e) = result {
tracing::error!(reason = ?e, "block-importer failed");
}
})?;

let _ = importer_join.join();
Expand All @@ -129,7 +135,6 @@ fn signal_handler(cancellation: CancellationToken) {
tracing::info!("shutting down");
cancellation.cancel();
}

Err(err) => tracing::error!("Unable to listen for shutdown signal: {}", err),
}
});
Expand Down Expand Up @@ -177,6 +182,7 @@ async fn execute_block_importer(
// re-execute and mine
executor.reexecute_external(&block, &receipts).await?;
let mined_block = miner.mine_external(&block).await?;
storage.temp.remove_executions_before(mined_block.transactions.len()).await?;

// export to csv OR permanent storage
match csv {
Expand Down
6 changes: 5 additions & 1 deletion src/bin/importer_online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ async fn run(config: ImporterOnlineConfig) -> anyhow::Result<()> {
let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), relayer).await;
let chain = BlockchainClient::new(&config.external_rpc).await?;

run_importer_online(executor, miner, storage, chain).await
let result = run_importer_online(executor, miner, storage, chain).await;
if let Err(ref e) = result {
tracing::error!(reason = ?e, "importer-online failed");
}
result
}

pub async fn run_importer_online(executor: Arc<Executor>, miner: Arc<BlockMiner>, storage: Arc<StratusStorage>, chain: BlockchainClient) -> anyhow::Result<()> {
Expand Down
13 changes: 12 additions & 1 deletion src/eth/primitives/transaction_execution.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use display_json::DebugAsJson;

use crate::eth::primitives::EvmExecution;
use crate::eth::primitives::ExternalReceipt;
use crate::eth::primitives::ExternalTransaction;
use crate::eth::primitives::Hash;
use crate::eth::primitives::TransactionInput;
use crate::eth::primitives::TransactionKind;

#[derive(Debug, Clone)]
#[derive(DebugAsJson, Clone, serde::Serialize)]
pub struct TransactionExecution {
pub kind: TransactionKind,
pub execution: EvmExecution,
Expand All @@ -26,4 +29,12 @@ impl TransactionExecution {
execution,
}
}

/// Returns the transaction hash.
pub fn hash(&self) -> Hash {
match self.kind {
TransactionKind::Local(ref tx) => tx.hash,
TransactionKind::External(ref tx, _) => tx.hash(),
}
}
}
3 changes: 2 additions & 1 deletion src/eth/primitives/transaction_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//! transactions, providing a comprehensive interface for transaction data.
use anyhow::anyhow;
use display_json::DebugAsJson;
use ethereum_types::U256;
use ethereum_types::U64;
use ethers_core::types::OtherFields;
Expand All @@ -27,7 +28,7 @@ use crate::ext::not;
use crate::ext::OptionExt;
use crate::log_and_err;

#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[derive(DebugAsJson, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct TransactionInput {
/// TODO: Optional for external/older transactions, but it should be required for newer transactions.
///
Expand Down
4 changes: 3 additions & 1 deletion src/eth/primitives/transaction_kind.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#![allow(clippy::large_enum_variant)]

use display_json::DebugAsJson;

use crate::eth::primitives::ExternalReceipt;
use crate::eth::primitives::ExternalTransaction;
use crate::eth::primitives::TransactionInput;

#[derive(Debug, Clone, derive_new::new)]
#[derive(DebugAsJson, Clone, derive_new::new, serde::Serialize)]
pub enum TransactionKind {
/// Transaction that was sent directly to Stratus.
Local(TransactionInput),
Expand Down
4 changes: 3 additions & 1 deletion src/eth/storage/inmemory/inmemory_temporary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ impl InMemoryTemporaryStorage {
impl TemporaryStorageExecutionOps for InMemoryTemporaryStorage {
async fn save_execution(&self, tx: TransactionExecution) -> anyhow::Result<()> {
let mut state = self.lock_write().await;
tracing::debug!(hash = %tx.hash(), tx_executions_len = %state.tx_executions.len(), "saving execution");

// save account changes
let changes = tx.execution.changes_to_persist();
Expand Down Expand Up @@ -108,6 +109,7 @@ impl TemporaryStorageExecutionOps for InMemoryTemporaryStorage {
}

async fn read_executions(&self) -> anyhow::Result<Vec<TransactionExecution>> {
tracing::debug!("reading executions");
let state = self.lock_read().await;
Ok(state.tx_executions.clone())
}
Expand All @@ -118,7 +120,7 @@ impl TemporaryStorageExecutionOps for InMemoryTemporaryStorage {
}

let mut state = self.lock_write().await;
tracing::debug!(len = state.tx_executions.len(), index = %index, "removing executions");
tracing::debug!(tx_executions_len = %state.tx_executions.len(), index = %index, "removing executions");
let _ = state.tx_executions.drain(..index - 1);

Ok(())
Expand Down

0 comments on commit 8e1385a

Please sign in to comment.