Skip to content

Commit

Permalink
Hybrid insert slots (#381)
Browse files Browse the repository at this point in the history
* chore: implement a semaphore on insert

* chore: add a paralel between connections and config

* chore: fine tune hybrid db

* chore: add account slots query

* lint

* fix: add missing file

* chore: simplify types

* lint

* chore: move heavy logic into query executor

* chore: refactore query executor

* fix: proper function calls

* fix: add missing account changes filler

* chore: trim memory and add slots

* chore: add slots into hybrid storage
  • Loading branch information
renancloudwalk authored Mar 18, 2024
1 parent d0dbd7f commit f03f6ec
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 32 deletions.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 1 addition & 12 deletions src/eth/storage/hybrid/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,19 +340,8 @@ impl PermanentStorage for HybridPermanentStorage {
tracing::debug!(number = %block.number(), "saving block");
let block = Arc::new(block);
let number = block.number();
state.blocks_by_hash.truncate(600);
state.blocks_by_number.insert(*number, Arc::clone(&block));
state.blocks_by_hash.insert(block.hash().clone(), Arc::clone(&block));

// save transactions
for transaction in block.transactions.clone() {
tracing::debug!(hash = %transaction.input.hash, "saving transaction");
state.transactions.insert(transaction.input.hash.clone(), transaction.clone());
if transaction.is_success() {
for log in transaction.logs {
state.logs.push(log);
}
}
}

// save block account changes
for changes in account_changes.clone() {
Expand Down
27 changes: 27 additions & 0 deletions src/eth/storage/hybrid/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use super::BlockTask;
use crate::eth::primitives::Address;
use crate::eth::primitives::Bytes;
use crate::eth::primitives::Nonce;
use crate::eth::primitives::SlotIndex;
use crate::eth::primitives::SlotValue;
use crate::eth::primitives::Wei;

type BlockNumbers = Vec<i64>;
Expand All @@ -19,6 +21,7 @@ type Weis = Vec<Wei>;
type Nonces = Vec<Nonce>;

type AccountChanges = (BlockNumbers, Addresses, OptionalBytes, Weis, Nonces);
type AccountSlotChanges = (BlockNumbers, Vec<SlotIndex>, Addresses, Vec<SlotValue>);

async fn execute_with_retry<F, Fut>(mut attempt: F, max_attempts: u32, initial_delay: Duration) -> Result<(), sqlx::Error>
where
Expand Down Expand Up @@ -46,6 +49,7 @@ pub async fn commit_eventually(pool: Arc<Pool<Postgres>>, block_task: BlockTask)
let block_data = serde_json::to_value(&block_task.block_data).unwrap();
let account_changes = serde_json::to_value(&block_task.account_changes).unwrap();
let mut accounts_changes: AccountChanges = (Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new());
let mut accounts_slots_changes: AccountSlotChanges = (Vec::new(), Vec::new(), Vec::new(), Vec::new());

for changes in block_task.account_changes.clone() {
let (original_nonce, new_nonce) = changes.nonce.take_both();
Expand All @@ -62,6 +66,15 @@ pub async fn commit_eventually(pool: Arc<Pool<Postgres>>, block_task: BlockTask)
None
});

for (_, slot) in changes.slots {
if let Some(slot) = slot.take_modified() {
accounts_slots_changes.0.push(block_task.block_number.clone().as_i64());
accounts_slots_changes.1.push(slot.index);
accounts_slots_changes.2.push(changes.address.clone());
accounts_slots_changes.3.push(slot.value.clone());
}
}

accounts_changes.0.push(block_task.block_number.clone().as_i64());
accounts_changes.1.push(changes.address.clone());
accounts_changes.2.push(bytecode);
Expand Down Expand Up @@ -100,6 +113,20 @@ pub async fn commit_eventually(pool: Arc<Pool<Postgres>>, block_task: BlockTask)
.await?;
}

if !accounts_slots_changes.0.is_empty() {
sqlx::query!(
"INSERT INTO public.neo_account_slots (block_number, slot_index, account_address, value)
SELECT * FROM UNNEST($1::bigint[], $2::bytea[], $3::bytea[], $4::bytea[])
AS t(block_number, slot_index, account_address, value);",
accounts_slots_changes.0 as _,
accounts_slots_changes.1 as _,
accounts_slots_changes.2 as _,
accounts_slots_changes.3 as _,
)
.execute(&mut *tx)
.await?;
}

tx.commit().await?;
Ok(())
},
Expand Down
4 changes: 2 additions & 2 deletions static/schema/001-init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -743,11 +743,11 @@ CREATE TABLE public.neo_accounts (

CREATE TABLE public.neo_account_slots (
block_number BIGINT NOT NULL,
index INT NOT NULL,
slot_index BYTEA NOT NULL,
account_address BYTEA NOT NULL,
value BYTEA,
created_at TIMESTAMP WITHOUT TIME ZONE DEFAULT now(),
PRIMARY KEY (account_address, index, block_number),
PRIMARY KEY (account_address, slot_index, block_number),
FOREIGN KEY (block_number) REFERENCES public.neo_blocks(block_number)
);

Expand Down

0 comments on commit f03f6ec

Please sign in to comment.