Skip to content

Commit

Permalink
enha: tidy rocksdb storage (#1906)
Browse files Browse the repository at this point in the history
* enha: tidy rocks - store BlockNumber as u32

* chore: rename LogMinedRockdb to LogMinedRocksdb

* enha: tidy rocksdb - writing txs - don't write block number and hash

those 2 can always be loaded from context, in this case, the header of
the block

* enha: tidying rocksdb - log index and tx index as u32

* enha: rocks tidying - txType u64->u8

* enha: tidying rocksdb - don't store block number and hash for each tx log

* enha: tidying rocksdb - don't store log index

* enha: rocks tidying - don't store tx hash or index in log

* update snapshot tests
  • Loading branch information
marcospb19-cw committed Dec 15, 2024
1 parent 1e9efa2 commit de60e3e
Show file tree
Hide file tree
Showing 16 changed files with 118 additions and 105 deletions.
20 changes: 15 additions & 5 deletions src/bin/historic_events_processor.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::borrow::Cow;
use std::time::Duration;

use anyhow::Context;
Expand All @@ -6,7 +7,10 @@ use chrono::TimeZone;
use chrono::Timelike;
use indicatif::ProgressBar;
use rocksdb::properties::ESTIMATE_NUM_KEYS;
use stratus::eth::primitives::TransactionMined;
use stratus::eth::storage::permanent::rocks::types::BlockNumberRocksdb;
use stratus::eth::storage::permanent::rocks::types::BlockRocksdb;
use stratus::eth::storage::permanent::rocks::types::HashRocksdb;
use stratus::eth::storage::permanent::rocks::types::TransactionMinedRocksdb;
use stratus::eth::storage::permanent::rocks::types::UnixTimeRocksdb;
use stratus::eth::storage::permanent::rocks::RocksStorageState;
Expand All @@ -18,8 +22,14 @@ use stratus::ledger::events::Event;
const TIMEOUT: Duration = Duration::from_secs(5);

/// Converts a mined transaction from RocksDB to account transfer events
fn transaction_mined_rocks_db_to_events(block_timestamp: UnixTimeRocksdb, tx: TransactionMinedRocksdb) -> Vec<AccountTransfers> {
transaction_to_events(block_timestamp.into(), std::borrow::Cow::Owned(tx.into()))
fn transaction_mined_rocks_db_to_events(
block_timestamp: UnixTimeRocksdb,
tx: TransactionMinedRocksdb,
block_number: BlockNumberRocksdb,
block_hash: HashRocksdb,
) -> Vec<AccountTransfers> {
let tx = TransactionMined::from_rocks_primitives(tx, block_number, block_hash);
transaction_to_events(block_timestamp.into(), Cow::Owned(tx))
}

/// Returns total count of blocks and transactions from RocksDB state
Expand Down Expand Up @@ -72,7 +82,7 @@ fn process_block_events(block: BlockRocksdb) -> Vec<String> {
block
.transactions
.into_iter()
.flat_map(|tx| transaction_mined_rocks_db_to_events(timestamp, tx))
.flat_map(|tx| transaction_mined_rocks_db_to_events(timestamp, tx, block.header.number, block.header.hash))
.map(|event| event.event_payload().unwrap())
.collect()
}
Expand All @@ -87,13 +97,13 @@ fn main() -> Result<(), anyhow::Error> {
// Load last processed block number from file
tracing::info!("loading last processed block");
let start_block = std::fs::read_to_string("last_processed_block")
.map(|s| s.trim().parse::<u64>().unwrap())
.map(|s| s.trim().parse::<u32>().unwrap())
.unwrap_or(0);
tracing::info!(?start_block);

tracing::info!("creating rocksdb iterator");
let iter = if start_block > 0 {
b_pb.inc(start_block);
b_pb.inc(start_block.into());
state.blocks_by_number.iter_from(start_block.into(), rocksdb::Direction::Forward)?
} else {
state.blocks_by_number.iter_start()
Expand Down
4 changes: 1 addition & 3 deletions src/eth/follower/importer/importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use crate::ext::traced_sleep;
use crate::ext::DisplayExt;
use crate::ext::SleepReason;
use crate::globals::IMPORTER_ONLINE_TASKS_SEMAPHORE;
use crate::if_else;
use crate::infra::kafka::KafkaConnector;
#[cfg(feature = "metrics")]
use crate::infra::metrics;
Expand Down Expand Up @@ -62,9 +61,8 @@ static EXTERNAL_RPC_CURRENT_BLOCK: AtomicU64 = AtomicU64::new(0);

/// Only sets the external RPC current block number if it is equals or greater than the current one.
fn set_external_rpc_current_block(new_number: BlockNumber) {
let new_number_u64 = new_number.as_u64();
let _ = EXTERNAL_RPC_CURRENT_BLOCK.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current_number| {
if_else!(new_number_u64 >= current_number, Some(new_number_u64), None)
Some(current_number.max(new_number.as_u64()))
});
}

Expand Down
6 changes: 4 additions & 2 deletions src/eth/primitives/block_number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,17 @@ impl BlockNumber {
}
}

/// Converts itself to i64.
pub fn as_i64(&self) -> i64 {
self.0.as_u64() as i64
}

/// Converts itself to u64.
pub fn as_u64(&self) -> u64 {
self.0.as_u64()
}

pub fn as_u32(&self) -> u32 {
self.0.as_u64() as u32
}
}

impl Dummy<Faker> for BlockNumber {
Expand Down
8 changes: 4 additions & 4 deletions src/eth/storage/permanent/rocks/rocks_permanent.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::path::Path;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering;
use std::time::Duration;

Expand All @@ -23,7 +23,7 @@ use crate::eth::storage::PermanentStorage;
#[derive(Debug)]
pub struct RocksPermanentStorage {
pub state: RocksStorageState,
block_number: AtomicU64,
block_number: AtomicU32,
}

impl RocksPermanentStorage {
Expand Down Expand Up @@ -82,7 +82,7 @@ impl PermanentStorage for RocksPermanentStorage {
}

fn set_mined_block_number(&self, number: BlockNumber) -> anyhow::Result<()> {
self.block_number.store(number.as_u64(), Ordering::SeqCst);
self.block_number.store(number.as_u32(), Ordering::SeqCst);
Ok(())
}

Expand Down Expand Up @@ -150,7 +150,7 @@ impl PermanentStorage for RocksPermanentStorage {

#[cfg(feature = "dev")]
fn reset(&self) -> anyhow::Result<()> {
self.block_number.store(0u64, Ordering::SeqCst);
self.block_number.store(0u32, Ordering::SeqCst);
self.state.reset().inspect_err(|e| {
tracing::error!(reason = ?e, "failed to reset in RocksPermanent");
})
Expand Down
27 changes: 14 additions & 13 deletions src/eth/storage/permanent/rocks/rocks_state.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::fmt;
use std::fmt::Debug;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
Expand Down Expand Up @@ -185,10 +185,10 @@ impl RocksStorageState {
self.db_path.rsplit('/').next().unwrap_or(&self.db_path)
}

pub fn preload_block_number(&self) -> Result<AtomicU64> {
let block_number = self.blocks_by_number.last_key()?.unwrap_or_default();
pub fn preload_block_number(&self) -> Result<AtomicU32> {
let block_number = self.blocks_by_number.last_key()?.unwrap_or_default().0;
tracing::info!(%block_number, "preloaded block_number");
Ok((u64::from(block_number)).into())
Ok(AtomicU32::new(block_number))
}

#[cfg(feature = "dev")]
Expand Down Expand Up @@ -257,13 +257,14 @@ impl RocksStorageState {
return log_and_err!("the block that the transaction was supposed to be in was not found")
.with_context(|| format!("block_number = {:?} tx_hash = {}", block_number, tx_hash));
};
let block = block.into_inner();

let transaction = block.into_inner().transactions.into_iter().find(|tx| Hash::from(tx.input.hash) == tx_hash);
let transaction = block.transactions.into_iter().find(|tx| Hash::from(tx.input.hash) == tx_hash);

match transaction {
Some(tx) => {
tracing::trace!(%tx_hash, "transaction found");
Ok(Some(tx.into()))
Ok(Some(TransactionMined::from_rocks_primitives(tx, block_number.into_inner(), block.header.hash)))
}
None => log_and_err!("rocks error, transaction wasn't found in block where the index pointed at")
.with_context(|| format!("block_number = {:?} tx_hash = {}", block_number, tx_hash)),
Expand All @@ -289,12 +290,12 @@ impl RocksStorageState {
break;
}

let logs = block
.into_inner()
.transactions
.into_iter()
.flat_map(|transaction| transaction.logs)
.map(LogMined::from);
let block = block.into_inner();
let logs = block.transactions.into_iter().enumerate().flat_map(|(tx_index, transaction)| {
transaction.logs.into_iter().enumerate().map(move |(log_index, log)| {
LogMined::from_rocks_primitives(log, block.header.number, block.header.hash, tx_index, transaction.input.hash, log_index)
})
});

let filtered_logs = logs.filter(|log| filter.matches(log));
logs_result.extend(filtered_logs);
Expand Down Expand Up @@ -405,7 +406,7 @@ impl RocksStorageState {
self.accounts_history.prepare_batch_insertion(
accounts.iter().cloned().map(|acc| {
let tup = <(AddressRocksdb, AccountRocksdb)>::from(acc);
((tup.0, 0u64.into()), tup.1.into())
((tup.0, 0u32.into()), tup.1.into())
}),
&mut write_batch,
)?;
Expand Down
49 changes: 26 additions & 23 deletions src/eth/storage/permanent/rocks/types/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,28 +48,31 @@ impl From<Block> for BlockRocksdb {

impl From<BlockRocksdb> for Block {
fn from(item: BlockRocksdb) -> Self {
Block {
header: BlockHeader {
number: BlockNumber::from(item.header.number),
hash: Hash::from(item.header.hash),
transactions_root: Hash::from(item.header.transactions_root),
gas_used: item.header.gas_used.into(),
gas_limit: item.header.gas_limit.into(),
bloom: item.header.bloom.into(),
timestamp: item.header.timestamp.into(),
parent_hash: Hash::from(item.header.parent_hash),
author: Address::from(item.header.author),
extra_data: item.header.extra_data.into(),
miner: Address::from(item.header.miner),
difficulty: item.header.difficulty.into(),
receipts_root: Hash::from(item.header.receipts_root),
uncle_hash: Hash::from(item.header.uncle_hash),
size: item.header.size.into(),
state_root: Hash::from(item.header.state_root),
total_difficulty: item.header.total_difficulty.into(),
nonce: item.header.nonce.into(),
},
transactions: item.transactions.into_iter().map(TransactionMined::from).collect(),
}
let header = BlockHeader {
number: BlockNumber::from(item.header.number),
hash: Hash::from(item.header.hash),
transactions_root: Hash::from(item.header.transactions_root),
gas_used: item.header.gas_used.into(),
gas_limit: item.header.gas_limit.into(),
bloom: item.header.bloom.into(),
timestamp: item.header.timestamp.into(),
parent_hash: Hash::from(item.header.parent_hash),
author: Address::from(item.header.author),
extra_data: item.header.extra_data.into(),
miner: Address::from(item.header.miner),
difficulty: item.header.difficulty.into(),
receipts_root: Hash::from(item.header.receipts_root),
uncle_hash: Hash::from(item.header.uncle_hash),
size: item.header.size.into(),
state_root: Hash::from(item.header.state_root),
total_difficulty: item.header.total_difficulty.into(),
nonce: item.header.nonce.into(),
};
let transactions = item
.transactions
.into_iter()
.map(|tx| TransactionMined::from_rocks_primitives(tx, header.number.into(), header.hash.into()))
.collect();
Block { header, transactions }
}
}
14 changes: 7 additions & 7 deletions src/eth/storage/permanent/rocks/types/block_number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ use crate::eth::primitives::BlockNumber;
use crate::gen_newtype_from;

#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Ord, PartialOrd, Hash, derive_more::Display, fake::Dummy)]
pub struct BlockNumberRocksdb(pub u64);
pub struct BlockNumberRocksdb(pub u32);

gen_newtype_from!(self = BlockNumberRocksdb, other = u8, u16, u32, u64);
gen_newtype_from!(self = BlockNumberRocksdb, other = u8, u16, u32);

impl From<BlockNumber> for BlockNumberRocksdb {
fn from(item: BlockNumber) -> Self {
item.0.as_u64().into()
Self(item.as_u32())
}
}

Expand All @@ -21,7 +21,7 @@ impl From<BlockNumberRocksdb> for BlockNumber {
}
}

impl From<BlockNumberRocksdb> for u64 {
impl From<BlockNumberRocksdb> for u32 {
fn from(value: BlockNumberRocksdb) -> Self {
value.0
}
Expand All @@ -35,14 +35,14 @@ impl serde::Serialize for BlockNumberRocksdb {

impl<'de> serde::Deserialize<'de> for BlockNumberRocksdb {
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
u64::deserialize(deserializer).map(|v| Self(u64::from_be(v)))
u32::deserialize(deserializer).map(|v| Self(u32::from_be(v)))
}
}

impl Add<u64> for BlockNumberRocksdb {
impl Add<u32> for BlockNumberRocksdb {
type Output = Self;

fn add(self, other: u64) -> Self {
fn add(self, other: u32) -> Self {
BlockNumberRocksdb(self.0 + other)
}
}
10 changes: 5 additions & 5 deletions src/eth/storage/permanent/rocks/types/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@ use std::fmt::Debug;
use crate::eth::primitives::Index;

#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, derive_more::Add, Copy, Hash, fake::Dummy)]
pub struct IndexRocksdb(u64);
pub struct IndexRocksdb(pub(self) u32);

impl IndexRocksdb {
pub fn inner_value(&self) -> u64 {
self.0
pub fn as_usize(&self) -> usize {
self.0 as usize
}
}

impl From<Index> for IndexRocksdb {
fn from(item: Index) -> Self {
IndexRocksdb(item.0)
IndexRocksdb(item.0 as u32)
}
}

impl From<IndexRocksdb> for Index {
fn from(item: IndexRocksdb) -> Self {
Index::new(item.inner_value())
Index::new(item.0 as u64)
}
}
43 changes: 19 additions & 24 deletions src/eth/storage/permanent/rocks/types/log_mined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,37 @@ use std::fmt::Debug;

use super::block_number::BlockNumberRocksdb;
use super::hash::HashRocksdb;
use super::index::IndexRocksdb;
use super::log::LogRocksdb;
use crate::eth::primitives::Index;
use crate::eth::primitives::LogMined;

#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, fake::Dummy)]
pub struct LogMinedRockdb {
pub struct LogMinedRocksdb {
pub log: LogRocksdb,
pub transaction_hash: HashRocksdb,
pub transaction_index: IndexRocksdb,
pub log_index: IndexRocksdb,
pub block_number: BlockNumberRocksdb,
pub block_hash: HashRocksdb,
}

impl From<LogMined> for LogMinedRockdb {
impl From<LogMined> for LogMinedRocksdb {
fn from(item: LogMined) -> Self {
Self {
log: item.log.into(),
transaction_hash: item.transaction_hash.into(),
transaction_index: item.transaction_index.into(),
log_index: item.log_index.into(),
block_number: item.block_number.into(),
block_hash: item.block_hash.into(),
}
Self { log: item.log.into() }
}
}

impl From<LogMinedRockdb> for LogMined {
fn from(item: LogMinedRockdb) -> Self {
impl LogMined {
pub fn from_rocks_primitives(
other: LogMinedRocksdb,
block_number: BlockNumberRocksdb,
block_hash: HashRocksdb,
tx_index: usize,
tx_hash: HashRocksdb,
log_index: usize,
) -> Self {
Self {
log: item.log.into(),
transaction_hash: item.transaction_hash.into(),
transaction_index: item.transaction_index.into(),
log_index: item.log_index.into(),
block_number: item.block_number.into(),
block_hash: item.block_hash.into(),
block_number: block_number.into(),
block_hash: block_hash.into(),
log: other.log.into(),
transaction_hash: tx_hash.into(),
transaction_index: Index::from(tx_index as u64),
log_index: Index::from(log_index as u64),
}
}
}
Loading

0 comments on commit de60e3e

Please sign in to comment.