Skip to content

Commit

Permalink
Faster boot (#508)
Browse files Browse the repository at this point in the history
* chore: use concurrency to load data

* lint

* more lint
  • Loading branch information
renancloudwalk authored Apr 2, 2024
1 parent 9b1aa52 commit b66490c
Showing 1 changed file with 56 additions and 31 deletions.
87 changes: 56 additions & 31 deletions src/eth/storage/hybrid/hybrid_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use core::fmt;
use std::sync::Arc;

use anyhow::Context;
use futures::future::join_all;
use itertools::Itertools;
use revm::primitives::KECCAK_EMPTY;
use sqlx::types::BigDecimal;
Expand Down Expand Up @@ -88,9 +89,14 @@ impl HybridStorageState {
//XXX TODO use a fixed block_number during load, in order to avoid sync problem
// e.g other instance moving forward and this query getting incongruous data
pub async fn load_latest_data(&self, pool: &Pool<Postgres>) -> anyhow::Result<()> {
let account_rows = sqlx::query_as!(
AccountRow,
"
let pool_clone_accounts = pool.clone();
let self_clone_accounts = Arc::<RocksDb<Address, AccountInfo>>::clone(&self.accounts);

// Spawn the first blocking task for accounts
let accounts_task = tokio::spawn(async move {
let account_rows = sqlx::query_as!(
AccountRow,
"
SELECT DISTINCT ON (address)
address,
nonce,
Expand All @@ -103,27 +109,33 @@ impl HybridStorageState {
address,
block_number DESC
"
)
.fetch_all(pool)
.await?;

for account_row in account_rows {
let addr: Address = account_row.address.try_into().unwrap_or_default(); //XXX add alert
self.accounts.insert(
addr,
AccountInfo {
balance: account_row.balance.map(|b| b.try_into().unwrap_or_default()).unwrap_or_default(),
nonce: account_row.nonce.map(|n| n.try_into().unwrap_or_default()).unwrap_or_default(),
bytecode: account_row.bytecode.map(Bytes::from),
code_hash: account_row.code_hash,
},
);
}
)
.fetch_all(&pool_clone_accounts)
.await?;

for account_row in account_rows {
let addr: Address = account_row.address.try_into().unwrap_or_default();
self_clone_accounts.insert(
addr,
AccountInfo {
balance: account_row.balance.map(|b| b.try_into().unwrap_or_default()).unwrap_or_default(),
nonce: account_row.nonce.map(|n| n.try_into().unwrap_or_default()).unwrap_or_default(),
bytecode: account_row.bytecode.map(Bytes::from),
code_hash: account_row.code_hash,
},
);
}
Result::<(), anyhow::Error>::Ok(())
});

// Load slots
let slot_rows = sqlx::query_as!(
SlotRow,
"
let pool_clone_slots = pool.clone();
let self_clone_slots = Arc::<RocksDb<(Address, SlotIndex), SlotValue>>::clone(&self.account_slots);

// Spawn the second blocking task for slots
let slots_task = tokio::spawn(async move {
let slot_rows = sqlx::query_as!(
SlotRow,
"
SELECT DISTINCT ON (account_address, slot_index)
account_address,
slot_index,
Expand All @@ -135,14 +147,27 @@ impl HybridStorageState {
slot_index,
block_number DESC
"
)
.fetch_all(pool)
.await?;

for slot_row in slot_rows {
let addr: Address = slot_row.account_address.try_into().unwrap_or_default(); //XXX add alert
self.account_slots
.insert((addr, slot_row.slot_index), slot_row.value.unwrap_or_default().into());
)
.fetch_all(&pool_clone_slots)
.await?;

for slot_row in slot_rows {
let addr: Address = slot_row.account_address.try_into().unwrap_or_default();
self_clone_slots.insert((addr, slot_row.slot_index), slot_row.value.unwrap_or_default().into());
}
Result::<(), anyhow::Error>::Ok(())
});

// Await both tasks concurrently
let results = join_all(vec![accounts_task, slots_task]).await;

// Check the results of both tasks
for result in results {
match result {
Ok(Ok(())) => continue, // Successfully completed task
Ok(Err(e)) => return Err(e), // Task completed with an error
Err(e) => return Err(anyhow::Error::new(e)), // Task panicked
}
}

Ok(())
Expand Down

0 comments on commit b66490c

Please sign in to comment.