From 2e35da8ef39075fae74516a8c2de2df687023115 Mon Sep 17 00:00:00 2001 From: Renato Dinhani <101204870+dinhani-cw@users.noreply.github.com> Date: Wed, 28 Feb 2024 12:45:21 -0300 Subject: [PATCH] refactor: move postgres functions to _postgres.rs module to have reuse between importer jobs (#289) --- ...2c321c80d3e77c1b364e101ae96f4d42e2f9.json} | 4 +- src/bin/importer/_postgres.rs | 208 ++++++++++++++++++ src/bin/importer/importer-download.rs | 110 +-------- src/bin/importer/importer-import.rs | 132 +---------- .../sql/select_downloaded_blocks_in_range.sql | 3 + .../select_downloaded_receipts_in_range.sql | 4 +- 6 files changed, 233 insertions(+), 228 deletions(-) rename .sqlx/{query-86a8b3ca23e91ea82eab7f3902355523ee2b91f53da6a36c6fd8851f0f214975.json => query-730e617474d8bb7970702970ba162c321c80d3e77c1b364e101ae96f4d42e2f9.json} (66%) create mode 100644 src/bin/importer/_postgres.rs create mode 100644 src/bin/importer/sql/select_downloaded_blocks_in_range.sql diff --git a/.sqlx/query-86a8b3ca23e91ea82eab7f3902355523ee2b91f53da6a36c6fd8851f0f214975.json b/.sqlx/query-730e617474d8bb7970702970ba162c321c80d3e77c1b364e101ae96f4d42e2f9.json similarity index 66% rename from .sqlx/query-86a8b3ca23e91ea82eab7f3902355523ee2b91f53da6a36c6fd8851f0f214975.json rename to .sqlx/query-730e617474d8bb7970702970ba162c321c80d3e77c1b364e101ae96f4d42e2f9.json index 6889ba1a6..6ab05b5fb 100644 --- a/.sqlx/query-86a8b3ca23e91ea82eab7f3902355523ee2b91f53da6a36c6fd8851f0f214975.json +++ b/.sqlx/query-730e617474d8bb7970702970ba162c321c80d3e77c1b364e101ae96f4d42e2f9.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "select block_number, payload\nfrom external_receipts\nwhere block_number >= $1 and block_number <= $2;", + "query": "select\n block_number,\n payload\nfrom external_receipts\nwhere block_number >= $1 and block_number <= $2;", "describe": { "columns": [ { @@ -25,5 +25,5 @@ false ] }, - "hash": "86a8b3ca23e91ea82eab7f3902355523ee2b91f53da6a36c6fd8851f0f214975" + "hash": "730e617474d8bb7970702970ba162c321c80d3e77c1b364e101ae96f4d42e2f9" } diff --git a/src/bin/importer/_postgres.rs b/src/bin/importer/_postgres.rs new file mode 100644 index 000000000..bd4403849 --- /dev/null +++ b/src/bin/importer/_postgres.rs @@ -0,0 +1,208 @@ +#![allow(dead_code)] + +use serde_json::Value as JsonValue; +use sqlx::types::BigDecimal; +use sqlx::Row; +use stratus::eth::primitives::Address; +use stratus::eth::primitives::BlockNumber; +use stratus::eth::primitives::ExternalBlock; +use stratus::eth::primitives::ExternalReceipt; +use stratus::eth::primitives::Hash; +use stratus::eth::primitives::Wei; +use stratus::infra::postgres::Postgres; +use stratus::log_and_err; + +// ----------------------------------------------------------------------------- +// Queries +// ----------------------------------------------------------------------------- + +// Blocks + +pub async fn pg_retrieve_max_downloaded_block(pg: &Postgres, start: BlockNumber, end: BlockNumber) -> anyhow::Result> { + tracing::debug!(%start, %end, "retrieving max downloaded block"); + + let result = sqlx::query_file_scalar!("src/bin/importer/sql/select_max_downloaded_block_in_range.sql", start.as_i64(), end.as_i64()) + .fetch_one(&pg.connection_pool) + .await; + + match result { + Ok(Some(max)) => Ok(Some(max.into())), + Ok(None) => Ok(None), + Err(e) => log_and_err!(reason = e, "failed to retrieve max block number"), + } +} + +pub async fn pg_retrieve_max_imported_block(pg: &Postgres) -> anyhow::Result> { + tracing::debug!("retrieving max imported block"); + + let result = sqlx::query_file_scalar!("src/bin/importer/sql/select_max_imported_block.sql") + .fetch_one(&pg.connection_pool) + .await; + + let block_number: i64 = match result { + Ok(Some(max)) => max, + Ok(None) => return Ok(None), + Err(e) => return log_and_err!(reason = e, "failed to retrieve max block number"), + }; + + Ok(Some(block_number.into())) +} + +pub async fn pg_init_blocks_cursor(pg: &Postgres) -> anyhow::Result> { + let start = match pg_retrieve_max_imported_block(pg).await? { + Some(number) => number.next(), + None => BlockNumber::ZERO, + }; + tracing::info!(%start, "initing blocks cursor"); + + let mut tx = pg.start_transaction().await?; + let result = sqlx::query_file!("src/bin/importer/sql/cursor_declare_downloaded_blocks.sql", start.as_i64()) + .execute(&mut *tx) + .await; + + match result { + Ok(_) => Ok(tx), + Err(e) => log_and_err!(reason = e, "failed to open postgres cursor"), + } +} + +pub async fn pg_fetch_blocks(tx: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> anyhow::Result> { + tracing::debug!("fetching more blocks"); + + let result = sqlx::query_file_scalar!("src/bin/importer/sql/cursor_fetch_downloaded_blocks.sql") + .fetch_all(&mut **tx) + .await; + + match result { + Ok(rows) => { + let mut parsed_rows: Vec = Vec::with_capacity(rows.len()); + for row in rows { + let parsed = BlockRow { + number: row.get_unchecked::<'_, i64, usize>(0).into(), + payload: row.get_unchecked::<'_, JsonValue, usize>(1).try_into()?, + }; + parsed_rows.push(parsed); + } + Ok(parsed_rows) + } + Err(e) => log_and_err!(reason = e, "failed to fetch blocks from cursor"), + } +} + +// Receipts + +pub async fn pg_retrieve_downloaded_receipts(pg: &Postgres, start: BlockNumber, end: BlockNumber) -> anyhow::Result> { + tracing::debug!(%start, %end, "retrieving receipts in range"); + + let result = sqlx::query_file!("src/bin/importer/sql/select_downloaded_receipts_in_range.sql", start.as_i64(), end.as_i64()) + .fetch_all(&pg.connection_pool) + .await; + + match result { + Ok(rows) => { + let mut parsed_rows: Vec = Vec::with_capacity(rows.len()); + for row in rows { + let parsed = ReceiptRow { + block_number: row.block_number.into(), + payload: row.payload.try_into()?, + }; + + parsed_rows.push(parsed); + } + Ok(parsed_rows) + } + Err(e) => log_and_err!(reason = e, "failed to retrieve receipts"), + } +} + +// Balances + +pub async fn pg_retrieve_downloaded_balances(pg: &Postgres) -> anyhow::Result> { + tracing::debug!("retrieving downloaded balances"); + + let result = sqlx::query_file_as!(BalanceRow, "src/bin/importer/sql/select_downloaded_balances.sql") + .fetch_all(&pg.connection_pool) + .await; + + match result { + Ok(accounts) => Ok(accounts), + Err(e) => log_and_err!(reason = e, "failed to retrieve downloaded balances"), + } +} + +// ----------------------------------------------------------------------------- +// Inserts +// ----------------------------------------------------------------------------- +pub async fn pg_insert_balance(pg: &Postgres, address: Address, balance: Wei) -> anyhow::Result<()> { + tracing::debug!(%address, %balance, "saving external balance"); + + let result = sqlx::query_file!( + "src/bin/importer/sql/insert_external_balance.sql", + address.as_ref(), + TryInto::::try_into(balance)? + ) + .execute(&pg.connection_pool) + .await; + + match result { + Ok(_) => Ok(()), + Err(e) => log_and_err!(reason = e, "failed to insert external balance"), + } +} + +pub async fn pg_insert_block_and_receipts(pg: &Postgres, number: BlockNumber, block: JsonValue, receipts: Vec<(Hash, JsonValue)>) -> anyhow::Result<()> { + tracing::debug!(?block, ?receipts, "saving external block and receipts"); + + let mut tx = pg.start_transaction().await?; + + // insert block + let result = sqlx::query_file!("src/bin/importer/sql/insert_external_block.sql", number.as_i64(), block) + .execute(&mut *tx) + .await; + + match result { + Ok(_) => {} + Err(sqlx::Error::Database(e)) if e.is_unique_violation() => { + tracing::warn!(reason = ?e, "block unique violation, skipping"); + } + Err(e) => return log_and_err!(reason = e, "failed to insert block"), + } + + // insert receipts + for (hash, receipt) in receipts { + let result = sqlx::query_file!("src/bin/importer/sql/insert_external_receipt.sql", hash.as_ref(), number.as_i64(), receipt) + .execute(&mut *tx) + .await; + + match result { + Ok(_) => {} + Err(sqlx::Error::Database(e)) if e.is_unique_violation() => { + tracing::warn!(reason = ?e, "receipt unique violation, skipping"); + } + Err(e) => return log_and_err!(reason = e, "failed to insert receipt"), + } + } + + pg.commit_transaction(tx).await?; + + Ok(()) +} + +// ----------------------------------------------------------------------------- +// Types +// ----------------------------------------------------------------------------- + +pub struct BlockRow { + pub number: BlockNumber, + pub payload: ExternalBlock, +} + +pub struct ReceiptRow { + pub block_number: BlockNumber, + pub payload: ExternalReceipt, +} + +pub struct BalanceRow { + pub address: Address, + pub balance: Wei, +} diff --git a/src/bin/importer/importer-download.rs b/src/bin/importer/importer-download.rs index 8f6b1e3bd..60d0a038e 100644 --- a/src/bin/importer/importer-download.rs +++ b/src/bin/importer/importer-download.rs @@ -1,19 +1,19 @@ +mod _postgres; + use std::cmp::min; use std::sync::Arc; use std::time::Duration; +use _postgres::*; use anyhow::anyhow; use anyhow::Context; use futures::StreamExt; use futures::TryStreamExt; use itertools::Itertools; -use serde_json::Value as JsonValue; -use sqlx::types::BigDecimal; use stratus::config::ImporterDownloadConfig; use stratus::eth::primitives::Address; use stratus::eth::primitives::BlockNumber; use stratus::eth::primitives::Hash; -use stratus::eth::primitives::Wei; use stratus::ext::not; use stratus::infra::postgres::Postgres; use stratus::infra::BlockchainClient; @@ -48,7 +48,7 @@ async fn download_balances(pg: &Postgres, chain: &BlockchainClient, accounts: Ve } // retrieve downloaded balances - let downloaded_balances = db_retrieve_balances(pg).await?; + let downloaded_balances = pg_retrieve_downloaded_balances(pg).await?; let downloaded_balances_addresses = downloaded_balances.iter().map(|balance| &balance.address).collect_vec(); // keep only accounts that must be downloaded @@ -60,7 +60,7 @@ async fn download_balances(pg: &Postgres, chain: &BlockchainClient, accounts: Ve // download missing balances for address in address_to_download { let balance = chain.get_balance(&address, Some(BlockNumber::ZERO)).await?; - db_insert_balance(pg, address, balance).await?; + pg_insert_balance(pg, address, balance).await?; } Ok(()) @@ -95,7 +95,7 @@ async fn download_blocks(pg: Arc, chain: Arc, parale async fn download(pg: Arc, chain: Arc, start: BlockNumber, end_inclusive: BlockNumber) -> anyhow::Result<()> { // calculate current block - let mut current = match db_retrieve_max_downloaded_block(&pg, start, end_inclusive).await? { + let mut current = match pg_retrieve_max_downloaded_block(&pg, start, end_inclusive).await? { Some(number) => number.next(), None => start, }; @@ -147,7 +147,7 @@ async fn download(pg: Arc, chain: Arc, start: BlockN } // save block and receipts - if let Err(e) = db_insert_block_and_receipts(&pg, current, block_json, receipts_json).await { + if let Err(e) = pg_insert_block_and_receipts(&pg, current, block_json, receipts_json).await { tracing::warn!(reason = ?e, "retrying because failed to save block"); continue; } @@ -160,101 +160,7 @@ async fn download(pg: Arc, chain: Arc, start: BlockN } // ----------------------------------------------------------------------------- -// Postgres -// ----------------------------------------------------------------------------- - -struct BalanceRow { - address: Address, - #[allow(dead_code)] // allow it because the SQL returns it, even if it is not used - balance: Wei, -} - -async fn db_retrieve_balances(pg: &Postgres) -> anyhow::Result> { - tracing::debug!("retrieving downloaded balances"); - - let result = sqlx::query_file_as!(BalanceRow, "src/bin/importer/sql/select_downloaded_balances.sql") - .fetch_all(&pg.connection_pool) - .await; - match result { - Ok(accounts) => Ok(accounts), - Err(e) => log_and_err!(reason = e, "failed to retrieve downloaded balances"), - } -} - -async fn db_retrieve_max_downloaded_block(pg: &Postgres, start: BlockNumber, end: BlockNumber) -> anyhow::Result> { - tracing::debug!(%start, %end, "retrieving max downloaded block"); - - let result = sqlx::query_file_scalar!("src/bin/importer/sql/select_max_downloaded_block_in_range.sql", start.as_i64(), end.as_i64()) - .fetch_one(&pg.connection_pool) - .await; - match result { - Ok(Some(max)) => Ok(Some(max.into())), - Ok(None) => Ok(None), - Err(e) => log_and_err!(reason = e, "failed to retrieve max block number"), - } -} - -async fn db_insert_balance(pg: &Postgres, address: Address, balance: Wei) -> anyhow::Result<()> { - tracing::debug!(%address, %balance, "saving external balance"); - - let result = sqlx::query_file!( - "src/bin/importer/sql/insert_external_balance.sql", - address.as_ref(), - TryInto::::try_into(balance)? - ) - .execute(&pg.connection_pool) - .await; - - match result { - Ok(_) => Ok(()), - Err(e) => log_and_err!(reason = e, "failed to insert external balance"), - } -} - -async fn db_insert_block_and_receipts(pg: &Postgres, number: BlockNumber, block: JsonValue, receipts: Vec<(Hash, JsonValue)>) -> anyhow::Result<()> { - tracing::debug!(?block, ?receipts, "saving external block and receipts"); - - let mut tx = pg.start_transaction().await?; - - // insert block - let result = sqlx::query_file!("src/bin/importer/sql/insert_external_block.sql", number.as_i64(), block) - .execute(&mut *tx) - .await; - - match result { - Ok(_) => {} - Err(sqlx::Error::Database(e)) if e.is_unique_violation() => { - tracing::warn!(reason = ?e, "block unique violation, skipping"); - } - Err(e) => { - return log_and_err!(reason = e, "failed to insert block"); - } - } - - // insert receipts - for (hash, receipt) in receipts { - let result = sqlx::query_file!("src/bin/importer/sql/insert_external_receipt.sql", hash.as_ref(), number.as_i64(), receipt) - .execute(&mut *tx) - .await; - - match result { - Ok(_) => {} - Err(sqlx::Error::Database(e)) if e.is_unique_violation() => { - tracing::warn!(reason = ?e, "receipt unique violation, skipping"); - } - Err(e) => { - return log_and_err!(reason = e, "failed to insert receipt"); - } - } - } - - pg.commit_transaction(tx).await?; - - Ok(()) -} - -// ----------------------------------------------------------------------------- -// Minimum RPC structs +// Blockchain RPC structs // ----------------------------------------------------------------------------- #[derive(serde::Deserialize)] diff --git a/src/bin/importer/importer-import.rs b/src/bin/importer/importer-import.rs index f35252779..2bd07d5fe 100644 --- a/src/bin/importer/importer-import.rs +++ b/src/bin/importer/importer-import.rs @@ -1,21 +1,14 @@ -#![allow(dead_code)] +mod _postgres; use std::collections::HashMap; use std::sync::Arc; +use _postgres::*; use itertools::Itertools; -use serde_json::Value as JsonValue; -use sqlx::Row; use stratus::config::ImporterImportConfig; use stratus::eth::primitives::Account; -use stratus::eth::primitives::Address; -use stratus::eth::primitives::BlockNumber; -use stratus::eth::primitives::ExternalBlock; -use stratus::eth::primitives::ExternalReceipt; -use stratus::eth::primitives::Wei; use stratus::infra::postgres::Postgres; use stratus::init_global_services; -use stratus::log_and_err; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; @@ -36,7 +29,7 @@ async fn main() -> anyhow::Result<()> { let cancellation = CancellationToken::new(); // import genesis accounts - let balances = db_retrieve_balances(&pg).await?; + let balances = pg_retrieve_downloaded_balances(&pg).await?; let accounts = balances .into_iter() .map(|row| Account::new_with_balance(row.address, row.balance)) @@ -44,7 +37,7 @@ async fn main() -> anyhow::Result<()> { storage.save_accounts_to_perm(accounts).await?; // load blocks and receipts in background - tokio::spawn(keep_loading_blocks(pg, cancellation.clone(), backlog_tx.clone())); + tokio::spawn(keep_loading_blocks(pg, cancellation.clone(), backlog_tx)); // import blocks and transactions in foreground let reason = loop { @@ -83,7 +76,7 @@ async fn keep_loading_blocks( // data backlog: mpsc::Sender, ) -> anyhow::Result<()> { - let mut tx = db_init_blocks_cursor(&pg).await?; + let mut tx = pg_init_blocks_cursor(&pg).await?; let reason = loop { if cancellation.is_cancelled() { @@ -91,8 +84,8 @@ async fn keep_loading_blocks( } // find blocks - tracing::info!("retrieving more blocks to process"); - let blocks = match db_fetch_blocks(&mut tx).await { + tracing::info!("retrieving blocks to process"); + let blocks = match pg_fetch_blocks(&mut tx).await { Ok(blocks) => if blocks.is_empty() { cancellation.cancel(); @@ -107,9 +100,10 @@ async fn keep_loading_blocks( }; // find receipts + tracing::info!("retrieving receipts to process"); let block_start = blocks.first().unwrap().number; let block_end = blocks.last().unwrap().number; - let Ok(receipts) = db_retrieve_receipts(&pg, block_start, block_end).await else { + let Ok(receipts) = pg_retrieve_downloaded_receipts(&pg, block_start, block_end).await else { cancellation.cancel(); break "error loading receipts"; }; @@ -124,111 +118,3 @@ async fn keep_loading_blocks( tracing::info!(%reason, "postgres loader finished"); Ok(()) } - -struct BlockRow { - number: i64, - payload: ExternalBlock, -} - -struct ReceiptRow { - block_number: i64, - payload: ExternalReceipt, -} - -struct BalanceRow { - address: Address, - balance: Wei, -} - -async fn db_retrieve_balances(pg: &Postgres) -> anyhow::Result> { - tracing::debug!("retrieving downloaded balances"); - - let result = sqlx::query_file_as!(BalanceRow, "src/bin/importer/sql/select_downloaded_balances.sql") - .fetch_all(&pg.connection_pool) - .await; - match result { - Ok(accounts) => Ok(accounts), - Err(e) => log_and_err!(reason = e, "failed to retrieve downloaded balances"), - } -} - -async fn db_init_blocks_cursor(pg: &Postgres) -> anyhow::Result> { - let start = match db_retrieve_max_imported_block(pg).await? { - Some(number) => number.next(), - None => BlockNumber::ZERO, - }; - tracing::info!(%start, "initing blocks cursor"); - - let mut tx = pg.start_transaction().await?; - let result = sqlx::query_file!("src/bin/importer/sql/cursor_declare_downloaded_blocks.sql", start.as_i64()) - .execute(&mut *tx) - .await; - - match result { - Ok(_) => Ok(tx), - Err(e) => log_and_err!(reason = e, "failed to open postgres cursor"), - } -} - -async fn db_fetch_blocks(tx: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> anyhow::Result> { - tracing::debug!("fetching more blocks"); - - let result = sqlx::query_file_scalar!("src/bin/importer/sql/cursor_fetch_downloaded_blocks.sql") - .fetch_all(&mut **tx) - .await; - - match result { - Ok(rows) => { - let mut parsed_rows: Vec = Vec::with_capacity(rows.len()); - for row in rows { - let parsed = BlockRow { - number: row.get_unchecked::<'_, i64, usize>(0), - payload: row.get_unchecked::<'_, JsonValue, usize>(1).try_into()?, - }; - parsed_rows.push(parsed); - } - Ok(parsed_rows) - } - Err(e) => log_and_err!(reason = e, "failed to fetch blocks from cursor"), - } -} - -async fn db_retrieve_receipts(pg: &Postgres, block_start: i64, block_end: i64) -> anyhow::Result> { - tracing::debug!(start = %block_start, end = %block_end, "findind receipts"); - - let result = sqlx::query_file!("src/bin/importer/sql/select_downloaded_receipts_in_range.sql", block_start, block_end) - .fetch_all(&pg.connection_pool) - .await; - - match result { - Ok(rows) => { - let mut parsed_rows: Vec = Vec::with_capacity(rows.len()); - for row in rows { - let parsed = ReceiptRow { - block_number: row.block_number, - payload: row.payload.try_into()?, - }; - - parsed_rows.push(parsed); - } - Ok(parsed_rows) - } - Err(e) => log_and_err!(reason = e, "failed to retrieve receipts"), - } -} - -async fn db_retrieve_max_imported_block(pg: &Postgres) -> anyhow::Result> { - tracing::debug!("finding max imported block"); - - let result = sqlx::query_file_scalar!("src/bin/importer/sql/select_max_imported_block.sql") - .fetch_one(&pg.connection_pool) - .await; - - let block_number: i64 = match result { - Ok(Some(max)) => max, - Ok(None) => return Ok(None), - Err(e) => return log_and_err!(reason = e, "failed to retrieve max block number"), - }; - - Ok(Some(block_number.into())) -} diff --git a/src/bin/importer/sql/select_downloaded_blocks_in_range.sql b/src/bin/importer/sql/select_downloaded_blocks_in_range.sql new file mode 100644 index 000000000..65ea60e7e --- /dev/null +++ b/src/bin/importer/sql/select_downloaded_blocks_in_range.sql @@ -0,0 +1,3 @@ +select number, payload +from external_blocks +where number >= $1 and number <= $2; \ No newline at end of file diff --git a/src/bin/importer/sql/select_downloaded_receipts_in_range.sql b/src/bin/importer/sql/select_downloaded_receipts_in_range.sql index 84fbfad12..fb6c67e6e 100644 --- a/src/bin/importer/sql/select_downloaded_receipts_in_range.sql +++ b/src/bin/importer/sql/select_downloaded_receipts_in_range.sql @@ -1,3 +1,5 @@ -select block_number, payload +select + block_number, + payload from external_receipts where block_number >= $1 and block_number <= $2; \ No newline at end of file