Skip to content

Commit

Permalink
tweak: seperate modes for the db
Browse files Browse the repository at this point in the history
  • Loading branch information
zeapoz committed Apr 15, 2024
1 parent 08c20f6 commit 5b97788
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 115 deletions.
5 changes: 3 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ async fn main() -> Result<()> {
ReconstructSource::L1 { l1_fetcher_options } => {
let fetcher_options = l1_fetcher_options.into();
let processor = TreeProcessor::new(db_path.clone()).await?;
let fetcher = L1Fetcher::new(fetcher_options, Some(processor.get_snapshot()))?;
let fetcher = L1Fetcher::new(fetcher_options, Some(processor.get_inner_db()))?;
let (tx, rx) = mpsc::channel::<CommitBlock>(5);

let processor_handle = tokio::spawn(async move {
Expand Down Expand Up @@ -160,7 +160,8 @@ async fn main() -> Result<()> {
let processor = SnapshotBuilder::new(db_path);

let mut fetcher_options: L1FetcherOptions = l1_fetcher_options.into();
if let Ok(Some(batch_number)) = processor.get_last_l1_batch_number() {
if let Ok(batch_number) = processor.get_latest_l1_batch_number() {
let batch_number = batch_number.as_u64();
if batch_number > ethereum::GENESIS_BLOCK {
tracing::info!(
"Found a preexisting snapshot db, continuing from L1 block: {batch_number}"
Expand Down
24 changes: 12 additions & 12 deletions src/processor/snapshot/exporter.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use std::path::{Path, PathBuf};

use ethers::types::{U256, U64};
use ethers::types::U256;
use eyre::Result;
use state_reconstruct_storage::{
snapshot_columns,
types::{
Proto, SnapshotFactoryDependencies, SnapshotFactoryDependency, SnapshotHeader,
SnapshotStorageLogsChunk, SnapshotStorageLogsChunkMetadata,
},
InnerDB, FACTORY_DEPS, INDEX_TO_KEY_MAP,
DBMode, InnerDB, INDEX_TO_KEY_MAP,
};

use super::{DEFAULT_DB_PATH, SNAPSHOT_HEADER_FILE_NAME};
use crate::processor::snapshot::SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX;
use crate::processor::snapshot::{
DEFAULT_DB_PATH, SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX, SNAPSHOT_HEADER_FILE_NAME,
};

pub struct SnapshotExporter {
basedir: PathBuf,
Expand All @@ -25,20 +27,15 @@ impl SnapshotExporter {
None => PathBuf::from(DEFAULT_DB_PATH),
};

let database = InnerDB::new_read_only(db_path)?;
let database = InnerDB::new_read_only(db_path, DBMode::Snapshot)?;

This comment has been minimized.

Copy link
@vbar

vbar Apr 16, 2024

Collaborator

I'm opposed to having usage modes for individual structs; especially when the caller always knows which mode they want, why not keep separate structs for them?

This comment has been minimized.

Copy link
@zeapoz

zeapoz Apr 16, 2024

Author Member

Reverted back to using two structs.

Ok(Self {
basedir: basedir.to_path_buf(),
database,
})
}

pub fn export_snapshot(&self, chunk_size: u64) -> Result<()> {
let l1_batch_number = U64::from(
self.database
.get_last_l1_batch_number()?
.expect("snapshot db contains no L1 batch number"),
);

let l1_batch_number = self.database.get_latest_l1_batch_number()?;
let mut header = SnapshotHeader {
l1_batch_number,
..Default::default()
Expand All @@ -62,7 +59,10 @@ impl SnapshotExporter {
fn export_factory_deps(&self, header: &mut SnapshotHeader) -> Result<()> {
tracing::info!("Exporting factory dependencies...");

let storage_logs = self.database.cf_handle(FACTORY_DEPS).unwrap();
let storage_logs = self
.database
.cf_handle(snapshot_columns::FACTORY_DEPS)
.unwrap();
let mut iterator = self
.database
.iterator_cf(storage_logs, rocksdb::IteratorMode::Start);
Expand Down
4 changes: 2 additions & 2 deletions src/processor/snapshot/importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use eyre::Result;
use state_reconstruct_fetcher::constants::storage::INNER_DB_NAME;
use state_reconstruct_storage::{
types::{Proto, SnapshotFactoryDependencies, SnapshotHeader, SnapshotStorageLogsChunk},
InnerDB,
DBMode, InnerDB,
};
use tokio::sync::Mutex;

Expand All @@ -25,7 +25,7 @@ pub struct SnapshotImporter {
impl SnapshotImporter {
pub async fn new(directory: PathBuf, db_path: &Path) -> Result<Self> {
let inner_db_path = db_path.join(INNER_DB_NAME);
let new_state = InnerDB::new(inner_db_path.clone())?;
let new_state = InnerDB::new(inner_db_path.clone(), DBMode::Reconstruction)?;
let snapshot = Arc::new(Mutex::new(new_state));
let tree = TreeWrapper::new(db_path, snapshot.clone(), true).await?;

Expand Down
14 changes: 7 additions & 7 deletions src/processor/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use state_reconstruct_fetcher::{
use state_reconstruct_storage::{
bytecode,
types::{MiniblockNumber, SnapshotFactoryDependency, SnapshotStorageLog},
InnerDB,
DBMode, InnerDB,
};
use tokio::sync::mpsc;

Expand All @@ -35,7 +35,7 @@ impl SnapshotBuilder {
None => PathBuf::from(DEFAULT_DB_PATH),
};

let mut database = InnerDB::new(db_path).unwrap();
let mut database = InnerDB::new(db_path, DBMode::Snapshot).unwrap();

let idx = database
.get_last_repeated_key_index()
Expand All @@ -51,8 +51,8 @@ impl SnapshotBuilder {
}

// Gets the next L1 batch number to be processed for ues in state recovery.
pub fn get_last_l1_batch_number(&self) -> Result<Option<u64>> {
self.database.get_last_l1_batch_number()
pub fn get_latest_l1_batch_number(&self) -> Result<U64> {
self.database.get_latest_l1_batch_number()
}
}

Expand Down Expand Up @@ -120,7 +120,7 @@ impl Processor for SnapshotBuilder {
}

if let Some(number) = block.l1_block_number {
let _ = self.database.set_last_l1_batch_number(number);
let _ = self.database.set_latest_l1_batch_number(number);
};
}
}
Expand Down Expand Up @@ -236,7 +236,7 @@ mod tests {
use std::fs;

use indexmap::IndexMap;
use state_reconstruct_storage::{InnerDB, PackingType};
use state_reconstruct_storage::{DBMode, InnerDB, PackingType};

use super::*;

Expand Down Expand Up @@ -271,7 +271,7 @@ mod tests {
builder.run(rx).await;
}

let db = InnerDB::new(PathBuf::from(db_dir.clone())).unwrap();
let db = InnerDB::new(PathBuf::from(db_dir.clone()), DBMode::Snapshot).unwrap();

let key = U256::from_dec_str("1234").unwrap();
let Some(log) = db.get_storage_log(&key).unwrap() else {
Expand Down
24 changes: 12 additions & 12 deletions src/processor/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use state_reconstruct_fetcher::{
metrics::{PerfMetric, METRICS_TRACING_TARGET},
types::CommitBlock,
};
use state_reconstruct_storage::InnerDB;
use state_reconstruct_storage::{DBMode::Reconstruction, InnerDB};
use tokio::{
sync::{mpsc, Mutex},
time::Instant,
Expand All @@ -26,7 +26,7 @@ pub struct TreeProcessor {
/// The internal merkle tree.
tree: TreeWrapper,
/// The stored state snapshot.
snapshot: Arc<Mutex<InnerDB>>,
inner_db: Arc<Mutex<InnerDB>>,
}

impl TreeProcessor {
Expand All @@ -46,15 +46,15 @@ impl TreeProcessor {
);
}

let new_state = InnerDB::new(inner_db_path.clone())?;
let snapshot = Arc::new(Mutex::new(new_state));
let tree = TreeWrapper::new(&db_path, snapshot.clone(), init).await?;
let new_state = InnerDB::new(inner_db_path.clone(), Reconstruction)?;
let inner_db = Arc::new(Mutex::new(new_state));
let tree = TreeWrapper::new(&db_path, inner_db.clone(), init).await?;

Ok(Self { tree, snapshot })
Ok(Self { tree, inner_db })
}

pub fn get_snapshot(&self) -> Arc<Mutex<InnerDB>> {
self.snapshot.clone()
pub fn get_inner_db(&self) -> Arc<Mutex<InnerDB>> {
self.inner_db.clone()
}
}

Expand All @@ -66,10 +66,10 @@ impl Processor for TreeProcessor {
while let Some(block) = rx.recv().await {
// Check if we've already processed this block.
let latest_l2 = self
.snapshot
.inner_db
.lock()
.await
.get_latest_l2_block_number()
.get_latest_l2_batch_number()
.expect("value should default to 0");
if latest_l2 >= block.l2_block_number {
tracing::debug!(
Expand All @@ -89,10 +89,10 @@ impl Processor for TreeProcessor {

// Update snapshot values.
before = Instant::now();
self.snapshot
self.inner_db
.lock()
.await
.set_latest_l2_block_number(block.l2_block_number)
.set_latest_l2_batch_number(block.l2_block_number)
.expect("db failed");

if snapshot_metric.add(before.elapsed()) > 10 {
Expand Down
18 changes: 9 additions & 9 deletions src/processor/tree/tree_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ pub struct TreeWrapper {
index_to_key: HashMap<u64, U256>,
key_to_value: HashMap<U256, H256>,
tree: MerkleTree<RocksDBWrapper>,
snapshot: Arc<Mutex<InnerDB>>,
inner_db: Arc<Mutex<InnerDB>>,
}

impl TreeWrapper {
/// Attempts to create a new [`TreeWrapper`].
pub async fn new(
db_path: &Path,
snapshot: Arc<Mutex<InnerDB>>,
inner_db: Arc<Mutex<InnerDB>>,
reconstruct: bool,
) -> Result<Self> {
let db_opt = RocksDBOptions {
Expand All @@ -40,15 +40,15 @@ impl TreeWrapper {
let mut tree = MerkleTree::new(db);

if reconstruct {
let mut guard = snapshot.lock().await;
let mut guard = inner_db.lock().await;
reconstruct_genesis_state(&mut tree, &mut guard, INITAL_STATE_PATH)?;
}

Ok(Self {
index_to_key: HashMap::new(),
key_to_value: HashMap::new(),
tree,
snapshot,
inner_db,
})
}

Expand All @@ -64,7 +64,7 @@ impl TreeWrapper {
let value = self.process_value(*key, *value);

tree_entries.push(TreeEntry::new(*key, index, value));
self.snapshot
self.inner_db
.lock()
.await
.add_key(key)
Expand All @@ -77,7 +77,7 @@ impl TreeWrapper {
let index = *index;
// Index is 1-based so we subtract 1.
let key = self
.snapshot
.inner_db
.lock()
.await
.get_key(index - 1)
Expand Down Expand Up @@ -131,7 +131,7 @@ impl TreeWrapper {

for log in &chunk.storage_logs {
tree_entries.push(TreeEntry::new(log.key, log.enumeration_index, log.value));
self.snapshot
self.inner_db
.lock()
.await
.add_key(&log.key)
Expand All @@ -147,8 +147,8 @@ impl TreeWrapper {

tracing::info!("Succesfully imported snapshot containing {num_tree_entries} storage logs!",);

let snapshot = self.snapshot.lock().await;
snapshot.set_latest_l1_block_number(l1_batch_number.as_u64() + 1)?;
let db = self.inner_db.lock().await;
db.set_latest_l1_batch_number(l1_batch_number.as_u64() + 1)?;

Ok(())
}
Expand Down
10 changes: 5 additions & 5 deletions state-reconstruct-fetcher/src/l1_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub struct L1Fetcher {
}

impl L1Fetcher {
pub fn new(config: L1FetcherOptions, snapshot: Option<Arc<Mutex<InnerDB>>>) -> Result<Self> {
pub fn new(config: L1FetcherOptions, inner_db: Option<Arc<Mutex<InnerDB>>>) -> Result<Self> {
let provider = Provider::<Http>::try_from(&config.http_url)
.expect("could not instantiate HTTP Provider");

Expand All @@ -87,7 +87,7 @@ impl L1Fetcher {
provider,
contracts,
config,
inner_db: snapshot,
inner_db,
metrics,
})
}
Expand All @@ -102,7 +102,7 @@ impl L1Fetcher {
if current_l1_block_number == GENESIS_BLOCK.into() {
if let Some(snapshot) = &self.inner_db {
let snapshot_latest_l1_block_number =
snapshot.lock().await.get_latest_l1_block_number()?;
snapshot.lock().await.get_latest_l1_batch_number()?;
if snapshot_latest_l1_block_number > current_l1_block_number {
current_l1_block_number = snapshot_latest_l1_block_number;
tracing::info!(
Expand All @@ -124,7 +124,7 @@ impl L1Fetcher {
metrics.first_l1_block_num = current_l1_block_number.as_u64();
metrics.latest_l1_block_num = current_l1_block_number.as_u64();
if let Some(snapshot) = &self.inner_db {
metrics.latest_l2_block_num = snapshot.lock().await.get_latest_l2_block_number()?;
metrics.latest_l2_block_num = snapshot.lock().await.get_latest_l2_batch_number()?;
metrics.first_l2_block_num = metrics.latest_l2_block_num;
}
}
Expand Down Expand Up @@ -194,7 +194,7 @@ impl L1Fetcher {
snapshot
.lock()
.await
.set_latest_l1_block_number(block_num)?;
.set_latest_l1_batch_number(block_num)?;
}

// Fetching is naturally ahead of parsing, but the data
Expand Down
Loading

0 comments on commit 5b97788

Please sign in to comment.