From e677112f77f9d9e79e7941e9642862d2d9d0467f Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Sat, 21 Sep 2024 20:09:47 +0800 Subject: [PATCH] chore: compilation fixes --- event-svc/src/tests/migration.rs | 2 +- one/src/migrations.rs | 37 ++++++++++++++++++++++++++------ 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/event-svc/src/tests/migration.rs b/event-svc/src/tests/migration.rs index 536c37f5..63c63c4f 100644 --- a/event-svc/src/tests/migration.rs +++ b/event-svc/src/tests/migration.rs @@ -25,7 +25,7 @@ struct InMemBlockStore { #[async_trait] impl BlockStore for InMemBlockStore { - fn blocks_from_dir(&self) -> BoxStream<'static, Result<(Cid, Vec)>> { + fn blocks(&self) -> BoxStream<'static, Result<(Cid, Vec)>> { let blocks = self.blocks.clone(); futures::stream::iter(blocks.into_iter().map(Result::Ok)).boxed() } diff --git a/one/src/migrations.rs b/one/src/migrations.rs index a93cc6f0..2d5763c9 100644 --- a/one/src/migrations.rs +++ b/one/src/migrations.rs @@ -11,7 +11,7 @@ use clap::{Args, Subcommand}; use futures::{stream::BoxStream, StreamExt}; use multihash_codetable::{Code, Multihash, MultihashDigest}; use tokio::io::AsyncBufReadExt; -use tracing::{debug, info, trace}; +use tracing::{debug, info}; use crate::{default_directory, DBOpts, Info, LogOpts}; @@ -64,7 +64,31 @@ pub struct FromIpfsOpts { #[command(flatten)] log_opts: LogOpts, - /// Path of file containing list of newline-delimited absolute file paths to migrate + /// Path of file containing list of newline-delimited absolute file paths to migrate. + /// + /// See below for example usage when running a migration for a live IPFS node. Multiple migration runs using lists + /// of files that have changed between runs is useful for incremental migrations. This method can also be used for + /// a final migration after shutting down the IPFS node so that all inflight blocks are migrated to the new C1 node. + /// + /// # Get list of files in sorted order + /// + /// find ~/.ipfs/blocks -type f | sort > first_run_files.txt + /// + /// # Run the migration + /// + /// ceramic-one migrations from-ipfs --input-ipfs-path ~/.ipfs/blocks --input-file-list-path first_run_files.txt + /// + /// # Get updated list of files in sorted order + /// + /// find ~/.ipfs/blocks -type f | sort > second_run_files.txt + /// + /// # Use comm to get the list of new files + /// + /// comm -13 first_run_files.txt second_run_files.txt > new_files.txt + /// + /// # Re-run the migration + /// + /// ceramic-one migrations from-ipfs --input-ipfs-path ~/.ipfs/blocks --input-file-list-path new_files.txt #[clap(long, short = 'f', env = "CERAMIC_ONE_INPUT_FILE_LIST_PATH")] input_file_list_path: Option, @@ -197,7 +221,7 @@ impl BlockStore for FSBlockStore { } impl FSBlockStore { - fn blocks_from_dir(&self) -> BoxStream<'static, anyhow::Result<(Cid, Vec)>> { + fn blocks_from_dir(&self) -> BoxStream<'static, Result<(Cid, Vec)>> { // the block store is split in to 1024 directories and then the blocks stored as files. // the dir structure is the penultimate two characters as dir then the b32 sha256 multihash of the block // The leading "B" for the b32 sha256 multihash is left off @@ -207,7 +231,7 @@ impl FSBlockStore { let mut dirs = Vec::new(); dirs.push(self.input_ipfs_path.clone()); - try_stream! { + (try_stream! { while let Some(dir) = dirs.pop() { let mut entries = tokio::fs::read_dir(dir).await?; while let Some(entry) = entries.next_entry().await? { @@ -218,10 +242,11 @@ impl FSBlockStore { } } } - } + }) + .boxed() } - fn blocks_from_list(&self) -> BoxStream<'static, anyhow::Result<(Cid, Vec)>> { + fn blocks_from_list(&self) -> BoxStream<'static, Result<(Cid, Vec)>> { let input_file_list_path = self .input_file_list_path .clone()