From 513bdbdbb191417775c3137f0537f950eb4fc50b Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Wed, 28 Feb 2024 13:36:16 -0300 Subject: [PATCH 01/17] implement state comparison job --- src/bin/importer/importer-import.rs | 57 +++++++++++++++++-- src/config.rs | 12 ++++ src/eth/primitives/mod.rs | 1 + src/eth/primitives/slot.rs | 14 ++++- .../storage/inmemory/inmemory_permanent.rs | 5 ++ src/eth/storage/permanent_storage.rs | 3 + src/eth/storage/postgres/postgres.rs | 23 ++++++-- .../queries/select_random_slot_sample.sql | 25 ++++++++ src/eth/storage/stratus_storage.rs | 6 ++ src/infra/blockchain_client.rs | 21 +++++++ 10 files changed, 158 insertions(+), 9 deletions(-) create mode 100644 src/eth/storage/postgres/queries/select_random_slot_sample.sql diff --git a/src/bin/importer/importer-import.rs b/src/bin/importer/importer-import.rs index f35252779..f307c176f 100644 --- a/src/bin/importer/importer-import.rs +++ b/src/bin/importer/importer-import.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; use itertools::Itertools; use serde_json::Value as JsonValue; @@ -13,7 +14,9 @@ use stratus::eth::primitives::BlockNumber; use stratus::eth::primitives::ExternalBlock; use stratus::eth::primitives::ExternalReceipt; use stratus::eth::primitives::Wei; +use stratus::eth::storage::StratusStorage; use stratus::infra::postgres::Postgres; +use stratus::infra::BlockchainClient; use stratus::init_global_services; use stratus::log_and_err; use tokio::sync::mpsc; @@ -21,6 +24,7 @@ use tokio_util::sync::CancellationToken; /// Number of tasks in backlog: (BACKLOG_SIZE * BacklogTask) const BACKLOG_SIZE: usize = 10; +const RPC_TIMEOUT: Duration = Duration::from_secs(2); type BacklogTask = (Vec, Vec); #[tokio::main(flavor = "current_thread")] @@ -45,6 +49,14 @@ async fn main() -> anyhow::Result<()> { // load blocks and receipts in background tokio::spawn(keep_loading_blocks(pg, cancellation.clone(), backlog_tx.clone())); + if config.validate_state { + tokio::spawn(keep_comparing_state( + Arc::clone(&storage), + config.external_rpc, + config.max_samples, + BlockNumber::from(config.comparison_period), + )); + } // import blocks and transactions in foreground let reason = loop { @@ -73,6 +85,43 @@ async fn main() -> anyhow::Result<()> { Ok(()) } +async fn keep_comparing_state(storage: Arc, external_rpc: String, max_sample_size: u64, compare_after: BlockNumber) -> anyhow::Result<()> { + let chain = BlockchainClient::new(&external_rpc, RPC_TIMEOUT)?; + let mut latest_compared_block = storage.read_current_block_number().await?; + loop { + let current_imported_block = storage.read_current_block_number().await?; + if current_imported_block - latest_compared_block >= compare_after { + compare_state(&chain, Arc::clone(&storage), latest_compared_block, current_imported_block, max_sample_size).await?; + latest_compared_block = current_imported_block; + } + tokio::time::sleep(Duration::from_secs(1)).await; + } +} + +async fn compare_state( + chain: &BlockchainClient, + storage: Arc, + start: BlockNumber, + end: BlockNumber, + max_sample_size: u64, +) -> anyhow::Result<()> { + let slots = storage.get_slots_sample(start, end, max_sample_size, Some(1)).await?; + for sampled_slot in slots { + if chain + .get_storage_at( + &sampled_slot.address, + &sampled_slot.slot.index, + stratus::eth::primitives::StoragePointInTime::Past(sampled_slot.block_number), + ) + .await? + != sampled_slot.slot.value + { + return Err(anyhow::anyhow!("State mismatch on slot {:?}", sampled_slot)); + } + } + Ok(()) +} + // ----------------------------------------------------------------------------- // Postgres block loader // ----------------------------------------------------------------------------- @@ -93,13 +142,13 @@ async fn keep_loading_blocks( // find blocks tracing::info!("retrieving more blocks to process"); let blocks = match db_fetch_blocks(&mut tx).await { - Ok(blocks) => + Ok(blocks) => { if blocks.is_empty() { cancellation.cancel(); break "no more blocks to process"; - } else { - blocks - }, + } + blocks + } Err(_) => { cancellation.cancel(); break "error loading blocks"; diff --git a/src/config.rs b/src/config.rs index b6d8153ca..e8deb295b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -71,6 +71,18 @@ pub struct ImporterImportConfig { #[deref] #[clap(flatten)] pub common: CommonConfig, + + #[arg(short = 'v', long = "validate-state", env = "VALIDATE-STATE", default_value_t = false, requires = "external_rpc")] + pub validate_state: bool, + + #[arg(short = 'r', long = "external-rpc", env = "EXTERNAL_RPC", requires = "validate_state")] + pub external_rpc: String, + + #[arg(short = 'm', long = "max-samples", env = "MAX_SAMPLES", default_value_t = 1000, requires = "validate_state")] + pub max_samples: u64, + + #[arg(short = 'p', long = "comparison-period", env = "COMPARISON_PERIOD", default_value_t = 1000, requires = "validate_state")] + pub comparison_period: u64, } /// Configuration for rpc-poller binary. diff --git a/src/eth/primitives/mod.rs b/src/eth/primitives/mod.rs index 50195bb6e..4fac65343 100644 --- a/src/eth/primitives/mod.rs +++ b/src/eth/primitives/mod.rs @@ -149,6 +149,7 @@ pub use log_mined::LogMined; pub use log_topic::LogTopic; pub use nonce::Nonce; pub use slot::Slot; +pub use slot::SlotSample; pub use slot::SlotIndex; pub use slot::SlotValue; pub use storage_point_in_time::StoragePointInTime; diff --git a/src/eth/primitives/slot.rs b/src/eth/primitives/slot.rs index 9306c0ab3..a781c4921 100644 --- a/src/eth/primitives/slot.rs +++ b/src/eth/primitives/slot.rs @@ -20,8 +20,12 @@ use sqlx::postgres::PgHasArrayType; use sqlx::Decode; use crate::gen_newtype_from; +use crate::eth::primitives::BlockNumber; -#[derive(Debug, Clone, Default, PartialEq, Eq, fake::Dummy, serde::Serialize, serde::Deserialize)] +use super::Address; + + +#[derive(Debug, Clone, Default, PartialEq, Eq, fake::Dummy, serde::Serialize, serde::Deserialize, sqlx::FromRow)] pub struct Slot { pub index: SlotIndex, pub value: SlotValue, @@ -202,3 +206,11 @@ impl PgHasArrayType for SlotValue { <[u8; 32] as PgHasArrayType>::array_type_info() } } + +#[derive(Debug, sqlx::FromRow)] +pub struct SlotSample { + pub address: Address, + pub block_number: BlockNumber, + #[sqlx(flatten)] + pub slot: Slot +} diff --git a/src/eth/storage/inmemory/inmemory_permanent.rs b/src/eth/storage/inmemory/inmemory_permanent.rs index 119d5578f..d6333651b 100644 --- a/src/eth/storage/inmemory/inmemory_permanent.rs +++ b/src/eth/storage/inmemory/inmemory_permanent.rs @@ -26,6 +26,7 @@ use crate::eth::primitives::LogMined; use crate::eth::primitives::Nonce; use crate::eth::primitives::Slot; use crate::eth::primitives::SlotIndex; +use crate::eth::primitives::SlotSample; use crate::eth::primitives::StoragePointInTime; use crate::eth::primitives::TransactionMined; use crate::eth::primitives::Wei; @@ -353,6 +354,10 @@ impl PermanentStorage for InMemoryPermanentStorage { Ok(()) } + + async fn get_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: Option) -> anyhow::Result> { + todo!(); + } } #[derive(Debug)] diff --git a/src/eth/storage/permanent_storage.rs b/src/eth/storage/permanent_storage.rs index 659dea1da..c68bddf3d 100644 --- a/src/eth/storage/permanent_storage.rs +++ b/src/eth/storage/permanent_storage.rs @@ -10,6 +10,7 @@ use crate::eth::primitives::LogFilter; use crate::eth::primitives::LogMined; use crate::eth::primitives::Slot; use crate::eth::primitives::SlotIndex; +use crate::eth::primitives::SlotSample; use crate::eth::primitives::StoragePointInTime; use crate::eth::primitives::TransactionMined; use crate::eth::storage::StorageError; @@ -51,4 +52,6 @@ pub trait PermanentStorage: Send + Sync { /// Resets all state to a specific block number. async fn reset_at(&self, number: BlockNumber) -> anyhow::Result<()>; + + async fn get_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: Option) -> anyhow::Result>; } diff --git a/src/eth/storage/postgres/postgres.rs b/src/eth/storage/postgres/postgres.rs index dc364e45e..239f8c704 100644 --- a/src/eth/storage/postgres/postgres.rs +++ b/src/eth/storage/postgres/postgres.rs @@ -24,6 +24,7 @@ use crate::eth::primitives::LogMined; use crate::eth::primitives::LogTopic; use crate::eth::primitives::Slot; use crate::eth::primitives::SlotIndex; +use crate::eth::primitives::SlotSample; use crate::eth::primitives::StoragePointInTime; use crate::eth::primitives::TransactionMined; use crate::eth::storage::postgres::types::AccountBatch; @@ -106,10 +107,11 @@ impl PermanentStorage for Postgres { let slot_index: [u8; 32] = slot_index.clone().into(); let slot = match point_in_time { - StoragePointInTime::Present => + StoragePointInTime::Present => { sqlx::query_file_as!(Slot, "src/eth/storage/postgres/queries/select_slot.sql", address.as_ref(), slot_index.as_ref()) .fetch_optional(&self.connection_pool) - .await?, + .await? + } StoragePointInTime::Past(number) => { let block_number: i64 = (*number).try_into()?; sqlx::query_file_as!( @@ -715,6 +717,18 @@ impl PermanentStorage for Postgres { Ok(()) } + + async fn get_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: Option) -> anyhow::Result> { + let seed = seed.unwrap_or(1); + let slots_sample = sqlx::query_as::<_, SlotSample>(include_str!("queries/select_random_slot_sample.sql")) + .bind(seed as i64) + .bind(start) + .bind(end) + .bind(max_samples as i64) + .fetch_all(&self.connection_pool) + .await?; + Ok(slots_sample) + } } fn partition_logs(logs: impl IntoIterator) -> HashMap> { @@ -733,12 +747,13 @@ fn partition_topics(topics: impl IntoIterator) -> HashMap< let mut partitions: HashMap>> = HashMap::new(); for topic in topics { match partitions.get_mut(&topic.transaction_hash) { - Some(transaction_logs) => + Some(transaction_logs) => { if let Some(part) = transaction_logs.get_mut(&topic.log_idx) { part.push(topic); } else { transaction_logs.insert(topic.log_idx, vec![topic]); - }, + } + } None => { partitions.insert(topic.transaction_hash.clone(), [(topic.log_idx, vec![topic])].into_iter().collect()); } diff --git a/src/eth/storage/postgres/queries/select_random_slot_sample.sql b/src/eth/storage/postgres/queries/select_random_slot_sample.sql new file mode 100644 index 000000000..721a4e594 --- /dev/null +++ b/src/eth/storage/postgres/queries/select_random_slot_sample.sql @@ -0,0 +1,25 @@ +SELECT + idx as "index: _", + value as "value: _", + account_address as "address: _", + block_number as "block_number: _" +FROM + (SELECT + setseed($1), + null as "idx", + null as "value", + null as "account_address", + null as "block_number" + UNION ALL + SELECT + null, + idx, + value, + account_address, + block_number + FROM historical_slots + WHERE block_number >= $2 AND block_number <= $3 + OFFSET 1 + ) +ORDER BY random() +LIMIT $4; diff --git a/src/eth/storage/stratus_storage.rs b/src/eth/storage/stratus_storage.rs index a145a726e..d0d934716 100644 --- a/src/eth/storage/stratus_storage.rs +++ b/src/eth/storage/stratus_storage.rs @@ -14,6 +14,7 @@ use crate::eth::primitives::LogFilter; use crate::eth::primitives::LogMined; use crate::eth::primitives::Slot; use crate::eth::primitives::SlotIndex; +use crate::eth::primitives::SlotSample; use crate::eth::primitives::StoragePointInTime; use crate::eth::primitives::TransactionMined; use crate::eth::storage::PermanentStorage; @@ -220,4 +221,9 @@ impl StratusStorage { }, } } + + pub async fn get_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: Option) -> anyhow::Result>{ + self.perm.get_slots_sample(start, end, max_samples, seed).await + } + } diff --git a/src/infra/blockchain_client.rs b/src/infra/blockchain_client.rs index 1eb883871..25dc51b81 100644 --- a/src/infra/blockchain_client.rs +++ b/src/infra/blockchain_client.rs @@ -8,6 +8,9 @@ use serde_json::Value as JsonValue; use crate::eth::primitives::Address; use crate::eth::primitives::BlockNumber; use crate::eth::primitives::Hash; +use crate::eth::primitives::SlotIndex; +use crate::eth::primitives::SlotValue; +use crate::eth::primitives::StoragePointInTime; use crate::eth::primitives::Wei; use crate::log_and_err; @@ -77,4 +80,22 @@ impl BlockchainClient { Err(e) => log_and_err!(reason = e, "failed to retrieve account balance"), } } + + /// Retrieves a slot at some block. + pub async fn get_storage_at(&self, address: &Address, index: &SlotIndex, point_in_time: StoragePointInTime) -> anyhow::Result { + tracing::debug!(%address, ?point_in_time, "retrieving account balance"); + + let address = serde_json::to_value(address)?; + let index = serde_json::to_value(index)?; + let number = match point_in_time { + StoragePointInTime::Present => serde_json::to_value("latest")?, + StoragePointInTime::Past(number) => serde_json::to_value(number)? + }; + let result = self.http.request::>("eth_getStorageAt", vec![address, index, number]).await; + + match result { + Ok(value) => Ok(value), + Err(e) => log_and_err!(reason = e, "failed to retrieve account balance"), + } + } } From e43cf351712390935f2c3794427caf5a3cede003 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Wed, 28 Feb 2024 13:36:45 -0300 Subject: [PATCH 02/17] rename to validate_state --- src/bin/importer/importer-import.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/bin/importer/importer-import.rs b/src/bin/importer/importer-import.rs index f307c176f..7b9141030 100644 --- a/src/bin/importer/importer-import.rs +++ b/src/bin/importer/importer-import.rs @@ -50,7 +50,7 @@ async fn main() -> anyhow::Result<()> { // load blocks and receipts in background tokio::spawn(keep_loading_blocks(pg, cancellation.clone(), backlog_tx.clone())); if config.validate_state { - tokio::spawn(keep_comparing_state( + tokio::spawn(keep_validating_state( Arc::clone(&storage), config.external_rpc, config.max_samples, @@ -85,20 +85,20 @@ async fn main() -> anyhow::Result<()> { Ok(()) } -async fn keep_comparing_state(storage: Arc, external_rpc: String, max_sample_size: u64, compare_after: BlockNumber) -> anyhow::Result<()> { +async fn keep_validating_state(storage: Arc, external_rpc: String, max_sample_size: u64, compare_after: BlockNumber) -> anyhow::Result<()> { let chain = BlockchainClient::new(&external_rpc, RPC_TIMEOUT)?; let mut latest_compared_block = storage.read_current_block_number().await?; loop { let current_imported_block = storage.read_current_block_number().await?; if current_imported_block - latest_compared_block >= compare_after { - compare_state(&chain, Arc::clone(&storage), latest_compared_block, current_imported_block, max_sample_size).await?; + validate_state(&chain, Arc::clone(&storage), latest_compared_block, current_imported_block, max_sample_size).await?; latest_compared_block = current_imported_block; } tokio::time::sleep(Duration::from_secs(1)).await; } } -async fn compare_state( +async fn validate_state( chain: &BlockchainClient, storage: Arc, start: BlockNumber, From 8f8706519f847da5954b02d151aa01e908f617d2 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Wed, 28 Feb 2024 13:49:53 -0300 Subject: [PATCH 03/17] Panic if state is invalid --- src/bin/importer/importer-import.rs | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/src/bin/importer/importer-import.rs b/src/bin/importer/importer-import.rs index 7b9141030..e58478136 100644 --- a/src/bin/importer/importer-import.rs +++ b/src/bin/importer/importer-import.rs @@ -49,6 +49,8 @@ async fn main() -> anyhow::Result<()> { // load blocks and receipts in background tokio::spawn(keep_loading_blocks(pg, cancellation.clone(), backlog_tx.clone())); + + // validate state in background if config.validate_state { tokio::spawn(keep_validating_state( Arc::clone(&storage), @@ -91,7 +93,11 @@ async fn keep_validating_state(storage: Arc, external_rpc: Strin loop { let current_imported_block = storage.read_current_block_number().await?; if current_imported_block - latest_compared_block >= compare_after { - validate_state(&chain, Arc::clone(&storage), latest_compared_block, current_imported_block, max_sample_size).await?; + let result = validate_state(&chain, Arc::clone(&storage), latest_compared_block, current_imported_block, max_sample_size) + .await; + if let Err(err) = result { + panic!("{}", err.to_string()); + } latest_compared_block = current_imported_block; } tokio::time::sleep(Duration::from_secs(1)).await; @@ -107,16 +113,20 @@ async fn validate_state( ) -> anyhow::Result<()> { let slots = storage.get_slots_sample(start, end, max_sample_size, Some(1)).await?; for sampled_slot in slots { - if chain + let expected_value = chain .get_storage_at( &sampled_slot.address, &sampled_slot.slot.index, stratus::eth::primitives::StoragePointInTime::Past(sampled_slot.block_number), ) - .await? - != sampled_slot.slot.value - { - return Err(anyhow::anyhow!("State mismatch on slot {:?}", sampled_slot)); + .await?; + if sampled_slot.slot.value != expected_value { + return Err(anyhow::anyhow!( + "State mismatch on slot {:?}, expected: {:?}, found: {:?}", + sampled_slot, + expected_value, + sampled_slot.slot.value + )); } } Ok(()) From b8019231cf172e31a587654e749c1f42d28cecf7 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Thu, 29 Feb 2024 10:27:13 -0300 Subject: [PATCH 04/17] extract validator to its own binary --- ...a401438a319d0b9acad44dcef28bde23d65fd.json | 43 ++++++++ Cargo.toml | 4 + src/bin/importer/state_validator.rs | 101 ++++++++++++++++++ src/config.rs | 29 ++--- src/eth/primitives/slot.rs | 8 +- src/eth/storage/postgres/postgres.rs | 20 ++-- 6 files changed, 181 insertions(+), 24 deletions(-) create mode 100644 .sqlx/query-39d0b94d07738b1f644a2239c22a401438a319d0b9acad44dcef28bde23d65fd.json create mode 100644 src/bin/importer/state_validator.rs diff --git a/.sqlx/query-39d0b94d07738b1f644a2239c22a401438a319d0b9acad44dcef28bde23d65fd.json b/.sqlx/query-39d0b94d07738b1f644a2239c22a401438a319d0b9acad44dcef28bde23d65fd.json new file mode 100644 index 000000000..95c35ee76 --- /dev/null +++ b/.sqlx/query-39d0b94d07738b1f644a2239c22a401438a319d0b9acad44dcef28bde23d65fd.json @@ -0,0 +1,43 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n idx as \"index!: _\",\n value as \"value!: _\",\n account_address as \"address!: _\",\n block_number as \"block_number!: _\"\nFROM\n (SELECT\n setseed($1),\n null as \"idx\",\n null as \"value\",\n null as \"account_address\",\n null as \"block_number\"\n UNION ALL\n SELECT\n null,\n idx,\n value,\n account_address,\n block_number\n FROM historical_slots\n WHERE block_number >= $2 AND block_number <= $3\n OFFSET 1\n )\nORDER BY random()\nLIMIT $4;\n", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "index!: _", + "type_info": "Bytea" + }, + { + "ordinal": 1, + "name": "value!: _", + "type_info": "Bytea" + }, + { + "ordinal": 2, + "name": "address!: _", + "type_info": "Bytea" + }, + { + "ordinal": 3, + "name": "block_number!: _", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Float8", + "Int8", + "Int8", + "Int8" + ] + }, + "nullable": [ + null, + null, + null, + null + ] + }, + "hash": "39d0b94d07738b1f644a2239c22a401438a319d0b9acad44dcef28bde23d65fd" +} diff --git a/Cargo.toml b/Cargo.toml index 287bc300a..e0b2ad596 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -94,6 +94,10 @@ path = "src/bin/importer/importer-import.rs" name = "rpc-server-poller" path = "src/bin/rpc_server_poller.rs" +[[bin]] +name = "state-validator" +path = "src/bin/importer/state-validator.rs" + # ------------------------------------------------------------------------------ # Lints # ------------------------------------------------------------------------------ diff --git a/src/bin/importer/state_validator.rs b/src/bin/importer/state_validator.rs new file mode 100644 index 000000000..460cbb30e --- /dev/null +++ b/src/bin/importer/state_validator.rs @@ -0,0 +1,101 @@ +#![allow(dead_code)] + +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; + +use itertools::Itertools; +use serde_json::Value as JsonValue; +use sqlx::Row; +use stratus::config::ImporterImportConfig; +use stratus::eth::primitives::Account; +use stratus::eth::primitives::Address; +use stratus::eth::primitives::BlockNumber; +use stratus::eth::primitives::ExternalBlock; +use stratus::eth::primitives::ExternalReceipt; +use stratus::eth::primitives::Wei; +use stratus::eth::storage::StratusStorage; +use stratus::infra::postgres::Postgres; +use stratus::infra::BlockchainClient; +use stratus::init_global_services; +use stratus::log_and_err; +use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; + +/// Number of tasks in backlog: (BACKLOG_SIZE * BacklogTask) +const BACKLOG_SIZE: usize = 10; +const RPC_TIMEOUT: Duration = Duration::from_secs(2); +type BacklogTask = (Vec, Vec); + +#[tokio::main()] +async fn main() -> anyhow::Result<()> { + // init services + let config: ImporterImportConfig = init_global_services(); + let pg = Arc::new(Postgres::new(&config.postgres_url).await?); + let storage = config.init_storage().await?; + let executor = config.init_executor(Arc::clone(&storage)); + + // init shared data between importer and postgres loader + let (backlog_tx, mut backlog_rx) = mpsc::channel::(BACKLOG_SIZE); + let cancellation = CancellationToken::new(); + + // import genesis accounts + let balances = db_retrieve_balances(&pg).await?; + let accounts = balances + .into_iter() + .map(|row| Account::new_with_balance(row.address, row.balance)) + .collect_vec(); + storage.save_accounts_to_perm(accounts).await?; + + let chain = BlockchainClient::new(&config.external_rpc, RPC_TIMEOUT)?; + let mut latest_compared_block = storage.read_current_block_number().await?; + loop { + let current_imported_block = storage.read_current_block_number().await?; + if current_imported_block - latest_compared_block >= interval { + let result = validate_state(&chain, Arc::clone(&storage), latest_compared_block, latest_compared_block + interval, max_sample_size) + .await; + if let Err(err) = result { + cancellation.cancel(); + panic!("{}", err); + } + latest_compared_block = current_imported_block; + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + Ok(()) +} + +async fn keep_validating_state(storage: Arc, external_rpc: String, max_sample_size: u64, interval: BlockNumber, cancellation: CancellationToken) -> anyhow::Result<()> { + println!("STARTING STATE VALIDATION"); + +} + +async fn validate_state( + chain: &BlockchainClient, + storage: Arc, + start: BlockNumber, + end: BlockNumber, + max_sample_size: u64, +) -> anyhow::Result<()> { + println!("Validating state {:?}, {:?}", start, end); + let slots = storage.get_slots_sample(start, end, max_sample_size, Some(1)).await?; + for sampled_slot in slots { + let expected_value = chain + .get_storage_at( + &sampled_slot.address, + &sampled_slot.index, + stratus::eth::primitives::StoragePointInTime::Past(sampled_slot.block_number), + ) + .await?; + println!("Comparing {:?} {:?}", sampled_slot.value, expected_value); + if sampled_slot.value != expected_value { + return Err(anyhow::anyhow!( + "State mismatch on slot {:?}, expected value: {:?}, found: {:?}", + sampled_slot, + expected_value, + sampled_slot.value + )); + } + } + Ok(()) +} diff --git a/src/config.rs b/src/config.rs index e8deb295b..330eb530f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -71,18 +71,6 @@ pub struct ImporterImportConfig { #[deref] #[clap(flatten)] pub common: CommonConfig, - - #[arg(short = 'v', long = "validate-state", env = "VALIDATE-STATE", default_value_t = false, requires = "external_rpc")] - pub validate_state: bool, - - #[arg(short = 'r', long = "external-rpc", env = "EXTERNAL_RPC", requires = "validate_state")] - pub external_rpc: String, - - #[arg(short = 'm', long = "max-samples", env = "MAX_SAMPLES", default_value_t = 1000, requires = "validate_state")] - pub max_samples: u64, - - #[arg(short = 'p', long = "comparison-period", env = "COMPARISON_PERIOD", default_value_t = 1000, requires = "validate_state")] - pub comparison_period: u64, } /// Configuration for rpc-poller binary. @@ -97,6 +85,23 @@ pub struct RpcPollerConfig { pub common: CommonConfig, } +/// Configuration for importer-import binary. +#[derive(Parser, Debug, derive_more::Deref)] +pub struct StateValidatorConfig { + #[deref] + #[clap(flatten)] + pub common: CommonConfig, + + #[arg(short = 'r', long = "external-rpc", env = "EXTERNAL_RPC", requires = "validate_state")] + pub external_rpc: String, + + #[arg(short = 'm', long = "max-samples", env = "MAX_SAMPLES", default_value_t = 0)] + pub sample_size: u64, + + #[arg(short = 'i', long = "inverval", env = "INVERVAL", default_value_t = 1000)] + pub interval: u64, +} + /// Common configuration that can be used by any binary. #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] diff --git a/src/eth/primitives/slot.rs b/src/eth/primitives/slot.rs index a781c4921..686af0b74 100644 --- a/src/eth/primitives/slot.rs +++ b/src/eth/primitives/slot.rs @@ -25,7 +25,7 @@ use crate::eth::primitives::BlockNumber; use super::Address; -#[derive(Debug, Clone, Default, PartialEq, Eq, fake::Dummy, serde::Serialize, serde::Deserialize, sqlx::FromRow)] +#[derive(Debug, Clone, Default, PartialEq, Eq, fake::Dummy, serde::Serialize, serde::Deserialize)] pub struct Slot { pub index: SlotIndex, pub value: SlotValue, @@ -207,10 +207,10 @@ impl PgHasArrayType for SlotValue { } } -#[derive(Debug, sqlx::FromRow)] +#[derive(Debug, sqlx::Decode)] pub struct SlotSample { pub address: Address, pub block_number: BlockNumber, - #[sqlx(flatten)] - pub slot: Slot + pub index: SlotIndex, + pub value: SlotValue } diff --git a/src/eth/storage/postgres/postgres.rs b/src/eth/storage/postgres/postgres.rs index 239f8c704..d6980f368 100644 --- a/src/eth/storage/postgres/postgres.rs +++ b/src/eth/storage/postgres/postgres.rs @@ -720,14 +720,18 @@ impl PermanentStorage for Postgres { async fn get_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: Option) -> anyhow::Result> { let seed = seed.unwrap_or(1); - let slots_sample = sqlx::query_as::<_, SlotSample>(include_str!("queries/select_random_slot_sample.sql")) - .bind(seed as i64) - .bind(start) - .bind(end) - .bind(max_samples as i64) - .fetch_all(&self.connection_pool) - .await?; - Ok(slots_sample) + let slots_sample_rows = sqlx::query_file_as!( + SlotSample, + "src/eth/storage/postgres/queries/select_random_slot_sample.sql", + seed as i64, + start as _, + end as _, + max_samples as i64 + ) + .fetch_all(&self.connection_pool) + .await?; + + Ok(slots_sample_rows) } } From b3166e2a37f76bcb9a388fdb76e2740e52dea32f Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Thu, 29 Feb 2024 10:27:25 -0300 Subject: [PATCH 05/17] fix query --- .../postgres/queries/select_random_slot_sample.sql | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/eth/storage/postgres/queries/select_random_slot_sample.sql b/src/eth/storage/postgres/queries/select_random_slot_sample.sql index 721a4e594..a4196298f 100644 --- a/src/eth/storage/postgres/queries/select_random_slot_sample.sql +++ b/src/eth/storage/postgres/queries/select_random_slot_sample.sql @@ -1,8 +1,8 @@ SELECT - idx as "index: _", - value as "value: _", - account_address as "address: _", - block_number as "block_number: _" + idx as "index!: _", + value as "value!: _", + account_address as "address!: _", + block_number as "block_number!: _" FROM (SELECT setseed($1), From b385d49af5a5a176307bfe023c8d68214be95124 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Thu, 29 Feb 2024 10:29:06 -0300 Subject: [PATCH 06/17] undo imprter changes --- src/bin/importer/importer-import.rs | 67 ++--------------------------- 1 file changed, 4 insertions(+), 63 deletions(-) diff --git a/src/bin/importer/importer-import.rs b/src/bin/importer/importer-import.rs index e58478136..f35252779 100644 --- a/src/bin/importer/importer-import.rs +++ b/src/bin/importer/importer-import.rs @@ -2,7 +2,6 @@ use std::collections::HashMap; use std::sync::Arc; -use std::time::Duration; use itertools::Itertools; use serde_json::Value as JsonValue; @@ -14,9 +13,7 @@ use stratus::eth::primitives::BlockNumber; use stratus::eth::primitives::ExternalBlock; use stratus::eth::primitives::ExternalReceipt; use stratus::eth::primitives::Wei; -use stratus::eth::storage::StratusStorage; use stratus::infra::postgres::Postgres; -use stratus::infra::BlockchainClient; use stratus::init_global_services; use stratus::log_and_err; use tokio::sync::mpsc; @@ -24,7 +21,6 @@ use tokio_util::sync::CancellationToken; /// Number of tasks in backlog: (BACKLOG_SIZE * BacklogTask) const BACKLOG_SIZE: usize = 10; -const RPC_TIMEOUT: Duration = Duration::from_secs(2); type BacklogTask = (Vec, Vec); #[tokio::main(flavor = "current_thread")] @@ -50,16 +46,6 @@ async fn main() -> anyhow::Result<()> { // load blocks and receipts in background tokio::spawn(keep_loading_blocks(pg, cancellation.clone(), backlog_tx.clone())); - // validate state in background - if config.validate_state { - tokio::spawn(keep_validating_state( - Arc::clone(&storage), - config.external_rpc, - config.max_samples, - BlockNumber::from(config.comparison_period), - )); - } - // import blocks and transactions in foreground let reason = loop { // retrieve new tasks to execute @@ -87,51 +73,6 @@ async fn main() -> anyhow::Result<()> { Ok(()) } -async fn keep_validating_state(storage: Arc, external_rpc: String, max_sample_size: u64, compare_after: BlockNumber) -> anyhow::Result<()> { - let chain = BlockchainClient::new(&external_rpc, RPC_TIMEOUT)?; - let mut latest_compared_block = storage.read_current_block_number().await?; - loop { - let current_imported_block = storage.read_current_block_number().await?; - if current_imported_block - latest_compared_block >= compare_after { - let result = validate_state(&chain, Arc::clone(&storage), latest_compared_block, current_imported_block, max_sample_size) - .await; - if let Err(err) = result { - panic!("{}", err.to_string()); - } - latest_compared_block = current_imported_block; - } - tokio::time::sleep(Duration::from_secs(1)).await; - } -} - -async fn validate_state( - chain: &BlockchainClient, - storage: Arc, - start: BlockNumber, - end: BlockNumber, - max_sample_size: u64, -) -> anyhow::Result<()> { - let slots = storage.get_slots_sample(start, end, max_sample_size, Some(1)).await?; - for sampled_slot in slots { - let expected_value = chain - .get_storage_at( - &sampled_slot.address, - &sampled_slot.slot.index, - stratus::eth::primitives::StoragePointInTime::Past(sampled_slot.block_number), - ) - .await?; - if sampled_slot.slot.value != expected_value { - return Err(anyhow::anyhow!( - "State mismatch on slot {:?}, expected: {:?}, found: {:?}", - sampled_slot, - expected_value, - sampled_slot.slot.value - )); - } - } - Ok(()) -} - // ----------------------------------------------------------------------------- // Postgres block loader // ----------------------------------------------------------------------------- @@ -152,13 +93,13 @@ async fn keep_loading_blocks( // find blocks tracing::info!("retrieving more blocks to process"); let blocks = match db_fetch_blocks(&mut tx).await { - Ok(blocks) => { + Ok(blocks) => if blocks.is_empty() { cancellation.cancel(); break "no more blocks to process"; - } - blocks - } + } else { + blocks + }, Err(_) => { cancellation.cancel(); break "error loading blocks"; From ba054ebfb73e42ed9f4d9aefe2b0274f775f365a Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Thu, 29 Feb 2024 11:57:53 -0300 Subject: [PATCH 07/17] improve config --- ...9cb51ea244b95a376ef962c5c2932e9b15a4.json} | 4 +- src/bin/importer/state-validator.rs | 91 ++++++++++++++++ src/bin/importer/state_validator.rs | 101 ------------------ src/config.rs | 30 +++++- .../queries/select_random_slot_sample.sql | 2 +- 5 files changed, 121 insertions(+), 107 deletions(-) rename .sqlx/{query-39d0b94d07738b1f644a2239c22a401438a319d0b9acad44dcef28bde23d65fd.json => query-13a8805112acd60f7a2cda1f468f9cb51ea244b95a376ef962c5c2932e9b15a4.json} (88%) create mode 100644 src/bin/importer/state-validator.rs delete mode 100644 src/bin/importer/state_validator.rs diff --git a/.sqlx/query-39d0b94d07738b1f644a2239c22a401438a319d0b9acad44dcef28bde23d65fd.json b/.sqlx/query-13a8805112acd60f7a2cda1f468f9cb51ea244b95a376ef962c5c2932e9b15a4.json similarity index 88% rename from .sqlx/query-39d0b94d07738b1f644a2239c22a401438a319d0b9acad44dcef28bde23d65fd.json rename to .sqlx/query-13a8805112acd60f7a2cda1f468f9cb51ea244b95a376ef962c5c2932e9b15a4.json index 95c35ee76..3526b14fa 100644 --- a/.sqlx/query-39d0b94d07738b1f644a2239c22a401438a319d0b9acad44dcef28bde23d65fd.json +++ b/.sqlx/query-13a8805112acd60f7a2cda1f468f9cb51ea244b95a376ef962c5c2932e9b15a4.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT\n idx as \"index!: _\",\n value as \"value!: _\",\n account_address as \"address!: _\",\n block_number as \"block_number!: _\"\nFROM\n (SELECT\n setseed($1),\n null as \"idx\",\n null as \"value\",\n null as \"account_address\",\n null as \"block_number\"\n UNION ALL\n SELECT\n null,\n idx,\n value,\n account_address,\n block_number\n FROM historical_slots\n WHERE block_number >= $2 AND block_number <= $3\n OFFSET 1\n )\nORDER BY random()\nLIMIT $4;\n", + "query": "SELECT\n idx as \"index!: _\",\n value as \"value!: _\",\n account_address as \"address!: _\",\n block_number as \"block_number!: _\"\nFROM\n (SELECT\n setseed($1),\n null as \"idx\",\n null as \"value\",\n null as \"account_address\",\n null as \"block_number\"\n UNION ALL\n SELECT\n null,\n idx,\n value,\n account_address,\n block_number\n FROM historical_slots\n WHERE block_number >= $2 AND block_number < $3\n OFFSET 1\n )\nORDER BY random()\nLIMIT $4;\n", "describe": { "columns": [ { @@ -39,5 +39,5 @@ null ] }, - "hash": "39d0b94d07738b1f644a2239c22a401438a319d0b9acad44dcef28bde23d65fd" + "hash": "13a8805112acd60f7a2cda1f468f9cb51ea244b95a376ef962c5c2932e9b15a4" } diff --git a/src/bin/importer/state-validator.rs b/src/bin/importer/state-validator.rs new file mode 100644 index 000000000..78958deb1 --- /dev/null +++ b/src/bin/importer/state-validator.rs @@ -0,0 +1,91 @@ +use std::sync::Arc; +use std::time::Duration; + +use stratus::config::{StateValidatorConfig, ValidatorMethodConfig}; +use stratus::eth::primitives::BlockNumber; +use stratus::eth::storage::StratusStorage; +use stratus::infra::BlockchainClient; +use stratus::init_global_services; + +const RPC_TIMEOUT: Duration = Duration::from_secs(2); + +#[tokio::main()] +async fn main() -> anyhow::Result<()> { + // init services + let config: StateValidatorConfig = init_global_services(); + let storage = config.init_storage().await?; + + let interval = BlockNumber::from(config.interval); + + let mut latest_compared_block = BlockNumber::ZERO; + loop { + let current_block = storage.read_current_block_number().await?; + if current_block - latest_compared_block >= interval { + let result = validate_state( + &config.method, + Arc::clone(&storage), + latest_compared_block, + latest_compared_block + interval, + config.sample_size, + config.seed, + ) + .await; + if let Err(err) = result { + panic!("{}", err); + } + latest_compared_block = latest_compared_block + interval; + } + tokio::time::sleep(Duration::from_secs(1)).await; + } +} + +async fn validate_state( + method: &ValidatorMethodConfig, + storage: Arc, + start: BlockNumber, + end: BlockNumber, + max_sample_size: u64, + seed: u64, +) -> anyhow::Result<()> { + match method { + ValidatorMethodConfig::Rpc { url } => { + let chain = BlockchainClient::new(url, RPC_TIMEOUT)?; + validate_state_rpc(&chain, storage, start, end, max_sample_size, seed).await + } + _ => todo!(), + } +} + +async fn validate_state_rpc( + chain: &BlockchainClient, + storage: Arc, + start: BlockNumber, + end: BlockNumber, + max_sample_size: u64, + seed: u64, +) -> anyhow::Result<()> { + println!("Validating state {:?}, {:?}", start, end); + let seed = match seed { + 0 => None, + n => Some(n), + }; + let slots = storage.get_slots_sample(start, end, max_sample_size, seed).await?; + for sampled_slot in slots { + let expected_value = chain + .get_storage_at( + &sampled_slot.address, + &sampled_slot.index, + stratus::eth::primitives::StoragePointInTime::Past(sampled_slot.block_number), + ) + .await?; + if sampled_slot.value != expected_value { + return Err(anyhow::anyhow!( + "State mismatch on slot {:?}, expected value: {:?}, found: {:?}", + sampled_slot, + expected_value, + sampled_slot.value + )); + } + } + Ok(()) +} diff --git a/src/bin/importer/state_validator.rs b/src/bin/importer/state_validator.rs deleted file mode 100644 index 460cbb30e..000000000 --- a/src/bin/importer/state_validator.rs +++ /dev/null @@ -1,101 +0,0 @@ -#![allow(dead_code)] - -use std::collections::HashMap; -use std::sync::Arc; -use std::time::Duration; - -use itertools::Itertools; -use serde_json::Value as JsonValue; -use sqlx::Row; -use stratus::config::ImporterImportConfig; -use stratus::eth::primitives::Account; -use stratus::eth::primitives::Address; -use stratus::eth::primitives::BlockNumber; -use stratus::eth::primitives::ExternalBlock; -use stratus::eth::primitives::ExternalReceipt; -use stratus::eth::primitives::Wei; -use stratus::eth::storage::StratusStorage; -use stratus::infra::postgres::Postgres; -use stratus::infra::BlockchainClient; -use stratus::init_global_services; -use stratus::log_and_err; -use tokio::sync::mpsc; -use tokio_util::sync::CancellationToken; - -/// Number of tasks in backlog: (BACKLOG_SIZE * BacklogTask) -const BACKLOG_SIZE: usize = 10; -const RPC_TIMEOUT: Duration = Duration::from_secs(2); -type BacklogTask = (Vec, Vec); - -#[tokio::main()] -async fn main() -> anyhow::Result<()> { - // init services - let config: ImporterImportConfig = init_global_services(); - let pg = Arc::new(Postgres::new(&config.postgres_url).await?); - let storage = config.init_storage().await?; - let executor = config.init_executor(Arc::clone(&storage)); - - // init shared data between importer and postgres loader - let (backlog_tx, mut backlog_rx) = mpsc::channel::(BACKLOG_SIZE); - let cancellation = CancellationToken::new(); - - // import genesis accounts - let balances = db_retrieve_balances(&pg).await?; - let accounts = balances - .into_iter() - .map(|row| Account::new_with_balance(row.address, row.balance)) - .collect_vec(); - storage.save_accounts_to_perm(accounts).await?; - - let chain = BlockchainClient::new(&config.external_rpc, RPC_TIMEOUT)?; - let mut latest_compared_block = storage.read_current_block_number().await?; - loop { - let current_imported_block = storage.read_current_block_number().await?; - if current_imported_block - latest_compared_block >= interval { - let result = validate_state(&chain, Arc::clone(&storage), latest_compared_block, latest_compared_block + interval, max_sample_size) - .await; - if let Err(err) = result { - cancellation.cancel(); - panic!("{}", err); - } - latest_compared_block = current_imported_block; - } - tokio::time::sleep(Duration::from_secs(1)).await; - } - Ok(()) -} - -async fn keep_validating_state(storage: Arc, external_rpc: String, max_sample_size: u64, interval: BlockNumber, cancellation: CancellationToken) -> anyhow::Result<()> { - println!("STARTING STATE VALIDATION"); - -} - -async fn validate_state( - chain: &BlockchainClient, - storage: Arc, - start: BlockNumber, - end: BlockNumber, - max_sample_size: u64, -) -> anyhow::Result<()> { - println!("Validating state {:?}, {:?}", start, end); - let slots = storage.get_slots_sample(start, end, max_sample_size, Some(1)).await?; - for sampled_slot in slots { - let expected_value = chain - .get_storage_at( - &sampled_slot.address, - &sampled_slot.index, - stratus::eth::primitives::StoragePointInTime::Past(sampled_slot.block_number), - ) - .await?; - println!("Comparing {:?} {:?}", sampled_slot.value, expected_value); - if sampled_slot.value != expected_value { - return Err(anyhow::anyhow!( - "State mismatch on slot {:?}, expected value: {:?}, found: {:?}", - sampled_slot, - expected_value, - sampled_slot.value - )); - } - } - Ok(()) -} diff --git a/src/config.rs b/src/config.rs index 8a6c51138..da993c641 100644 --- a/src/config.rs +++ b/src/config.rs @@ -96,14 +96,21 @@ pub struct StateValidatorConfig { #[clap(flatten)] pub common: CommonConfig, - #[arg(short = 'r', long = "external-rpc", env = "EXTERNAL_RPC", requires = "validate_state")] - pub external_rpc: String, - + /// How many slots to validate per batch. 0 means every slot. #[arg(short = 'm', long = "max-samples", env = "MAX_SAMPLES", default_value_t = 0)] pub sample_size: u64, + /// Seed to use when sampling. 0 for random seed. + #[arg(long = "seed", env = "SEED", default_value_t = 0, requires = "sample_size")] + pub seed: u64, + + /// Validate in batches of n blocks. #[arg(short = 'i', long = "inverval", env = "INVERVAL", default_value_t = 1000)] pub interval: u64, + + /// What method to use when validating. + #[arg(long = "method", env = "METHOD")] + pub method: ValidatorMethodConfig, } /// Common configuration that can be used by any binary. @@ -243,6 +250,23 @@ impl FromStr for StorageConfig { } } +#[derive(Clone, Debug, strum::Display)] +pub enum ValidatorMethodConfig { + Rpc { url: String }, + CompareTables +} + +impl FromStr for ValidatorMethodConfig { + type Err = anyhow::Error; + + fn from_str(s: &str) -> anyhow::Result { + match s { + "compare_tables" => Ok(Self::CompareTables), + s => Ok(Self::Rpc { url: s.to_string() }) + } + } +} + /// Enviroment where the application is running. #[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)] pub enum Environment { diff --git a/src/eth/storage/postgres/queries/select_random_slot_sample.sql b/src/eth/storage/postgres/queries/select_random_slot_sample.sql index a4196298f..7fc33cf43 100644 --- a/src/eth/storage/postgres/queries/select_random_slot_sample.sql +++ b/src/eth/storage/postgres/queries/select_random_slot_sample.sql @@ -18,7 +18,7 @@ FROM account_address, block_number FROM historical_slots - WHERE block_number >= $2 AND block_number <= $3 + WHERE block_number >= $2 AND block_number < $3 OFFSET 1 ) ORDER BY random() From 60f2ea61dc047b65d0edafd15850ae55f085175f Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Thu, 29 Feb 2024 12:03:54 -0300 Subject: [PATCH 08/17] fix seed --- src/eth/storage/postgres/postgres.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/eth/storage/postgres/postgres.rs b/src/eth/storage/postgres/postgres.rs index d6980f368..7264f1023 100644 --- a/src/eth/storage/postgres/postgres.rs +++ b/src/eth/storage/postgres/postgres.rs @@ -719,11 +719,11 @@ impl PermanentStorage for Postgres { } async fn get_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: Option) -> anyhow::Result> { - let seed = seed.unwrap_or(1); + let seed = seed.unwrap_or(0) as f64 / 100.0; let slots_sample_rows = sqlx::query_file_as!( SlotSample, "src/eth/storage/postgres/queries/select_random_slot_sample.sql", - seed as i64, + seed, start as _, end as _, max_samples as i64 From ab22b2b497070907064dd32f96eb78568c0f1c7d Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Thu, 29 Feb 2024 13:26:02 -0300 Subject: [PATCH 09/17] Implement get_slots_sample for inmemory --- Cargo.lock | 1 + Cargo.toml | 1 + src/eth/storage/inmemory/inmemory_history.rs | 12 ++++-- .../storage/inmemory/inmemory_permanent.rs | 37 ++++++++++++++++++- 4 files changed, 47 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3011c0283..bb113de20 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3499,6 +3499,7 @@ dependencies = [ "phf_codegen", "pin-project", "quote", + "rand", "revm", "rlp", "serde", diff --git a/Cargo.toml b/Cargo.toml index e0b2ad596..89412e0a5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,6 +66,7 @@ sqlx = { version = "0.7.3", features = ["runtime-tokio", "postgres", "bigdecimal # test fake = { version = "2.9.2", features = ["derive"] } +rand = "0.8.5" [dev-dependencies] binary_macros = "1.0.0" diff --git a/src/eth/storage/inmemory/inmemory_history.rs b/src/eth/storage/inmemory/inmemory_history.rs index 8f0357c52..14a6a1223 100644 --- a/src/eth/storage/inmemory/inmemory_history.rs +++ b/src/eth/storage/inmemory/inmemory_history.rs @@ -6,15 +6,15 @@ use nonempty::NonEmpty; use crate::eth::primitives::BlockNumber; use crate::eth::primitives::StoragePointInTime; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct InMemoryHistory(NonEmpty>) where T: Clone + Debug; #[derive(Clone, Debug, derive_new::new)] pub struct InMemoryHistoryValue { - block_number: BlockNumber, - value: T, + pub block_number: BlockNumber, + pub value: T, } impl InMemoryHistory @@ -71,3 +71,9 @@ where &self.0.last().value } } + +impl From> for Vec> { + fn from(value: InMemoryHistory) -> Self { + value.0.into() + } +} diff --git a/src/eth/storage/inmemory/inmemory_permanent.rs b/src/eth/storage/inmemory/inmemory_permanent.rs index d6333651b..9e1b37d9e 100644 --- a/src/eth/storage/inmemory/inmemory_permanent.rs +++ b/src/eth/storage/inmemory/inmemory_permanent.rs @@ -7,6 +7,9 @@ use std::sync::Arc; use async_trait::async_trait; use indexmap::IndexMap; use metrics::atomics::AtomicU64; +use rand::rngs::StdRng; +use rand::seq::IteratorRandom; +use rand::SeedableRng; use tokio::sync::RwLock; use tokio::sync::RwLockReadGuard; use tokio::sync::RwLockWriteGuard; @@ -356,7 +359,39 @@ impl PermanentStorage for InMemoryPermanentStorage { } async fn get_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: Option) -> anyhow::Result> { - todo!(); + let state = self.lock_read().await; + let contracts: Vec<_> = state + .accounts + .iter() + .filter(|(_, account_info)| account_info.bytecode.get_current().is_some()) + .collect(); + + let sample_per_contract = max_samples / contracts.len() as u64; + + let mut rng = StdRng::seed_from_u64(seed.unwrap_or(0)); + let mut slot_sample = vec![]; + for (_, contract_info) in contracts { + slot_sample.append( + &mut contract_info + .slots + .values() + .flat_map(|slot_history| Vec::from((*slot_history).clone())) + .filter_map(|slot| { + if slot.block_number >= start && slot.block_number < end { + Some(SlotSample { + address: contract_info.address.clone(), + block_number: slot.block_number, + index: slot.value.index, + value: slot.value.value, + }) + } else { + None + } + }) + .choose_multiple(&mut rng, sample_per_contract as usize), + ); + } + Ok(slot_sample) } } From 1ba41c605e9a78072b992c8afa1a668bddd3c3e9 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Thu, 29 Feb 2024 13:27:44 -0300 Subject: [PATCH 10/17] fix pg seed --- src/eth/storage/postgres/postgres.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/eth/storage/postgres/postgres.rs b/src/eth/storage/postgres/postgres.rs index 7264f1023..398ab4f3a 100644 --- a/src/eth/storage/postgres/postgres.rs +++ b/src/eth/storage/postgres/postgres.rs @@ -719,7 +719,7 @@ impl PermanentStorage for Postgres { } async fn get_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: Option) -> anyhow::Result> { - let seed = seed.unwrap_or(0) as f64 / 100.0; + let seed = (seed.unwrap_or(0) as f64 / 100.0).fract(); let slots_sample_rows = sqlx::query_file_as!( SlotSample, "src/eth/storage/postgres/queries/select_random_slot_sample.sql", From 80c632efbfda7a75d8605170de9f253f55492b9d Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Thu, 29 Feb 2024 13:49:33 -0300 Subject: [PATCH 11/17] make inmemory get_slots_sample behave like postgres' --- .../storage/inmemory/inmemory_permanent.rs | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/src/eth/storage/inmemory/inmemory_permanent.rs b/src/eth/storage/inmemory/inmemory_permanent.rs index 9e1b37d9e..adce14b38 100644 --- a/src/eth/storage/inmemory/inmemory_permanent.rs +++ b/src/eth/storage/inmemory/inmemory_permanent.rs @@ -359,27 +359,23 @@ impl PermanentStorage for InMemoryPermanentStorage { } async fn get_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: Option) -> anyhow::Result> { - let state = self.lock_read().await; - let contracts: Vec<_> = state + let mut rng = StdRng::seed_from_u64(seed.unwrap_or(0)); + + let samples = self + .lock_read() + .await .accounts .iter() .filter(|(_, account_info)| account_info.bytecode.get_current().is_some()) - .collect(); - - let sample_per_contract = max_samples / contracts.len() as u64; - - let mut rng = StdRng::seed_from_u64(seed.unwrap_or(0)); - let mut slot_sample = vec![]; - for (_, contract_info) in contracts { - slot_sample.append( - &mut contract_info + .flat_map(|(_, contract)| { + contract .slots .values() .flat_map(|slot_history| Vec::from((*slot_history).clone())) .filter_map(|slot| { if slot.block_number >= start && slot.block_number < end { Some(SlotSample { - address: contract_info.address.clone(), + address: contract.address.clone(), block_number: slot.block_number, index: slot.value.index, value: slot.value.value, @@ -388,10 +384,10 @@ impl PermanentStorage for InMemoryPermanentStorage { None } }) - .choose_multiple(&mut rng, sample_per_contract as usize), - ); - } - Ok(slot_sample) + }) + .choose_multiple(&mut rng, max_samples as usize); + + Ok(samples) } } From f1ac3ea2e3d6fdcbfc4c489e5ebca425d772df41 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Thu, 29 Feb 2024 14:32:45 -0300 Subject: [PATCH 12/17] refactor --- Cargo.toml | 2 +- src/bin/{importer => }/state-validator.rs | 9 +++++++-- src/config.rs | 4 ++-- .../storage/inmemory/inmemory_permanent.rs | 19 +++++++++++-------- src/eth/storage/permanent_storage.rs | 2 +- src/eth/storage/postgres/postgres.rs | 11 ++++++++--- src/eth/storage/stratus_storage.rs | 2 +- 7 files changed, 31 insertions(+), 18 deletions(-) rename src/bin/{importer => }/state-validator.rs (95%) diff --git a/Cargo.toml b/Cargo.toml index 89412e0a5..1b0235d2b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -97,7 +97,7 @@ path = "src/bin/rpc_server_poller.rs" [[bin]] name = "state-validator" -path = "src/bin/importer/state-validator.rs" +path = "src/bin/state-validator.rs" # ------------------------------------------------------------------------------ # Lints diff --git a/src/bin/importer/state-validator.rs b/src/bin/state-validator.rs similarity index 95% rename from src/bin/importer/state-validator.rs rename to src/bin/state-validator.rs index 78958deb1..d29866cc2 100644 --- a/src/bin/importer/state-validator.rs +++ b/src/bin/state-validator.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use std::time::Duration; +use rand::Rng; use stratus::config::{StateValidatorConfig, ValidatorMethodConfig}; use stratus::eth::primitives::BlockNumber; use stratus::eth::storage::StratusStorage; @@ -15,6 +16,7 @@ async fn main() -> anyhow::Result<()> { let config: StateValidatorConfig = init_global_services(); let storage = config.init_storage().await?; + let interval = BlockNumber::from(config.interval); let mut latest_compared_block = BlockNumber::ZERO; @@ -66,8 +68,11 @@ async fn validate_state_rpc( ) -> anyhow::Result<()> { println!("Validating state {:?}, {:?}", start, end); let seed = match seed { - 0 => None, - n => Some(n), + 0 => { + let mut rng = rand::thread_rng(); + rng.gen() + }, + n => n, }; let slots = storage.get_slots_sample(start, end, max_sample_size, seed).await?; for sampled_slot in slots { diff --git a/src/config.rs b/src/config.rs index da993c641..d60ccebff 100644 --- a/src/config.rs +++ b/src/config.rs @@ -97,7 +97,7 @@ pub struct StateValidatorConfig { pub common: CommonConfig, /// How many slots to validate per batch. 0 means every slot. - #[arg(short = 'm', long = "max-samples", env = "MAX_SAMPLES", default_value_t = 0)] + #[arg(long = "max-samples", env = "MAX_SAMPLES", default_value_t = 0)] pub sample_size: u64, /// Seed to use when sampling. 0 for random seed. @@ -109,7 +109,7 @@ pub struct StateValidatorConfig { pub interval: u64, /// What method to use when validating. - #[arg(long = "method", env = "METHOD")] + #[arg(short = 'm', long = "method", env = "METHOD")] pub method: ValidatorMethodConfig, } diff --git a/src/eth/storage/inmemory/inmemory_permanent.rs b/src/eth/storage/inmemory/inmemory_permanent.rs index adce14b38..f70014238 100644 --- a/src/eth/storage/inmemory/inmemory_permanent.rs +++ b/src/eth/storage/inmemory/inmemory_permanent.rs @@ -358,12 +358,10 @@ impl PermanentStorage for InMemoryPermanentStorage { Ok(()) } - async fn get_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: Option) -> anyhow::Result> { - let mut rng = StdRng::seed_from_u64(seed.unwrap_or(0)); + async fn get_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: u64) -> anyhow::Result> { + let state = self.lock_read().await; - let samples = self - .lock_read() - .await + let samples = state .accounts .iter() .filter(|(_, account_info)| account_info.bytecode.get_current().is_some()) @@ -384,10 +382,15 @@ impl PermanentStorage for InMemoryPermanentStorage { None } }) - }) - .choose_multiple(&mut rng, max_samples as usize); + }); - Ok(samples) + match max_samples { + 0 => Ok(samples.collect()), + n => { + let mut rng = StdRng::seed_from_u64(seed); + Ok(samples.choose_multiple(&mut rng, n as usize)) + } + } } } diff --git a/src/eth/storage/permanent_storage.rs b/src/eth/storage/permanent_storage.rs index c68bddf3d..5d85c54f2 100644 --- a/src/eth/storage/permanent_storage.rs +++ b/src/eth/storage/permanent_storage.rs @@ -53,5 +53,5 @@ pub trait PermanentStorage: Send + Sync { /// Resets all state to a specific block number. async fn reset_at(&self, number: BlockNumber) -> anyhow::Result<()>; - async fn get_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: Option) -> anyhow::Result>; + async fn get_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: u64) -> anyhow::Result>; } diff --git a/src/eth/storage/postgres/postgres.rs b/src/eth/storage/postgres/postgres.rs index 398ab4f3a..f8fd7b12c 100644 --- a/src/eth/storage/postgres/postgres.rs +++ b/src/eth/storage/postgres/postgres.rs @@ -718,15 +718,20 @@ impl PermanentStorage for Postgres { Ok(()) } - async fn get_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: Option) -> anyhow::Result> { - let seed = (seed.unwrap_or(0) as f64 / 100.0).fract(); + async fn get_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: u64) -> anyhow::Result> { + let seed = (seed as f64 / 100.0).fract(); + let max_samples: Option = match max_samples { + 0 => None, + n => Some(n as i64) + }; + let slots_sample_rows = sqlx::query_file_as!( SlotSample, "src/eth/storage/postgres/queries/select_random_slot_sample.sql", seed, start as _, end as _, - max_samples as i64 + max_samples ) .fetch_all(&self.connection_pool) .await?; diff --git a/src/eth/storage/stratus_storage.rs b/src/eth/storage/stratus_storage.rs index d0d934716..6f5f933af 100644 --- a/src/eth/storage/stratus_storage.rs +++ b/src/eth/storage/stratus_storage.rs @@ -222,7 +222,7 @@ impl StratusStorage { } } - pub async fn get_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: Option) -> anyhow::Result>{ + pub async fn get_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: u64) -> anyhow::Result>{ self.perm.get_slots_sample(start, end, max_samples, seed).await } From 24ef265695d2590b6b54fb48aec72e38e1627ad8 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Thu, 29 Feb 2024 15:00:13 -0300 Subject: [PATCH 13/17] validate concurrently --- src/bin/state-validator.rs | 32 +++++++++++++++++++------------- src/config.rs | 4 ++++ 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/src/bin/state-validator.rs b/src/bin/state-validator.rs index d29866cc2..09c91d4a0 100644 --- a/src/bin/state-validator.rs +++ b/src/bin/state-validator.rs @@ -7,10 +7,11 @@ use stratus::eth::primitives::BlockNumber; use stratus::eth::storage::StratusStorage; use stratus::infra::BlockchainClient; use stratus::init_global_services; +use tokio::task::JoinSet; const RPC_TIMEOUT: Duration = Duration::from_secs(2); -#[tokio::main()] +#[tokio::main(flavor = "current_thread")] async fn main() -> anyhow::Result<()> { // init services let config: StateValidatorConfig = init_global_services(); @@ -20,29 +21,33 @@ async fn main() -> anyhow::Result<()> { let interval = BlockNumber::from(config.interval); let mut latest_compared_block = BlockNumber::ZERO; + + let mut futures = JoinSet::new(); loop { let current_block = storage.read_current_block_number().await?; - if current_block - latest_compared_block >= interval { - let result = validate_state( - &config.method, + if current_block - latest_compared_block >= interval && futures.len() < config.concurrent_tasks as usize { + let future = validate_state( + config.method.clone(), Arc::clone(&storage), latest_compared_block, latest_compared_block + interval, config.sample_size, config.seed, - ) - .await; - if let Err(err) = result { - panic!("{}", err); - } + ); + + futures.spawn(future); + latest_compared_block = latest_compared_block + interval; + } else { + if let Some(res) = futures.join_next().await { + res?? + } } - tokio::time::sleep(Duration::from_secs(1)).await; } } async fn validate_state( - method: &ValidatorMethodConfig, + method: ValidatorMethodConfig, storage: Arc, start: BlockNumber, end: BlockNumber, @@ -51,7 +56,7 @@ async fn validate_state( ) -> anyhow::Result<()> { match method { ValidatorMethodConfig::Rpc { url } => { - let chain = BlockchainClient::new(url, RPC_TIMEOUT)?; + let chain = BlockchainClient::new(&url, RPC_TIMEOUT)?; validate_state_rpc(&chain, storage, start, end, max_sample_size, seed).await } _ => todo!(), @@ -66,7 +71,7 @@ async fn validate_state_rpc( max_sample_size: u64, seed: u64, ) -> anyhow::Result<()> { - println!("Validating state {:?}, {:?}", start, end); + tracing::debug!("Validating state {:?}, {:?}", start, end); let seed = match seed { 0 => { let mut rng = rand::thread_rng(); @@ -83,6 +88,7 @@ async fn validate_state_rpc( stratus::eth::primitives::StoragePointInTime::Past(sampled_slot.block_number), ) .await?; + if sampled_slot.value != expected_value { return Err(anyhow::anyhow!( "State mismatch on slot {:?}, expected value: {:?}, found: {:?}", diff --git a/src/config.rs b/src/config.rs index d60ccebff..137e6bcec 100644 --- a/src/config.rs +++ b/src/config.rs @@ -111,6 +111,10 @@ pub struct StateValidatorConfig { /// What method to use when validating. #[arg(short = 'm', long = "method", env = "METHOD")] pub method: ValidatorMethodConfig, + + /// How many concurrent validation tasks to run + #[arg(short = 'c', long = "concurrent-tasks", env = "CONCURRENT_TASKS", default_value_t = 10)] + pub concurrent_tasks: u16, } /// Common configuration that can be used by any binary. From f89cb8deec955629188c6e006e14b74f7d703380 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Thu, 29 Feb 2024 15:21:04 -0300 Subject: [PATCH 14/17] fmt --- src/bin/state-validator.rs | 12 +++++------- src/config.rs | 4 ++-- src/eth/primitives/mod.rs | 2 +- src/eth/primitives/slot.rs | 8 +++----- src/eth/storage/postgres/postgres.rs | 12 +++++------- src/eth/storage/stratus_storage.rs | 3 +-- src/infra/blockchain_client.rs | 7 +++++-- 7 files changed, 22 insertions(+), 26 deletions(-) diff --git a/src/bin/state-validator.rs b/src/bin/state-validator.rs index 09c91d4a0..069d8d147 100644 --- a/src/bin/state-validator.rs +++ b/src/bin/state-validator.rs @@ -2,7 +2,8 @@ use std::sync::Arc; use std::time::Duration; use rand::Rng; -use stratus::config::{StateValidatorConfig, ValidatorMethodConfig}; +use stratus::config::StateValidatorConfig; +use stratus::config::ValidatorMethodConfig; use stratus::eth::primitives::BlockNumber; use stratus::eth::storage::StratusStorage; use stratus::infra::BlockchainClient; @@ -17,7 +18,6 @@ async fn main() -> anyhow::Result<()> { let config: StateValidatorConfig = init_global_services(); let storage = config.init_storage().await?; - let interval = BlockNumber::from(config.interval); let mut latest_compared_block = BlockNumber::ZERO; @@ -38,10 +38,8 @@ async fn main() -> anyhow::Result<()> { futures.spawn(future); latest_compared_block = latest_compared_block + interval; - } else { - if let Some(res) = futures.join_next().await { - res?? - } + } else if let Some(res) = futures.join_next().await { + res??; } } } @@ -76,7 +74,7 @@ async fn validate_state_rpc( 0 => { let mut rng = rand::thread_rng(); rng.gen() - }, + } n => n, }; let slots = storage.get_slots_sample(start, end, max_sample_size, seed).await?; diff --git a/src/config.rs b/src/config.rs index 137e6bcec..f103f3b0e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -257,7 +257,7 @@ impl FromStr for StorageConfig { #[derive(Clone, Debug, strum::Display)] pub enum ValidatorMethodConfig { Rpc { url: String }, - CompareTables + CompareTables, } impl FromStr for ValidatorMethodConfig { @@ -266,7 +266,7 @@ impl FromStr for ValidatorMethodConfig { fn from_str(s: &str) -> anyhow::Result { match s { "compare_tables" => Ok(Self::CompareTables), - s => Ok(Self::Rpc { url: s.to_string() }) + s => Ok(Self::Rpc { url: s.to_string() }), } } } diff --git a/src/eth/primitives/mod.rs b/src/eth/primitives/mod.rs index 4fac65343..e42ac11f5 100644 --- a/src/eth/primitives/mod.rs +++ b/src/eth/primitives/mod.rs @@ -149,8 +149,8 @@ pub use log_mined::LogMined; pub use log_topic::LogTopic; pub use nonce::Nonce; pub use slot::Slot; -pub use slot::SlotSample; pub use slot::SlotIndex; +pub use slot::SlotSample; pub use slot::SlotValue; pub use storage_point_in_time::StoragePointInTime; pub use transaction_input::TransactionInput; diff --git a/src/eth/primitives/slot.rs b/src/eth/primitives/slot.rs index 686af0b74..a40ffdb9b 100644 --- a/src/eth/primitives/slot.rs +++ b/src/eth/primitives/slot.rs @@ -19,11 +19,9 @@ use sqlx::error::BoxDynError; use sqlx::postgres::PgHasArrayType; use sqlx::Decode; -use crate::gen_newtype_from; -use crate::eth::primitives::BlockNumber; - use super::Address; - +use crate::eth::primitives::BlockNumber; +use crate::gen_newtype_from; #[derive(Debug, Clone, Default, PartialEq, Eq, fake::Dummy, serde::Serialize, serde::Deserialize)] pub struct Slot { @@ -212,5 +210,5 @@ pub struct SlotSample { pub address: Address, pub block_number: BlockNumber, pub index: SlotIndex, - pub value: SlotValue + pub value: SlotValue, } diff --git a/src/eth/storage/postgres/postgres.rs b/src/eth/storage/postgres/postgres.rs index f8fd7b12c..4d7b3c644 100644 --- a/src/eth/storage/postgres/postgres.rs +++ b/src/eth/storage/postgres/postgres.rs @@ -107,11 +107,10 @@ impl PermanentStorage for Postgres { let slot_index: [u8; 32] = slot_index.clone().into(); let slot = match point_in_time { - StoragePointInTime::Present => { + StoragePointInTime::Present => sqlx::query_file_as!(Slot, "src/eth/storage/postgres/queries/select_slot.sql", address.as_ref(), slot_index.as_ref()) .fetch_optional(&self.connection_pool) - .await? - } + .await?, StoragePointInTime::Past(number) => { let block_number: i64 = (*number).try_into()?; sqlx::query_file_as!( @@ -722,7 +721,7 @@ impl PermanentStorage for Postgres { let seed = (seed as f64 / 100.0).fract(); let max_samples: Option = match max_samples { 0 => None, - n => Some(n as i64) + n => Some(n as i64), }; let slots_sample_rows = sqlx::query_file_as!( @@ -756,13 +755,12 @@ fn partition_topics(topics: impl IntoIterator) -> HashMap< let mut partitions: HashMap>> = HashMap::new(); for topic in topics { match partitions.get_mut(&topic.transaction_hash) { - Some(transaction_logs) => { + Some(transaction_logs) => if let Some(part) = transaction_logs.get_mut(&topic.log_idx) { part.push(topic); } else { transaction_logs.insert(topic.log_idx, vec![topic]); - } - } + }, None => { partitions.insert(topic.transaction_hash.clone(), [(topic.log_idx, vec![topic])].into_iter().collect()); } diff --git a/src/eth/storage/stratus_storage.rs b/src/eth/storage/stratus_storage.rs index 6f5f933af..47f9d2233 100644 --- a/src/eth/storage/stratus_storage.rs +++ b/src/eth/storage/stratus_storage.rs @@ -222,8 +222,7 @@ impl StratusStorage { } } - pub async fn get_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: u64) -> anyhow::Result>{ + pub async fn get_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: u64) -> anyhow::Result> { self.perm.get_slots_sample(start, end, max_samples, seed).await } - } diff --git a/src/infra/blockchain_client.rs b/src/infra/blockchain_client.rs index 25dc51b81..9b4e9098a 100644 --- a/src/infra/blockchain_client.rs +++ b/src/infra/blockchain_client.rs @@ -89,9 +89,12 @@ impl BlockchainClient { let index = serde_json::to_value(index)?; let number = match point_in_time { StoragePointInTime::Present => serde_json::to_value("latest")?, - StoragePointInTime::Past(number) => serde_json::to_value(number)? + StoragePointInTime::Past(number) => serde_json::to_value(number)?, }; - let result = self.http.request::>("eth_getStorageAt", vec![address, index, number]).await; + let result = self + .http + .request::>("eth_getStorageAt", vec![address, index, number]) + .await; match result { Ok(value) => Ok(value), From d44c690820864ed3f53465a761dd9272e8f4d946 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Thu, 29 Feb 2024 15:43:00 -0300 Subject: [PATCH 15/17] rename get_slots_sample --- src/bin/state-validator.rs | 2 +- src/eth/storage/inmemory/inmemory_permanent.rs | 2 +- src/eth/storage/permanent_storage.rs | 2 +- src/eth/storage/postgres/postgres.rs | 2 +- src/eth/storage/stratus_storage.rs | 4 ++-- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/bin/state-validator.rs b/src/bin/state-validator.rs index 069d8d147..01e6513fb 100644 --- a/src/bin/state-validator.rs +++ b/src/bin/state-validator.rs @@ -77,7 +77,7 @@ async fn validate_state_rpc( } n => n, }; - let slots = storage.get_slots_sample(start, end, max_sample_size, seed).await?; + let slots = storage.read_slots_sample(start, end, max_sample_size, seed).await?; for sampled_slot in slots { let expected_value = chain .get_storage_at( diff --git a/src/eth/storage/inmemory/inmemory_permanent.rs b/src/eth/storage/inmemory/inmemory_permanent.rs index f70014238..38a39b669 100644 --- a/src/eth/storage/inmemory/inmemory_permanent.rs +++ b/src/eth/storage/inmemory/inmemory_permanent.rs @@ -358,7 +358,7 @@ impl PermanentStorage for InMemoryPermanentStorage { Ok(()) } - async fn get_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: u64) -> anyhow::Result> { + async fn read_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: u64) -> anyhow::Result> { let state = self.lock_read().await; let samples = state diff --git a/src/eth/storage/permanent_storage.rs b/src/eth/storage/permanent_storage.rs index 5d85c54f2..606525cf0 100644 --- a/src/eth/storage/permanent_storage.rs +++ b/src/eth/storage/permanent_storage.rs @@ -53,5 +53,5 @@ pub trait PermanentStorage: Send + Sync { /// Resets all state to a specific block number. async fn reset_at(&self, number: BlockNumber) -> anyhow::Result<()>; - async fn get_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: u64) -> anyhow::Result>; + async fn read_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: u64) -> anyhow::Result>; } diff --git a/src/eth/storage/postgres/postgres.rs b/src/eth/storage/postgres/postgres.rs index 4d7b3c644..b635396af 100644 --- a/src/eth/storage/postgres/postgres.rs +++ b/src/eth/storage/postgres/postgres.rs @@ -717,7 +717,7 @@ impl PermanentStorage for Postgres { Ok(()) } - async fn get_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: u64) -> anyhow::Result> { + async fn read_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: u64) -> anyhow::Result> { let seed = (seed as f64 / 100.0).fract(); let max_samples: Option = match max_samples { 0 => None, diff --git a/src/eth/storage/stratus_storage.rs b/src/eth/storage/stratus_storage.rs index 47f9d2233..48e331e1f 100644 --- a/src/eth/storage/stratus_storage.rs +++ b/src/eth/storage/stratus_storage.rs @@ -222,7 +222,7 @@ impl StratusStorage { } } - pub async fn get_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: u64) -> anyhow::Result> { - self.perm.get_slots_sample(start, end, max_samples, seed).await + pub async fn read_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: u64) -> anyhow::Result> { + self.perm.read_slots_sample(start, end, max_samples, seed).await } } From 762e8d5fb90c0670f62ffab56ed90f6c3b8caeca Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Thu, 29 Feb 2024 16:32:11 -0300 Subject: [PATCH 16/17] is_contract --- src/eth/storage/inmemory/inmemory_permanent.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/eth/storage/inmemory/inmemory_permanent.rs b/src/eth/storage/inmemory/inmemory_permanent.rs index 38a39b669..2cbae8879 100644 --- a/src/eth/storage/inmemory/inmemory_permanent.rs +++ b/src/eth/storage/inmemory/inmemory_permanent.rs @@ -364,7 +364,7 @@ impl PermanentStorage for InMemoryPermanentStorage { let samples = state .accounts .iter() - .filter(|(_, account_info)| account_info.bytecode.get_current().is_some()) + .filter(|(_, account_info)| account_info.is_contract()) .flat_map(|(_, contract)| { contract .slots @@ -437,4 +437,8 @@ impl InMemoryPermanentAccount { } self.slots = new_slots; } + + fn is_contract(&self) -> bool { + self.bytecode.get_current().is_some() + } } From c042428a1d0d8e734a648253a06f5321575a1d76 Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Thu, 29 Feb 2024 16:33:46 -0300 Subject: [PATCH 17/17] fmt --- src/config.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/config.rs b/src/config.rs index 4a0e62a3a..1ed10b927 100644 --- a/src/config.rs +++ b/src/config.rs @@ -299,4 +299,3 @@ impl FromStr for Environment { } } } -