Skip to content

Commit

Permalink
feat: change rocksdb structure to minimize redundancy (#543)
Browse files Browse the repository at this point in the history
  • Loading branch information
carneiro-cw authored Apr 8, 2024
1 parent e7014d5 commit 1be9dde
Showing 1 changed file with 57 additions and 27 deletions.
84 changes: 57 additions & 27 deletions src/eth/storage/rocks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use anyhow::anyhow;
use anyhow::Context;
use async_trait::async_trait;
use futures::future::join_all;
use itertools::Itertools;
use num_traits::cast::ToPrimitive;
use revm::primitives::KECCAK_EMPTY;
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -37,6 +39,7 @@ use crate::eth::primitives::TransactionMined;
use crate::eth::primitives::Wei;
use crate::eth::storage::PermanentStorage;
use crate::eth::storage::StorageError;
use crate::log_and_err;

#[derive(Debug)]
pub struct RocksPermanentStorage {
Expand Down Expand Up @@ -192,19 +195,12 @@ impl PermanentStorage for RocksPermanentStorage {

async fn read_mined_transaction(&self, hash: &Hash) -> anyhow::Result<Option<TransactionMined>> {
tracing::debug!(%hash, "reading transaction");

match self.state.transactions.get(hash) {
Some(transaction) => {
tracing::trace!(%hash, "transaction found in memory");
Ok(Some(transaction.clone()))
}
None => Ok(None),
}
self.state.read_transaction(hash)
}

async fn read_logs(&self, filter: &LogFilter) -> anyhow::Result<Vec<LogMined>> {
tracing::debug!(?filter, "reading logs");
self.state.read_logs(filter).await
self.state.read_logs(filter)
}

async fn save_block(&self, block: Block) -> anyhow::Result<(), StorageError> {
Expand All @@ -219,9 +215,9 @@ impl PermanentStorage for RocksPermanentStorage {
let mut txs_batch = vec![];
let mut logs_batch = vec![];
for transaction in block.transactions.clone() {
txs_batch.push((transaction.input.hash.clone(), transaction.clone()));
txs_batch.push((transaction.input.hash.clone(), transaction.block_number));
for log in transaction.logs {
logs_batch.push(((transaction.input.hash.clone(), log.log_index), log.clone()));
logs_batch.push(((transaction.input.hash.clone(), log.log_index), transaction.block_number));
}
}

Expand All @@ -236,9 +232,12 @@ impl PermanentStorage for RocksPermanentStorage {

let blocks_by_number = Arc::clone(&self.state.blocks_by_number);
let blocks_by_hash = Arc::clone(&self.state.blocks_by_hash);
let block_clone = block.clone();
let mut block_without_changes = block.clone();
for transaction in &mut block_without_changes.transactions {
transaction.execution.changes = vec![];
}
let hash_clone = hash.clone();
futures.push(tokio::task::spawn_blocking(move || blocks_by_number.insert(number, block_clone)));
futures.push(tokio::task::spawn_blocking(move || blocks_by_number.insert(number, block_without_changes)));
futures.push(tokio::task::spawn_blocking(move || blocks_by_hash.insert(hash_clone, number)));

futures.append(
Expand Down Expand Up @@ -329,10 +328,10 @@ pub struct RocksStorageState {
pub accounts_history: Arc<RocksDb<(Address, BlockNumber), AccountInfo>>,
pub account_slots: Arc<RocksDb<(Address, SlotIndex), SlotValue>>,
pub account_slots_history: Arc<RocksDb<(Address, SlotIndex, BlockNumber), SlotValue>>,
pub transactions: Arc<RocksDb<Hash, TransactionMined>>,
pub transactions: Arc<RocksDb<Hash, BlockNumber>>,
pub blocks_by_number: Arc<RocksDb<BlockNumber, Block>>,
pub blocks_by_hash: Arc<RocksDb<Hash, BlockNumber>>,
pub logs: Arc<RocksDb<(Hash, Index), LogMined>>,
pub logs: Arc<RocksDb<(Hash, Index), BlockNumber>>,
}

impl RocksStorageState {
Expand Down Expand Up @@ -385,15 +384,15 @@ impl RocksStorageState {
}

let transactions = self.transactions.iter_start();
for (hash, transaction) in transactions {
if transaction.block_number > block_number {
for (hash, tx_block_number) in transactions {
if tx_block_number > block_number {
self.transactions.delete(&hash)?;
}
}

let logs = self.logs.iter_start();
for (key, log) in logs {
if log.block_number > block_number {
for (key, log_block_number) in logs {
if log_block_number > block_number {
self.logs.delete(&key)?;
}
}
Expand Down Expand Up @@ -521,17 +520,48 @@ impl RocksStorageState {
Ok(vec![account_changes_future, slot_changes_future])
}

pub async fn read_logs(&self, filter: &LogFilter) -> anyhow::Result<Vec<LogMined>> {
let logs = self
.logs
pub fn read_transaction(&self, tx_hash: &Hash) -> anyhow::Result<Option<TransactionMined>> {
match self.transactions.get(tx_hash) {
Some(transaction) => match self.blocks_by_number.get(&transaction) {
Some(block) => {
tracing::trace!(%tx_hash, "transaction found in memory");
match block.transactions.into_iter().find(|tx| &tx.input.hash == tx_hash) {
Some(tx) => Ok(Some(tx)),
None => log_and_err!("transaction was not found in block"),
}
}
None => {
log_and_err!("the block that the transaction was supposed to be in was not found")
}
},
None => Ok(None),
}
}

pub fn read_logs(&self, filter: &LogFilter) -> anyhow::Result<Vec<LogMined>> {
self.logs
.iter_start()
.skip_while(|(_, log)| log.block_number < filter.from_block)
.take_while(|(_, log)| match filter.to_block {
Some(to_block) => log.block_number <= to_block,
.skip_while(|(_, log_block_number)| log_block_number < &filter.from_block)
.take_while(|(_, log_block_number)| match filter.to_block {
Some(to_block) => log_block_number <= &to_block,
None => true,
})
.filter_map(|(_, log)| if filter.matches(&log) { Some(log) } else { None });
Ok(logs.collect::<Vec<_>>())
.map(|((tx_hash, _), _)| match self.read_transaction(&tx_hash) {
Ok(Some(tx)) => Ok(tx.logs),
Ok(None) => Err(anyhow!("the transaction the log was supposed to be in was not found")),
Err(err) => Err(err),
})
.flatten_ok()
.filter_map(|log_res| match log_res {
Ok(log) =>
if filter.matches(&log) {
Some(Ok(log))
} else {
None
},
err => Some(err),
})
.collect()
}

pub async fn get_slot_at_point(&self, address: &Address, slot_index: &SlotIndex, point_in_time: &StoragePointInTime) -> anyhow::Result<Option<Slot>> {
Expand Down

0 comments on commit 1be9dde

Please sign in to comment.