Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TRY] perf: spawn (parallelize) io tasks related to tarball #190

Closed
wants to merge 18 commits into from
61 changes: 61 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ reflink-copy = { version = "0.1.9" }
junction = { version = "1.0.0" }
reqwest = { version = "0.11", default-features = false, features = ["json", "native-tls-vendored"] }
node-semver = { version = "2.1.0" }
num_cpus = { version = "1.16.0" }
pipe-trait = { version = "0.4.0" }
rayon = { version = "1.8.0" }
serde = { version = "1.0.188", features = ["derive"] }
Expand All @@ -54,6 +55,7 @@ sha2 = { version = "0.10.8" }
split-first-char = { version = "0.0.0" }
ssri = { version = "9.0.0" }
strum = { version = "0.25.0", features = ["derive"] }
swc_malloc = { version = "0.5.9" }
tar = { version = "0.4.40" }
text-block-macros = { version = "0.1.1" }
tracing = { version = "0.1.37" }
Expand All @@ -78,9 +80,9 @@ allow_branch = "main"
opt-level = 3
lto = "fat"
codegen-units = 1
strip = "symbols"
debug = false
panic = "abort" # Let it crash and force ourselves to write safe Rust.
# strip = "symbols"
# debug = false
panic = "abort" # Let it crash and force ourselves to write safe Rust.

# Use the `--profile release-debug` flag to show symbols in release mode.
# e.g. `cargo build --profile release-debug`
Expand Down
3 changes: 3 additions & 0 deletions crates/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ miette = { workspace = true }
reqwest = { workspace = true }
pipe-trait = { workspace = true }
tokio = { workspace = true }
swc_malloc = { workspace = true }
rayon = { workspace = true }
num_cpus = { workspace = true }

[dev-dependencies]
pacquet-store-dir = { workspace = true }
Expand Down
12 changes: 12 additions & 0 deletions crates/cli/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
extern crate swc_malloc;

mod cli_args;
mod state;

Expand All @@ -8,6 +10,16 @@ use pacquet_diagnostics::enable_tracing_by_env;
use state::State;

pub async fn main() -> miette::Result<()> {
// We use rayon only for blocking syscalls, so we multiply the number of threads by 3.
//
// If we are going to use rayon for CPU-bound tasks,
// we should create an extra threadpool for IO-bound tasks,
// and use the global theadpool for CPU-bound tasks.
rayon::ThreadPoolBuilder::new()
.num_threads(num_cpus::get() * 3)
.build_global()
.expect("build rayon thread pool");

enable_tracing_by_env();
set_panic_hook();
CliArgs::parse().run().await
Expand Down
1 change: 1 addition & 0 deletions crates/store-dir/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ serde = { workspace = true }
serde_json = { workspace = true }
sha2 = { workspace = true }
ssri = { workspace = true }
rayon.workspace = true

[dev-dependencies]
pretty_assertions = { workspace = true }
Expand Down
18 changes: 14 additions & 4 deletions crates/store-dir/src/cas_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,25 @@ pub enum WriteCasFileError {

impl StoreDir {
/// Write a file from an npm package to the store directory.
pub fn write_cas_file(
pub fn write_cas_file<'a>(
&self,
buffer: &[u8],
scope: &rayon::Scope<'a>,
buffer: Vec<u8>,
executable: bool,
) -> Result<(PathBuf, FileHash), WriteCasFileError> {
let file_hash = Sha512::digest(buffer);
let file_hash = Sha512::digest(&buffer);
let file_path = self.cas_file_path(file_hash, executable);
let mode = executable.then_some(EXEC_MODE);
ensure_file(&file_path, buffer, mode).map_err(WriteCasFileError::WriteFile)?;

scope.spawn({
let file_path = file_path.clone();
move |_| {
ensure_file(&file_path, &buffer, mode)
.map_err(WriteCasFileError::WriteFile)
.expect("todo: handle ensure_file error");
}
});

Ok((file_path, file_hash))
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/tarball/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ tar = { workspace = true }
tokio = { workspace = true }
zune-inflate = { workspace = true }
tracing = { workspace = true }
rayon.workspace = true
kdy1 marked this conversation as resolved.
Show resolved Hide resolved

[dev-dependencies]
pretty_assertions = { workspace = true }
Expand Down
143 changes: 79 additions & 64 deletions crates/tarball/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,73 +201,88 @@ impl<'a> DownloadTarballToStore<'a> {
Other(TarballError),
}
let cas_paths = tokio::task::spawn(async move {
verify_checksum(&response, package_integrity.clone()).map_err(TaskError::Checksum)?;

// TODO: move tarball extraction to its own function
// TODO: test it
// TODO: test the duplication of entries

let mut archive = decompress_gzip(&response, package_unpacked_size)
.map_err(TaskError::Other)?
.pipe(Cursor::new)
.pipe(Archive::new);

let entries = archive
.entries()
.map_err(TarballError::ReadTarballEntries)
.map_err(TaskError::Other)?
.filter(|entry| !entry.as_ref().unwrap().header().entry_type().is_dir());

let ((_, Some(capacity)) | (capacity, None)) = entries.size_hint();
let mut cas_paths = HashMap::<OsString, PathBuf>::with_capacity(capacity);
let mut pkg_files_idx = PackageFilesIndex { files: HashMap::with_capacity(capacity) };

for entry in entries {
let mut entry = entry.unwrap();

let file_mode = entry.header().mode().expect("get mode"); // TODO: properly propagate this error
let file_is_executable = file_mode::is_all_exec(file_mode);

// Read the contents of the entry
let mut buffer = Vec::with_capacity(entry.size() as usize);
entry.read_to_end(&mut buffer).unwrap();

let entry_path = entry.path().unwrap();
let cleaned_entry_path =
entry_path.components().skip(1).collect::<PathBuf>().into_os_string();
let (file_path, file_hash) = store_dir
.write_cas_file(&buffer, file_is_executable)
.map_err(TarballError::WriteCasFile)?;

let tarball_index_key = cleaned_entry_path
.to_str()
.expect("entry path must be valid UTF-8") // TODO: propagate this error, provide more information
.to_string(); // TODO: convert cleaned_entry_path to String too.

if let Some(previous) = cas_paths.insert(cleaned_entry_path, file_path) {
tracing::warn!(?previous, "Duplication detected. Old entry has been ejected");
rayon::scope(|scope| {
verify_checksum(&response, package_integrity.clone())
.map_err(TaskError::Checksum)?;

// TODO: move tarball extraction to its own function
// TODO: test it
// TODO: test the duplication of entries

let mut archive = decompress_gzip(&response, package_unpacked_size)
.map_err(TaskError::Other)?
.pipe(Cursor::new)
.pipe(Archive::new);

let entries = archive
.entries()
.map_err(TarballError::ReadTarballEntries)
.map_err(TaskError::Other)?
.filter(|entry| !entry.as_ref().unwrap().header().entry_type().is_dir());

let ((_, Some(capacity)) | (capacity, None)) = entries.size_hint();
let mut cas_paths = HashMap::<OsString, PathBuf>::with_capacity(capacity);
let mut pkg_files_idx =
PackageFilesIndex { files: HashMap::with_capacity(capacity) };

for entry in entries {
let mut entry = entry.unwrap();

let file_mode = entry.header().mode().expect("get mode"); // TODO: properly propagate this error
let file_is_executable = file_mode::is_all_exec(file_mode);

// Read the contents of the entry
let mut buffer = Vec::with_capacity(entry.size() as usize);
entry.read_to_end(&mut buffer).unwrap();

let entry_path = entry.path().unwrap();
let cleaned_entry_path =
entry_path.components().skip(1).collect::<PathBuf>().into_os_string();
let (file_path, file_hash) = store_dir
.write_cas_file(scope, buffer, file_is_executable)
.map_err(TarballError::WriteCasFile)?;
Comment on lines +234 to +236
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be the only IO operation in the entire scope. And none of the operations that follow depend on this IO operation being done either. Meaning, we could potentially defer this operation after building the cas_paths (i.e. do the IO on the created cas_paths). See if this suggestion can improve the code quality and performance.


let tarball_index_key = cleaned_entry_path
.to_str()
.expect("entry path must be valid UTF-8") // TODO: propagate this error, provide more information
.to_string(); // TODO: convert cleaned_entry_path to String too.

if let Some(previous) = cas_paths.insert(cleaned_entry_path, file_path) {
tracing::warn!(
?previous,
"Duplication detected. Old entry has been ejected"
);
}

let checked_at = UNIX_EPOCH.elapsed().ok().map(|x| x.as_millis());
let file_size = entry.header().size().ok();
let file_integrity = format!("sha512-{}", BASE64_STD.encode(file_hash));
let file_attrs = PackageFileInfo {
checked_at,
integrity: file_integrity,
mode: file_mode,
size: file_size,
};

if let Some(previous) =
pkg_files_idx.files.insert(tarball_index_key, file_attrs)
{
tracing::warn!(
?previous,
"Duplication detected. Old entry has been ejected"
);
}
}

let checked_at = UNIX_EPOCH.elapsed().ok().map(|x| x.as_millis());
let file_size = entry.header().size().ok();
let file_integrity = format!("sha512-{}", BASE64_STD.encode(file_hash));
let file_attrs = PackageFileInfo {
checked_at,
integrity: file_integrity,
mode: file_mode,
size: file_size,
};

if let Some(previous) = pkg_files_idx.files.insert(tarball_index_key, file_attrs) {
tracing::warn!(?previous, "Duplication detected. Old entry has been ejected");
}
}
scope.spawn(move |_| {
store_dir
.write_index_file(&package_integrity, &pkg_files_idx)
.map_err(TarballError::WriteTarballIndexFile)
.expect("todo: handle this error");
});

store_dir
.write_index_file(&package_integrity, &pkg_files_idx)
.map_err(TarballError::WriteTarballIndexFile)?;

Ok(cas_paths)
Ok(cas_paths)
})
})
.await
.expect("no join error")
Expand Down
Loading