Skip to content

Commit

Permalink
Create missing partitions for logs and transacitons (#437)
Browse files Browse the repository at this point in the history
* chore: create partitions for block number

* chore: add partitions for logs and transactions
  • Loading branch information
renancloudwalk authored Mar 25, 2024
1 parent 021323d commit 14e59bb
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 47 deletions.

This file was deleted.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions src/eth/primitives/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ impl Hash {
pub fn zero() -> Self {
Self(H256::zero())
}

pub fn into_u8(self) -> u8 {
let n = self.0.to_low_u64_ne() % 10;
n as u8
}
}

impl Display for Hash {
Expand Down
22 changes: 14 additions & 8 deletions src/eth/storage/hybrid/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,19 +82,23 @@ pub async fn commit_eventually(pool: Arc<Pool<Postgres>>, block_task: BlockTask)
accounts_changes.4.push(nonce);
}

let mut transaction_batch = (Vec::new(), Vec::new(), Vec::new());
let mut logs_batch = (Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new());
let mut transaction_batch = (Vec::new(), Vec::new(), Vec::new(), Vec::new());
let mut logs_batch = (Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new());
for transaction in block_task.block_data.transactions {
let last_char_vec: u8 = transaction.input.hash.clone().into_u8();

for log in transaction.logs.iter() {
logs_batch.0.push(transaction.block_number);
logs_batch.1.push(transaction.input.hash.clone());
logs_batch.2.push(log.log.address.clone());
logs_batch.3.push(log.log_index);
logs_batch.4.push(serde_json::to_value(log).unwrap());
logs_batch.5.push(last_char_vec);
}
transaction_batch.0.push(transaction.block_number);
transaction_batch.1.push(transaction.input.hash.clone());
transaction_batch.2.push(serde_json::to_value(transaction).unwrap());
transaction_batch.3.push(last_char_vec);
}

let pool_clone = Arc::<sqlx::Pool<sqlx::Postgres>>::clone(&pool);
Expand Down Expand Up @@ -144,27 +148,29 @@ pub async fn commit_eventually(pool: Arc<Pool<Postgres>>, block_task: BlockTask)

if !transaction_batch.0.is_empty() {
sqlx::query!(
"INSERT INTO public.neo_transactions (block_number, hash, transaction_data)
SELECT * FROM UNNEST($1::bigint[], $2::bytea[], $3::jsonb[])
AS t(block_number, hash, transaction_data);",
"INSERT INTO public.neo_transactions (block_number, hash, transaction_data, hash_partition)
SELECT * FROM UNNEST($1::bigint[], $2::bytea[], $3::jsonb[], $4::smallint[])
AS t(block_number, hash, transaction_data, hash_partition);",
transaction_batch.0 as _,
transaction_batch.1 as _,
transaction_batch.2 as _,
transaction_batch.3 as _,
)
.execute(&mut *tx)
.await?;
}

if !logs_batch.0.is_empty() {
sqlx::query!(
"INSERT INTO public.neo_logs (block_number, hash, address, log_idx, log_data)
SELECT * FROM UNNEST($1::bigint[], $2::bytea[], $3::bytea[], $4::numeric[], $5::jsonb[])
AS t(block_number, hash, address, log_idx, log_data);",
"INSERT INTO public.neo_logs (block_number, hash, address, log_idx, log_data, hash_partition)
SELECT * FROM UNNEST($1::bigint[], $2::bytea[], $3::bytea[], $4::numeric[], $5::jsonb[], $6::smallint[])
AS t(block_number, hash, address, log_idx, log_data, hash_partition);",
logs_batch.0 as _,
logs_batch.1 as _,
logs_batch.2 as _,
logs_batch.3 as _,
logs_batch.4 as _,
logs_batch.5 as _,
)
.execute(&mut *tx)
.await?;
Expand Down
31 changes: 26 additions & 5 deletions static/schema/001-init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -786,21 +786,42 @@ END$$;

CREATE TABLE public.neo_transactions (
hash BYTEA NOT NULL,
hash_partition SMALLINT NOT NULL,
block_number BIGINT NOT NULL,
transaction_data JSONB NOT NULL,
created_at TIMESTAMP WITHOUT TIME ZONE DEFAULT now(),
PRIMARY KEY (hash)
);
PRIMARY KEY (hash, hash_partition)
) PARTITION BY LIST (hash_partition);
CREATE INDEX idx_neo_transactions_hash ON public.neo_transactions (hash);

CREATE TABLE public.neo_logs (
hash BYTEA NOT NULL,
hash_partition SMALLINT NOT NULL,
block_number BIGINT NOT NULL,
log_idx numeric NOT NULL,
log_idx NUMERIC NOT NULL,
address BYTEA NOT NULL,
log_data JSONB NOT NULL,
created_at TIMESTAMP WITHOUT TIME ZONE DEFAULT now(),
PRIMARY KEY (hash, block_number, log_idx)
);
PRIMARY KEY (hash, block_number, log_idx, hash_partition)
) PARTITION BY LIST (hash_partition);
CREATE INDEX idx_neo_logs_hash ON public.neo_logs (hash);

DO $$
DECLARE
partition_id INT;
BEGIN
FOR partition_id IN 0..9 LOOP
-- Create partitions for public.neo_transactions
EXECUTE format('CREATE TABLE public.neo_transactions_%s PARTITION OF public.neo_transactions FOR VALUES IN (%L);', partition_id, partition_id);
EXECUTE format('CREATE INDEX ON public.neo_transactions_%s (hash_partition);', partition_id);
EXECUTE format('CREATE INDEX ON public.neo_transactions_%s (hash);', partition_id);

-- Create partitions for public.neo_logs
EXECUTE format('CREATE TABLE public.neo_logs_%s PARTITION OF public.neo_logs FOR VALUES IN (%L);', partition_id, partition_id);
EXECUTE format('CREATE INDEX ON public.neo_logs_%s (hash_partition);', partition_id);
EXECUTE format('CREATE INDEX ON public.neo_logs_%s (hash);', partition_id);
END LOOP;
END$$;

-- XXX END

Expand Down

0 comments on commit 14e59bb

Please sign in to comment.