Skip to content

Commit

Permalink
Rocks types (#648)
Browse files Browse the repository at this point in the history
* chore: use block number as prefix

* chore: add back reset for logs

* chore: implement indexed iter

* reset based on indexes

* fix: add index deletion

* chore: use indexed block numbers on historic accounts and slots

* lint

* fix: add missing index save

* chore: add a limit of backups

* do the reset based on minimal block on all databases

* lint

* define account rocksdb type

* implement slot_value_rocksdb
  • Loading branch information
renancloudwalk authored Apr 20, 2024
1 parent 70594ae commit 4fb3529
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 23 deletions.
8 changes: 8 additions & 0 deletions src/eth/primitives/slot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,14 @@ impl SlotValue {
pub fn as_u256(&self) -> U256 {
self.0
}

pub fn new(value: U256) -> Self {
SlotValue(value)
}

pub fn inner_value(&self) -> U256 {
self.0
}
}

impl FromStr for SlotValue {
Expand Down
12 changes: 6 additions & 6 deletions src/eth/storage/rocks/rocks_permanent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::eth::primitives::SlotSample;
use crate::eth::primitives::SlotValue;
use crate::eth::primitives::StoragePointInTime;
use crate::eth::primitives::TransactionMined;
use crate::eth::storage::rocks::rocks_state::AccountInfo;
use crate::eth::storage::rocks::rocks_state::AccountRocksdb;
use crate::eth::storage::PermanentStorage;
use crate::eth::storage::StorageError;

Expand Down Expand Up @@ -85,9 +85,9 @@ impl RocksPermanentStorage {
for (slot_index, slot_change) in &change.slots {
if let Some(value) = state.account_slots.get(&(address.clone(), slot_index.clone())) {
if let Some(original_slot) = slot_change.take_original_ref() {
let account_slot_value = value.clone();
if original_slot.value != account_slot_value {
conflicts.add_slot(address.clone(), slot_index.clone(), account_slot_value, original_slot.value.clone());
let account_slot_value: SlotValue = value.clone().into();
if original_slot.value != account_slot_value.clone() {
conflicts.add_slot(address.clone(), slot_index.clone(), account_slot_value.clone(), original_slot.value.clone());
}
}
}
Expand Down Expand Up @@ -232,7 +232,7 @@ impl PermanentStorage for RocksPermanentStorage {
for account in accounts {
self.state.accounts.insert(
account.address.clone(),
AccountInfo {
AccountRocksdb {
balance: account.balance.clone(),
nonce: account.nonce.clone(),
bytecode: account.bytecode.clone(),
Expand All @@ -242,7 +242,7 @@ impl PermanentStorage for RocksPermanentStorage {

self.state.accounts_history.insert(
(account.address.clone(), 0.into()),
AccountInfo {
AccountRocksdb {
balance: account.balance.clone(),
nonce: account.nonce.clone(),
bytecode: account.bytecode.clone(),
Expand Down
59 changes: 42 additions & 17 deletions src/eth/storage/rocks/rocks_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::atomic::AtomicU64;
use std::sync::Arc;

use anyhow::anyhow;
use ethereum_types::U256;
use futures::future::join_all;
use itertools::Itertools;
use num_traits::cast::ToPrimitive;
Expand Down Expand Up @@ -37,14 +38,14 @@ use crate::eth::storage::rocks_db::RocksDb;
use crate::log_and_err;

#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
pub struct AccountInfo {
pub struct AccountRocksdb {
pub balance: Wei,
pub nonce: Nonce,
pub bytecode: Option<Bytes>,
pub code_hash: CodeHash,
}

impl AccountInfo {
impl AccountRocksdb {
pub fn to_account(&self, address: &Address) -> Account {
Account {
address: address.clone(),
Expand All @@ -58,11 +59,32 @@ impl AccountInfo {
}
}

#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct SlotValueRocksdb(U256);

impl SlotValueRocksdb {
pub fn inner_value(&self) -> U256 {
self.0.clone()
}
}

impl From<SlotValue> for SlotValueRocksdb {
fn from(item: SlotValue) -> Self {
SlotValueRocksdb(item.inner_value())
}
}

impl From<SlotValueRocksdb> for SlotValue {
fn from(item: SlotValueRocksdb) -> Self {
SlotValue::new(item.inner_value())
}
}

pub struct RocksStorageState {
pub accounts: Arc<RocksDb<Address, AccountInfo>>,
pub accounts_history: Arc<RocksDb<(Address, BlockNumber), AccountInfo>>,
pub account_slots: Arc<RocksDb<(Address, SlotIndex), SlotValue>>,
pub account_slots_history: Arc<RocksDb<(Address, SlotIndex, BlockNumber), SlotValue>>,
pub accounts: Arc<RocksDb<Address, AccountRocksdb>>,
pub accounts_history: Arc<RocksDb<(Address, BlockNumber), AccountRocksdb>>,
pub account_slots: Arc<RocksDb<(Address, SlotIndex), SlotValueRocksdb>>,
pub account_slots_history: Arc<RocksDb<(Address, SlotIndex, BlockNumber), SlotValueRocksdb>>,
pub transactions: Arc<RocksDb<Hash, BlockNumber>>,
pub blocks_by_number: Arc<RocksDb<BlockNumber, Block>>,
pub blocks_by_hash: Arc<RocksDb<Hash, BlockNumber>>,
Expand Down Expand Up @@ -98,10 +120,10 @@ impl RocksStorageState {
}

pub fn listen_for_backup_trigger(&self, rx: mpsc::Receiver<()>) -> anyhow::Result<()> {
let accounts = Arc::<RocksDb<Address, AccountInfo>>::clone(&self.accounts);
let accounts_history = Arc::<RocksDb<(Address, BlockNumber), AccountInfo>>::clone(&self.accounts_history);
let account_slots = Arc::<RocksDb<(Address, SlotIndex), SlotValue>>::clone(&self.account_slots);
let account_slots_history = Arc::<RocksDb<(Address, SlotIndex, BlockNumber), SlotValue>>::clone(&self.account_slots_history);
let accounts = Arc::<RocksDb<Address, AccountRocksdb>>::clone(&self.accounts);
let accounts_history = Arc::<RocksDb<(Address, BlockNumber), AccountRocksdb>>::clone(&self.accounts_history);
let account_slots = Arc::<RocksDb<(Address, SlotIndex), SlotValueRocksdb>>::clone(&self.account_slots);
let account_slots_history = Arc::<RocksDb<(Address, SlotIndex, BlockNumber), SlotValueRocksdb>>::clone(&self.account_slots_history);
let blocks_by_hash = Arc::<RocksDb<Hash, BlockNumber>>::clone(&self.blocks_by_hash);
let blocks_by_number = Arc::<RocksDb<BlockNumber, Block>>::clone(&self.blocks_by_number);
let transactions = Arc::<RocksDb<Hash, BlockNumber>>::clone(&self.transactions);
Expand Down Expand Up @@ -381,7 +403,7 @@ impl RocksStorageState {
let account_changes_future = tokio::task::spawn_blocking(move || {
for change in changes_clone_for_accounts {
let address = change.address.clone();
let mut account_info_entry = accounts.entry_or_insert_with(address.clone(), || AccountInfo {
let mut account_info_entry = accounts.entry_or_insert_with(address.clone(), || AccountRocksdb {
balance: Wei::ZERO, // Initialize with default values
nonce: Nonce::ZERO,
bytecode: None,
Expand Down Expand Up @@ -413,8 +435,8 @@ impl RocksStorageState {
let address = change.address.clone();
for (slot_index, slot_change) in change.slots.clone() {
if let Some(slot) = slot_change.take_modified() {
slot_changes.push(((address.clone(), slot_index.clone()), slot.value.clone()));
slot_history_changes.push(((address.clone(), slot_index, block_number), slot.value));
slot_changes.push(((address.clone(), slot_index.clone()), slot.value.clone().into()));
slot_history_changes.push(((address.clone(), slot_index, block_number), slot.value.into()));
}
}
}
Expand Down Expand Up @@ -473,7 +495,7 @@ impl RocksStorageState {
match point_in_time {
StoragePointInTime::Present => self.account_slots.get(&(address.clone(), index.clone())).map(|account_slot_value| Slot {
index: index.clone(),
value: account_slot_value.clone(),
value: account_slot_value.clone().into(),
}),
StoragePointInTime::Past(number) => {
if let Some(((rocks_address, rocks_index, _), value)) = self
Expand All @@ -482,7 +504,10 @@ impl RocksStorageState {
.next()
{
if index == &rocks_index && address == &rocks_address {
return Some(Slot { index: rocks_index, value });
return Some(Slot {
index: rocks_index,
value: value.into(),
});
}
}
None
Expand Down Expand Up @@ -546,7 +571,7 @@ impl RocksStorageState {
for account in accounts {
account_batch.push((
account.address,
AccountInfo {
AccountRocksdb {
balance: account.balance,
nonce: account.nonce,
bytecode: account.bytecode,
Expand All @@ -563,7 +588,7 @@ impl RocksStorageState {
let mut slot_batch = vec![];

for (address, slot) in slots {
slot_batch.push(((address, slot.index), slot.value));
slot_batch.push(((address, slot.index), slot.value.into()));
}
self.account_slots.insert_batch(slot_batch, Some(block_number.into()));
}
Expand Down

0 comments on commit 4fb3529

Please sign in to comment.