diff --git a/.sqlx/query-13a8805112acd60f7a2cda1f468f9cb51ea244b95a376ef962c5c2932e9b15a4.json b/.sqlx/query-13a8805112acd60f7a2cda1f468f9cb51ea244b95a376ef962c5c2932e9b15a4.json new file mode 100644 index 000000000..3526b14fa --- /dev/null +++ b/.sqlx/query-13a8805112acd60f7a2cda1f468f9cb51ea244b95a376ef962c5c2932e9b15a4.json @@ -0,0 +1,43 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n idx as \"index!: _\",\n value as \"value!: _\",\n account_address as \"address!: _\",\n block_number as \"block_number!: _\"\nFROM\n (SELECT\n setseed($1),\n null as \"idx\",\n null as \"value\",\n null as \"account_address\",\n null as \"block_number\"\n UNION ALL\n SELECT\n null,\n idx,\n value,\n account_address,\n block_number\n FROM historical_slots\n WHERE block_number >= $2 AND block_number < $3\n OFFSET 1\n )\nORDER BY random()\nLIMIT $4;\n", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "index!: _", + "type_info": "Bytea" + }, + { + "ordinal": 1, + "name": "value!: _", + "type_info": "Bytea" + }, + { + "ordinal": 2, + "name": "address!: _", + "type_info": "Bytea" + }, + { + "ordinal": 3, + "name": "block_number!: _", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Float8", + "Int8", + "Int8", + "Int8" + ] + }, + "nullable": [ + null, + null, + null, + null + ] + }, + "hash": "13a8805112acd60f7a2cda1f468f9cb51ea244b95a376ef962c5c2932e9b15a4" +} diff --git a/Cargo.lock b/Cargo.lock index 3011c0283..bb113de20 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3499,6 +3499,7 @@ dependencies = [ "phf_codegen", "pin-project", "quote", + "rand", "revm", "rlp", "serde", diff --git a/Cargo.toml b/Cargo.toml index 02122a9a7..5687c21fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,6 +66,7 @@ sqlx = { version = "0.7.3", features = ["runtime-tokio", "postgres", "bigdecimal # test fake = { version = "2.9.2", features = ["derive"] } +rand = "0.8.5" [dev-dependencies] binary_macros = "1.0.0" @@ -94,6 +95,10 @@ path = "src/bin/importer/importer-import.rs" name = "rpc-server-poller" path = "src/bin/rpc_server_poller.rs" +[[bin]] +name = "state-validator" +path = "src/bin/state-validator.rs" + # ------------------------------------------------------------------------------ # Lints # ------------------------------------------------------------------------------ diff --git a/src/bin/state-validator.rs b/src/bin/state-validator.rs new file mode 100644 index 000000000..01e6513fb --- /dev/null +++ b/src/bin/state-validator.rs @@ -0,0 +1,100 @@ +use std::sync::Arc; +use std::time::Duration; + +use rand::Rng; +use stratus::config::StateValidatorConfig; +use stratus::config::ValidatorMethodConfig; +use stratus::eth::primitives::BlockNumber; +use stratus::eth::storage::StratusStorage; +use stratus::infra::BlockchainClient; +use stratus::init_global_services; +use tokio::task::JoinSet; + +const RPC_TIMEOUT: Duration = Duration::from_secs(2); + +#[tokio::main(flavor = "current_thread")] +async fn main() -> anyhow::Result<()> { + // init services + let config: StateValidatorConfig = init_global_services(); + let storage = config.init_storage().await?; + + let interval = BlockNumber::from(config.interval); + + let mut latest_compared_block = BlockNumber::ZERO; + + let mut futures = JoinSet::new(); + loop { + let current_block = storage.read_current_block_number().await?; + if current_block - latest_compared_block >= interval && futures.len() < config.concurrent_tasks as usize { + let future = validate_state( + config.method.clone(), + Arc::clone(&storage), + latest_compared_block, + latest_compared_block + interval, + config.sample_size, + config.seed, + ); + + futures.spawn(future); + + latest_compared_block = latest_compared_block + interval; + } else if let Some(res) = futures.join_next().await { + res??; + } + } +} + +async fn validate_state( + method: ValidatorMethodConfig, + storage: Arc, + start: BlockNumber, + end: BlockNumber, + max_sample_size: u64, + seed: u64, +) -> anyhow::Result<()> { + match method { + ValidatorMethodConfig::Rpc { url } => { + let chain = BlockchainClient::new(&url, RPC_TIMEOUT)?; + validate_state_rpc(&chain, storage, start, end, max_sample_size, seed).await + } + _ => todo!(), + } +} + +async fn validate_state_rpc( + chain: &BlockchainClient, + storage: Arc, + start: BlockNumber, + end: BlockNumber, + max_sample_size: u64, + seed: u64, +) -> anyhow::Result<()> { + tracing::debug!("Validating state {:?}, {:?}", start, end); + let seed = match seed { + 0 => { + let mut rng = rand::thread_rng(); + rng.gen() + } + n => n, + }; + let slots = storage.read_slots_sample(start, end, max_sample_size, seed).await?; + for sampled_slot in slots { + let expected_value = chain + .get_storage_at( + &sampled_slot.address, + &sampled_slot.index, + stratus::eth::primitives::StoragePointInTime::Past(sampled_slot.block_number), + ) + .await?; + + if sampled_slot.value != expected_value { + return Err(anyhow::anyhow!( + "State mismatch on slot {:?}, expected value: {:?}, found: {:?}", + sampled_slot, + expected_value, + sampled_slot.value + )); + } + } + Ok(()) +} diff --git a/src/config.rs b/src/config.rs index 0a985a6e6..1ed10b927 100644 --- a/src/config.rs +++ b/src/config.rs @@ -92,6 +92,34 @@ pub struct RpcPollerConfig { pub common: CommonConfig, } +/// Configuration for importer-import binary. +#[derive(Parser, Debug, derive_more::Deref)] +pub struct StateValidatorConfig { + #[deref] + #[clap(flatten)] + pub common: CommonConfig, + + /// How many slots to validate per batch. 0 means every slot. + #[arg(long = "max-samples", env = "MAX_SAMPLES", default_value_t = 0)] + pub sample_size: u64, + + /// Seed to use when sampling. 0 for random seed. + #[arg(long = "seed", env = "SEED", default_value_t = 0, requires = "sample_size")] + pub seed: u64, + + /// Validate in batches of n blocks. + #[arg(short = 'i', long = "inverval", env = "INVERVAL", default_value_t = 1000)] + pub interval: u64, + + /// What method to use when validating. + #[arg(short = 'm', long = "method", env = "METHOD")] + pub method: ValidatorMethodConfig, + + /// How many concurrent validation tasks to run + #[arg(short = 'c', long = "concurrent-tasks", env = "CONCURRENT_TASKS", default_value_t = 10)] + pub concurrent_tasks: u16, +} + /// Common configuration that can be used by any binary. #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] @@ -222,3 +250,52 @@ impl FromStr for StorageConfig { } } } + +#[derive(Clone, Debug, strum::Display)] +pub enum ValidatorMethodConfig { + Rpc { url: String }, + CompareTables, +} + +impl FromStr for ValidatorMethodConfig { + type Err = anyhow::Error; + + fn from_str(s: &str) -> anyhow::Result { + match s { + "compare_tables" => Ok(Self::CompareTables), + s => Ok(Self::Rpc { url: s.to_string() }), + } + } +} + +/// Enviroment where the application is running. +#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)] +pub enum Environment { + Development, + Production, +} + +impl Environment { + /// Checks if the current environment is production. + pub fn is_production(&self) -> bool { + matches!(self, Self::Production) + } + + /// Checks if the current environment is development. + pub fn is_development(&self) -> bool { + matches!(self, Self::Development) + } +} + +impl FromStr for Environment { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + let s = s.trim().to_lowercase(); + match s.as_str() { + "dev" | "development" => Ok(Self::Development), + "prod" | "production" => Ok(Self::Production), + s => Err(anyhow!("unknown environment: {}", s)), + } + } +} diff --git a/src/eth/primitives/mod.rs b/src/eth/primitives/mod.rs index 50195bb6e..e42ac11f5 100644 --- a/src/eth/primitives/mod.rs +++ b/src/eth/primitives/mod.rs @@ -150,6 +150,7 @@ pub use log_topic::LogTopic; pub use nonce::Nonce; pub use slot::Slot; pub use slot::SlotIndex; +pub use slot::SlotSample; pub use slot::SlotValue; pub use storage_point_in_time::StoragePointInTime; pub use transaction_input::TransactionInput; diff --git a/src/eth/primitives/slot.rs b/src/eth/primitives/slot.rs index 9306c0ab3..a40ffdb9b 100644 --- a/src/eth/primitives/slot.rs +++ b/src/eth/primitives/slot.rs @@ -19,6 +19,8 @@ use sqlx::error::BoxDynError; use sqlx::postgres::PgHasArrayType; use sqlx::Decode; +use super::Address; +use crate::eth::primitives::BlockNumber; use crate::gen_newtype_from; #[derive(Debug, Clone, Default, PartialEq, Eq, fake::Dummy, serde::Serialize, serde::Deserialize)] @@ -202,3 +204,11 @@ impl PgHasArrayType for SlotValue { <[u8; 32] as PgHasArrayType>::array_type_info() } } + +#[derive(Debug, sqlx::Decode)] +pub struct SlotSample { + pub address: Address, + pub block_number: BlockNumber, + pub index: SlotIndex, + pub value: SlotValue, +} diff --git a/src/eth/storage/inmemory/inmemory_history.rs b/src/eth/storage/inmemory/inmemory_history.rs index 8f0357c52..14a6a1223 100644 --- a/src/eth/storage/inmemory/inmemory_history.rs +++ b/src/eth/storage/inmemory/inmemory_history.rs @@ -6,15 +6,15 @@ use nonempty::NonEmpty; use crate::eth::primitives::BlockNumber; use crate::eth::primitives::StoragePointInTime; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct InMemoryHistory(NonEmpty>) where T: Clone + Debug; #[derive(Clone, Debug, derive_new::new)] pub struct InMemoryHistoryValue { - block_number: BlockNumber, - value: T, + pub block_number: BlockNumber, + pub value: T, } impl InMemoryHistory @@ -71,3 +71,9 @@ where &self.0.last().value } } + +impl From> for Vec> { + fn from(value: InMemoryHistory) -> Self { + value.0.into() + } +} diff --git a/src/eth/storage/inmemory/inmemory_permanent.rs b/src/eth/storage/inmemory/inmemory_permanent.rs index 119d5578f..2cbae8879 100644 --- a/src/eth/storage/inmemory/inmemory_permanent.rs +++ b/src/eth/storage/inmemory/inmemory_permanent.rs @@ -7,6 +7,9 @@ use std::sync::Arc; use async_trait::async_trait; use indexmap::IndexMap; use metrics::atomics::AtomicU64; +use rand::rngs::StdRng; +use rand::seq::IteratorRandom; +use rand::SeedableRng; use tokio::sync::RwLock; use tokio::sync::RwLockReadGuard; use tokio::sync::RwLockWriteGuard; @@ -26,6 +29,7 @@ use crate::eth::primitives::LogMined; use crate::eth::primitives::Nonce; use crate::eth::primitives::Slot; use crate::eth::primitives::SlotIndex; +use crate::eth::primitives::SlotSample; use crate::eth::primitives::StoragePointInTime; use crate::eth::primitives::TransactionMined; use crate::eth::primitives::Wei; @@ -353,6 +357,41 @@ impl PermanentStorage for InMemoryPermanentStorage { Ok(()) } + + async fn read_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: u64) -> anyhow::Result> { + let state = self.lock_read().await; + + let samples = state + .accounts + .iter() + .filter(|(_, account_info)| account_info.is_contract()) + .flat_map(|(_, contract)| { + contract + .slots + .values() + .flat_map(|slot_history| Vec::from((*slot_history).clone())) + .filter_map(|slot| { + if slot.block_number >= start && slot.block_number < end { + Some(SlotSample { + address: contract.address.clone(), + block_number: slot.block_number, + index: slot.value.index, + value: slot.value.value, + }) + } else { + None + } + }) + }); + + match max_samples { + 0 => Ok(samples.collect()), + n => { + let mut rng = StdRng::seed_from_u64(seed); + Ok(samples.choose_multiple(&mut rng, n as usize)) + } + } + } } #[derive(Debug)] @@ -398,4 +437,8 @@ impl InMemoryPermanentAccount { } self.slots = new_slots; } + + fn is_contract(&self) -> bool { + self.bytecode.get_current().is_some() + } } diff --git a/src/eth/storage/permanent_storage.rs b/src/eth/storage/permanent_storage.rs index 659dea1da..606525cf0 100644 --- a/src/eth/storage/permanent_storage.rs +++ b/src/eth/storage/permanent_storage.rs @@ -10,6 +10,7 @@ use crate::eth::primitives::LogFilter; use crate::eth::primitives::LogMined; use crate::eth::primitives::Slot; use crate::eth::primitives::SlotIndex; +use crate::eth::primitives::SlotSample; use crate::eth::primitives::StoragePointInTime; use crate::eth::primitives::TransactionMined; use crate::eth::storage::StorageError; @@ -51,4 +52,6 @@ pub trait PermanentStorage: Send + Sync { /// Resets all state to a specific block number. async fn reset_at(&self, number: BlockNumber) -> anyhow::Result<()>; + + async fn read_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: u64) -> anyhow::Result>; } diff --git a/src/eth/storage/postgres/postgres.rs b/src/eth/storage/postgres/postgres.rs index dc364e45e..b635396af 100644 --- a/src/eth/storage/postgres/postgres.rs +++ b/src/eth/storage/postgres/postgres.rs @@ -24,6 +24,7 @@ use crate::eth::primitives::LogMined; use crate::eth::primitives::LogTopic; use crate::eth::primitives::Slot; use crate::eth::primitives::SlotIndex; +use crate::eth::primitives::SlotSample; use crate::eth::primitives::StoragePointInTime; use crate::eth::primitives::TransactionMined; use crate::eth::storage::postgres::types::AccountBatch; @@ -715,6 +716,27 @@ impl PermanentStorage for Postgres { Ok(()) } + + async fn read_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: u64) -> anyhow::Result> { + let seed = (seed as f64 / 100.0).fract(); + let max_samples: Option = match max_samples { + 0 => None, + n => Some(n as i64), + }; + + let slots_sample_rows = sqlx::query_file_as!( + SlotSample, + "src/eth/storage/postgres/queries/select_random_slot_sample.sql", + seed, + start as _, + end as _, + max_samples + ) + .fetch_all(&self.connection_pool) + .await?; + + Ok(slots_sample_rows) + } } fn partition_logs(logs: impl IntoIterator) -> HashMap> { diff --git a/src/eth/storage/postgres/queries/select_random_slot_sample.sql b/src/eth/storage/postgres/queries/select_random_slot_sample.sql new file mode 100644 index 000000000..7fc33cf43 --- /dev/null +++ b/src/eth/storage/postgres/queries/select_random_slot_sample.sql @@ -0,0 +1,25 @@ +SELECT + idx as "index!: _", + value as "value!: _", + account_address as "address!: _", + block_number as "block_number!: _" +FROM + (SELECT + setseed($1), + null as "idx", + null as "value", + null as "account_address", + null as "block_number" + UNION ALL + SELECT + null, + idx, + value, + account_address, + block_number + FROM historical_slots + WHERE block_number >= $2 AND block_number < $3 + OFFSET 1 + ) +ORDER BY random() +LIMIT $4; diff --git a/src/eth/storage/stratus_storage.rs b/src/eth/storage/stratus_storage.rs index a145a726e..48e331e1f 100644 --- a/src/eth/storage/stratus_storage.rs +++ b/src/eth/storage/stratus_storage.rs @@ -14,6 +14,7 @@ use crate::eth::primitives::LogFilter; use crate::eth::primitives::LogMined; use crate::eth::primitives::Slot; use crate::eth::primitives::SlotIndex; +use crate::eth::primitives::SlotSample; use crate::eth::primitives::StoragePointInTime; use crate::eth::primitives::TransactionMined; use crate::eth::storage::PermanentStorage; @@ -220,4 +221,8 @@ impl StratusStorage { }, } } + + pub async fn read_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: u64) -> anyhow::Result> { + self.perm.read_slots_sample(start, end, max_samples, seed).await + } } diff --git a/src/infra/blockchain_client.rs b/src/infra/blockchain_client.rs index 1eb883871..9b4e9098a 100644 --- a/src/infra/blockchain_client.rs +++ b/src/infra/blockchain_client.rs @@ -8,6 +8,9 @@ use serde_json::Value as JsonValue; use crate::eth::primitives::Address; use crate::eth::primitives::BlockNumber; use crate::eth::primitives::Hash; +use crate::eth::primitives::SlotIndex; +use crate::eth::primitives::SlotValue; +use crate::eth::primitives::StoragePointInTime; use crate::eth::primitives::Wei; use crate::log_and_err; @@ -77,4 +80,25 @@ impl BlockchainClient { Err(e) => log_and_err!(reason = e, "failed to retrieve account balance"), } } + + /// Retrieves a slot at some block. + pub async fn get_storage_at(&self, address: &Address, index: &SlotIndex, point_in_time: StoragePointInTime) -> anyhow::Result { + tracing::debug!(%address, ?point_in_time, "retrieving account balance"); + + let address = serde_json::to_value(address)?; + let index = serde_json::to_value(index)?; + let number = match point_in_time { + StoragePointInTime::Present => serde_json::to_value("latest")?, + StoragePointInTime::Past(number) => serde_json::to_value(number)?, + }; + let result = self + .http + .request::>("eth_getStorageAt", vec![address, index, number]) + .await; + + match result { + Ok(value) => Ok(value), + Err(e) => log_and_err!(reason = e, "failed to retrieve account balance"), + } + } }