Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: renaming and reorganizing binaries #326

Merged
merged 2 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,20 @@ phf_codegen = "0.11.2"
# ------------------------------------------------------------------------------

[[bin]]
name = "importer-download"
path = "src/bin/importer/importer-download.rs"
name = "rpc-downloader"
path = "src/bin/rpc_downloader.rs"

[[bin]]
name = "importer-import"
path = "src/bin/importer/importer-import.rs"
name = "importer-offline"
path = "src/bin/importer_offline.rs"

[[bin]]
name = "rpc-server-poller"
path = "src/bin/rpc_server_poller.rs"
name = "importer-online"
path = "src/bin/importer_online.rs"

[[bin]]
name = "state-validator"
path = "src/bin/state-validator.rs"
path = "src/bin/state_validator.rs"

# ------------------------------------------------------------------------------
# Lints
Expand Down
26 changes: 17 additions & 9 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import '.justfile_helpers' # _lint, _outdated

# Environment variables (automatically set in all actions).
export RUST_BACKTRACE := "1"
export RUST_LOG := env("RUST_LOG", "stratus=info,importer-download=info,importer-importer=info")
export RUST_LOG := env("RUST_LOG", "stratus=info,rpc-downloader=info,importer-offline=info,importer-online=info,state-validator=info")

# Default URLs that can be passed as argument.
postgres_url := env("POSTGRES_URL", "postgres://postgres:[email protected]:5432/stratus")
Expand Down Expand Up @@ -86,15 +86,23 @@ update:
cargo update stratus

# ------------------------------------------------------------------------------
# Importer tasks
# Jobs
# ------------------------------------------------------------------------------
# Importer: Download external RPC blocks to temporary storage
importer-download *args="":
cargo run --bin importer-download --features dev --release -- --postgres {{postgres_url}} --external-rpc {{testnet_url}} {{args}}
# Job: Download external RPC blocks and receipts to temporary storage
rpc-downloader *args="":
cargo run --bin rpc-downloader --features dev --release -- --postgres {{postgres_url}} --external-rpc {{testnet_url}} {{args}}

# Importer: Import downloaded external RPC blocks to Stratus storage
importer-import *args="":
cargo run --bin importer-import --features dev --release -- --postgres {{postgres_url}} {{args}}
# Job: Import external RPC blocks from temporary storage to Stratus storage
importer-offline *args="":
cargo run --bin importer-offline --features dev --release -- --postgres {{postgres_url}} {{args}}

# Job: Import external RPC blocks from external RPC endpoint to Stratus storage
importer-online *args="":
cargo run --bin importer-online --features dev --release -- --external-rpc {{testnet_url}} {{args}}

# Job: Validate Stratus storage slots matches reference slots
state-validator *args="":
cargo run --bin state-validator --features dev --release -- --method {{testnet_url}} {{args}}

# ------------------------------------------------------------------------------
# Test tasks
Expand Down Expand Up @@ -262,7 +270,7 @@ e2e-flamegraph:

# Run cargo flamegraph with necessary environment variables
echo "Running cargo flamegraph..."
CARGO_PROFILE_RELEASE_DEBUG=true cargo flamegraph --bin rpc-server-poller --deterministic -- --external-rpc=http://localhost:3003/rpc --storage={{postgres_url}}
CARGO_PROFILE_RELEASE_DEBUG=true cargo flamegraph --bin importer-online --deterministic -- --external-rpc=http://localhost:3003/rpc --storage={{postgres_url}}


# ------------------------------------------------------------------------------
Expand Down
2 changes: 2 additions & 0 deletions src/bin/helpers/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
mod postgres;
pub use postgres::*;
14 changes: 7 additions & 7 deletions src/bin/importer/_postgres.rs → src/bin/helpers/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use stratus::log_and_err;
pub async fn pg_retrieve_max_external_block(pg: &Postgres, start: BlockNumber, end: BlockNumber) -> anyhow::Result<Option<BlockNumber>> {
tracing::debug!(%start, %end, "retrieving max external block");

let result = sqlx::query_file_scalar!("src/bin/importer/sql/select_max_external_block_in_range.sql", start.as_i64(), end.as_i64())
let result = sqlx::query_file_scalar!("src/bin/helpers/sql/select_max_external_block_in_range.sql", start.as_i64(), end.as_i64())
.fetch_one(&pg.connection_pool)
.await;

Expand All @@ -33,7 +33,7 @@ pub async fn pg_retrieve_max_external_block(pg: &Postgres, start: BlockNumber, e

pub async fn pg_retrieve_external_blocks_in_range(pg: &Postgres, start: BlockNumber, end: BlockNumber) -> anyhow::Result<Vec<BlockRow>> {
tracing::debug!(%start, %end, "retrieving external blocks in range");
let result = sqlx::query_file!("src/bin/importer/sql/select_external_blocks_in_range.sql", start.as_i64(), end.as_i64())
let result = sqlx::query_file!("src/bin/helpers/sql/select_external_blocks_in_range.sql", start.as_i64(), end.as_i64())
.fetch_all(&pg.connection_pool)
.await;

Expand All @@ -58,7 +58,7 @@ pub async fn pg_retrieve_external_blocks_in_range(pg: &Postgres, start: BlockNum
pub async fn pg_retrieve_external_receipts_in_range(pg: &Postgres, start: BlockNumber, end: BlockNumber) -> anyhow::Result<Vec<ReceiptRow>> {
tracing::debug!(%start, %end, "retrieving external receipts in range");

let result = sqlx::query_file!("src/bin/importer/sql/select_external_receipts_in_range.sql", start.as_i64(), end.as_i64())
let result = sqlx::query_file!("src/bin/helpers/sql/select_external_receipts_in_range.sql", start.as_i64(), end.as_i64())
.fetch_all(&pg.connection_pool)
.await;

Expand All @@ -83,7 +83,7 @@ pub async fn pg_retrieve_external_receipts_in_range(pg: &Postgres, start: BlockN
pub async fn pg_retrieve_external_balances(pg: &Postgres) -> anyhow::Result<Vec<BalanceRow>> {
tracing::debug!("retrieving external balances");

let result = sqlx::query_file_as!(BalanceRow, "src/bin/importer/sql/select_external_balances.sql")
let result = sqlx::query_file_as!(BalanceRow, "src/bin/helpers/sql/select_external_balances.sql")
.fetch_all(&pg.connection_pool)
.await;

Expand All @@ -100,7 +100,7 @@ pub async fn pg_insert_external_balance(pg: &Postgres, address: Address, balance
tracing::debug!(%address, %balance, "saving external balance");

let result = sqlx::query_file!(
"src/bin/importer/sql/insert_external_balance.sql",
"src/bin/helpers/sql/insert_external_balance.sql",
address.as_ref(),
TryInto::<BigDecimal>::try_into(balance)?
)
Expand All @@ -124,7 +124,7 @@ pub async fn pg_insert_external_block_and_receipts(
let mut tx = pg.start_transaction().await?;

// insert block
let result = sqlx::query_file!("src/bin/importer/sql/insert_external_block.sql", number.as_i64(), block)
let result = sqlx::query_file!("src/bin/helpers/sql/insert_external_block.sql", number.as_i64(), block)
.execute(&mut *tx)
.await;

Expand All @@ -138,7 +138,7 @@ pub async fn pg_insert_external_block_and_receipts(

// insert receipts
for (hash, receipt) in receipts {
let result = sqlx::query_file!("src/bin/importer/sql/insert_external_receipt.sql", hash.as_ref(), number.as_i64(), receipt)
let result = sqlx::query_file!("src/bin/helpers/sql/insert_external_receipt.sql", hash.as_ref(), number.as_i64(), receipt)
.execute(&mut *tx)
.await;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
mod _postgres;
mod helpers;

use std::cmp::min;
use std::collections::HashMap;
use std::sync::Arc;

use _postgres::*;
use anyhow::anyhow;
use futures::try_join;
use futures::StreamExt;
use helpers::*;
use itertools::Itertools;
use stratus::config::ImporterImportConfig;
use stratus::config::ImporterOfflineConfig;
use stratus::eth::primitives::Account;
use stratus::eth::primitives::BlockNumber;
use stratus::eth::primitives::BlockSelection;
Expand All @@ -35,7 +35,7 @@ type BacklogTask = (Vec<BlockRow>, Vec<ReceiptRow>);
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
// init services
let config: ImporterImportConfig = init_global_services();
let config: ImporterOfflineConfig = init_global_services();
let pg = Arc::new(Postgres::new(&config.postgres_url).await?);
let storage = config.init_storage().await?;
let executor = config.init_executor(Arc::clone(&storage));
Expand Down
4 changes: 2 additions & 2 deletions src/bin/rpc_server_poller.rs → src/bin/importer_online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;
use std::time::Duration;

use anyhow::Context;
use stratus::config::RpcPollerConfig;
use stratus::config::ImporterOnlineConfig;
use stratus::eth::primitives::BlockNumber;
use stratus::eth::primitives::Hash;
use stratus::infra::BlockchainClient;
Expand All @@ -13,7 +13,7 @@ const POLL_LATENCY: Duration = Duration::from_secs(1);

#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
let config: RpcPollerConfig = init_global_services();
let config: ImporterOnlineConfig = init_global_services();

let chain = Arc::new(BlockchainClient::new(&config.external_rpc, Duration::from_secs(1))?);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
mod _postgres;
mod helpers;

use std::cmp::min;
use std::sync::Arc;
use std::time::Duration;

use _postgres::*;
use anyhow::anyhow;
use anyhow::Context;
use futures::StreamExt;
use futures::TryStreamExt;
use helpers::*;
use itertools::Itertools;
use stratus::config::ImporterDownloadConfig;
use stratus::config::RpcDownloaderConfig;
use stratus::eth::primitives::Address;
use stratus::eth::primitives::BlockNumber;
use stratus::eth::primitives::Hash;
Expand All @@ -29,7 +29,7 @@ const NETWORK_TIMEOUT: Duration = Duration::from_secs(2);
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
// init services
let config: ImporterDownloadConfig = init_global_services();
let config: RpcDownloaderConfig = init_global_services();
let pg = Arc::new(Postgres::new(&config.postgres_url).await?);
let chain = Arc::new(BlockchainClient::new(&config.external_rpc, NETWORK_TIMEOUT)?);

Expand Down
File renamed without changes.
24 changes: 12 additions & 12 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ impl WithCommonConfig for StratusConfig {
}

// -----------------------------------------------------------------------------
// Config: ImporterDownload
// Config: RpcDownloader
// -----------------------------------------------------------------------------

/// Configuration for importer-download binary.
/// Configuration for `rpc-downlaoder` binary.
#[derive(Parser, Debug, derive_more::Deref)]
pub struct ImporterDownloadConfig {
pub struct RpcDownloaderConfig {
/// External RPC endpoint to sync blocks with Stratus.
#[arg(short = 'r', long = "external-rpc", env = "EXTERNAL_RPC")]
pub external_rpc: String,
Expand All @@ -86,7 +86,7 @@ pub struct ImporterDownloadConfig {
pub common: CommonConfig,
}

impl WithCommonConfig for ImporterDownloadConfig {
impl WithCommonConfig for RpcDownloaderConfig {
fn common(&self) -> &CommonConfig {
&self.common
}
Expand All @@ -96,9 +96,9 @@ impl WithCommonConfig for ImporterDownloadConfig {
// Config: ImporterImport
// -----------------------------------------------------------------------------

/// Configuration for importer-import binary.
/// Configuration for `importer-offline` binary.
#[derive(Parser, Debug, derive_more::Deref)]
pub struct ImporterImportConfig {
pub struct ImporterOfflineConfig {
/// Postgres connection URL.
#[arg(short = 'd', long = "postgres", env = "POSTGRES_URL")]
pub postgres_url: String,
Expand All @@ -112,19 +112,19 @@ pub struct ImporterImportConfig {
pub common: CommonConfig,
}

impl WithCommonConfig for ImporterImportConfig {
impl WithCommonConfig for ImporterOfflineConfig {
fn common(&self) -> &CommonConfig {
&self.common
}
}

// -----------------------------------------------------------------------------
// Config: RpcPoller
// Config: ImporterOnline
// -----------------------------------------------------------------------------

/// Configuration for rpc-poller binary.
/// Configuration for `importer-online` binary.
#[derive(Parser, Debug, derive_more::Deref)]
pub struct RpcPollerConfig {
pub struct ImporterOnlineConfig {
/// External RPC endpoint to sync blocks with Stratus.
#[arg(short = 'r', long = "external-rpc", env = "EXTERNAL_RPC")]
pub external_rpc: String,
Expand All @@ -134,7 +134,7 @@ pub struct RpcPollerConfig {
pub common: CommonConfig,
}

impl WithCommonConfig for RpcPollerConfig {
impl WithCommonConfig for ImporterOnlineConfig {
fn common(&self) -> &CommonConfig {
&self.common
}
Expand All @@ -144,7 +144,7 @@ impl WithCommonConfig for RpcPollerConfig {
// Config: StateValidator
// -----------------------------------------------------------------------------

/// Configuration for importer-import binary.
/// Configuration for `state-validator` binary.
#[derive(Parser, Debug, derive_more::Deref)]
pub struct StateValidatorConfig {
#[deref]
Expand Down
Loading