Skip to content

Commit

Permalink
Refactor hybrid storage consistency (#378)
Browse files Browse the repository at this point in the history
* chore: implement a semaphore on insert

* chore: add a paralel between connections and config

* chore: fine tune hybrid db

* chore: add account slots query

* lint

* fix: add missing file

* chore: simplify types

* lint

* chore: move heavy logic into query executor

* chore: refactore query executor

* fix: proper function calls

* fix: add missing account changes filler

* lint
  • Loading branch information
renancloudwalk authored Mar 18, 2024
1 parent 180db84 commit d0dbd7f
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 97 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

99 changes: 2 additions & 97 deletions src/eth/storage/hybrid/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! In-memory storage implementations.
mod query_executor;

use std::collections::HashMap;
use std::sync::atomic::Ordering;
Expand All @@ -20,7 +20,6 @@ use tokio::sync::RwLock;
use tokio::sync::RwLockReadGuard;
use tokio::sync::RwLockWriteGuard;
use tokio::sync::Semaphore;
use tokio::time::sleep;

use crate::eth::primitives::Account;
use crate::eth::primitives::Address;
Expand All @@ -45,14 +44,6 @@ use crate::eth::storage::inmemory::InMemoryHistory;
use crate::eth::storage::PermanentStorage;
use crate::eth::storage::StorageError;

type BlockNumbers = Vec<i64>;
type Addresses = Vec<Address>;
type OptionalBytes = Vec<Option<Bytes>>;
type Weis = Vec<Wei>;
type Nonces = Vec<Nonce>;

type AccountChanges = (BlockNumbers, Addresses, OptionalBytes, Weis, Nonces);

#[derive(Debug)]
struct BlockTask {
block_number: BlockNumber,
Expand Down Expand Up @@ -138,95 +129,9 @@ impl HybridPermanentStorage {
let pool_clone = Arc::clone(&pool);
let semaphore_clone = Arc::clone(&semaphore);

// Here we attempt to insert the block data into the database.
// Adjust the SQL query according to your table schema.
let block_data = serde_json::to_value(&block_task.block_data).unwrap();
let account_changes = serde_json::to_value(&block_task.account_changes).unwrap();
let max_attempts = 7;

tokio::spawn(async move {
let permit = semaphore_clone.acquire_owned().await.expect("Failed to acquire semaphore permit");
let mut attempts = 0;

loop {
let mut accounts_changes: AccountChanges = (Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new());

for changes in block_task.account_changes.clone() {
let (original_nonce, new_nonce) = changes.nonce.take_both();
let (original_balance, new_balance) = changes.balance.take_both();

let original_nonce = original_nonce.unwrap_or_default();
let original_balance = original_balance.unwrap_or_default();

let nonce = new_nonce.clone().unwrap_or(original_nonce.clone());
let balance = new_balance.clone().unwrap_or(original_balance.clone());

let bytecode = changes.bytecode.take().unwrap_or_else(|| {
tracing::debug!("bytecode not set, defaulting to None");
None
});

accounts_changes.0.push(block_task.block_number.clone().as_i64());
accounts_changes.1.push(changes.address.clone());
accounts_changes.2.push(bytecode);
accounts_changes.3.push(balance);
accounts_changes.4.push(nonce);
}

if !accounts_changes.0.is_empty() {
dbg!(&accounts_changes);
let result = sqlx::query!(
"INSERT INTO public.neo_accounts (block_number, address, bytecode, balance, nonce)
SELECT * FROM UNNEST($1::bigint[], $2::bytea[], $3::bytea[], $4::numeric[], $5::numeric[])
AS t(block_number, address, bytecode, balance, nonce);",
accounts_changes.0 as _,
accounts_changes.1 as _,
accounts_changes.2 as _,
accounts_changes.3 as _,
accounts_changes.4 as _,
)
.execute(&*pool_clone)
.await;

match result {
Ok(_) => println!("Accounts inserted successfully."),
Err(e) => println!("Failed to insert accounts: {}", e),
}
}

let result = sqlx::query!(
"INSERT INTO neo_blocks (block_number, block_hash, block, account_changes, created_at) VALUES ($1, $2, $3, $4, NOW());",
block_task.block_number as _,
block_task.block_hash as _,
block_data as _,
account_changes as _,
)
.execute(&*pool_clone)
.await;

match result {
Ok(_) => {
tracing::info!("Block {} inserted successfully.", block_task.block_number);
break;
}
Err(e) => {
if let sqlx::Error::PoolTimedOut = e {
attempts += 1;
if attempts >= max_attempts {
// Set a maximum number of retries
tracing::error!("Failed to insert block {} after {} attempts: {}", block_task.block_number, attempts, e);
break;
}
tracing::warn!("PoolTimedOut error occurred, retrying. Attempt: {}", attempts);
sleep(Duration::from_secs(1)).await; // Wait before retrying
} else {
tracing::error!("Failed to insert block {}: {}", block_task.block_number, e);
break;
}
}
}
}

query_executor::commit_eventually(pool_clone, block_task).await;
drop(permit);
});
}
Expand Down
111 changes: 111 additions & 0 deletions src/eth/storage/hybrid/query_executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use std::sync::Arc;

use sqlx::Acquire;
use sqlx::Pool;
use sqlx::Postgres;
use tokio::time::sleep;
use tokio::time::Duration;

use super::BlockTask;
use crate::eth::primitives::Address;
use crate::eth::primitives::Bytes;
use crate::eth::primitives::Nonce;
use crate::eth::primitives::Wei;

type BlockNumbers = Vec<i64>;
type Addresses = Vec<Address>;
type OptionalBytes = Vec<Option<Bytes>>;
type Weis = Vec<Wei>;
type Nonces = Vec<Nonce>;

type AccountChanges = (BlockNumbers, Addresses, OptionalBytes, Weis, Nonces);

async fn execute_with_retry<F, Fut>(mut attempt: F, max_attempts: u32, initial_delay: Duration) -> Result<(), sqlx::Error>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<(), sqlx::Error>>,
{
let mut attempts = 0;
let mut delay = initial_delay;

loop {
match attempt().await {
Ok(_) => return Ok(()),
Err(e) if attempts < max_attempts => {
attempts += 1;
tracing::warn!("Attempt {} failed, retrying in {:?}: {}", attempts, delay, e);
sleep(delay).await;
delay *= 2; // Exponential backoff
}
Err(e) => return Err(e),
}
}
}

pub async fn commit_eventually(pool: Arc<Pool<Postgres>>, block_task: BlockTask) {
let block_data = serde_json::to_value(&block_task.block_data).unwrap();
let account_changes = serde_json::to_value(&block_task.account_changes).unwrap();
let mut accounts_changes: AccountChanges = (Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new());

for changes in block_task.account_changes.clone() {
let (original_nonce, new_nonce) = changes.nonce.take_both();
let (original_balance, new_balance) = changes.balance.take_both();

let original_nonce = original_nonce.unwrap_or_default();
let original_balance = original_balance.unwrap_or_default();

let nonce = new_nonce.clone().unwrap_or(original_nonce.clone());
let balance = new_balance.clone().unwrap_or(original_balance.clone());

let bytecode = changes.bytecode.take().unwrap_or_else(|| {
tracing::debug!("bytecode not set, defaulting to None");
None
});

accounts_changes.0.push(block_task.block_number.clone().as_i64());
accounts_changes.1.push(changes.address.clone());
accounts_changes.2.push(bytecode);
accounts_changes.3.push(balance);
accounts_changes.4.push(nonce);
}

let pool_clone = Arc::<sqlx::Pool<sqlx::Postgres>>::clone(&pool);
execute_with_retry(
|| async {
let mut conn = pool_clone.acquire().await?;
let mut tx = conn.begin().await?;

sqlx::query!(
"INSERT INTO neo_blocks (block_number, block_hash, block, account_changes, created_at) VALUES ($1, $2, $3, $4, NOW());",
block_task.block_number as _,
block_task.block_hash as _,
block_data as _,
account_changes as _,
)
.execute(&mut *tx)
.await?;

if !accounts_changes.0.is_empty() {
sqlx::query!(
"INSERT INTO public.neo_accounts (block_number, address, bytecode, balance, nonce)
SELECT * FROM UNNEST($1::bigint[], $2::bytea[], $3::bytea[], $4::numeric[], $5::numeric[])
AS t(block_number, address, bytecode, balance, nonce);",
accounts_changes.0 as _,
accounts_changes.1 as _,
accounts_changes.2 as _,
accounts_changes.3 as _,
accounts_changes.4 as _,
)
.execute(&mut *tx)
.await?;
}

tx.commit().await?;
Ok(())
},
3,
Duration::from_millis(2),
)
.await
.expect("Failed to commit after multiple attempts.");
}

0 comments on commit d0dbd7f

Please sign in to comment.