Skip to content

Commit

Permalink
Merge pull request eqlabs#2159 from eqlabs/sistemd/store-class-and-st…
Browse files Browse the repository at this point in the history
…orage-trie

p2p: store class and storage trie updates during sync
  • Loading branch information
sistemd authored Aug 9, 2024
2 parents 83d6ade + d8fae45 commit e42e6d1
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 109 deletions.
5 changes: 5 additions & 0 deletions crates/pathfinder/src/bin/pathfinder/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst
gossiper,
gateway_public_key,
p2p_client,
config.verify_tree_hashes,
)
} else {
tokio::task::spawn(futures::future::pending())
Expand Down Expand Up @@ -496,6 +497,7 @@ fn start_sync(
gossiper: state::Gossiper,
gateway_public_key: pathfinder_common::PublicKey,
p2p_client: Option<p2p::client::peer_agnostic::Client>,
verify_tree_hashes: bool,
) -> tokio::task::JoinHandle<anyhow::Result<()>> {
if config.p2p.proxy {
start_feeder_gateway_sync(
Expand All @@ -518,6 +520,7 @@ fn start_sync(
p2p_client,
gateway_public_key,
config.p2p.l1_checkpoint_override,
verify_tree_hashes,
)
}
}
Expand Down Expand Up @@ -591,6 +594,7 @@ fn start_p2p_sync(
p2p_client: p2p::client::peer_agnostic::Client,
gateway_public_key: pathfinder_common::PublicKey,
l1_checkpoint_override: Option<pathfinder_ethereum::EthereumStateUpdate>,
verify_tree_hashes: bool,
) -> tokio::task::JoinHandle<anyhow::Result<()>> {
let sync = pathfinder_lib::sync::Sync {
storage,
Expand All @@ -602,6 +606,7 @@ fn start_p2p_sync(
chain: pathfinder_context.network,
public_key: gateway_public_key,
l1_checkpoint_override,
verify_tree_hashes,
};
tokio::spawn(sync.run())
}
Expand Down
11 changes: 10 additions & 1 deletion crates/pathfinder/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
pub mod block_hash;
mod sync;

pub use sync::{l1, l2, revert, sync, Gossiper, SyncContext};
pub use sync::{
l1,
l2,
revert,
sync,
update_starknet_state,
Gossiper,
StarknetStateUpdate,
SyncContext,
};
22 changes: 17 additions & 5 deletions crates/pathfinder/src/state/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ pub mod l2;
mod pending;
pub mod revert;

use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;
use std::time::{Duration, Instant};

use anyhow::Context;
use pathfinder_common::prelude::*;
use pathfinder_common::state_update::{ContractUpdate, SystemContractUpdate};
use pathfinder_common::{
BlockCommitmentSignature,
Chain,
Expand Down Expand Up @@ -839,7 +841,11 @@ async fn l2_update(
.context("Create database transaction")?;
let (storage_commitment, class_commitment) = update_starknet_state(
&transaction,
&state_update,
StarknetStateUpdate {
contract_updates: &state_update.contract_updates,
system_contract_updates: &state_update.system_contract_updates,
declared_sierra_classes: &state_update.declared_sierra_classes,
},
verify_tree_hashes,
block.block_number,
storage,
Expand Down Expand Up @@ -1043,9 +1049,15 @@ async fn l2_reorg(connection: &mut Connection, reorg_tail: BlockNumber) -> anyho
})
}

fn update_starknet_state(
pub struct StarknetStateUpdate<'a> {
pub contract_updates: &'a HashMap<ContractAddress, ContractUpdate>,
pub system_contract_updates: &'a HashMap<ContractAddress, SystemContractUpdate>,
pub declared_sierra_classes: &'a HashMap<SierraHash, CasmHash>,
}

pub fn update_starknet_state(
transaction: &Transaction<'_>,
state_update: &StateUpdate,
state_update: StarknetStateUpdate<'_>,
verify_hashes: bool,
block: BlockNumber,
// we need this so that we can create extra read-only transactions for
Expand Down Expand Up @@ -1109,7 +1121,7 @@ fn update_starknet_state(
.context("Inserting contract update result")?;
}

for (contract, update) in &state_update.system_contract_updates {
for (contract, update) in state_update.system_contract_updates {
let update_result = update_contract_state(
*contract,
&update.storage,
Expand Down Expand Up @@ -1151,7 +1163,7 @@ fn update_starknet_state(
}
.with_verify_hashes(verify_hashes);

for (sierra, casm) in &state_update.declared_sierra_classes {
for (sierra, casm) in state_update.declared_sierra_classes {
let leaf_hash = pathfinder_common::calculate_class_commitment_leaf_hash(*casm);

transaction
Expand Down
2 changes: 2 additions & 0 deletions crates/pathfinder/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub struct Sync {
pub chain_id: ChainId,
pub public_key: PublicKey,
pub l1_checkpoint_override: Option<EthereumStateUpdate>,
pub verify_tree_hashes: bool,
}

impl Sync {
Expand Down Expand Up @@ -90,6 +91,7 @@ impl Sync {
chain: self.chain,
chain_id: self.chain_id,
public_key: self.public_key,
verify_tree_hashes: self.verify_tree_hashes,
}
.run(checkpoint)
.await;
Expand Down
29 changes: 25 additions & 4 deletions crates/pathfinder/src/sync/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub struct Sync {
pub chain: Chain,
pub chain_id: ChainId,
pub public_key: PublicKey,
pub verify_tree_hashes: bool,
}

impl Sync {
Expand All @@ -75,6 +76,7 @@ impl Sync {
chain_id: ChainId,
public_key: PublicKey,
l1_anchor_override: Option<EthereumStateUpdate>,
verify_tree_hashes: bool,
) -> Self {
Self {
storage,
Expand All @@ -85,6 +87,7 @@ impl Sync {
chain,
chain_id,
public_key,
verify_tree_hashes,
}
}

Expand Down Expand Up @@ -121,7 +124,8 @@ impl Sync {

// Sync the rest of the data in chronological order.
self.sync_transactions(head, self.chain_id).await?;
self.sync_state_updates(head).await?;
self.sync_state_updates(head, self.verify_tree_hashes)
.await?;
self.sync_class_definitions(head).await?;
self.sync_events(head).await?;

Expand Down Expand Up @@ -186,7 +190,11 @@ impl Sync {
}

#[tracing::instrument(level = "debug", skip(self))]
async fn sync_state_updates(&self, stop: BlockNumber) -> Result<(), SyncError> {
async fn sync_state_updates(
&self,
stop: BlockNumber,
verify_tree_hashes: bool,
) -> Result<(), SyncError> {
let Some(start) = state_updates::next_missing(self.storage.clone(), stop)
.await
.context("Finding next missing state update")?
Expand All @@ -200,7 +208,7 @@ impl Sync {
state_updates::length_and_commitment_stream(self.storage.clone(), start, stop),
);

handle_state_diff_stream(stream, self.storage.clone(), start).await?;
handle_state_diff_stream(stream, self.storage.clone(), start, verify_tree_hashes).await?;

Ok(())
}
Expand Down Expand Up @@ -319,12 +327,21 @@ async fn handle_state_diff_stream(
+ 'static,
storage: Storage,
start: BlockNumber,
verify_tree_hashes: bool,
) -> Result<(), SyncError> {
Source::from_stream(stream.map_err(|e| e.map(Into::into)))
.spawn()
.pipe(FetchStarknetVersionFromDb::new(storage.connection()?), 10)
.pipe(state_updates::VerifyCommitment, 10)
.pipe(state_updates::Store::new(storage.connection()?, start), 10)
.pipe(
state_updates::UpdateStarknetState {
storage: storage.clone(),
connection: storage.connection()?,
current_block: start,
verify_tree_hashes,
},
10,
)
.into_stream()
.inspect_ok(|x| tracing::info!(tail=%x.data, "State diff synced"))
.try_fold((), |_, _| std::future::ready(Ok(())))
Expand Down Expand Up @@ -1174,6 +1191,7 @@ mod tests {
stream::iter(streamed_state_diffs),
storage.clone(),
BlockNumber::GENESIS,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -1218,6 +1236,7 @@ mod tests {
stream::iter(streamed_state_diffs),
storage,
BlockNumber::GENESIS,
false,
)
.await,
Err(SyncError::StateDiffCommitmentMismatch(_))
Expand All @@ -1233,6 +1252,7 @@ mod tests {
)))),
StorageBuilder::in_memory().unwrap(),
BlockNumber::GENESIS,
false,
)
.await,
Err(SyncError::Other(_))
Expand All @@ -1250,6 +1270,7 @@ mod tests {
stream::iter(streamed_state_diffs),
StorageBuilder::in_memory().unwrap(),
BlockNumber::GENESIS,
false,
)
.await,
Err(SyncError::Other(_))
Expand Down
2 changes: 2 additions & 0 deletions crates/pathfinder/src/sync/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ pub(super) enum SyncError2 {
ClassDefinitionsDeclarationsMismatch,
#[error("Starknet version not found in db")]
StarknetVersionNotFound,
#[error("State root mismatch")]
StateRootMismatch,
}

impl PartialEq for SyncError2 {
Expand Down
6 changes: 0 additions & 6 deletions crates/pathfinder/src/sync/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,12 +311,6 @@ impl ProcessStage for Persist {
.context("Creating database transaction")?;

for SignedBlockHeader { header, signature } in input {
// TODO update storage and class tries on the header
let header = BlockHeader {
storage_commitment: Default::default(),
class_commitment: Default::default(),
..header
};
tx.insert_block_header(&header)
.context("Persisting block header")?;
tx.insert_signature(header.number, &signature)
Expand Down
53 changes: 37 additions & 16 deletions crates/pathfinder/src/sync/state_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ use std::num::NonZeroUsize;
use anyhow::Context;
use p2p::client::types::UnverifiedStateUpdateData;
use p2p::PeerData;
use pathfinder_common::state_update::{ContractClassUpdate, ContractUpdate, StateUpdateData};
use pathfinder_common::state_update::{self, ContractClassUpdate, ContractUpdate, StateUpdateData};
use pathfinder_common::{
BlockHash,
BlockHeader,
BlockNumber,
ClassCommitment,
StarknetVersion,
StateCommitment,
StateDiffCommitment,
Expand All @@ -20,6 +21,7 @@ use pathfinder_merkle_tree::StorageCommitmentTree;
use pathfinder_storage::{Storage, TrieUpdate};
use tokio::task::spawn_blocking;

use crate::state::{update_starknet_state, StarknetStateUpdate};
use crate::sync::error::{SyncError, SyncError2};
use crate::sync::stream::ProcessStage;

Expand Down Expand Up @@ -128,33 +130,52 @@ impl ProcessStage for VerifyCommitment {
}
}

pub struct Store {
db: pathfinder_storage::Connection,
current_block: BlockNumber,
pub struct UpdateStarknetState {
pub storage: pathfinder_storage::Storage,
pub connection: pathfinder_storage::Connection,
pub current_block: BlockNumber,
pub verify_tree_hashes: bool,
}

impl Store {
pub fn new(db: pathfinder_storage::Connection, start: BlockNumber) -> Self {
Self {
db,
current_block: start,
}
}
}

impl ProcessStage for Store {
const NAME: &'static str = "StateDiff::Persist";
impl ProcessStage for UpdateStarknetState {
type Input = StateUpdateData;
type Output = BlockNumber;

const NAME: &'static str = "StateDiff::UpdateStarknetState";

fn map(&mut self, state_update: Self::Input) -> Result<Self::Output, SyncError2> {
let mut db = self
.db
.connection
.transaction()
.context("Creating database transaction")?;

let tail = self.current_block;

let (storage_commitment, class_commitment) = update_starknet_state(
&db,
StarknetStateUpdate {
contract_updates: &state_update.contract_updates,
system_contract_updates: &state_update.system_contract_updates,
declared_sierra_classes: &state_update.declared_sierra_classes,
},
self.verify_tree_hashes,
self.current_block,
self.storage.clone(),
)
.context("Updating Starknet state")?;

// Ensure that roots match.
let state_commitment = StateCommitment::calculate(storage_commitment, class_commitment);
let expected_state_commitment = db
.state_commitment(self.current_block.into())
.context("Querying state commitment")?
.context("State commitment not found")?;
if state_commitment != expected_state_commitment {
return Err(SyncError2::StateRootMismatch);
}

db.update_storage_and_class_commitments(tail, storage_commitment, class_commitment)
.context("Updating storage and class commitments")?;
db.insert_state_update_data(self.current_block, &state_update)
.context("Inserting state update data")?;
db.commit().context("Committing db transaction")?;
Expand Down
Loading

0 comments on commit e42e6d1

Please sign in to comment.