Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: state validator binary #296

Merged
merged 20 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ sqlx = { version = "0.7.3", features = ["runtime-tokio", "postgres", "bigdecimal

# test
fake = { version = "2.9.2", features = ["derive"] }
rand = "0.8.5"

[dev-dependencies]
binary_macros = "1.0.0"
Expand Down Expand Up @@ -94,6 +95,10 @@ path = "src/bin/importer/importer-import.rs"
name = "rpc-server-poller"
path = "src/bin/rpc_server_poller.rs"

[[bin]]
name = "state-validator"
path = "src/bin/state-validator.rs"

# ------------------------------------------------------------------------------
# Lints
# ------------------------------------------------------------------------------
Expand Down
100 changes: 100 additions & 0 deletions src/bin/state-validator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use std::sync::Arc;
use std::time::Duration;

use rand::Rng;
use stratus::config::StateValidatorConfig;
use stratus::config::ValidatorMethodConfig;
use stratus::eth::primitives::BlockNumber;
use stratus::eth::storage::StratusStorage;
use stratus::infra::BlockchainClient;
use stratus::init_global_services;
use tokio::task::JoinSet;

const RPC_TIMEOUT: Duration = Duration::from_secs(2);

#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
// init services
let config: StateValidatorConfig = init_global_services();
let storage = config.init_storage().await?;

let interval = BlockNumber::from(config.interval);

let mut latest_compared_block = BlockNumber::ZERO;

let mut futures = JoinSet::new();
loop {
let current_block = storage.read_current_block_number().await?;
if current_block - latest_compared_block >= interval && futures.len() < config.concurrent_tasks as usize {
let future = validate_state(
config.method.clone(),
Arc::clone(&storage),
latest_compared_block,
latest_compared_block + interval,
config.sample_size,
config.seed,
);

futures.spawn(future);

latest_compared_block = latest_compared_block + interval;
} else if let Some(res) = futures.join_next().await {
res??;
}
}
}

async fn validate_state(
method: ValidatorMethodConfig,
storage: Arc<StratusStorage>,
start: BlockNumber,
end: BlockNumber,
max_sample_size: u64,
seed: u64,
) -> anyhow::Result<()> {
match method {
ValidatorMethodConfig::Rpc { url } => {
let chain = BlockchainClient::new(&url, RPC_TIMEOUT)?;
validate_state_rpc(&chain, storage, start, end, max_sample_size, seed).await
}
_ => todo!(),
}
}

async fn validate_state_rpc(
chain: &BlockchainClient,
storage: Arc<StratusStorage>,
start: BlockNumber,
end: BlockNumber,
max_sample_size: u64,
seed: u64,
) -> anyhow::Result<()> {
tracing::debug!("Validating state {:?}, {:?}", start, end);
let seed = match seed {
0 => {
let mut rng = rand::thread_rng();
rng.gen()
}
n => n,
};
let slots = storage.get_slots_sample(start, end, max_sample_size, seed).await?;
for sampled_slot in slots {
let expected_value = chain
.get_storage_at(
&sampled_slot.address,
&sampled_slot.index,
stratus::eth::primitives::StoragePointInTime::Past(sampled_slot.block_number),
)
.await?;

if sampled_slot.value != expected_value {
return Err(anyhow::anyhow!(
"State mismatch on slot {:?}, expected value: {:?}, found: {:?}",
sampled_slot,
expected_value,
sampled_slot.value
));
}
}
Ok(())
}
45 changes: 45 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,34 @@ pub struct RpcPollerConfig {
pub common: CommonConfig,
}

/// Configuration for importer-import binary.
#[derive(Parser, Debug, derive_more::Deref)]
pub struct StateValidatorConfig {
#[deref]
#[clap(flatten)]
pub common: CommonConfig,

/// How many slots to validate per batch. 0 means every slot.
#[arg(long = "max-samples", env = "MAX_SAMPLES", default_value_t = 0)]
pub sample_size: u64,

/// Seed to use when sampling. 0 for random seed.
#[arg(long = "seed", env = "SEED", default_value_t = 0, requires = "sample_size")]
pub seed: u64,

/// Validate in batches of n blocks.
#[arg(short = 'i', long = "inverval", env = "INVERVAL", default_value_t = 1000)]
pub interval: u64,

/// What method to use when validating.
#[arg(short = 'm', long = "method", env = "METHOD")]
pub method: ValidatorMethodConfig,

/// How many concurrent validation tasks to run
#[arg(short = 'c', long = "concurrent-tasks", env = "CONCURRENT_TASKS", default_value_t = 10)]
pub concurrent_tasks: u16,
}

/// Common configuration that can be used by any binary.
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
Expand Down Expand Up @@ -226,6 +254,23 @@ impl FromStr for StorageConfig {
}
}

#[derive(Clone, Debug, strum::Display)]
pub enum ValidatorMethodConfig {
Rpc { url: String },
CompareTables,
}

impl FromStr for ValidatorMethodConfig {
type Err = anyhow::Error;

fn from_str(s: &str) -> anyhow::Result<Self, Self::Err> {
match s {
"compare_tables" => Ok(Self::CompareTables),
s => Ok(Self::Rpc { url: s.to_string() }),
}
}
}

/// Enviroment where the application is running.
#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
pub enum Environment {
Expand Down
1 change: 1 addition & 0 deletions src/eth/primitives/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ pub use log_topic::LogTopic;
pub use nonce::Nonce;
pub use slot::Slot;
pub use slot::SlotIndex;
pub use slot::SlotSample;
pub use slot::SlotValue;
pub use storage_point_in_time::StoragePointInTime;
pub use transaction_input::TransactionInput;
Expand Down
10 changes: 10 additions & 0 deletions src/eth/primitives/slot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use sqlx::error::BoxDynError;
use sqlx::postgres::PgHasArrayType;
use sqlx::Decode;

use super::Address;
use crate::eth::primitives::BlockNumber;
use crate::gen_newtype_from;

#[derive(Debug, Clone, Default, PartialEq, Eq, fake::Dummy, serde::Serialize, serde::Deserialize)]
Expand Down Expand Up @@ -202,3 +204,11 @@ impl PgHasArrayType for SlotValue {
<[u8; 32] as PgHasArrayType>::array_type_info()
}
}

#[derive(Debug, sqlx::Decode)]
pub struct SlotSample {
pub address: Address,
pub block_number: BlockNumber,
pub index: SlotIndex,
carneiro-cw marked this conversation as resolved.
Show resolved Hide resolved
pub value: SlotValue,
}
12 changes: 9 additions & 3 deletions src/eth/storage/inmemory/inmemory_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ use nonempty::NonEmpty;
use crate::eth::primitives::BlockNumber;
use crate::eth::primitives::StoragePointInTime;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct InMemoryHistory<T>(NonEmpty<InMemoryHistoryValue<T>>)
where
T: Clone + Debug;

#[derive(Clone, Debug, derive_new::new)]
pub struct InMemoryHistoryValue<T> {
block_number: BlockNumber,
value: T,
pub block_number: BlockNumber,
pub value: T,
}

impl<T> InMemoryHistory<T>
Expand Down Expand Up @@ -71,3 +71,9 @@ where
&self.0.last().value
}
}

impl<T: Clone + Debug> From<InMemoryHistory<T>> for Vec<InMemoryHistoryValue<T>> {
fn from(value: InMemoryHistory<T>) -> Self {
value.0.into()
}
}
39 changes: 39 additions & 0 deletions src/eth/storage/inmemory/inmemory_permanent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use std::sync::Arc;
use async_trait::async_trait;
use indexmap::IndexMap;
use metrics::atomics::AtomicU64;
use rand::rngs::StdRng;
use rand::seq::IteratorRandom;
use rand::SeedableRng;
use tokio::sync::RwLock;
use tokio::sync::RwLockReadGuard;
use tokio::sync::RwLockWriteGuard;
Expand All @@ -26,6 +29,7 @@ use crate::eth::primitives::LogMined;
use crate::eth::primitives::Nonce;
use crate::eth::primitives::Slot;
use crate::eth::primitives::SlotIndex;
use crate::eth::primitives::SlotSample;
use crate::eth::primitives::StoragePointInTime;
use crate::eth::primitives::TransactionMined;
use crate::eth::primitives::Wei;
Expand Down Expand Up @@ -353,6 +357,41 @@ impl PermanentStorage for InMemoryPermanentStorage {

Ok(())
}

async fn get_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: u64) -> anyhow::Result<Vec<SlotSample>> {
let state = self.lock_read().await;

let samples = state
.accounts
.iter()
.filter(|(_, account_info)| account_info.bytecode.get_current().is_some())
carneiro-cw marked this conversation as resolved.
Show resolved Hide resolved
.flat_map(|(_, contract)| {
contract
.slots
.values()
.flat_map(|slot_history| Vec::from((*slot_history).clone()))
.filter_map(|slot| {
if slot.block_number >= start && slot.block_number < end {
Some(SlotSample {
address: contract.address.clone(),
block_number: slot.block_number,
index: slot.value.index,
value: slot.value.value,
})
} else {
None
}
})
});

match max_samples {
0 => Ok(samples.collect()),
n => {
let mut rng = StdRng::seed_from_u64(seed);
Ok(samples.choose_multiple(&mut rng, n as usize))
}
}
}
}

#[derive(Debug)]
Expand Down
3 changes: 3 additions & 0 deletions src/eth/storage/permanent_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::eth::primitives::LogFilter;
use crate::eth::primitives::LogMined;
use crate::eth::primitives::Slot;
use crate::eth::primitives::SlotIndex;
use crate::eth::primitives::SlotSample;
use crate::eth::primitives::StoragePointInTime;
use crate::eth::primitives::TransactionMined;
use crate::eth::storage::StorageError;
Expand Down Expand Up @@ -51,4 +52,6 @@ pub trait PermanentStorage: Send + Sync {

/// Resets all state to a specific block number.
async fn reset_at(&self, number: BlockNumber) -> anyhow::Result<()>;

async fn get_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: u64) -> anyhow::Result<Vec<SlotSample>>;
}
22 changes: 22 additions & 0 deletions src/eth/storage/postgres/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::eth::primitives::LogMined;
use crate::eth::primitives::LogTopic;
use crate::eth::primitives::Slot;
use crate::eth::primitives::SlotIndex;
use crate::eth::primitives::SlotSample;
use crate::eth::primitives::StoragePointInTime;
use crate::eth::primitives::TransactionMined;
use crate::eth::storage::postgres::types::AccountBatch;
Expand Down Expand Up @@ -715,6 +716,27 @@ impl PermanentStorage for Postgres {

Ok(())
}

async fn get_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: u64) -> anyhow::Result<Vec<SlotSample>> {
let seed = (seed as f64 / 100.0).fract();
let max_samples: Option<i64> = match max_samples {
0 => None,
n => Some(n as i64),
};

let slots_sample_rows = sqlx::query_file_as!(
SlotSample,
"src/eth/storage/postgres/queries/select_random_slot_sample.sql",
seed,
start as _,
end as _,
max_samples
)
.fetch_all(&self.connection_pool)
.await?;

Ok(slots_sample_rows)
}
}

fn partition_logs(logs: impl IntoIterator<Item = PostgresLog>) -> HashMap<TransactionHash, Vec<PostgresLog>> {
Expand Down
Loading