Skip to content

Commit

Permalink
Configurable postgres connections (#338)
Browse files Browse the repository at this point in the history
* chore: creates smaller blocks batches

this allows us to do smaller quries and less time waiting PG
3x faster this way

* chore: add storage connections configurations

* fix: postgres connection

* fix: test execution
  • Loading branch information
renancloudwalk authored Mar 12, 2024
1 parent 26dec16 commit 2bf51e2
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/bin/importer_offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type BacklogTask = (Vec<BlockRow>, Vec<ReceiptRow>);
async fn main() -> anyhow::Result<()> {
// init services
let config: ImporterOfflineConfig = init_global_services();
let pg = Arc::new(Postgres::new(&config.postgres_url).await?);
let pg = Arc::new(Postgres::new(&config.postgres_url, 400usize, 20usize).await?);
let storage = config.init_storage().await?;
let executor = config.init_executor(Arc::clone(&storage));

Expand Down
2 changes: 1 addition & 1 deletion src/bin/rpc_downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const BLOCKS_BY_TASK: usize = 1_000;
async fn main() -> anyhow::Result<()> {
// init services
let config: RpcDownloaderConfig = init_global_services();
let pg = Arc::new(Postgres::new(&config.postgres_url).await?);
let pg = Arc::new(Postgres::new(&config.postgres_url, 400usize, 20usize).await?);
let chain = Arc::new(BlockchainClient::new(&config.external_rpc).await?);

// download balances and blocks
Expand Down
14 changes: 11 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,14 @@ pub struct CommonConfig {
#[arg(short = 's', long = "storage", env = "STORAGE", default_value_t = StorageConfig::InMemory)]
pub storage: StorageConfig,

/// Number of parallel connections to the storage.
#[arg(long = "storage_max_connections", env = "STORAGE_MAX_CONNECTIONS", default_value = "100")]
pub storage_max_connections: usize,

/// How many seconds spent waiting for connection to be acquired.
#[arg(long = "storage_acquire_timeout", env = "STORAGE_ACQUIRE_TIMEOUT", default_value = "2")]
pub storage_acquire_timeout: usize,

/// Number of EVM instances to run.
#[arg(long = "evms", env = "EVMS", default_value = "1")]
pub num_evms: usize,
Expand Down Expand Up @@ -228,7 +236,7 @@ impl WithCommonConfig for CommonConfig {
impl CommonConfig {
/// Initializes storage.
pub async fn init_storage(&self) -> anyhow::Result<Arc<StratusStorage>> {
let storage = self.storage.init().await?;
let storage = self.storage.init(self).await?;

if self.enable_genesis {
let genesis = storage.read_block(&BlockSelection::Number(BlockNumber::ZERO)).await?;
Expand Down Expand Up @@ -307,12 +315,12 @@ pub enum StorageConfig {

impl StorageConfig {
/// Initializes the storage implementation.
pub async fn init(&self) -> anyhow::Result<Arc<StratusStorage>> {
pub async fn init(&self, common_config: &CommonConfig) -> anyhow::Result<Arc<StratusStorage>> {
let temp = Arc::new(InMemoryTemporaryStorage::default());

let perm: Arc<dyn PermanentStorage> = match self {
Self::InMemory => Arc::new(InMemoryPermanentStorage::default()),
Self::Postgres { url } => Arc::new(Postgres::new(url).await?),
Self::Postgres { url } => Arc::new(Postgres::new(url, common_config.storage_max_connections, common_config.storage_acquire_timeout).await?),
};
Ok(Arc::new(StratusStorage::new(temp, perm)))
}
Expand Down
6 changes: 3 additions & 3 deletions src/infra/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ pub struct Postgres {
}

impl Postgres {
pub async fn new(url: &str) -> anyhow::Result<Self> {
pub async fn new(url: &str, max_connections: usize, acquire_timeout: usize) -> anyhow::Result<Self> {
tracing::info!(%url, "starting postgres client");

let connection_pool = PgPoolOptions::new()
.min_connections(1)
.max_connections(100)
.acquire_timeout(Duration::from_secs(2))
.max_connections(max_connections.try_into().unwrap_or(100))
.acquire_timeout(Duration::from_secs(acquire_timeout.try_into().unwrap_or(2)))
.connect(url)
.await
.map_err(|e| {
Expand Down
15 changes: 14 additions & 1 deletion tests/test_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ mod tests {
// Adjust this import to include your Revm and related structs
use fake::{Dummy, Faker};
use nonempty::nonempty;
use stratus::config::CommonConfig;
use stratus::config::MetricsHistogramKind;
use stratus::config::StorageConfig;
use stratus::eth::primitives::test_accounts;
use stratus::eth::primitives::TransactionInput;
Expand All @@ -19,7 +21,18 @@ mod tests {
#[tokio::test]
async fn test_execution() {
let storage: StorageConfig = "inmemory".parse().unwrap(); //XXX we need to use a real storage
let storage = storage.init().await.unwrap();
let config = CommonConfig {
storage: storage.clone(),
storage_max_connections: 1usize,
storage_acquire_timeout: 100usize,
num_evms: 1usize,
num_async_threads: 1usize,
num_blocking_threads: 1usize,
metrics_histogram_kind: MetricsHistogramKind::Summary,
enable_genesis: false,
nocapture: false,
};
let storage = storage.init(&config).await.unwrap();
let mut rng = thread_rng();
let mut fake_transaction_input = TransactionInput::dummy_with_rng(&Faker, &mut rng);
fake_transaction_input.nonce = 0u64.into();
Expand Down
2 changes: 1 addition & 1 deletion tests/test_import_offline_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ async fn test_import_offline_snapshot() {
// init snapshot data
let snapshot_json = include_str!("fixtures/block-292973/snapshot.json");
let snapshot: InMemoryPermanentStorageState = serde_json::from_str(snapshot_json).unwrap();
let pg = Postgres::new(docker.postgres_connection_url()).await.unwrap();
let pg = Postgres::new(docker.postgres_connection_url(), 100usize, 2usize).await.unwrap();
populate_postgres(&pg, snapshot).await;

// init executor and execute
Expand Down

0 comments on commit 2bf51e2

Please sign in to comment.