Skip to content

Commit

Permalink
feat: clap parsing durations (#841)
Browse files Browse the repository at this point in the history
  • Loading branch information
dinhani-cw authored May 14, 2024
1 parent 55f5de6 commit fb01706
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 20 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ csv = "=1.3.0"
derive_more = "=0.99.17"
derive-new = "=0.6.0"
hex-literal = "=0.4.1"
humantime = "=2.1.0"
indexmap = { version = "=2.2.6", features = ["serde"] }
itertools = "=0.12.1"
nonempty = { version = "=0.10.0", features = ["serialize"] }
Expand Down
8 changes: 4 additions & 4 deletions src/bin/importer_online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub async fn run_importer_online(
storage: Arc<StratusStorage>,
chain: BlockchainClient,
cancellation: CancellationToken,
sync_interval: u64,
sync_interval: Duration,
) -> anyhow::Result<()> {
// start from last imported block
let mut number = storage.read_mined_block_number().await?;
Expand Down Expand Up @@ -93,7 +93,7 @@ async fn prefetch_blocks_and_receipts(
chain: BlockchainClient,
data_tx: mpsc::Sender<(ExternalBlock, ExternalReceipts)>,
cancellation: CancellationToken,
sync_interval: u64,
sync_interval: Duration,
) {
let buffered_data = Arc::new(RwLock::new(HashMap::new()));
let chain_clone = chain.clone();
Expand All @@ -114,11 +114,11 @@ async fn prefetch_blocks_and_receipts(
match chain_clone.get_current_block_number().await {
Ok(current_block_number) =>
if current_block_number < next_block_number.next() {
sleep(Duration::from_millis(sync_interval)).await;
sleep(sync_interval).await;
},
Err(e) => {
tracing::error!("failed to get current block number {:?}", e);
sleep(Duration::from_millis(sync_interval)).await;
sleep(sync_interval).await;
}
}

Expand Down
4 changes: 1 addition & 3 deletions src/bin/run_with_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ fn main() -> anyhow::Result<()> {
}

async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> {
//XXX #[cfg(feature = "rocks")]
//XXX stratus::eth::storage::rocks::consensus::gather_clients().await.unwrap();
// init services
let storage = config.stratus_storage.init().await?;
let relayer = config.relayer.init(Arc::clone(&storage)).await?;
Expand Down Expand Up @@ -48,7 +46,7 @@ async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> {
};

let importer_task = async move {
let res = run_importer_online(executor, miner, storage, chain, cancellation.clone(), 600).await;
let res = run_importer_online(executor, miner, storage, chain, cancellation.clone(), config.sync_interval).await;
tracing::warn!("run_importer_online finished, cancelling tasks");
cancellation.cancel();
res
Expand Down
42 changes: 32 additions & 10 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,8 @@ pub struct ImporterOnlineConfig {
#[arg(short = 'r', long = "external-rpc", env = "EXTERNAL_RPC")]
pub external_rpc: String,

#[arg(long = "sync-interval", env = "SYNC_INTERVAL", default_value = "700")]
pub sync_interval: u64,
#[arg(long = "sync-interval", value_parser=parse_duration, env = "SYNC_INTERVAL", default_value = "600ms")]
pub sync_interval: Duration,

#[clap(flatten)]
pub executor: ExecutorConfig,
Expand Down Expand Up @@ -453,6 +453,9 @@ pub struct RunWithImporterConfig {
#[arg(short = 'a', long = "address", env = "ADDRESS", default_value = "0.0.0.0:3000")]
pub address: SocketAddr,

#[arg(long = "sync-interval", value_parser=parse_duration, env = "SYNC_INTERVAL", default_value = "600ms")]
pub sync_interval: Duration,

#[clap(flatten)]
pub stratus_storage: StratusStorageConfig,

Expand Down Expand Up @@ -496,7 +499,7 @@ pub struct StateValidatorConfig {
pub seed: u64,

/// Validate in batches of n blocks.
#[arg(short = 'i', long = "inverval", env = "INVERVAL", default_value_t = 1000)]
#[arg(short = 'i', long = "interval", env = "INTERVAL", default_value = "1000")]
pub interval: u64,

/// What method to use when validating.
Expand Down Expand Up @@ -569,9 +572,9 @@ pub struct ExternalRpcStorageConfig {
#[arg(long = "external-rpc-storage-connections", env = "EXTERNAL_RPC_STORAGE_CONNECTIONS")]
pub external_rpc_storage_connections: u32,

/// External RPC storage timeout when opening a connection (in millis).
#[arg(long = "external-rpc-storage-timeout", env = "EXTERNAL_RPC_STORAGE_TIMEOUT")]
pub external_rpc_storage_timeout_millis: u64,
/// External RPC storage timeout when opening a connection.
#[arg(long = "external-rpc-storage-timeout", value_parser=parse_duration, env = "EXTERNAL_RPC_STORAGE_TIMEOUT")]
pub external_rpc_storage_timeout: Duration,
}

#[derive(DebugAsJson, Clone, serde::Serialize)]
Expand All @@ -589,7 +592,7 @@ impl ExternalRpcStorageConfig {
let config = PostgresExternalRpcStorageConfig {
url: url.to_owned(),
connections: self.external_rpc_storage_connections,
acquire_timeout: Duration::from_millis(self.external_rpc_storage_timeout_millis),
acquire_timeout: self.external_rpc_storage_timeout,
};
Ok(Arc::new(PostgresExternalRpcStorage::new(config).await?))
}
Expand Down Expand Up @@ -663,8 +666,8 @@ pub struct PermanentStorageConfig {
pub perm_storage_connections: u32,

/// Permamenent storage timeout when opening a connection (in millis).
#[arg(long = "perm-storage-timeout", env = "PERM_STORAGE_TIMEOUT")]
pub perm_storage_timeout_millis: u64,
#[arg(long = "perm-storage-timeout", value_parser=parse_duration, env = "PERM_STORAGE_TIMEOUT")]
pub perm_storage_timeout: Duration,
}

#[derive(DebugAsJson, Clone, serde::Serialize)]
Expand All @@ -690,7 +693,7 @@ impl PermanentStorageConfig {
let config = PostgresPermanentStorageConfig {
url: url.to_owned(),
connections: self.perm_storage_connections,
acquire_timeout: Duration::from_millis(self.perm_storage_timeout_millis),
acquire_timeout: self.perm_storage_timeout,
};
Arc::new(PostgresPermanentStorage::new(config).await?)
}
Expand Down Expand Up @@ -763,3 +766,22 @@ impl FromStr for ValidatorMethodConfig {
}
}
}

// -----------------------------------------------------------------------------
// Parsers
// -----------------------------------------------------------------------------
fn parse_duration(s: &str) -> anyhow::Result<Duration> {
// try millis
let millis: Result<u64, _> = s.parse();
if let Ok(millis) = millis {
return Ok(Duration::from_millis(millis));
}

// try humantime
if let Ok(parsed) = humantime::parse_duration(s) {
return Ok(parsed);
}

// error
Err(anyhow!("invalid duration format: {}", s))
}
4 changes: 1 addition & 3 deletions tests/test_import_external_snapshot_postgres.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
mod test_import_external_snapshot_common;

use std::time::Duration;

use stratus::eth::storage::PermanentStorage;
use stratus::eth::storage::PostgresPermanentStorage;
use stratus::eth::storage::PostgresPermanentStorageConfig;
Expand All @@ -21,7 +19,7 @@ fn test_import_external_snapshot_with_postgres() {
let pg = PostgresPermanentStorage::new(PostgresPermanentStorageConfig {
url: docker.postgres_connection_url().to_string(),
connections: global_services.config.stratus_storage.perm_storage.perm_storage_connections,
acquire_timeout: Duration::from_millis(global_services.config.stratus_storage.perm_storage.perm_storage_timeout_millis),
acquire_timeout: global_services.config.stratus_storage.perm_storage.perm_storage_timeout,
})
.await
.unwrap();
Expand Down

0 comments on commit fb01706

Please sign in to comment.