Skip to content

Commit

Permalink
Rocks types (#649)
Browse files Browse the repository at this point in the history
* chore: use block number as prefix

* chore: add back reset for logs

* chore: implement indexed iter

* reset based on indexes

* fix: add index deletion

* chore: use indexed block numbers on historic accounts and slots

* lint

* fix: add missing index save

* chore: add a limit of backups

* do the reset based on minimal block on all databases

* lint

* define account rocksdb type

* implement slot_value_rocksdb

* lint

* chore: move address into rocksdb

* clippy

* lint
  • Loading branch information
renancloudwalk authored Apr 20, 2024
1 parent 4fb3529 commit 78263c5
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 21 deletions.
8 changes: 8 additions & 0 deletions src/eth/primitives/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ impl Address {
Self(H160(bytes))
}

pub fn new_from_h160(h160: H160) -> Self {
Self(h160)
}

/// Checks if current address is the zero address.
pub fn is_zero(&self) -> bool {
self == &Self::ZERO
Expand All @@ -63,6 +67,10 @@ impl Address {
pub fn is_ignored(&self) -> bool {
self.is_coinbase() || self.is_zero()
}

pub fn inner_value(&self) -> H160 {
self.0
}
}

impl Display for Address {
Expand Down
8 changes: 4 additions & 4 deletions src/eth/storage/rocks/rocks_permanent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl RocksPermanentStorage {
for change in account_changes {
let address = &change.address;

if let Some(account) = state.accounts.get(address) {
if let Some(account) = state.accounts.get(&(*address).clone().into()) {
// check account info conflicts
if let Some(original_nonce) = change.nonce.take_original_ref() {
let account_nonce = &account.nonce;
Expand All @@ -83,7 +83,7 @@ impl RocksPermanentStorage {
}
// check slots conflicts
for (slot_index, slot_change) in &change.slots {
if let Some(value) = state.account_slots.get(&(address.clone(), slot_index.clone())) {
if let Some(value) = state.account_slots.get(&(address.clone().into(), slot_index.clone())) {
if let Some(original_slot) = slot_change.take_original_ref() {
let account_slot_value: SlotValue = value.clone().into();
if original_slot.value != account_slot_value.clone() {
Expand Down Expand Up @@ -231,7 +231,7 @@ impl PermanentStorage for RocksPermanentStorage {

for account in accounts {
self.state.accounts.insert(
account.address.clone(),
account.address.clone().into(),
AccountRocksdb {
balance: account.balance.clone(),
nonce: account.nonce.clone(),
Expand All @@ -241,7 +241,7 @@ impl PermanentStorage for RocksPermanentStorage {
);

self.state.accounts_history.insert(
(account.address.clone(), 0.into()),
(account.address.clone().into(), 0.into()),
AccountRocksdb {
balance: account.balance.clone(),
nonce: account.nonce.clone(),
Expand Down
57 changes: 40 additions & 17 deletions src/eth/storage/rocks/rocks_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::atomic::AtomicU64;
use std::sync::Arc;

use anyhow::anyhow;
use ethereum_types::H160;
use ethereum_types::U256;
use futures::future::join_all;
use itertools::Itertools;
Expand Down Expand Up @@ -80,11 +81,32 @@ impl From<SlotValueRocksdb> for SlotValue {
}
}

#[derive(Debug, Clone, Default, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
pub struct AddressRocksdb(H160);

impl AddressRocksdb {
pub fn inner_value(&self) -> H160 {
self.0.clone()
}
}

impl From<Address> for AddressRocksdb {
fn from(item: Address) -> Self {
AddressRocksdb(item.inner_value())
}
}

impl From<AddressRocksdb> for Address {
fn from(item: AddressRocksdb) -> Self {
Address::new_from_h160(item.inner_value())
}
}

pub struct RocksStorageState {
pub accounts: Arc<RocksDb<Address, AccountRocksdb>>,
pub accounts_history: Arc<RocksDb<(Address, BlockNumber), AccountRocksdb>>,
pub account_slots: Arc<RocksDb<(Address, SlotIndex), SlotValueRocksdb>>,
pub account_slots_history: Arc<RocksDb<(Address, SlotIndex, BlockNumber), SlotValueRocksdb>>,
pub accounts: Arc<RocksDb<AddressRocksdb, AccountRocksdb>>,
pub accounts_history: Arc<RocksDb<(AddressRocksdb, BlockNumber), AccountRocksdb>>,
pub account_slots: Arc<RocksDb<(AddressRocksdb, SlotIndex), SlotValueRocksdb>>,
pub account_slots_history: Arc<RocksDb<(AddressRocksdb, SlotIndex, BlockNumber), SlotValueRocksdb>>,
pub transactions: Arc<RocksDb<Hash, BlockNumber>>,
pub blocks_by_number: Arc<RocksDb<BlockNumber, Block>>,
pub blocks_by_hash: Arc<RocksDb<Hash, BlockNumber>>,
Expand Down Expand Up @@ -120,10 +142,10 @@ impl RocksStorageState {
}

pub fn listen_for_backup_trigger(&self, rx: mpsc::Receiver<()>) -> anyhow::Result<()> {
let accounts = Arc::<RocksDb<Address, AccountRocksdb>>::clone(&self.accounts);
let accounts_history = Arc::<RocksDb<(Address, BlockNumber), AccountRocksdb>>::clone(&self.accounts_history);
let account_slots = Arc::<RocksDb<(Address, SlotIndex), SlotValueRocksdb>>::clone(&self.account_slots);
let account_slots_history = Arc::<RocksDb<(Address, SlotIndex, BlockNumber), SlotValueRocksdb>>::clone(&self.account_slots_history);
let accounts = Arc::<RocksDb<AddressRocksdb, AccountRocksdb>>::clone(&self.accounts);
let accounts_history = Arc::<RocksDb<(AddressRocksdb, BlockNumber), AccountRocksdb>>::clone(&self.accounts_history);
let account_slots = Arc::<RocksDb<(AddressRocksdb, SlotIndex), SlotValueRocksdb>>::clone(&self.account_slots);
let account_slots_history = Arc::<RocksDb<(AddressRocksdb, SlotIndex, BlockNumber), SlotValueRocksdb>>::clone(&self.account_slots_history);
let blocks_by_hash = Arc::<RocksDb<Hash, BlockNumber>>::clone(&self.blocks_by_hash);
let blocks_by_number = Arc::<RocksDb<BlockNumber, Block>>::clone(&self.blocks_by_number);
let transactions = Arc::<RocksDb<Hash, BlockNumber>>::clone(&self.transactions);
Expand Down Expand Up @@ -402,7 +424,7 @@ impl RocksStorageState {

let account_changes_future = tokio::task::spawn_blocking(move || {
for change in changes_clone_for_accounts {
let address = change.address.clone();
let address: AddressRocksdb = change.address.clone().into();
let mut account_info_entry = accounts.entry_or_insert_with(address.clone(), || AccountRocksdb {
balance: Wei::ZERO, // Initialize with default values
nonce: Nonce::ZERO,
Expand Down Expand Up @@ -432,7 +454,7 @@ impl RocksStorageState {

let slot_changes_future = tokio::task::spawn_blocking(move || {
for change in changes_clone_for_slots {
let address = change.address.clone();
let address: AddressRocksdb = change.address.clone().into();
for (slot_index, slot_change) in change.slots.clone() {
if let Some(slot) = slot_change.take_modified() {
slot_changes.push(((address.clone(), slot_index.clone()), slot.value.clone().into()));
Expand Down Expand Up @@ -493,7 +515,7 @@ impl RocksStorageState {

pub fn read_slot(&self, address: &Address, index: &SlotIndex, point_in_time: &StoragePointInTime) -> Option<Slot> {
match point_in_time {
StoragePointInTime::Present => self.account_slots.get(&(address.clone(), index.clone())).map(|account_slot_value| Slot {
StoragePointInTime::Present => self.account_slots.get(&(address.clone().into(), index.clone())).map(|account_slot_value| Slot {
index: index.clone(),
value: account_slot_value.clone().into(),
}),
Expand All @@ -503,7 +525,7 @@ impl RocksStorageState {
.iter_from((address.clone(), index.clone(), *number), rocksdb::Direction::Reverse)
.next()
{
if index == &rocks_index && address == &rocks_address {
if index == &rocks_index && rocks_address == (*address).clone().into() {
return Some(Slot {
index: rocks_index,
value: value.into(),
Expand All @@ -517,7 +539,7 @@ impl RocksStorageState {

pub fn read_account(&self, address: &Address, point_in_time: &StoragePointInTime) -> Option<Account> {
match point_in_time {
StoragePointInTime::Present => match self.accounts.get(address) {
StoragePointInTime::Present => match self.accounts.get(&((*address).clone().into())) {
Some(inner_account) => {
let account = inner_account.to_account(address);
tracing::trace!(%address, ?account, "account found");
Expand All @@ -530,12 +552,13 @@ impl RocksStorageState {
}
},
StoragePointInTime::Past(block_number) => {
let rocks_address: AddressRocksdb = address.clone().into();
if let Some(((addr, _), account_info)) = self
.accounts_history
.iter_from((address.clone(), *block_number), rocksdb::Direction::Reverse)
.iter_from((rocks_address, *block_number), rocksdb::Direction::Reverse)
.next()
{
if address == &addr {
if addr == (*address).clone().into() {
return Some(account_info.to_account(address));
}
}
Expand Down Expand Up @@ -570,7 +593,7 @@ impl RocksStorageState {
let mut account_batch = vec![];
for account in accounts {
account_batch.push((
account.address,
account.address.into(),
AccountRocksdb {
balance: account.balance,
nonce: account.nonce,
Expand All @@ -588,7 +611,7 @@ impl RocksStorageState {
let mut slot_batch = vec![];

for (address, slot) in slots {
slot_batch.push(((address, slot.index), slot.value.into()));
slot_batch.push(((address.into(), slot.index), slot.value.into()));
}
self.account_slots.insert_batch(slot_batch, Some(block_number.into()));
}
Expand Down

0 comments on commit 78263c5

Please sign in to comment.