From 2bf51e2469f111b086c0ee5af20c1347863ae859 Mon Sep 17 00:00:00 2001 From: renancloudwalk <53792026+renancloudwalk@users.noreply.github.com> Date: Tue, 12 Mar 2024 18:02:44 -0300 Subject: [PATCH] Configurable postgres connections (#338) * chore: creates smaller blocks batches this allows us to do smaller quries and less time waiting PG 3x faster this way * chore: add storage connections configurations * fix: postgres connection * fix: test execution --- src/bin/importer_offline.rs | 2 +- src/bin/rpc_downloader.rs | 2 +- src/config.rs | 14 +++++++++++--- src/infra/postgres.rs | 6 +++--- tests/test_execution.rs | 15 ++++++++++++++- tests/test_import_offline_snapshot.rs | 2 +- 6 files changed, 31 insertions(+), 10 deletions(-) diff --git a/src/bin/importer_offline.rs b/src/bin/importer_offline.rs index d68e453e3..a9e00c0c7 100644 --- a/src/bin/importer_offline.rs +++ b/src/bin/importer_offline.rs @@ -38,7 +38,7 @@ type BacklogTask = (Vec, Vec); async fn main() -> anyhow::Result<()> { // init services let config: ImporterOfflineConfig = init_global_services(); - let pg = Arc::new(Postgres::new(&config.postgres_url).await?); + let pg = Arc::new(Postgres::new(&config.postgres_url, 400usize, 20usize).await?); let storage = config.init_storage().await?; let executor = config.init_executor(Arc::clone(&storage)); diff --git a/src/bin/rpc_downloader.rs b/src/bin/rpc_downloader.rs index 1e964d226..2d62c1d94 100644 --- a/src/bin/rpc_downloader.rs +++ b/src/bin/rpc_downloader.rs @@ -26,7 +26,7 @@ const BLOCKS_BY_TASK: usize = 1_000; async fn main() -> anyhow::Result<()> { // init services let config: RpcDownloaderConfig = init_global_services(); - let pg = Arc::new(Postgres::new(&config.postgres_url).await?); + let pg = Arc::new(Postgres::new(&config.postgres_url, 400usize, 20usize).await?); let chain = Arc::new(BlockchainClient::new(&config.external_rpc).await?); // download balances and blocks diff --git a/src/config.rs b/src/config.rs index 924579098..34fce1c00 100644 --- a/src/config.rs +++ b/src/config.rs @@ -190,6 +190,14 @@ pub struct CommonConfig { #[arg(short = 's', long = "storage", env = "STORAGE", default_value_t = StorageConfig::InMemory)] pub storage: StorageConfig, + /// Number of parallel connections to the storage. + #[arg(long = "storage_max_connections", env = "STORAGE_MAX_CONNECTIONS", default_value = "100")] + pub storage_max_connections: usize, + + /// How many seconds spent waiting for connection to be acquired. + #[arg(long = "storage_acquire_timeout", env = "STORAGE_ACQUIRE_TIMEOUT", default_value = "2")] + pub storage_acquire_timeout: usize, + /// Number of EVM instances to run. #[arg(long = "evms", env = "EVMS", default_value = "1")] pub num_evms: usize, @@ -228,7 +236,7 @@ impl WithCommonConfig for CommonConfig { impl CommonConfig { /// Initializes storage. pub async fn init_storage(&self) -> anyhow::Result> { - let storage = self.storage.init().await?; + let storage = self.storage.init(self).await?; if self.enable_genesis { let genesis = storage.read_block(&BlockSelection::Number(BlockNumber::ZERO)).await?; @@ -307,12 +315,12 @@ pub enum StorageConfig { impl StorageConfig { /// Initializes the storage implementation. - pub async fn init(&self) -> anyhow::Result> { + pub async fn init(&self, common_config: &CommonConfig) -> anyhow::Result> { let temp = Arc::new(InMemoryTemporaryStorage::default()); let perm: Arc = match self { Self::InMemory => Arc::new(InMemoryPermanentStorage::default()), - Self::Postgres { url } => Arc::new(Postgres::new(url).await?), + Self::Postgres { url } => Arc::new(Postgres::new(url, common_config.storage_max_connections, common_config.storage_acquire_timeout).await?), }; Ok(Arc::new(StratusStorage::new(temp, perm))) } diff --git a/src/infra/postgres.rs b/src/infra/postgres.rs index 21beb1e64..822a1195a 100644 --- a/src/infra/postgres.rs +++ b/src/infra/postgres.rs @@ -30,13 +30,13 @@ pub struct Postgres { } impl Postgres { - pub async fn new(url: &str) -> anyhow::Result { + pub async fn new(url: &str, max_connections: usize, acquire_timeout: usize) -> anyhow::Result { tracing::info!(%url, "starting postgres client"); let connection_pool = PgPoolOptions::new() .min_connections(1) - .max_connections(100) - .acquire_timeout(Duration::from_secs(2)) + .max_connections(max_connections.try_into().unwrap_or(100)) + .acquire_timeout(Duration::from_secs(acquire_timeout.try_into().unwrap_or(2))) .connect(url) .await .map_err(|e| { diff --git a/tests/test_execution.rs b/tests/test_execution.rs index 24a88624b..5bd4e5a36 100644 --- a/tests/test_execution.rs +++ b/tests/test_execution.rs @@ -8,6 +8,8 @@ mod tests { // Adjust this import to include your Revm and related structs use fake::{Dummy, Faker}; use nonempty::nonempty; + use stratus::config::CommonConfig; + use stratus::config::MetricsHistogramKind; use stratus::config::StorageConfig; use stratus::eth::primitives::test_accounts; use stratus::eth::primitives::TransactionInput; @@ -19,7 +21,18 @@ mod tests { #[tokio::test] async fn test_execution() { let storage: StorageConfig = "inmemory".parse().unwrap(); //XXX we need to use a real storage - let storage = storage.init().await.unwrap(); + let config = CommonConfig { + storage: storage.clone(), + storage_max_connections: 1usize, + storage_acquire_timeout: 100usize, + num_evms: 1usize, + num_async_threads: 1usize, + num_blocking_threads: 1usize, + metrics_histogram_kind: MetricsHistogramKind::Summary, + enable_genesis: false, + nocapture: false, + }; + let storage = storage.init(&config).await.unwrap(); let mut rng = thread_rng(); let mut fake_transaction_input = TransactionInput::dummy_with_rng(&Faker, &mut rng); fake_transaction_input.nonce = 0u64.into(); diff --git a/tests/test_import_offline_snapshot.rs b/tests/test_import_offline_snapshot.rs index 71d739740..e8618a5c2 100644 --- a/tests/test_import_offline_snapshot.rs +++ b/tests/test_import_offline_snapshot.rs @@ -80,7 +80,7 @@ async fn test_import_offline_snapshot() { // init snapshot data let snapshot_json = include_str!("fixtures/block-292973/snapshot.json"); let snapshot: InMemoryPermanentStorageState = serde_json::from_str(snapshot_json).unwrap(); - let pg = Postgres::new(docker.postgres_connection_url()).await.unwrap(); + let pg = Postgres::new(docker.postgres_connection_url(), 100usize, 2usize).await.unwrap(); populate_postgres(&pg, snapshot).await; // init executor and execute