From d409e38e669926e769bd21eb16f96a048693ee92 Mon Sep 17 00:00:00 2001 From: renancloudwalk <53792026+renancloudwalk@users.noreply.github.com> Date: Sat, 6 Apr 2024 18:22:29 -0300 Subject: [PATCH] fix: logs on rocksdb (#532) --- src/eth/storage/mod.rs | 2 +- src/eth/storage/rocks/mod.rs | 92 +++++++++--------------------------- 2 files changed, 24 insertions(+), 70 deletions(-) diff --git a/src/eth/storage/mod.rs b/src/eth/storage/mod.rs index 9feb85029..a4d79b6d4 100644 --- a/src/eth/storage/mod.rs +++ b/src/eth/storage/mod.rs @@ -7,8 +7,8 @@ mod inmemory; mod permanent_storage; mod postgres_external_rpc; mod postgres_permanent; -pub mod rocks_db; //XXX mod rocks; +pub mod rocks_db; mod sled; mod storage_error; mod stratus_storage; diff --git a/src/eth/storage/rocks/mod.rs b/src/eth/storage/rocks/mod.rs index ddda60499..641b9291f 100644 --- a/src/eth/storage/rocks/mod.rs +++ b/src/eth/storage/rocks/mod.rs @@ -52,10 +52,7 @@ impl RocksPermanentStorage { let state = RocksStorageState::new(); let block_number = Self::preload_block_number(&state).await?; state.load_latest_data().await?; - Ok(Self { - state, - block_number, - }) + Ok(Self { state, block_number }) } async fn preload_block_number(state: &RocksStorageState) -> anyhow::Result { @@ -148,28 +145,30 @@ impl PermanentStorage for RocksPermanentStorage { async fn maybe_read_account(&self, address: &Address, point_in_time: &StoragePointInTime) -> anyhow::Result> { let account = match point_in_time { - StoragePointInTime::Present => { - match self.state.accounts.get(address) { - Some(inner_account) => { - let account = inner_account.to_account(address).await; - tracing::trace!(%address, ?account, "account found"); - Some(account) - } + StoragePointInTime::Present => match self.state.accounts.get(address) { + Some(inner_account) => { + let account = inner_account.to_account(address).await; + tracing::trace!(%address, ?account, "account found"); + Some(account) + } - None => { - tracing::trace!(%address, "account not found"); - None - } + None => { + tracing::trace!(%address, "account not found"); + None } - } + }, StoragePointInTime::Past(block_number) => { - if let Some(((addr, _), account_info)) = self.state.accounts_history.iter_from((address.clone(), *block_number), rocksdb::Direction::Reverse).next() { + if let Some(((addr, _), account_info)) = self + .state + .accounts_history + .iter_from((address.clone(), *block_number), rocksdb::Direction::Reverse) + .next() + { if address == &addr { return Ok(Some(account_info.to_account(address).await)); } } - return Ok(None) - + return Ok(None); } }; Ok(account) @@ -198,9 +197,7 @@ impl PermanentStorage for RocksPermanentStorage { tracing::trace!(?selection, ?block, "block found"); Ok(Some(block)) } - None => { - Ok(None) - } + None => Ok(None), } } @@ -212,9 +209,7 @@ impl PermanentStorage for RocksPermanentStorage { tracing::trace!(%hash, "transaction found in memory"); Ok(Some(transaction.clone())) } - None => { - Ok(None) - } + None => Ok(None), } } @@ -337,7 +332,6 @@ impl PermanentStorage for RocksPermanentStorage { } } - #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] pub struct AccountInfo { pub balance: Wei, @@ -463,49 +457,10 @@ impl RocksStorageState { None => true, }) .filter_map(|(_, log)| if filter.matches(&log) { Some(log) } else { None }); - Ok(vec![]) - - //XXX let log_query_builder = &mut QueryBuilder::new( - //XXX r#" - //XXX SELECT log_data - //XXX FROM neo_logs - //XXX "#, - //XXX ); - //XXX log_query_builder.push(" WHERE block_number >= ").push_bind(filter.from_block); - - //XXX // verifies if to_block exists - //XXX if let Some(block_number) = filter.to_block { - //XXX log_query_builder.push(" AND block_number <= ").push_bind(block_number); - //XXX } - - //XXX for address in filter.addresses.iter() { - //XXX log_query_builder.push(" AND address = ").push_bind(address); - //XXX } - - //XXX let log_query = log_query_builder.build(); - - //XXX let query_result = log_query.fetch_all(pool).await?; - - //XXX let pg_logs = query_result - //XXX .into_iter() - //XXX .map(|row| { - //XXX let json: Json = row.get("log_data"); - //XXX json.0 - //XXX }) - //XXX .filter(|log| filter.matches(log)) - //XXX .chain(logs) // we chain the iterators because it might be the case that some logs are yet to be written to pg - //XXX .unique_by(|log| (log.block_number, log.log_index, log.transaction_hash.clone())) - //XXX .collect(); - - //XXX Ok(pg_logs) + Ok(logs.collect::>()) } - pub async fn get_slot_at_point( - &self, - address: &Address, - slot_index: &SlotIndex, - point_in_time: &StoragePointInTime, - ) -> anyhow::Result> { + pub async fn get_slot_at_point(&self, address: &Address, slot_index: &SlotIndex, point_in_time: &StoragePointInTime) -> anyhow::Result> { let slot = match point_in_time { StoragePointInTime::Present => self.account_slots.get(&(address.clone(), slot_index.clone())).map(|account_slot_value| Slot { index: slot_index.clone(), @@ -520,9 +475,8 @@ impl RocksStorageState { if slot_index == &index && address == &addr { return Ok(Some(Slot { index, value })); } - } + } return Ok(None); - } }; Ok(slot)