Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream unpacking of source distribution downloads #1157

Merged
merged 3 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
443 changes: 430 additions & 13 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ license = "MIT OR Apache-2.0"
[workspace.dependencies]
anstream = { version = "0.6.5" }
anyhow = { version = "1.0.79" }
async-compression = { version = "0.4.6" }
async-std = {version = "1.6.0" }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit worried about the async-std dep, it's abandoned and could clash with tokio(?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is your source for async-std being abandoned?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can migrate to https://crates.io/crates/tokio-tar which is a fork for Tokio. Or I can look into forking async-tar to land this PR: dignifiedquire/async-tar#41.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no official message or anything, there's just barely any activity on the repo anymore

image

image

As long as it's stable it's fine, i'm more worried we'll get some clash with tokio or something because they are different executors with different runtimes and types.

async-tar = { version = "0.4.2" }
async_http_range_reader = { git = "https://github.com/baszalmstra/async_http_range_reader", rev = "8dab2c08ac864fec1df014465264f9a7c8eae905" }
async_zip = { git = "https://github.com/charliermarsh/rs-async-zip", rev = "d76801da0943de985254fc6255c0e476b57c5836", features = ["deflate"] }
base64 = { version = "0.21.7" }
Expand Down
10 changes: 8 additions & 2 deletions crates/puffin-build/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use tracing::{debug, info_span, instrument, Instrument};

use distribution_types::Resolution;
use pep508_rs::Requirement;
use puffin_extract::extract_source;
use puffin_interpreter::{Interpreter, Virtualenv};
use puffin_traits::{BuildContext, BuildKind, SetupPyStrategy, SourceBuildTrait};

Expand Down Expand Up @@ -297,8 +296,15 @@ impl SourceBuild {
source.to_path_buf()
} else {
debug!("Unpacking for build: {}", source.display());

let extracted = temp_dir.path().join("extracted");
extract_source(source, &extracted)

// Unzip the archive into the temporary directory.
puffin_extract::archive(source, &extracted)
.map_err(|err| Error::Extraction(extracted.clone(), err))?;

// Extract the top-level directory from the archive.
puffin_extract::strip_component(&extracted)
.map_err(|err| Error::Extraction(extracted.clone(), err))?
};
let source_tree = if let Some(subdir) = subdirectory {
Expand Down
5 changes: 2 additions & 3 deletions crates/puffin-distribution/src/distribution_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use distribution_types::{
use platform_tags::Tags;
use puffin_cache::{Cache, CacheBucket, Timestamp, WheelCache};
use puffin_client::{CacheControl, CachedClientError, RegistryClient};
use puffin_extract::unzip_no_seek;
use puffin_fs::metadata_if_exists;
use puffin_git::GitSource;
use puffin_traits::{BuildContext, NoBinary};
Expand Down Expand Up @@ -157,7 +156,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
// Download and unzip the wheel to a temporary directory.
let temp_dir =
tempfile::tempdir_in(self.cache.root()).map_err(Error::CacheWrite)?;
unzip_no_seek(reader.compat(), temp_dir.path()).await?;
puffin_extract::stream::unzip(reader.compat(), temp_dir.path()).await?;

// Persist the temporary directory to the directory store.
let archive = self
Expand Down Expand Up @@ -215,7 +214,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
// Download and unzip the wheel to a temporary directory.
let temp_dir =
tempfile::tempdir_in(self.cache.root()).map_err(Error::CacheWrite)?;
unzip_no_seek(reader.compat(), temp_dir.path()).await?;
puffin_extract::stream::unzip(reader.compat(), temp_dir.path()).await?;

// Persist the temporary directory to the directory store.
let archive = self
Expand Down
55 changes: 12 additions & 43 deletions crates/puffin-distribution/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use anyhow::Result;
use fs_err::tokio as fs;
use futures::{FutureExt, TryStreamExt};
use reqwest::Response;
use tempfile::TempDir;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::{debug, info_span, instrument, Instrument};
use url::Url;
Expand Down Expand Up @@ -750,62 +749,32 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
return Ok(cache_path);
}

// Download the source distribution to a temporary file.
// TODO(charlie): Unzip as we download, as with wheels.
// Download and unzip the source distribution into a temporary directory.
let span =
info_span!("download_source_dist", filename = filename, source_dist = %source_dist);
let download_dir = self.download_source_dist_url(response, filename).await?;
let temp_dir =
tempfile::tempdir_in(self.build_context.cache().root()).map_err(Error::CacheWrite)?;
let reader = response
.bytes_stream()
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
.into_async_read();
puffin_extract::stream::archive(reader.compat(), filename, temp_dir.path()).await?;
drop(span);

// Unzip the source distribution to a temporary directory.
let span =
info_span!("extract_source_dist", filename = filename, source_dist = %source_dist);
let source_dist_dir = puffin_extract::extract_source(
download_dir.path().join(filename),
download_dir.path().join("extracted"),
)?;
drop(span);
// Extract the top-level directory.
let extracted = puffin_extract::strip_component(temp_dir.path())?;

// Persist the unzipped distribution to the cache.
// Persist it to the cache.
fs_err::tokio::create_dir_all(cache_path.parent().expect("Cache entry to have parent"))
.await
.map_err(Error::CacheWrite)?;
fs_err::tokio::rename(&source_dist_dir, &cache_path)
fs_err::tokio::rename(extracted, &cache_path)
.await
.map_err(Error::CacheWrite)?;

Ok(cache_path)
}

/// Download a source distribution from a URL to a temporary file.
async fn download_source_dist_url(
&self,
response: Response,
source_dist_filename: &str,
) -> Result<TempDir, puffin_client::Error> {
let reader = response
.bytes_stream()
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
.into_async_read();
let mut reader = tokio::io::BufReader::new(reader.compat());

// Create a temporary directory.
let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
.map_err(puffin_client::ErrorKind::CacheWrite)?;

// Download the source distribution to a temporary file.
let mut writer = tokio::io::BufWriter::new(
fs_err::tokio::File::create(temp_dir.path().join(source_dist_filename))
.await
.map_err(puffin_client::ErrorKind::CacheWrite)?,
);
tokio::io::copy(&mut reader, &mut writer)
.await
.map_err(puffin_client::ErrorKind::CacheWrite)?;

Ok(temp_dir)
}

/// Download a source distribution from a Git repository.
async fn download_source_dist_git(&self, url: &Url) -> Result<(Fetch, Option<PathBuf>), Error> {
debug!("Fetching source distribution from Git: {url}");
Expand Down
6 changes: 3 additions & 3 deletions crates/puffin-distribution/src/unzip.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::path::Path;

use puffin_extract::{unzip_archive, Error};
use puffin_extract::Error;

use crate::download::BuiltWheel;
use crate::{DiskWheel, LocalWheel};
Expand All @@ -12,13 +12,13 @@ pub trait Unzip {

impl Unzip for DiskWheel {
fn unzip(&self, target: &Path) -> Result<(), Error> {
unzip_archive(fs_err::File::open(&self.path)?, target)
puffin_extract::unzip(fs_err::File::open(&self.path)?, target)
}
}

impl Unzip for BuiltWheel {
fn unzip(&self, target: &Path) -> Result<(), Error> {
unzip_archive(fs_err::File::open(&self.path)?, target)
puffin_extract::unzip(fs_err::File::open(&self.path)?, target)
}
}

Expand Down
4 changes: 4 additions & 0 deletions crates/puffin-extract/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ license = { workspace = true }
workspace = true

[dependencies]
async-compression = { workspace = true, features = ["gzip"] }
# See: https://github.com/dignifiedquire/async-tar/pull/35
async-std = { workspace = true, features = ["unstable"] }
async-tar = { workspace = true }
async_zip = { workspace = true, features = ["tokio"] }
flate2 = { workspace = true }
fs-err = { workspace = true, features = ["tokio"] }
Expand Down
19 changes: 19 additions & 0 deletions crates/puffin-extract/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use std::path::PathBuf;

use zip::result::ZipError;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
Zip(#[from] ZipError),
#[error(transparent)]
AsyncZip(#[from] async_zip::error::ZipError),
#[error(transparent)]
Io(#[from] std::io::Error),
#[error("Unsupported archive type: {0}")]
UnsupportedArchive(PathBuf),
#[error(
"The top level of the archive must only contain a list directory, but it contains: {0:?}"
)]
InvalidArchive(Vec<fs_err::DirEntry>),
}
Loading
Loading