Skip to content

Commit

Permalink
stratus storage
Browse files Browse the repository at this point in the history
  • Loading branch information
dinhani-cw committed May 15, 2024
1 parent 28aa49f commit a9ec848
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 63 deletions.
10 changes: 1 addition & 9 deletions src/bin/importer_offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ use stratus::eth::storage::StratusStorage;
use stratus::eth::BlockMiner;
use stratus::eth::Executor;
use stratus::ext::not;
#[cfg(feature = "metrics")]
use stratus::infra::metrics;
use stratus::log_and_err;
use stratus::utils::signal_handler;
use stratus::GlobalServices;
Expand Down Expand Up @@ -166,9 +164,6 @@ async fn execute_block_importer(
tracing::info!(%block_start, %block_end, receipts = %receipts.len(), "importing blocks");
for (block_index, block) in blocks.into_iter().enumerate() {
async {
#[cfg(feature = "metrics")]
let start = metrics::now();

// re-execute block
executor.reexecute_external(&block, &receipts).await?;

Expand All @@ -186,9 +181,6 @@ async fn execute_block_importer(
export_snapshot(&block, &receipts, &mined_block)?;
}

#[cfg(feature = "metrics")]
metrics::inc_import_offline(start.elapsed());

anyhow::Ok(())
}
.await?;
Expand Down Expand Up @@ -349,7 +341,7 @@ async fn import_external_to_csv(
// flush
if should_flush {
csv.flush()?;
stratus_storage.flush_temp().await?;
stratus_storage.flush().await?;
}

// chunk
Expand Down
7 changes: 4 additions & 3 deletions src/bin/importer_online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,10 @@ pub async fn run_importer_online(
miner.mine_external_mixed_and_commit().await?;

#[cfg(feature = "metrics")]
metrics::inc_n_importer_online_transactions_total(receipts.len() as u64);
#[cfg(feature = "metrics")]
metrics::inc_import_online_mined_block(start.elapsed());
{
metrics::inc_n_importer_online_transactions_total(receipts.len() as u64);
metrics::inc_import_online_mined_block(start.elapsed());
}
}

Ok(())
Expand Down
6 changes: 3 additions & 3 deletions src/eth/block_miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl BlockMiner {
///
/// Local transactions are not allowed to be part of the block.
pub async fn mine_external(&self) -> anyhow::Result<Block> {
let block = self.storage.temp.finish_block().await?;
let block = self.storage.finish_block().await?;
let (local_txs, external_txs) = block.split_transactions();

// validate
Expand All @@ -80,7 +80,7 @@ impl BlockMiner {
///
/// Local transactions are allowed to be part of the block if failed, but not succesful ones.
pub async fn mine_external_mixed(&self) -> anyhow::Result<Block> {
let block = self.storage.temp.finish_block().await?;
let block = self.storage.finish_block().await?;
let (local_txs, external_txs) = block.split_transactions();

// validate
Expand Down Expand Up @@ -113,7 +113,7 @@ impl BlockMiner {
///
/// External transactions are not allowed to be part of the block.
pub async fn mine_local(&self) -> anyhow::Result<Block> {
let block = self.storage.temp.finish_block().await?;
let block = self.storage.finish_block().await?;
let (local_txs, external_txs) = block.split_transactions();

// validate
Expand Down
4 changes: 2 additions & 2 deletions src/eth/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl Executor {

// track active block number
let storage = &self.storage;
storage.temp.set_external_block(block.clone()).await?;
storage.set_active_external_block(block.clone()).await?;
storage.set_active_block_number(block.number()).await?;

// determine how to execute each transaction
Expand All @@ -115,7 +115,7 @@ impl Executor {
ParallelExecutionRoute::Parallel(..) => {
match parallel_executions.next().await.unwrap() {
// success: check conflicts
Ok(tx) => match storage.temp.check_conflicts(&tx.result.execution).await? {
Ok(tx) => match storage.check_conflicts(&tx.result.execution).await? {
// no conflict: proceeed
None => tx,
// conflict: reexecute
Expand Down
2 changes: 1 addition & 1 deletion src/eth/storage/inmemory/inmemory_temporary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl TemporaryStorage for InMemoryTemporaryStorage {
// External block
// -------------------------------------------------------------------------

async fn set_external_block(&self, block: ExternalBlock) -> anyhow::Result<()> {
async fn set_active_external_block(&self, block: ExternalBlock) -> anyhow::Result<()> {
tracing::debug!(number = %block.number(), "setting re-executed external block");

let mut states = self.lock_write().await;
Expand Down
64 changes: 50 additions & 14 deletions src/eth/storage/stratus_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@ use crate::eth::primitives::Address;
use crate::eth::primitives::Block;
use crate::eth::primitives::BlockNumber;
use crate::eth::primitives::BlockSelection;
use crate::eth::primitives::EvmExecution;
use crate::eth::primitives::ExecutionConflicts;
use crate::eth::primitives::ExternalBlock;
use crate::eth::primitives::Hash;
use crate::eth::primitives::LogFilter;
use crate::eth::primitives::LogMined;
use crate::eth::primitives::PendingBlock;
use crate::eth::primitives::Slot;
use crate::eth::primitives::SlotIndex;
use crate::eth::primitives::SlotIndexes;
Expand All @@ -35,7 +39,7 @@ const DEFAULT_VALUE: &str = "default";
///
/// Additionaly it tracks metrics that are independent of the storage implementation.
pub struct StratusStorage {
pub temp: Arc<dyn TemporaryStorage>,
temp: Arc<dyn TemporaryStorage>,
perm: Arc<dyn PermanentStorage>,
}

Expand Down Expand Up @@ -139,6 +143,19 @@ impl StratusStorage {
// Accounts and slots
// -------------------------------------------------------------------------

pub async fn set_active_external_block(&self, block: ExternalBlock) -> anyhow::Result<()> {
#[cfg(feature = "metrics")]
{
let start = metrics::now();
let result = self.temp.set_active_external_block(block).await;
metrics::inc_storage_set_active_external_block(start.elapsed(), result.is_ok());
result
}

#[cfg(not(feature = "metrics"))]
self.temp.inc_storage_set_external_block(block).await
}

#[tracing::instrument(skip_all)]
pub async fn save_accounts(&self, accounts: Vec<Account>) -> anyhow::Result<()> {
#[cfg(feature = "metrics")]
Expand All @@ -153,6 +170,20 @@ impl StratusStorage {
self.perm.save_accounts(accounts).await
}

#[tracing::instrument(skip_all)]
pub async fn check_conflicts(&self, execution: &EvmExecution) -> anyhow::Result<Option<ExecutionConflicts>> {
#[cfg(feature = "metrics")]
{
let start = metrics::now();
let result = self.temp.check_conflicts(execution).await;
metrics::inc_storage_check_conflicts(start.elapsed(), result.is_ok(), result.as_ref().is_ok_and(|conflicts| conflicts.is_some()));
result
}

#[cfg(not(feature = "metrics"))]
self.temp.check_conflicts(execution).await
}

#[tracing::instrument(skip_all)]
pub async fn read_account(&self, address: &Address, point_in_time: &StoragePointInTime) -> anyhow::Result<Account> {
#[cfg(feature = "metrics")]
Expand Down Expand Up @@ -275,21 +306,26 @@ impl StratusStorage {
}

#[tracing::instrument(skip_all)]
pub async fn save_block(&self, block: Block) -> anyhow::Result<()> {
pub async fn finish_block(&self) -> anyhow::Result<PendingBlock> {
#[cfg(feature = "metrics")]
{
let (start, label_size_by_tx, label_size_by_gas, gas_used) = (
metrics::now(),
block.label_size_by_transactions(),
block.label_size_by_gas(),
block.header.gas_used.as_u64(),
);

let result = self.perm.save_block(block).await;
let start = metrics::now();
let result = self.temp.finish_block().await;
metrics::inc_storage_finish_block(start.elapsed(), result.is_ok());
result
}

metrics::inc_storage_commit(start.elapsed(), label_size_by_tx, label_size_by_gas, result.is_ok());
metrics::inc_n_storage_gas_total(gas_used);
#[cfg(not(feature = "metrics"))]
self.temp.finish_block().await
}

#[tracing::instrument(skip_all)]
pub async fn save_block(&self, block: Block) -> anyhow::Result<()> {
#[cfg(feature = "metrics")]
{
let (start, label_size_by_tx, label_size_by_gas) = (metrics::now(), block.label_size_by_transactions(), block.label_size_by_gas());
let result = self.perm.save_block(block).await;
metrics::inc_storage_save_block(start.elapsed(), label_size_by_tx, label_size_by_gas, result.is_ok());
result
}

Expand Down Expand Up @@ -340,12 +376,12 @@ impl StratusStorage {
}

#[tracing::instrument(skip_all)]
pub async fn flush_temp(&self) -> anyhow::Result<()> {
pub async fn flush(&self) -> anyhow::Result<()> {
#[cfg(feature = "metrics")]
{
let start = metrics::now();
let result = self.temp.flush().await;
metrics::inc_storage_flush_temp(start.elapsed(), result.is_ok());
metrics::inc_storage_flush(start.elapsed(), STORAGE_TEMP, result.is_ok());
result
}

Expand Down
8 changes: 4 additions & 4 deletions src/eth/storage/temporary_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,21 @@ pub trait TemporaryStorage: Send + Sync {
// -------------------------------------------------------------------------

/// Sets the external block being re-executed.
async fn set_external_block(&self, block: ExternalBlock) -> anyhow::Result<()>;
async fn set_active_external_block(&self, block: ExternalBlock) -> anyhow::Result<()>;

/// Saves an re-executed transaction to the active mined block.
async fn save_execution(&self, tx: TransactionExecution) -> anyhow::Result<()>;

/// Finishes the mining of the active block and starts a new block.
async fn finish_block(&self) -> anyhow::Result<PendingBlock>;

/// Checks if an execution conflicts with current storage state.
async fn check_conflicts(&self, execution: &EvmExecution) -> anyhow::Result<Option<ExecutionConflicts>>;

// -------------------------------------------------------------------------
// Accounts and slots
// -------------------------------------------------------------------------

/// Checks if an execution conflicts with current storage state.
async fn check_conflicts(&self, execution: &EvmExecution) -> anyhow::Result<Option<ExecutionConflicts>>;

/// Retrieves an account from the storage. Returns Option when not found.
async fn read_account(&self, address: &Address) -> anyhow::Result<Option<Account>>;

Expand Down
38 changes: 15 additions & 23 deletions src/infra/metrics/metrics_definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ metrics! {
metrics! {
group: storage_read,

"Time to execute storage check_conflicts operation."
histogram_duration storage_check_conflicts{success, conflicted} [],

"Time to execute storage read_active_block_number operation."
histogram_duration storage_read_active_block_number{success} [],

Expand Down Expand Up @@ -56,41 +59,30 @@ metrics! {
"Time to execute storage save_account_changes operation."
histogram_duration storage_save_execution{success} [],

"Time to execute storage flush_temp operation."
histogram_duration storage_flush_temp{success} [],

"Time to execute storage save_block operation."
histogram_duration storage_save_block{success} [],

"Time to execute storage reset operation."
histogram_duration storage_reset{kind, success} [],
"Time to execute storage flush operation."
histogram_duration storage_flush{kind, success} [],

"Time to execute storage commit operation."
histogram_duration storage_commit{size_by_tx, size_by_gas, success} [],
"Time to execute storage set_active_external_block operation."
histogram_duration storage_set_active_external_block{success} [],

"Ammount of gas in the commited transactions"
counter storage_gas_total{} []
}
"Time to execute storage finish_block operation."
histogram_duration storage_finish_block{success} [],

// Importer offline metrics.
metrics! {
group: importer_offline,
"Time to execute storage save_block operation."
histogram_duration storage_save_block{size_by_tx, size_by_gas, success} [],

"Time to execute import_offline operation."
histogram_duration import_offline{} []
"Time to execute storage reset operation."
histogram_duration storage_reset{kind, success} []
}

// Importer online metrics.
metrics! {
group: importer_online,

"Time to execute import_online operation."
histogram_duration import_online{} [],

"Time to import one mined block."
"Time to import one block."
histogram_duration import_online_mined_block{} [],

"Transactions imported"
"Number of transactions imported."
counter importer_online_transactions_total{} []
}

Expand Down
2 changes: 0 additions & 2 deletions src/infra/metrics/metrics_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use metrics_exporter_prometheus::PrometheusBuilder;
use crate::config::MetricsHistogramKind;
use crate::infra::metrics::metrics_for_evm;
use crate::infra::metrics::metrics_for_executor;
use crate::infra::metrics::metrics_for_importer_offline;
use crate::infra::metrics::metrics_for_importer_online;
use crate::infra::metrics::metrics_for_json_rpc;
use crate::infra::metrics::metrics_for_rocks;
Expand All @@ -33,7 +32,6 @@ pub fn init_metrics(histogram_kind: MetricsHistogramKind) {

// get metric definitions
let mut metrics = Vec::new();
metrics.extend(metrics_for_importer_offline());
metrics.extend(metrics_for_importer_online());
metrics.extend(metrics_for_json_rpc());
metrics.extend(metrics_for_executor());
Expand Down
4 changes: 2 additions & 2 deletions tests/test_import_external_snapshot_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ mod m {
pub use stratus::infra::metrics::METRIC_EVM_EXECUTION;
pub use stratus::infra::metrics::METRIC_EVM_EXECUTION_SLOT_READS_CACHED;
pub use stratus::infra::metrics::METRIC_EXECUTOR_EXTERNAL_BLOCK;
pub use stratus::infra::metrics::METRIC_STORAGE_COMMIT;
pub use stratus::infra::metrics::METRIC_STORAGE_READ_ACCOUNT;
pub use stratus::infra::metrics::METRIC_STORAGE_READ_SLOT;
pub use stratus::infra::metrics::METRIC_STORAGE_READ_SLOTS;
pub use stratus::infra::metrics::METRIC_STORAGE_SAVE_BLOCK;
}

#[cfg(feature = "metrics")]
Expand Down Expand Up @@ -83,7 +83,7 @@ const METRIC_QUERIES: [&str; 48] = [
m::formatcp!("{}{{found_at='permanent', quantile='0.95'}}", m::METRIC_STORAGE_READ_SLOT),
m::formatcp!("{}{{found_at='default', quantile='0.95'}}", m::METRIC_STORAGE_READ_SLOT),
"* COMMIT",
m::formatcp!("{}{{quantile='1'}}", m::METRIC_STORAGE_COMMIT),
m::formatcp!("{}{{quantile='1'}}", m::METRIC_STORAGE_SAVE_BLOCK),
];

#[cfg(not(feature = "metrics"))]
Expand Down

0 comments on commit a9ec848

Please sign in to comment.