Skip to content

Commit

Permalink
fix: logs on rocksdb (#532)
Browse files Browse the repository at this point in the history
  • Loading branch information
renancloudwalk authored Apr 6, 2024
1 parent de002c8 commit d409e38
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 70 deletions.
2 changes: 1 addition & 1 deletion src/eth/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ mod inmemory;
mod permanent_storage;
mod postgres_external_rpc;
mod postgres_permanent;
pub mod rocks_db;
//XXX mod rocks;
pub mod rocks_db;
mod sled;
mod storage_error;
mod stratus_storage;
Expand Down
92 changes: 23 additions & 69 deletions src/eth/storage/rocks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,7 @@ impl RocksPermanentStorage {
let state = RocksStorageState::new();
let block_number = Self::preload_block_number(&state).await?;
state.load_latest_data().await?;
Ok(Self {
state,
block_number,
})
Ok(Self { state, block_number })
}

async fn preload_block_number(state: &RocksStorageState) -> anyhow::Result<AtomicU64> {
Expand Down Expand Up @@ -148,28 +145,30 @@ impl PermanentStorage for RocksPermanentStorage {

async fn maybe_read_account(&self, address: &Address, point_in_time: &StoragePointInTime) -> anyhow::Result<Option<Account>> {
let account = match point_in_time {
StoragePointInTime::Present => {
match self.state.accounts.get(address) {
Some(inner_account) => {
let account = inner_account.to_account(address).await;
tracing::trace!(%address, ?account, "account found");
Some(account)
}
StoragePointInTime::Present => match self.state.accounts.get(address) {
Some(inner_account) => {
let account = inner_account.to_account(address).await;
tracing::trace!(%address, ?account, "account found");
Some(account)
}

None => {
tracing::trace!(%address, "account not found");
None
}
None => {
tracing::trace!(%address, "account not found");
None
}
}
},
StoragePointInTime::Past(block_number) => {
if let Some(((addr, _), account_info)) = self.state.accounts_history.iter_from((address.clone(), *block_number), rocksdb::Direction::Reverse).next() {
if let Some(((addr, _), account_info)) = self
.state
.accounts_history
.iter_from((address.clone(), *block_number), rocksdb::Direction::Reverse)
.next()
{
if address == &addr {
return Ok(Some(account_info.to_account(address).await));
}
}
return Ok(None)

return Ok(None);
}
};
Ok(account)
Expand Down Expand Up @@ -198,9 +197,7 @@ impl PermanentStorage for RocksPermanentStorage {
tracing::trace!(?selection, ?block, "block found");
Ok(Some(block))
}
None => {
Ok(None)
}
None => Ok(None),
}
}

Expand All @@ -212,9 +209,7 @@ impl PermanentStorage for RocksPermanentStorage {
tracing::trace!(%hash, "transaction found in memory");
Ok(Some(transaction.clone()))
}
None => {
Ok(None)
}
None => Ok(None),
}
}

Expand Down Expand Up @@ -337,7 +332,6 @@ impl PermanentStorage for RocksPermanentStorage {
}
}


#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
pub struct AccountInfo {
pub balance: Wei,
Expand Down Expand Up @@ -463,49 +457,10 @@ impl RocksStorageState {
None => true,
})
.filter_map(|(_, log)| if filter.matches(&log) { Some(log) } else { None });
Ok(vec![])

//XXX let log_query_builder = &mut QueryBuilder::new(
//XXX r#"
//XXX SELECT log_data
//XXX FROM neo_logs
//XXX "#,
//XXX );
//XXX log_query_builder.push(" WHERE block_number >= ").push_bind(filter.from_block);

//XXX // verifies if to_block exists
//XXX if let Some(block_number) = filter.to_block {
//XXX log_query_builder.push(" AND block_number <= ").push_bind(block_number);
//XXX }

//XXX for address in filter.addresses.iter() {
//XXX log_query_builder.push(" AND address = ").push_bind(address);
//XXX }

//XXX let log_query = log_query_builder.build();

//XXX let query_result = log_query.fetch_all(pool).await?;

//XXX let pg_logs = query_result
//XXX .into_iter()
//XXX .map(|row| {
//XXX let json: Json<LogMined> = row.get("log_data");
//XXX json.0
//XXX })
//XXX .filter(|log| filter.matches(log))
//XXX .chain(logs) // we chain the iterators because it might be the case that some logs are yet to be written to pg
//XXX .unique_by(|log| (log.block_number, log.log_index, log.transaction_hash.clone()))
//XXX .collect();

//XXX Ok(pg_logs)
Ok(logs.collect::<Vec<_>>())
}

pub async fn get_slot_at_point(
&self,
address: &Address,
slot_index: &SlotIndex,
point_in_time: &StoragePointInTime,
) -> anyhow::Result<Option<Slot>> {
pub async fn get_slot_at_point(&self, address: &Address, slot_index: &SlotIndex, point_in_time: &StoragePointInTime) -> anyhow::Result<Option<Slot>> {
let slot = match point_in_time {
StoragePointInTime::Present => self.account_slots.get(&(address.clone(), slot_index.clone())).map(|account_slot_value| Slot {
index: slot_index.clone(),
Expand All @@ -520,9 +475,8 @@ impl RocksStorageState {
if slot_index == &index && address == &addr {
return Ok(Some(Slot { index, value }));
}
}
}
return Ok(None);

}
};
Ok(slot)
Expand Down

0 comments on commit d409e38

Please sign in to comment.