From d0dbd7fd4dc341a6ee9003b699b74facad9f0c64 Mon Sep 17 00:00:00 2001 From: renancloudwalk <53792026+renancloudwalk@users.noreply.github.com> Date: Mon, 18 Mar 2024 07:49:55 -0300 Subject: [PATCH] Refactor hybrid storage consistency (#378) * chore: implement a semaphore on insert * chore: add a paralel between connections and config * chore: fine tune hybrid db * chore: add account slots query * lint * fix: add missing file * chore: simplify types * lint * chore: move heavy logic into query executor * chore: refactore query executor * fix: proper function calls * fix: add missing account changes filler * lint --- ...585c0333dc2de6412fe5cff8594e5dc4b183f.json | 18 +++ src/eth/storage/hybrid/mod.rs | 99 +--------------- src/eth/storage/hybrid/query_executor.rs | 111 ++++++++++++++++++ 3 files changed, 131 insertions(+), 97 deletions(-) create mode 100644 .sqlx/query-94064f50a8d7041651be00c2892585c0333dc2de6412fe5cff8594e5dc4b183f.json create mode 100644 src/eth/storage/hybrid/query_executor.rs diff --git a/.sqlx/query-94064f50a8d7041651be00c2892585c0333dc2de6412fe5cff8594e5dc4b183f.json b/.sqlx/query-94064f50a8d7041651be00c2892585c0333dc2de6412fe5cff8594e5dc4b183f.json new file mode 100644 index 000000000..7b0e14c6a --- /dev/null +++ b/.sqlx/query-94064f50a8d7041651be00c2892585c0333dc2de6412fe5cff8594e5dc4b183f.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO public.neo_accounts (block_number, address, bytecode, balance, nonce)\n SELECT * FROM UNNEST($1::bigint[], $2::bytea[], $3::bytea[], $4::numeric[], $5::numeric[])\n AS t(block_number, address, bytecode, balance, nonce);", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8Array", + "ByteaArray", + "ByteaArray", + "NumericArray", + "NumericArray" + ] + }, + "nullable": [] + }, + "hash": "94064f50a8d7041651be00c2892585c0333dc2de6412fe5cff8594e5dc4b183f" +} diff --git a/src/eth/storage/hybrid/mod.rs b/src/eth/storage/hybrid/mod.rs index 4b67d44e4..6867dc63f 100644 --- a/src/eth/storage/hybrid/mod.rs +++ b/src/eth/storage/hybrid/mod.rs @@ -1,4 +1,4 @@ -//! In-memory storage implementations. +mod query_executor; use std::collections::HashMap; use std::sync::atomic::Ordering; @@ -20,7 +20,6 @@ use tokio::sync::RwLock; use tokio::sync::RwLockReadGuard; use tokio::sync::RwLockWriteGuard; use tokio::sync::Semaphore; -use tokio::time::sleep; use crate::eth::primitives::Account; use crate::eth::primitives::Address; @@ -45,14 +44,6 @@ use crate::eth::storage::inmemory::InMemoryHistory; use crate::eth::storage::PermanentStorage; use crate::eth::storage::StorageError; -type BlockNumbers = Vec; -type Addresses = Vec
; -type OptionalBytes = Vec>; -type Weis = Vec; -type Nonces = Vec; - -type AccountChanges = (BlockNumbers, Addresses, OptionalBytes, Weis, Nonces); - #[derive(Debug)] struct BlockTask { block_number: BlockNumber, @@ -138,95 +129,9 @@ impl HybridPermanentStorage { let pool_clone = Arc::clone(&pool); let semaphore_clone = Arc::clone(&semaphore); - // Here we attempt to insert the block data into the database. - // Adjust the SQL query according to your table schema. - let block_data = serde_json::to_value(&block_task.block_data).unwrap(); - let account_changes = serde_json::to_value(&block_task.account_changes).unwrap(); - let max_attempts = 7; - tokio::spawn(async move { let permit = semaphore_clone.acquire_owned().await.expect("Failed to acquire semaphore permit"); - let mut attempts = 0; - - loop { - let mut accounts_changes: AccountChanges = (Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new()); - - for changes in block_task.account_changes.clone() { - let (original_nonce, new_nonce) = changes.nonce.take_both(); - let (original_balance, new_balance) = changes.balance.take_both(); - - let original_nonce = original_nonce.unwrap_or_default(); - let original_balance = original_balance.unwrap_or_default(); - - let nonce = new_nonce.clone().unwrap_or(original_nonce.clone()); - let balance = new_balance.clone().unwrap_or(original_balance.clone()); - - let bytecode = changes.bytecode.take().unwrap_or_else(|| { - tracing::debug!("bytecode not set, defaulting to None"); - None - }); - - accounts_changes.0.push(block_task.block_number.clone().as_i64()); - accounts_changes.1.push(changes.address.clone()); - accounts_changes.2.push(bytecode); - accounts_changes.3.push(balance); - accounts_changes.4.push(nonce); - } - - if !accounts_changes.0.is_empty() { - dbg!(&accounts_changes); - let result = sqlx::query!( - "INSERT INTO public.neo_accounts (block_number, address, bytecode, balance, nonce) - SELECT * FROM UNNEST($1::bigint[], $2::bytea[], $3::bytea[], $4::numeric[], $5::numeric[]) - AS t(block_number, address, bytecode, balance, nonce);", - accounts_changes.0 as _, - accounts_changes.1 as _, - accounts_changes.2 as _, - accounts_changes.3 as _, - accounts_changes.4 as _, - ) - .execute(&*pool_clone) - .await; - - match result { - Ok(_) => println!("Accounts inserted successfully."), - Err(e) => println!("Failed to insert accounts: {}", e), - } - } - - let result = sqlx::query!( - "INSERT INTO neo_blocks (block_number, block_hash, block, account_changes, created_at) VALUES ($1, $2, $3, $4, NOW());", - block_task.block_number as _, - block_task.block_hash as _, - block_data as _, - account_changes as _, - ) - .execute(&*pool_clone) - .await; - - match result { - Ok(_) => { - tracing::info!("Block {} inserted successfully.", block_task.block_number); - break; - } - Err(e) => { - if let sqlx::Error::PoolTimedOut = e { - attempts += 1; - if attempts >= max_attempts { - // Set a maximum number of retries - tracing::error!("Failed to insert block {} after {} attempts: {}", block_task.block_number, attempts, e); - break; - } - tracing::warn!("PoolTimedOut error occurred, retrying. Attempt: {}", attempts); - sleep(Duration::from_secs(1)).await; // Wait before retrying - } else { - tracing::error!("Failed to insert block {}: {}", block_task.block_number, e); - break; - } - } - } - } - + query_executor::commit_eventually(pool_clone, block_task).await; drop(permit); }); } diff --git a/src/eth/storage/hybrid/query_executor.rs b/src/eth/storage/hybrid/query_executor.rs new file mode 100644 index 000000000..d630b154f --- /dev/null +++ b/src/eth/storage/hybrid/query_executor.rs @@ -0,0 +1,111 @@ +use std::sync::Arc; + +use sqlx::Acquire; +use sqlx::Pool; +use sqlx::Postgres; +use tokio::time::sleep; +use tokio::time::Duration; + +use super::BlockTask; +use crate::eth::primitives::Address; +use crate::eth::primitives::Bytes; +use crate::eth::primitives::Nonce; +use crate::eth::primitives::Wei; + +type BlockNumbers = Vec; +type Addresses = Vec
; +type OptionalBytes = Vec>; +type Weis = Vec; +type Nonces = Vec; + +type AccountChanges = (BlockNumbers, Addresses, OptionalBytes, Weis, Nonces); + +async fn execute_with_retry(mut attempt: F, max_attempts: u32, initial_delay: Duration) -> Result<(), sqlx::Error> +where + F: FnMut() -> Fut, + Fut: std::future::Future>, +{ + let mut attempts = 0; + let mut delay = initial_delay; + + loop { + match attempt().await { + Ok(_) => return Ok(()), + Err(e) if attempts < max_attempts => { + attempts += 1; + tracing::warn!("Attempt {} failed, retrying in {:?}: {}", attempts, delay, e); + sleep(delay).await; + delay *= 2; // Exponential backoff + } + Err(e) => return Err(e), + } + } +} + +pub async fn commit_eventually(pool: Arc>, block_task: BlockTask) { + let block_data = serde_json::to_value(&block_task.block_data).unwrap(); + let account_changes = serde_json::to_value(&block_task.account_changes).unwrap(); + let mut accounts_changes: AccountChanges = (Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new()); + + for changes in block_task.account_changes.clone() { + let (original_nonce, new_nonce) = changes.nonce.take_both(); + let (original_balance, new_balance) = changes.balance.take_both(); + + let original_nonce = original_nonce.unwrap_or_default(); + let original_balance = original_balance.unwrap_or_default(); + + let nonce = new_nonce.clone().unwrap_or(original_nonce.clone()); + let balance = new_balance.clone().unwrap_or(original_balance.clone()); + + let bytecode = changes.bytecode.take().unwrap_or_else(|| { + tracing::debug!("bytecode not set, defaulting to None"); + None + }); + + accounts_changes.0.push(block_task.block_number.clone().as_i64()); + accounts_changes.1.push(changes.address.clone()); + accounts_changes.2.push(bytecode); + accounts_changes.3.push(balance); + accounts_changes.4.push(nonce); + } + + let pool_clone = Arc::>::clone(&pool); + execute_with_retry( + || async { + let mut conn = pool_clone.acquire().await?; + let mut tx = conn.begin().await?; + + sqlx::query!( + "INSERT INTO neo_blocks (block_number, block_hash, block, account_changes, created_at) VALUES ($1, $2, $3, $4, NOW());", + block_task.block_number as _, + block_task.block_hash as _, + block_data as _, + account_changes as _, + ) + .execute(&mut *tx) + .await?; + + if !accounts_changes.0.is_empty() { + sqlx::query!( + "INSERT INTO public.neo_accounts (block_number, address, bytecode, balance, nonce) + SELECT * FROM UNNEST($1::bigint[], $2::bytea[], $3::bytea[], $4::numeric[], $5::numeric[]) + AS t(block_number, address, bytecode, balance, nonce);", + accounts_changes.0 as _, + accounts_changes.1 as _, + accounts_changes.2 as _, + accounts_changes.3 as _, + accounts_changes.4 as _, + ) + .execute(&mut *tx) + .await?; + } + + tx.commit().await?; + Ok(()) + }, + 3, + Duration::from_millis(2), + ) + .await + .expect("Failed to commit after multiple attempts."); +}