diff --git a/Cargo.lock b/Cargo.lock index 7f4b41b6..06928335 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2552,6 +2552,7 @@ dependencies = [ "tokio", "tokio-metrics", "tokio-prometheus-client", + "tokio-stream", "tracing", ] diff --git a/one/Cargo.toml b/one/Cargo.toml index 6d181b44..09511c99 100644 --- a/one/Cargo.toml +++ b/one/Cargo.toml @@ -50,6 +50,7 @@ signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] } swagger.workspace = true tokio-metrics = { version = "0.3.1", features = ["rt"] } tokio-prometheus-client = "0.1" +tokio-stream = { workspace = true, features = ["io-util"] } tokio.workspace = true tracing.workspace = true diff --git a/one/src/migrations.rs b/one/src/migrations.rs index 506272ca..a93cc6f0 100644 --- a/one/src/migrations.rs +++ b/one/src/migrations.rs @@ -64,7 +64,7 @@ pub struct FromIpfsOpts { #[command(flatten)] log_opts: LogOpts, - /// Path of list of files to migrate + /// Path of file containing list of newline-delimited absolute file paths to migrate #[clap(long, short = 'f', env = "CERAMIC_ONE_INPUT_FILE_LIST_PATH")] input_file_list_path: Option, @@ -113,6 +113,12 @@ pub async fn migrate(cmd: EventsCommand) -> Result<()> { } async fn from_ipfs(opts: FromIpfsOpts) -> Result<()> { + // Limit and offset are only used when reading from a file list + if (opts.limit.is_some() || opts.offset > 0) && opts.input_file_list_path.is_none() { + return Err(anyhow!( + "File list path is required when using limit or offset" + )); + } let network = opts.network.to_network(&opts.local_network_id)?; let db_opts: DBOpts = (&opts).into(); let sqlite_pool = db_opts.get_sqlite_pool().await?; @@ -226,18 +232,10 @@ impl FSBlockStore { let limit = self.file_limit; (try_stream! { let file = tokio::fs::File::open(input_file_list_path).await?; - let mut lines = tokio::io::BufReader::new(file).lines(); - let mut i = 0; - while let Some(line) = lines.next_line().await? { - i += 1; - if let Some(limit) = limit { - if i > offset + limit { - return; - } - } - if i <= offset { - continue; - } + let lines = tokio::io::BufReader::new(file).lines(); + let lines = tokio_stream::wrappers::LinesStream::new(lines); + let mut lines = lines.skip(offset as usize).take(limit.unwrap_or(u64::MAX) as usize); + while let Some(line) = lines.next().await.transpose()? { let path = PathBuf::from(line); match block_from_path(path.clone()).await { Ok(Some(block)) => yield block,