Skip to content

Commit

Permalink
feat: snapshot improvements (#526)
Browse files Browse the repository at this point in the history
  • Loading branch information
dinhani-cw authored Apr 5, 2024
1 parent a5c0914 commit c2e927b
Show file tree
Hide file tree
Showing 13 changed files with 1,266,877 additions and 182 deletions.
85 changes: 56 additions & 29 deletions src/bin/importer_offline.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::cmp::min;
use std::fs;
use std::sync::Arc;

use anyhow::anyhow;
use futures::try_join;
use futures::StreamExt;
use itertools::Itertools;
use stratus::config::ImporterOfflineConfig;
use stratus::eth::primitives::Block;
use stratus::eth::primitives::BlockNumber;
Expand All @@ -13,6 +15,7 @@ use stratus::eth::primitives::ExternalReceipt;
use stratus::eth::primitives::ExternalReceipts;
use stratus::eth::storage::CsvExporter;
use stratus::eth::storage::ExternalRpcStorage;
use stratus::eth::storage::InMemoryPermanentStorage;
use stratus::eth::storage::StratusStorage;
use stratus::eth::EthExecutor;
use stratus::ext::not;
Expand Down Expand Up @@ -44,10 +47,12 @@ fn main() -> anyhow::Result<()> {
}

async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> {
// init services
let rpc_storage = config.rpc_storage.init().await?;
let stratus_storage = config.stratus_storage.init().await?;
let executor = config.executor.init(Arc::clone(&stratus_storage));

// init block range
let block_start = match config.block_start {
Some(start) => BlockNumber::from(start),
None => block_number_to_start(&stratus_storage).await?,
Expand All @@ -57,28 +62,21 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> {
None => block_number_to_stop(&rpc_storage).await?,
};

// init csv
let mut csv = if config.export_csv { Some(CsvExporter::new(block_start)?) } else { None };

// init shared data between importer and external rpc storage loader
let (backlog_tx, backlog_rx) = mpsc::channel::<BacklogTask>(BACKLOG_SIZE);
let cancellation = CancellationToken::new();

// load genesis accounts
let initial_accounts = rpc_storage.read_initial_accounts().await?;

// initialize CSV and write initial accounts if necessary
let csv = if config.export_csv {
let mut csv = CsvExporter::new(block_start)?;

// write initial accounts once, even between runs
stratus_storage.save_accounts_to_perm(initial_accounts.clone()).await?;
if let Some(ref mut csv) = csv {
if csv.is_accounts_empty() {
csv.export_initial_accounts(initial_accounts.clone())?;
}

Some(csv)
} else {
None
};

stratus_storage.save_accounts_to_perm(initial_accounts).await?;
}

// execute parallel tasks (external rpc storage loader and block importer)
tokio::spawn(execute_external_rpc_storage_loader(
Expand All @@ -89,22 +87,25 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> {
block_end,
backlog_tx,
));
execute_block_importer(executor, Arc::clone(&stratus_storage), csv, cancellation, backlog_rx).await?;

let block_snapshots = config.export_snapshot.into_iter().map_into().collect();
execute_block_importer(executor, &stratus_storage, csv, cancellation, backlog_rx, block_snapshots).await?;

Ok(())
}

// -----------------------------------------------------------------------------
// Block Importer
// Block importer
// -----------------------------------------------------------------------------
async fn execute_block_importer(
// services
executor: EthExecutor,
stratus_storage: Arc<StratusStorage>,
stratus_storage: &StratusStorage,
mut csv: Option<CsvExporter>,
cancellation: CancellationToken,
// data
mut backlog_rx: mpsc::Receiver<BacklogTask>,
blocks_to_export_snapshot: Vec<BlockNumber>,
) -> anyhow::Result<()> {
tracing::info!("block importer starting");

Expand All @@ -116,27 +117,32 @@ async fn execute_block_importer(
break "block loader finished or failed";
};

// imports transactions
let block_start = blocks.first().unwrap().number();
let block_end = blocks.last().unwrap().number();
let mut receipts = ExternalReceipts::from(receipts);
let block_last_index = blocks.len() - 1;
let receipts = ExternalReceipts::from(receipts);

// imports block transactions
tracing::info!(%block_start, %block_end, receipts = %receipts.len(), "importing blocks");
let block_last_index = blocks.len() - 1;
for (block_index, block) in blocks.into_iter().enumerate() {
#[cfg(feature = "metrics")]
let start = metrics::now();

// when exporting to csv, permanent state is written to csv.
// when not exporting to csv, permanent state is written to storage.
match csv {
// import block
// * when exporting to csv, permanent state is written to csv
// * when not exporting to csv, permanent state is written to storage
let mined_block = match csv {
Some(ref mut csv) => {
let block = executor.import_external_to_temp(block, &mut receipts).await?;
import_external_to_csv(csv, &stratus_storage, block, block_index, block_last_index).await?;
}
None => {
executor.import_external_to_perm(block, &mut receipts).await?;
let mined_block = executor.import_external_to_temp(block.clone(), &receipts).await?;
import_external_to_csv(stratus_storage, csv, mined_block.clone(), block_index, block_last_index).await?;
mined_block
}
None => executor.import_external_to_perm(block.clone(), &receipts).await?,
};

// export snapshot for tests
if blocks_to_export_snapshot.contains(&block.number()) {
export_snapshot(&block, &receipts, &mined_block)?;
}

#[cfg(feature = "metrics")]
Expand Down Expand Up @@ -248,11 +254,32 @@ async fn block_number_to_stop(rpc_storage: &Arc<dyn ExternalRpcStorage>) -> anyh
}

// -----------------------------------------------------------------------------
// Csv Exporter
// Snapshot exporter
// -----------------------------------------------------------------------------
fn export_snapshot(external_block: &ExternalBlock, external_receipts: &ExternalReceipts, mined_block: &Block) -> anyhow::Result<()> {
// generate snapshot
let snapshot = InMemoryPermanentStorage::dump_snapshot(mined_block.compact_account_changes());

// create dir
let dir = format!("tests/fixtures/block-{}/", mined_block.number());
fs::create_dir_all(&dir)?;

// write json
fs::write(format!("{}/block.json", dir), serde_json::to_string_pretty(external_block)?)?;
fs::write(format!("{}/receipts.json", dir), serde_json::to_string_pretty(external_receipts)?)?;
fs::write(format!("{}/snapshot.json", dir), serde_json::to_string_pretty(&snapshot)?)?;

Ok(())
}

// -----------------------------------------------------------------------------
// Csv exporter
// -----------------------------------------------------------------------------
async fn import_external_to_csv(
csv: &mut CsvExporter,
// services
stratus_storage: &StratusStorage,
csv: &mut CsvExporter,
// data
block: Block,
block_index: usize,
block_last_index: usize,
Expand Down
4 changes: 2 additions & 2 deletions src/bin/importer_online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ async fn run(config: ImporterOnlineConfig) -> anyhow::Result<()> {
let receipts = futures::stream::iter(receipts).buffered(RECEIPTS_PARALELLISM).try_collect::<Vec<_>>().await?;

// import block
let mut receipts: ExternalReceipts = receipts.into();
executor.import_external_to_perm(block, &mut receipts).await?;
let receipts: ExternalReceipts = receipts.into();
executor.import_external_to_perm(block, &receipts).await?;

#[cfg(feature = "metrics")]
metrics::inc_import_online(start.elapsed());
Expand Down
3 changes: 3 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,9 @@ pub struct ImporterOfflineConfig {
#[arg(long = "export-csv", env = "EXPORT_CSV", default_value = "false")]
pub export_csv: bool,

#[arg(long = "export-snapshot", env = "EXPORT_SNAPSHOT", value_delimiter = ',')]
pub export_snapshot: Vec<u64>,

#[clap(flatten)]
pub executor: ExecutorConfig,

Expand Down
42 changes: 11 additions & 31 deletions src/eth/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
//! `EthExecutor` is designed to work with the `Evm` trait implementations to execute transactions and calls,
//! while also interfacing with a miner component to handle block mining and a storage component to persist state changes.
use std::io::Write;
use std::sync::Arc;

use anyhow::anyhow;
Expand All @@ -16,7 +15,6 @@ use tokio::sync::Mutex;
use crate::eth::evm::EvmExecutionResult;
use crate::eth::evm::EvmInput;
use crate::eth::primitives::Block;
use crate::eth::primitives::BlockNumber;
use crate::eth::primitives::CallInput;
use crate::eth::primitives::Execution;
use crate::eth::primitives::ExecutionMetrics;
Expand All @@ -26,7 +24,6 @@ use crate::eth::primitives::ExternalTransactionExecution;
use crate::eth::primitives::LogMined;
use crate::eth::primitives::StoragePointInTime;
use crate::eth::primitives::TransactionInput;
use crate::eth::storage::InMemoryPermanentStorage;
use crate::eth::storage::StorageError;
use crate::eth::storage::StratusStorage;
use crate::eth::BlockMiner;
Expand Down Expand Up @@ -73,7 +70,7 @@ impl EthExecutor {
// -------------------------------------------------------------------------

/// Re-executes an external block locally and imports it to the permanent storage.
pub async fn import_external_to_perm(&self, block: ExternalBlock, receipts: &mut ExternalReceipts) -> anyhow::Result<Block> {
pub async fn import_external_to_perm(&self, block: ExternalBlock, receipts: &ExternalReceipts) -> anyhow::Result<Block> {
// import block
let block = self.import_external_to_temp(block, receipts).await?;

Expand All @@ -89,7 +86,7 @@ impl EthExecutor {
}

/// Re-executes an external block locally and imports it to the temporary storage.
pub async fn import_external_to_temp(&self, block: ExternalBlock, receipts: &mut ExternalReceipts) -> anyhow::Result<Block> {
pub async fn import_external_to_temp(&self, block: ExternalBlock, receipts: &ExternalReceipts) -> anyhow::Result<Block> {
#[cfg(feature = "metrics")]
let start = metrics::now();

Expand All @@ -106,25 +103,25 @@ impl EthExecutor {
let tx_start = metrics::now();

// re-execute transaction or create a fake execution the external transaction failed
let receipt = receipts.try_take(&tx.hash())?;
let receipt = receipts.try_get(&tx.hash())?;
let execution = if receipt.is_success() {
let evm_input = EvmInput::from_external_transaction(&block, tx.clone(), &receipt)?;
let evm_input = EvmInput::from_external_transaction(&block, tx.clone(), receipt)?;
self.execute_in_evm(evm_input).await
} else {
let sender = self.storage.read_account(&receipt.from.into(), &StoragePointInTime::Present).await?;
let execution = Execution::from_failed_external_transaction(&block, &receipt, sender)?;
let execution = Execution::from_failed_external_transaction(&block, receipt, sender)?;
Ok((execution, ExecutionMetrics::default()))
};

// handle execution result
match execution {
Ok((mut execution, execution_metrics)) => {
// apply execution costs that were not consided when re-executing the transaction
execution.apply_execution_costs(&receipt)?;
execution.apply_execution_costs(receipt)?;
execution.gas = receipt.gas_used.unwrap_or_default().try_into()?;

// ensure it matches receipt before saving
if let Err(e) = execution.compare_with_receipt(&receipt) {
if let Err(e) = execution.compare_with_receipt(receipt) {
let json_tx = serde_json::to_string(&tx).unwrap();
let json_receipt = serde_json::to_string(&receipt).unwrap();
let json_execution_logs = serde_json::to_string(&execution.logs).unwrap();
Expand All @@ -134,7 +131,7 @@ impl EthExecutor {

// temporarily save state to next transactions from the same block
self.storage.save_account_changes_to_temp(execution.changes.clone()).await?;
executions.push((tx, receipt, execution.clone()));
executions.push((tx, receipt.clone(), execution.clone()));

// track metrics
#[cfg(feature = "metrics")]
Expand All @@ -151,19 +148,6 @@ impl EthExecutor {
}
}

// convert block
let block = Block::from_external(block, executions)?;

// Update block snapshot for integration testing
// Block 292973 from CloudWalk Network Mainnet
if *block.number() == BlockNumber::from(292973) {
let block_changes = block.compact_account_changes();
let state = InMemoryPermanentStorage::dump_snapshot(block_changes).await;
let state_string = serde_json::to_string(&state)?;
let mut file = std::fs::File::create("tests/fixtures/block-292973/snapshot.json")?;
file.write_all(state_string.as_bytes())?;
};

// track metrics
#[cfg(feature = "metrics")]
{
Expand All @@ -172,7 +156,7 @@ impl EthExecutor {
metrics::inc_executor_external_block_slot_reads(block_metrics.slot_reads);
}

Ok(block)
Block::from_external(block, executions)
}

/// Executes a transaction persisting state changes.
Expand Down Expand Up @@ -223,16 +207,12 @@ impl EthExecutor {
};

// notify new blocks
if let Err(e) = self.block_notifier.send(block.clone()) {
tracing::error!(reason = ?e, "failed to send block notification");
};
let _ = self.block_notifier.send(block.clone());

// notify transaction logs
for trx in block.transactions {
for log in trx.logs {
if let Err(e) = self.log_notifier.send(log) {
tracing::error!(reason = ?e, "failed to send log notification");
};
let _ = self.log_notifier.send(log);
}
}

Expand Down
9 changes: 5 additions & 4 deletions src/eth/primitives/external_receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ use crate::eth::primitives::ExternalReceipt;
use crate::eth::primitives::Hash;

/// A collection of [`ExternalReceipt`].
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ExternalReceipts(HashMap<Hash, ExternalReceipt>);

impl ExternalReceipts {
/// Tries to take a receipt by its hash.
pub fn try_take(&mut self, hash: &Hash) -> anyhow::Result<ExternalReceipt> {
match self.take(hash) {
pub fn try_get(&self, hash: &Hash) -> anyhow::Result<&ExternalReceipt> {
match self.get(hash) {
Some(receipt) => Ok(receipt),
None => {
tracing::error!(%hash, "receipt is missing for hash");
Expand All @@ -21,8 +22,8 @@ impl ExternalReceipts {
}

/// Takes a receipt by its hash.
pub fn take(&mut self, hash: &Hash) -> Option<ExternalReceipt> {
self.0.remove(hash)
pub fn get(&self, hash: &Hash) -> Option<&ExternalReceipt> {
self.0.get(hash)
}

/// Returns the number of receipts.
Expand Down
2 changes: 1 addition & 1 deletion src/eth/storage/inmemory/inmemory_permanent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl InMemoryPermanentStorage {
// -------------------------------------------------------------------------

/// Dump a snapshot of an execution previous state that can be used in tests.
pub async fn dump_snapshot(changes: Vec<ExecutionAccountChanges>) -> InMemoryPermanentStorageState {
pub fn dump_snapshot(changes: Vec<ExecutionAccountChanges>) -> InMemoryPermanentStorageState {
let mut state = InMemoryPermanentStorageState::default();
for change in changes {
// save account
Expand Down
321 changes: 320 additions & 1 deletion tests/fixtures/block-292973/block.json

Large diffs are not rendered by default.

1,215,884 changes: 1,215,870 additions & 14 deletions tests/fixtures/block-292973/receipts.json

Large diffs are not rendered by default.

50,607 changes: 50,606 additions & 1 deletion tests/fixtures/block-292973/snapshot.json

Large diffs are not rendered by default.

Loading

0 comments on commit c2e927b

Please sign in to comment.