Skip to content

Commit

Permalink
Decode sdists async
Browse files Browse the repository at this point in the history
  • Loading branch information
charliermarsh committed Jan 29, 2024
1 parent 4b89e21 commit 151b92a
Show file tree
Hide file tree
Showing 14 changed files with 322 additions and 325 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ license = "MIT OR Apache-2.0"
[workspace.dependencies]
anstream = { version = "0.6.5" }
anyhow = { version = "1.0.79" }
async-compression = { version = "0.4.6" }
async-tar = { version = "0.4.2" }
async_http_range_reader = { git = "https://github.com/baszalmstra/async_http_range_reader", rev = "8dab2c08ac864fec1df014465264f9a7c8eae905" }
async_zip = { git = "https://github.com/charliermarsh/rs-async-zip", rev = "d76801da0943de985254fc6255c0e476b57c5836", features = ["deflate"] }
base64 = { version = "0.21.7" }
Expand Down
10 changes: 8 additions & 2 deletions crates/puffin-build/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use tracing::{debug, info_span, instrument, Instrument};

use distribution_types::Resolution;
use pep508_rs::Requirement;
use puffin_extract::extract_source;
use puffin_interpreter::{Interpreter, Virtualenv};
use puffin_traits::{BuildContext, BuildKind, SetupPyStrategy, SourceBuildTrait};

Expand Down Expand Up @@ -297,8 +296,15 @@ impl SourceBuild {
source.to_path_buf()
} else {
debug!("Unpacking for build: {}", source.display());

let extracted = temp_dir.path().join("extracted");
extract_source(source, &extracted)

// Unzip the archive into the temporary directory.
puffin_extract::archive(source, &extracted)
.map_err(|err| Error::Extraction(extracted.clone(), err))?;

// Extract the top-level directory from the archive.
puffin_extract::strip_component(&extracted)
.map_err(|err| Error::Extraction(extracted.clone(), err))?
};
let source_tree = if let Some(subdir) = subdirectory {
Expand Down
5 changes: 2 additions & 3 deletions crates/puffin-distribution/src/distribution_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use distribution_types::{
use platform_tags::Tags;
use puffin_cache::{Cache, CacheBucket, Timestamp, WheelCache};
use puffin_client::{CacheControl, CachedClientError, RegistryClient};
use puffin_extract::unzip_no_seek;
use puffin_fs::metadata_if_exists;
use puffin_git::GitSource;
use puffin_traits::{BuildContext, NoBinary};
Expand Down Expand Up @@ -157,7 +156,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
// Download and unzip the wheel to a temporary directory.
let temp_dir =
tempfile::tempdir_in(self.cache.root()).map_err(Error::CacheWrite)?;
unzip_no_seek(reader.compat(), temp_dir.path()).await?;
puffin_extract::stream::unzip(reader.compat(), temp_dir.path()).await?;

// Persist the temporary directory to the directory store.
let archive = self
Expand Down Expand Up @@ -215,7 +214,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
// Download and unzip the wheel to a temporary directory.
let temp_dir =
tempfile::tempdir_in(self.cache.root()).map_err(Error::CacheWrite)?;
unzip_no_seek(reader.compat(), temp_dir.path()).await?;
puffin_extract::stream::unzip(reader.compat(), temp_dir.path()).await?;

// Persist the temporary directory to the directory store.
let archive = self
Expand Down
81 changes: 16 additions & 65 deletions crates/puffin-distribution/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use anyhow::Result;
use fs_err::tokio as fs;
use futures::{FutureExt, TryStreamExt};
use reqwest::Response;
use tempfile::TempDir;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::{debug, info_span, instrument, Instrument};
use url::Url;
Expand Down Expand Up @@ -750,78 +749,30 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
return Ok(cache_path);
}

// Download the source distribution to a temporary file. If it's a zip archive, we can unzip
// it directly into the cache.
if filename.ends_with(".zip") {
// Unzip the source distribution to a temporary directory.
let span = info_span!("download_unzip_source_dist", filename = filename, source_dist = %source_dist);
let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
.map_err(Error::CacheWrite)?;
let reader = response
.bytes_stream()
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
.into_async_read();
puffin_extract::unzip_no_seek(reader.compat(), temp_dir.path()).await?;
drop(span);

// Persist the unzipped distribution to the cache.
fs_err::tokio::create_dir_all(cache_path.parent().expect("Cache entry to have parent"))
.await
.map_err(Error::CacheWrite)?;
fs_err::tokio::rename(temp_dir.path(), &cache_path)
.await
.map_err(Error::CacheWrite)?;
} else {
// Unzip the source distribution to a temporary directory.
let span = info_span!("download_unzip_source_dist", filename = filename, source_dist = %source_dist);
let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
.map_err(Error::CacheWrite)?;
let reader = response
.bytes_stream()
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
.into_async_read();
let extracted = puffin_extract::untar_no_seek(reader, temp_dir.path()).await?;
drop(span);

// Persist the unzipped distribution to the cache.
fs_err::tokio::create_dir_all(cache_path.parent().expect("Cache entry to have parent"))
.await
.map_err(Error::CacheWrite)?;
fs_err::tokio::rename(extracted, &cache_path)
.await
.map_err(Error::CacheWrite)?;
}

Ok(cache_path)
}

/// Download a source distribution from a URL to a temporary file.
async fn download_source_dist_url(
&self,
response: Response,
source_dist_filename: &str,
) -> Result<TempDir, puffin_client::Error> {
// Download and unzip the source distribution into a temporary directory.
let span =
info_span!("download_source_dist", filename = filename, source_dist = %source_dist);
let temp_dir =
tempfile::tempdir_in(self.build_context.cache().root()).map_err(Error::CacheWrite)?;
let reader = response
.bytes_stream()
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
.into_async_read();
let mut reader = tokio::io::BufReader::new(reader.compat());
puffin_extract::stream::archive(reader.compat(), filename, temp_dir.path()).await?;
drop(span);

// Create a temporary directory.
let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
.map_err(puffin_client::ErrorKind::CacheWrite)?;
// Extract the top-level directory.
let extracted = puffin_extract::strip_component(temp_dir.path())?;

// Download the source distribution to a temporary file.
let mut writer = tokio::io::BufWriter::new(
fs_err::tokio::File::create(temp_dir.path().join(source_dist_filename))
.await
.map_err(puffin_client::ErrorKind::CacheWrite)?,
);
tokio::io::copy(&mut reader, &mut writer)
// Persist it to the cache.
fs_err::tokio::create_dir_all(cache_path.parent().expect("Cache entry to have parent"))
.await
.map_err(Error::CacheWrite)?;
fs_err::tokio::rename(extracted, &cache_path)
.await
.map_err(puffin_client::ErrorKind::CacheWrite)?;
.map_err(Error::CacheWrite)?;

Ok(temp_dir)
Ok(cache_path)
}

/// Download a source distribution from a Git repository.
Expand Down
6 changes: 3 additions & 3 deletions crates/puffin-distribution/src/unzip.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::path::Path;

use puffin_extract::{unzip_archive, Error};
use puffin_extract::Error;

use crate::download::BuiltWheel;
use crate::{DiskWheel, LocalWheel};
Expand All @@ -12,13 +12,13 @@ pub trait Unzip {

impl Unzip for DiskWheel {
fn unzip(&self, target: &Path) -> Result<(), Error> {
unzip_archive(fs_err::File::open(&self.path)?, target)
puffin_extract::unzip(fs_err::File::open(&self.path)?, target)
}
}

impl Unzip for BuiltWheel {
fn unzip(&self, target: &Path) -> Result<(), Error> {
unzip_archive(fs_err::File::open(&self.path)?, target)
puffin_extract::unzip(fs_err::File::open(&self.path)?, target)
}
}

Expand Down
4 changes: 2 additions & 2 deletions crates/puffin-extract/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ license = { workspace = true }
workspace = true

[dependencies]
async-compression = { version = "0.4.6", features = ["gzip"] }
async-tar = "0.4.2"
async-compression = { workspace = true, features = ["gzip"] }
async-tar = { workspace = true }
async_zip = { workspace = true, features = ["tokio"] }
flate2 = { workspace = true }
fs-err = { workspace = true, features = ["tokio"] }
Expand Down
19 changes: 19 additions & 0 deletions crates/puffin-extract/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use std::path::PathBuf;

use zip::result::ZipError;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
Zip(#[from] ZipError),
#[error(transparent)]
AsyncZip(#[from] async_zip::error::ZipError),
#[error(transparent)]
Io(#[from] std::io::Error),
#[error("Unsupported archive type: {0}")]
UnsupportedArchive(PathBuf),
#[error(
"The top level of the archive must only contain a list directory, but it contains: {0:?}"
)]
InvalidArchive(Vec<fs_err::DirEntry>),
}
Loading

0 comments on commit 151b92a

Please sign in to comment.