Skip to content

Commit

Permalink
refactor: move postgres functions to _postgres.rs module to have reus…
Browse files Browse the repository at this point in the history
…e between importer jobs (#289)
  • Loading branch information
dinhani-cw authored Feb 28, 2024
1 parent d54b28c commit 2e35da8
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 228 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

208 changes: 208 additions & 0 deletions src/bin/importer/_postgres.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
#![allow(dead_code)]

use serde_json::Value as JsonValue;
use sqlx::types::BigDecimal;
use sqlx::Row;
use stratus::eth::primitives::Address;
use stratus::eth::primitives::BlockNumber;
use stratus::eth::primitives::ExternalBlock;
use stratus::eth::primitives::ExternalReceipt;
use stratus::eth::primitives::Hash;
use stratus::eth::primitives::Wei;
use stratus::infra::postgres::Postgres;
use stratus::log_and_err;

// -----------------------------------------------------------------------------
// Queries
// -----------------------------------------------------------------------------

// Blocks

pub async fn pg_retrieve_max_downloaded_block(pg: &Postgres, start: BlockNumber, end: BlockNumber) -> anyhow::Result<Option<BlockNumber>> {
tracing::debug!(%start, %end, "retrieving max downloaded block");

let result = sqlx::query_file_scalar!("src/bin/importer/sql/select_max_downloaded_block_in_range.sql", start.as_i64(), end.as_i64())
.fetch_one(&pg.connection_pool)
.await;

match result {
Ok(Some(max)) => Ok(Some(max.into())),
Ok(None) => Ok(None),
Err(e) => log_and_err!(reason = e, "failed to retrieve max block number"),
}
}

pub async fn pg_retrieve_max_imported_block(pg: &Postgres) -> anyhow::Result<Option<BlockNumber>> {
tracing::debug!("retrieving max imported block");

let result = sqlx::query_file_scalar!("src/bin/importer/sql/select_max_imported_block.sql")
.fetch_one(&pg.connection_pool)
.await;

let block_number: i64 = match result {
Ok(Some(max)) => max,
Ok(None) => return Ok(None),
Err(e) => return log_and_err!(reason = e, "failed to retrieve max block number"),
};

Ok(Some(block_number.into()))
}

pub async fn pg_init_blocks_cursor(pg: &Postgres) -> anyhow::Result<sqlx::Transaction<'_, sqlx::Postgres>> {
let start = match pg_retrieve_max_imported_block(pg).await? {
Some(number) => number.next(),
None => BlockNumber::ZERO,
};
tracing::info!(%start, "initing blocks cursor");

let mut tx = pg.start_transaction().await?;
let result = sqlx::query_file!("src/bin/importer/sql/cursor_declare_downloaded_blocks.sql", start.as_i64())
.execute(&mut *tx)
.await;

match result {
Ok(_) => Ok(tx),
Err(e) => log_and_err!(reason = e, "failed to open postgres cursor"),
}
}

pub async fn pg_fetch_blocks(tx: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> anyhow::Result<Vec<BlockRow>> {
tracing::debug!("fetching more blocks");

let result = sqlx::query_file_scalar!("src/bin/importer/sql/cursor_fetch_downloaded_blocks.sql")
.fetch_all(&mut **tx)
.await;

match result {
Ok(rows) => {
let mut parsed_rows: Vec<BlockRow> = Vec::with_capacity(rows.len());
for row in rows {
let parsed = BlockRow {
number: row.get_unchecked::<'_, i64, usize>(0).into(),
payload: row.get_unchecked::<'_, JsonValue, usize>(1).try_into()?,
};
parsed_rows.push(parsed);
}
Ok(parsed_rows)
}
Err(e) => log_and_err!(reason = e, "failed to fetch blocks from cursor"),
}
}

// Receipts

pub async fn pg_retrieve_downloaded_receipts(pg: &Postgres, start: BlockNumber, end: BlockNumber) -> anyhow::Result<Vec<ReceiptRow>> {
tracing::debug!(%start, %end, "retrieving receipts in range");

let result = sqlx::query_file!("src/bin/importer/sql/select_downloaded_receipts_in_range.sql", start.as_i64(), end.as_i64())
.fetch_all(&pg.connection_pool)
.await;

match result {
Ok(rows) => {
let mut parsed_rows: Vec<ReceiptRow> = Vec::with_capacity(rows.len());
for row in rows {
let parsed = ReceiptRow {
block_number: row.block_number.into(),
payload: row.payload.try_into()?,
};

parsed_rows.push(parsed);
}
Ok(parsed_rows)
}
Err(e) => log_and_err!(reason = e, "failed to retrieve receipts"),
}
}

// Balances

pub async fn pg_retrieve_downloaded_balances(pg: &Postgres) -> anyhow::Result<Vec<BalanceRow>> {
tracing::debug!("retrieving downloaded balances");

let result = sqlx::query_file_as!(BalanceRow, "src/bin/importer/sql/select_downloaded_balances.sql")
.fetch_all(&pg.connection_pool)
.await;

match result {
Ok(accounts) => Ok(accounts),
Err(e) => log_and_err!(reason = e, "failed to retrieve downloaded balances"),
}
}

// -----------------------------------------------------------------------------
// Inserts
// -----------------------------------------------------------------------------
pub async fn pg_insert_balance(pg: &Postgres, address: Address, balance: Wei) -> anyhow::Result<()> {
tracing::debug!(%address, %balance, "saving external balance");

let result = sqlx::query_file!(
"src/bin/importer/sql/insert_external_balance.sql",
address.as_ref(),
TryInto::<BigDecimal>::try_into(balance)?
)
.execute(&pg.connection_pool)
.await;

match result {
Ok(_) => Ok(()),
Err(e) => log_and_err!(reason = e, "failed to insert external balance"),
}
}

pub async fn pg_insert_block_and_receipts(pg: &Postgres, number: BlockNumber, block: JsonValue, receipts: Vec<(Hash, JsonValue)>) -> anyhow::Result<()> {
tracing::debug!(?block, ?receipts, "saving external block and receipts");

let mut tx = pg.start_transaction().await?;

// insert block
let result = sqlx::query_file!("src/bin/importer/sql/insert_external_block.sql", number.as_i64(), block)
.execute(&mut *tx)
.await;

match result {
Ok(_) => {}
Err(sqlx::Error::Database(e)) if e.is_unique_violation() => {
tracing::warn!(reason = ?e, "block unique violation, skipping");
}
Err(e) => return log_and_err!(reason = e, "failed to insert block"),
}

// insert receipts
for (hash, receipt) in receipts {
let result = sqlx::query_file!("src/bin/importer/sql/insert_external_receipt.sql", hash.as_ref(), number.as_i64(), receipt)
.execute(&mut *tx)
.await;

match result {
Ok(_) => {}
Err(sqlx::Error::Database(e)) if e.is_unique_violation() => {
tracing::warn!(reason = ?e, "receipt unique violation, skipping");
}
Err(e) => return log_and_err!(reason = e, "failed to insert receipt"),
}
}

pg.commit_transaction(tx).await?;

Ok(())
}

// -----------------------------------------------------------------------------
// Types
// -----------------------------------------------------------------------------

pub struct BlockRow {
pub number: BlockNumber,
pub payload: ExternalBlock,
}

pub struct ReceiptRow {
pub block_number: BlockNumber,
pub payload: ExternalReceipt,
}

pub struct BalanceRow {
pub address: Address,
pub balance: Wei,
}
110 changes: 8 additions & 102 deletions src/bin/importer/importer-download.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
mod _postgres;

use std::cmp::min;
use std::sync::Arc;
use std::time::Duration;

use _postgres::*;
use anyhow::anyhow;
use anyhow::Context;
use futures::StreamExt;
use futures::TryStreamExt;
use itertools::Itertools;
use serde_json::Value as JsonValue;
use sqlx::types::BigDecimal;
use stratus::config::ImporterDownloadConfig;
use stratus::eth::primitives::Address;
use stratus::eth::primitives::BlockNumber;
use stratus::eth::primitives::Hash;
use stratus::eth::primitives::Wei;
use stratus::ext::not;
use stratus::infra::postgres::Postgres;
use stratus::infra::BlockchainClient;
Expand Down Expand Up @@ -48,7 +48,7 @@ async fn download_balances(pg: &Postgres, chain: &BlockchainClient, accounts: Ve
}

// retrieve downloaded balances
let downloaded_balances = db_retrieve_balances(pg).await?;
let downloaded_balances = pg_retrieve_downloaded_balances(pg).await?;
let downloaded_balances_addresses = downloaded_balances.iter().map(|balance| &balance.address).collect_vec();

// keep only accounts that must be downloaded
Expand All @@ -60,7 +60,7 @@ async fn download_balances(pg: &Postgres, chain: &BlockchainClient, accounts: Ve
// download missing balances
for address in address_to_download {
let balance = chain.get_balance(&address, Some(BlockNumber::ZERO)).await?;
db_insert_balance(pg, address, balance).await?;
pg_insert_balance(pg, address, balance).await?;
}

Ok(())
Expand Down Expand Up @@ -95,7 +95,7 @@ async fn download_blocks(pg: Arc<Postgres>, chain: Arc<BlockchainClient>, parale

async fn download(pg: Arc<Postgres>, chain: Arc<BlockchainClient>, start: BlockNumber, end_inclusive: BlockNumber) -> anyhow::Result<()> {
// calculate current block
let mut current = match db_retrieve_max_downloaded_block(&pg, start, end_inclusive).await? {
let mut current = match pg_retrieve_max_downloaded_block(&pg, start, end_inclusive).await? {
Some(number) => number.next(),
None => start,
};
Expand Down Expand Up @@ -147,7 +147,7 @@ async fn download(pg: Arc<Postgres>, chain: Arc<BlockchainClient>, start: BlockN
}

// save block and receipts
if let Err(e) = db_insert_block_and_receipts(&pg, current, block_json, receipts_json).await {
if let Err(e) = pg_insert_block_and_receipts(&pg, current, block_json, receipts_json).await {
tracing::warn!(reason = ?e, "retrying because failed to save block");
continue;
}
Expand All @@ -160,101 +160,7 @@ async fn download(pg: Arc<Postgres>, chain: Arc<BlockchainClient>, start: BlockN
}

// -----------------------------------------------------------------------------
// Postgres
// -----------------------------------------------------------------------------

struct BalanceRow {
address: Address,
#[allow(dead_code)] // allow it because the SQL returns it, even if it is not used
balance: Wei,
}

async fn db_retrieve_balances(pg: &Postgres) -> anyhow::Result<Vec<BalanceRow>> {
tracing::debug!("retrieving downloaded balances");

let result = sqlx::query_file_as!(BalanceRow, "src/bin/importer/sql/select_downloaded_balances.sql")
.fetch_all(&pg.connection_pool)
.await;
match result {
Ok(accounts) => Ok(accounts),
Err(e) => log_and_err!(reason = e, "failed to retrieve downloaded balances"),
}
}

async fn db_retrieve_max_downloaded_block(pg: &Postgres, start: BlockNumber, end: BlockNumber) -> anyhow::Result<Option<BlockNumber>> {
tracing::debug!(%start, %end, "retrieving max downloaded block");

let result = sqlx::query_file_scalar!("src/bin/importer/sql/select_max_downloaded_block_in_range.sql", start.as_i64(), end.as_i64())
.fetch_one(&pg.connection_pool)
.await;
match result {
Ok(Some(max)) => Ok(Some(max.into())),
Ok(None) => Ok(None),
Err(e) => log_and_err!(reason = e, "failed to retrieve max block number"),
}
}

async fn db_insert_balance(pg: &Postgres, address: Address, balance: Wei) -> anyhow::Result<()> {
tracing::debug!(%address, %balance, "saving external balance");

let result = sqlx::query_file!(
"src/bin/importer/sql/insert_external_balance.sql",
address.as_ref(),
TryInto::<BigDecimal>::try_into(balance)?
)
.execute(&pg.connection_pool)
.await;

match result {
Ok(_) => Ok(()),
Err(e) => log_and_err!(reason = e, "failed to insert external balance"),
}
}

async fn db_insert_block_and_receipts(pg: &Postgres, number: BlockNumber, block: JsonValue, receipts: Vec<(Hash, JsonValue)>) -> anyhow::Result<()> {
tracing::debug!(?block, ?receipts, "saving external block and receipts");

let mut tx = pg.start_transaction().await?;

// insert block
let result = sqlx::query_file!("src/bin/importer/sql/insert_external_block.sql", number.as_i64(), block)
.execute(&mut *tx)
.await;

match result {
Ok(_) => {}
Err(sqlx::Error::Database(e)) if e.is_unique_violation() => {
tracing::warn!(reason = ?e, "block unique violation, skipping");
}
Err(e) => {
return log_and_err!(reason = e, "failed to insert block");
}
}

// insert receipts
for (hash, receipt) in receipts {
let result = sqlx::query_file!("src/bin/importer/sql/insert_external_receipt.sql", hash.as_ref(), number.as_i64(), receipt)
.execute(&mut *tx)
.await;

match result {
Ok(_) => {}
Err(sqlx::Error::Database(e)) if e.is_unique_violation() => {
tracing::warn!(reason = ?e, "receipt unique violation, skipping");
}
Err(e) => {
return log_and_err!(reason = e, "failed to insert receipt");
}
}
}

pg.commit_transaction(tx).await?;

Ok(())
}

// -----------------------------------------------------------------------------
// Minimum RPC structs
// Blockchain RPC structs
// -----------------------------------------------------------------------------

#[derive(serde::Deserialize)]
Expand Down
Loading

0 comments on commit 2e35da8

Please sign in to comment.