Skip to content

Commit

Permalink
feat: state validator binary (#296)
Browse files Browse the repository at this point in the history
* implement state comparison job

* rename to validate_state

* Panic if state is invalid

* extract validator to its own binary

* fix query

* undo imprter changes

* improve config

* fix seed

* Implement get_slots_sample for inmemory

* fix pg seed

* make inmemory get_slots_sample behave like postgres'

* refactor

* validate concurrently

* fmt

* rename get_slots_sample

* is_contract

* fmt
  • Loading branch information
carneiro-cw authored Mar 1, 2024
1 parent 72e53f9 commit 595428d
Show file tree
Hide file tree
Showing 14 changed files with 368 additions and 3 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ sqlx = { version = "0.7.3", features = ["runtime-tokio", "postgres", "bigdecimal

# test
fake = { version = "2.9.2", features = ["derive"] }
rand = "0.8.5"

[dev-dependencies]
binary_macros = "1.0.0"
Expand Down Expand Up @@ -94,6 +95,10 @@ path = "src/bin/importer/importer-import.rs"
name = "rpc-server-poller"
path = "src/bin/rpc_server_poller.rs"

[[bin]]
name = "state-validator"
path = "src/bin/state-validator.rs"

# ------------------------------------------------------------------------------
# Lints
# ------------------------------------------------------------------------------
Expand Down
100 changes: 100 additions & 0 deletions src/bin/state-validator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use std::sync::Arc;
use std::time::Duration;

use rand::Rng;
use stratus::config::StateValidatorConfig;
use stratus::config::ValidatorMethodConfig;
use stratus::eth::primitives::BlockNumber;
use stratus::eth::storage::StratusStorage;
use stratus::infra::BlockchainClient;
use stratus::init_global_services;
use tokio::task::JoinSet;

const RPC_TIMEOUT: Duration = Duration::from_secs(2);

#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
// init services
let config: StateValidatorConfig = init_global_services();
let storage = config.init_storage().await?;

let interval = BlockNumber::from(config.interval);

let mut latest_compared_block = BlockNumber::ZERO;

let mut futures = JoinSet::new();
loop {
let current_block = storage.read_current_block_number().await?;
if current_block - latest_compared_block >= interval && futures.len() < config.concurrent_tasks as usize {
let future = validate_state(
config.method.clone(),
Arc::clone(&storage),
latest_compared_block,
latest_compared_block + interval,
config.sample_size,
config.seed,
);

futures.spawn(future);

latest_compared_block = latest_compared_block + interval;
} else if let Some(res) = futures.join_next().await {
res??;
}
}
}

async fn validate_state(
method: ValidatorMethodConfig,
storage: Arc<StratusStorage>,
start: BlockNumber,
end: BlockNumber,
max_sample_size: u64,
seed: u64,
) -> anyhow::Result<()> {
match method {
ValidatorMethodConfig::Rpc { url } => {
let chain = BlockchainClient::new(&url, RPC_TIMEOUT)?;
validate_state_rpc(&chain, storage, start, end, max_sample_size, seed).await
}
_ => todo!(),
}
}

async fn validate_state_rpc(
chain: &BlockchainClient,
storage: Arc<StratusStorage>,
start: BlockNumber,
end: BlockNumber,
max_sample_size: u64,
seed: u64,
) -> anyhow::Result<()> {
tracing::debug!("Validating state {:?}, {:?}", start, end);
let seed = match seed {
0 => {
let mut rng = rand::thread_rng();
rng.gen()
}
n => n,
};
let slots = storage.read_slots_sample(start, end, max_sample_size, seed).await?;
for sampled_slot in slots {
let expected_value = chain
.get_storage_at(
&sampled_slot.address,
&sampled_slot.index,
stratus::eth::primitives::StoragePointInTime::Past(sampled_slot.block_number),
)
.await?;

if sampled_slot.value != expected_value {
return Err(anyhow::anyhow!(
"State mismatch on slot {:?}, expected value: {:?}, found: {:?}",
sampled_slot,
expected_value,
sampled_slot.value
));
}
}
Ok(())
}
77 changes: 77 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,34 @@ pub struct RpcPollerConfig {
pub common: CommonConfig,
}

/// Configuration for importer-import binary.
#[derive(Parser, Debug, derive_more::Deref)]
pub struct StateValidatorConfig {
#[deref]
#[clap(flatten)]
pub common: CommonConfig,

/// How many slots to validate per batch. 0 means every slot.
#[arg(long = "max-samples", env = "MAX_SAMPLES", default_value_t = 0)]
pub sample_size: u64,

/// Seed to use when sampling. 0 for random seed.
#[arg(long = "seed", env = "SEED", default_value_t = 0, requires = "sample_size")]
pub seed: u64,

/// Validate in batches of n blocks.
#[arg(short = 'i', long = "inverval", env = "INVERVAL", default_value_t = 1000)]
pub interval: u64,

/// What method to use when validating.
#[arg(short = 'm', long = "method", env = "METHOD")]
pub method: ValidatorMethodConfig,

/// How many concurrent validation tasks to run
#[arg(short = 'c', long = "concurrent-tasks", env = "CONCURRENT_TASKS", default_value_t = 10)]
pub concurrent_tasks: u16,
}

/// Common configuration that can be used by any binary.
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
Expand Down Expand Up @@ -222,3 +250,52 @@ impl FromStr for StorageConfig {
}
}
}

#[derive(Clone, Debug, strum::Display)]
pub enum ValidatorMethodConfig {
Rpc { url: String },
CompareTables,
}

impl FromStr for ValidatorMethodConfig {
type Err = anyhow::Error;

fn from_str(s: &str) -> anyhow::Result<Self, Self::Err> {
match s {
"compare_tables" => Ok(Self::CompareTables),
s => Ok(Self::Rpc { url: s.to_string() }),
}
}
}

/// Enviroment where the application is running.
#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
pub enum Environment {
Development,
Production,
}

impl Environment {
/// Checks if the current environment is production.
pub fn is_production(&self) -> bool {
matches!(self, Self::Production)
}

/// Checks if the current environment is development.
pub fn is_development(&self) -> bool {
matches!(self, Self::Development)
}
}

impl FromStr for Environment {
type Err = anyhow::Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let s = s.trim().to_lowercase();
match s.as_str() {
"dev" | "development" => Ok(Self::Development),
"prod" | "production" => Ok(Self::Production),
s => Err(anyhow!("unknown environment: {}", s)),
}
}
}
1 change: 1 addition & 0 deletions src/eth/primitives/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ pub use log_topic::LogTopic;
pub use nonce::Nonce;
pub use slot::Slot;
pub use slot::SlotIndex;
pub use slot::SlotSample;
pub use slot::SlotValue;
pub use storage_point_in_time::StoragePointInTime;
pub use transaction_input::TransactionInput;
Expand Down
10 changes: 10 additions & 0 deletions src/eth/primitives/slot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use sqlx::error::BoxDynError;
use sqlx::postgres::PgHasArrayType;
use sqlx::Decode;

use super::Address;
use crate::eth::primitives::BlockNumber;
use crate::gen_newtype_from;

#[derive(Debug, Clone, Default, PartialEq, Eq, fake::Dummy, serde::Serialize, serde::Deserialize)]
Expand Down Expand Up @@ -202,3 +204,11 @@ impl PgHasArrayType for SlotValue {
<[u8; 32] as PgHasArrayType>::array_type_info()
}
}

#[derive(Debug, sqlx::Decode)]
pub struct SlotSample {
pub address: Address,
pub block_number: BlockNumber,
pub index: SlotIndex,
pub value: SlotValue,
}
12 changes: 9 additions & 3 deletions src/eth/storage/inmemory/inmemory_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ use nonempty::NonEmpty;
use crate::eth::primitives::BlockNumber;
use crate::eth::primitives::StoragePointInTime;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct InMemoryHistory<T>(NonEmpty<InMemoryHistoryValue<T>>)
where
T: Clone + Debug;

#[derive(Clone, Debug, derive_new::new)]
pub struct InMemoryHistoryValue<T> {
block_number: BlockNumber,
value: T,
pub block_number: BlockNumber,
pub value: T,
}

impl<T> InMemoryHistory<T>
Expand Down Expand Up @@ -71,3 +71,9 @@ where
&self.0.last().value
}
}

impl<T: Clone + Debug> From<InMemoryHistory<T>> for Vec<InMemoryHistoryValue<T>> {
fn from(value: InMemoryHistory<T>) -> Self {
value.0.into()
}
}
43 changes: 43 additions & 0 deletions src/eth/storage/inmemory/inmemory_permanent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use std::sync::Arc;
use async_trait::async_trait;
use indexmap::IndexMap;
use metrics::atomics::AtomicU64;
use rand::rngs::StdRng;
use rand::seq::IteratorRandom;
use rand::SeedableRng;
use tokio::sync::RwLock;
use tokio::sync::RwLockReadGuard;
use tokio::sync::RwLockWriteGuard;
Expand All @@ -26,6 +29,7 @@ use crate::eth::primitives::LogMined;
use crate::eth::primitives::Nonce;
use crate::eth::primitives::Slot;
use crate::eth::primitives::SlotIndex;
use crate::eth::primitives::SlotSample;
use crate::eth::primitives::StoragePointInTime;
use crate::eth::primitives::TransactionMined;
use crate::eth::primitives::Wei;
Expand Down Expand Up @@ -353,6 +357,41 @@ impl PermanentStorage for InMemoryPermanentStorage {

Ok(())
}

async fn read_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: u64) -> anyhow::Result<Vec<SlotSample>> {
let state = self.lock_read().await;

let samples = state
.accounts
.iter()
.filter(|(_, account_info)| account_info.is_contract())
.flat_map(|(_, contract)| {
contract
.slots
.values()
.flat_map(|slot_history| Vec::from((*slot_history).clone()))
.filter_map(|slot| {
if slot.block_number >= start && slot.block_number < end {
Some(SlotSample {
address: contract.address.clone(),
block_number: slot.block_number,
index: slot.value.index,
value: slot.value.value,
})
} else {
None
}
})
});

match max_samples {
0 => Ok(samples.collect()),
n => {
let mut rng = StdRng::seed_from_u64(seed);
Ok(samples.choose_multiple(&mut rng, n as usize))
}
}
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -398,4 +437,8 @@ impl InMemoryPermanentAccount {
}
self.slots = new_slots;
}

fn is_contract(&self) -> bool {
self.bytecode.get_current().is_some()
}
}
Loading

0 comments on commit 595428d

Please sign in to comment.