From e676cfb2a6c8fb42da71d76fa6f7f9e7004f703d Mon Sep 17 00:00:00 2001 From: Gaius Date: Fri, 19 Jul 2024 19:49:49 +0800 Subject: [PATCH] refactor: download directory from object storage (#614) Signed-off-by: Gaius --- Cargo.lock | 1 + Cargo.toml | 1 + dragonfly-client-backend/src/http.rs | 2 +- dragonfly-client-backend/src/lib.rs | 3 +- .../src/object_storage.rs | 54 ++-- dragonfly-client-core/src/error/mod.rs | 10 +- dragonfly-client-util/src/http/mod.rs | 7 + dragonfly-client/Cargo.toml | 3 +- dragonfly-client/src/bin/dfget/main.rs | 253 ++++++++++-------- 9 files changed, 185 insertions(+), 149 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4ceca169..e6325af4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1181,6 +1181,7 @@ dependencies = [ "tracing-opentelemetry", "tracing-subscriber", "url", + "uuid", "validator", "warp", ] diff --git a/Cargo.toml b/Cargo.toml index e4f65263..ff67af6b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,6 +85,7 @@ toml = "0.8.13" base16ct = { version = "0.2", features = ["alloc"] } bytesize = {version = "1.2.0", features = ["serde"]} bytesize-serde = "0.2.1" +percent-encoding = "2.3.1" [profile.release] debug = true diff --git a/dragonfly-client-backend/src/http.rs b/dragonfly-client-backend/src/http.rs index 0f711c47..555fee2a 100644 --- a/dragonfly-client-backend/src/http.rs +++ b/dragonfly-client-backend/src/http.rs @@ -106,7 +106,7 @@ impl super::Backend for HTTP { http_header: Some(header), http_status_code: Some(status_code), error_message: Some(status_code.to_string()), - entries: None, + entries: Vec::new(), }) } diff --git a/dragonfly-client-backend/src/lib.rs b/dragonfly-client-backend/src/lib.rs index 958e8423..1ffab42f 100644 --- a/dragonfly-client-backend/src/lib.rs +++ b/dragonfly-client-backend/src/lib.rs @@ -73,9 +73,8 @@ pub struct HeadResponse { // http_status_code is the status code of the response. pub http_status_code: Option, - // If path is a directory, entries will be returned. Otherwise, entries will be None. // Entries is the information of the entries in the directory. - pub entries: Option>, + pub entries: Vec, // error_message is the error message of the response. pub error_message: Option, diff --git a/dragonfly-client-backend/src/object_storage.rs b/dragonfly-client-backend/src/object_storage.rs index 785a237e..fb4126b3 100644 --- a/dragonfly-client-backend/src/object_storage.rs +++ b/dragonfly-client-backend/src/object_storage.rs @@ -444,36 +444,34 @@ impl crate::Backend for ObjectStorage { // Get the entries if url point to a directory. let entries = if parsed_url.is_dir() { - Some( - operator - .list_with(&parsed_url.key) - .recursive(true) - .metakey(Metakey::ContentLength | Metakey::Mode) - .await // Do the list op here. - .map_err(|err| { - error!( - "list request failed {} {}: {}", - request.task_id, request.url, err - ); - ClientError::BackendError(BackendError { - message: err.to_string(), - status_code: None, - header: None, - }) - })? - .into_iter() - .map(|entry| { - let metadata = entry.metadata(); - super::DirEntry { - url: parsed_url.make_url_by_entry_path(entry.path()).to_string(), - content_length: metadata.content_length() as usize, - is_dir: metadata.is_dir(), - } + operator + .list_with(&parsed_url.key) + .recursive(true) + .metakey(Metakey::ContentLength | Metakey::Mode) + .await // Do the list op here. + .map_err(|err| { + error!( + "list request failed {} {}: {}", + request.task_id, request.url, err + ); + ClientError::BackendError(BackendError { + message: err.to_string(), + status_code: None, + header: None, }) - .collect(), - ) + })? + .into_iter() + .map(|entry| { + let metadata = entry.metadata(); + super::DirEntry { + url: parsed_url.make_url_by_entry_path(entry.path()).to_string(), + content_length: metadata.content_length() as usize, + is_dir: metadata.is_dir(), + } + }) + .collect() } else { - None + Vec::new() }; // Stat the object to get the response from the ObjectStorage. diff --git a/dragonfly-client-core/src/error/mod.rs b/dragonfly-client-core/src/error/mod.rs index 456c3e05..584f4f03 100644 --- a/dragonfly-client-core/src/error/mod.rs +++ b/dragonfly-client-core/src/error/mod.rs @@ -169,9 +169,15 @@ pub enum DFError { #[error(transparent)] ExternalError(#[from] ExternalError), - #[error("max download file count {0} exceeded")] - MaxDownloadFileCountExceeded(usize), + // MaxDownloadFilesExceeded is the error for max download files exceeded. + #[error("max number of files to download exceeded: {0}")] + MaxDownloadFilesExceeded(usize), + // Unsupported is the error for unsupported. + #[error("unsupported {0}")] + Unsupported(String), + + // TokioJoinError is the error for tokio join. #[error(transparent)] TokioJoinError(tokio::task::JoinError), } diff --git a/dragonfly-client-util/src/http/mod.rs b/dragonfly-client-util/src/http/mod.rs index 741fb8eb..c01bf773 100644 --- a/dragonfly-client-util/src/http/mod.rs +++ b/dragonfly-client-util/src/http/mod.rs @@ -104,6 +104,13 @@ pub fn header_vec_to_hashmap(raw_header: Vec) -> Result, +) -> Result { + hashmap_to_reqwest_headermap(&header_vec_to_hashmap(raw_header)?) +} + // get_range gets the range from http header. pub fn get_range(header: &HeaderMap, content_length: u64) -> Result> { match header.get(reqwest::header::RANGE) { diff --git a/dragonfly-client/Cargo.toml b/dragonfly-client/Cargo.toml index 460d306a..42d9faef 100644 --- a/dragonfly-client/Cargo.toml +++ b/dragonfly-client/Cargo.toml @@ -60,6 +60,8 @@ clap.workspace = true anyhow.workspace = true blake3.workspace = true bytesize.workspace = true +uuid.workspace = true +percent-encoding.workspace = true tracing-log = "0.2" tracing-subscriber = { version = "0.3", features = ["env-filter", "time", "chrono"] } tracing-appender = "0.2.3" @@ -88,7 +90,6 @@ futures-util = "0.3.30" termion = "4.0.2" tabled = "0.15.0" path-absolutize = "3.1.1" -percent-encoding = "2.3.1" [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator = { version = "0.5.4", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms", "background_threads"] } diff --git a/dragonfly-client/src/bin/dfget/main.rs b/dragonfly-client/src/bin/dfget/main.rs index c21077bd..d05b0887 100644 --- a/dragonfly-client/src/bin/dfget/main.rs +++ b/dragonfly-client/src/bin/dfget/main.rs @@ -21,22 +21,26 @@ use dragonfly_api::errordetails::v2::Backend; use dragonfly_client::grpc::dfdaemon_download::DfdaemonDownloadClient; use dragonfly_client::grpc::health::HealthClient; use dragonfly_client::tracing::init_tracing; -use dragonfly_client_backend::{BackendFactory, HeadRequest}; +use dragonfly_client_backend::{object_storage, BackendFactory, DirEntry, HeadRequest}; use dragonfly_client_config::{self, default_piece_length, dfdaemon, dfget}; use dragonfly_client_core::error::{BackendError, ErrorType, OrErr}; use dragonfly_client_core::{Error, Result}; -use dragonfly_client_util::http::header_vec_to_hashmap; +use dragonfly_client_util::http::{header_vec_to_hashmap, header_vec_to_reqwest_headermap}; use indicatif::{MultiProgress, ProgressBar, ProgressState, ProgressStyle}; use path_absolutize::*; use percent_encoding::percent_decode_str; use std::path::{Path, PathBuf}; +use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use std::{cmp::min, fmt::Write}; use termion::{color, style}; -use tokio::sync::Semaphore; +use tokio::fs; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; +use tokio::task::JoinSet; use tracing::{error, info, warn, Level}; use url::Url; +use uuid::Uuid; const LONG_ABOUT: &str = r#" A download command line based on P2P technology in Dragonfly that can download resources of different protocols. @@ -66,8 +70,6 @@ Examples: $ dfget cos:/// -O /tmp/file.txt --storage-access-key-id= --storage-access-key-secret= --storage-endpoint= "#; -const DFGET_HEAD_REQUEST_TASK_ID: &str = "dfget"; - #[derive(Debug, Parser, Clone)] #[command( name = dfget::NAME, @@ -201,17 +203,17 @@ struct Args { #[arg( long, - default_value_t = 8, - help = "Specify the max number of file to download" + default_value_t = 10, + help = "Specify the max count of file to download when downloading a directory. If the actual file count is greater than this value, the downloading will be rejected" )] - download_max_files: usize, + max_files: usize, #[arg( long, default_value_t = 5, - help = "Specify the concurrent count of download tasks" + help = "Specify the max count of concurrent download files when downloading a directory" )] - download_concurrent_count: usize, + max_concurrent_requests: usize, #[arg( short = 'l', @@ -475,22 +477,26 @@ async fn run(mut args: Args) -> Result<()> { })?; // Get the absolute path of the output file. - let absolute_path = Path::new(&args.output).absolutize()?; - info!("download file to: {}", absolute_path.to_string_lossy()); - - args.output = absolute_path.into(); + args.output = Path::new(&args.output).absolutize()?.into(); + info!("download file to: {}", args.output.to_string_lossy()); + + // If download from object storage, the path has end with '/', then download all files in then + // directory. Otherwise, download the single file. Only object storage protocol supports + // directory download. + let scheme = args.url.scheme(); + if object_storage::Scheme::from_str(scheme).is_err() && args.url.path().ends_with('/') { + return Err(Error::Unsupported(format!("{} download directory", scheme))); + }; - // If url ends with '/', treat it as a directory and download the whole directory. if args.url.path().ends_with('/') { - download_tasks(args, dfdaemon_download_client).await - } else { - // Download single file. - let progress_bar = ProgressBar::new(0); - download_task(args, progress_bar, dfdaemon_download_client).await - } + return download_dir(args, dfdaemon_download_client).await; + }; + + download(args, ProgressBar::new(0), dfdaemon_download_client).await } -async fn download_tasks(args: Args, download_client: DfdaemonDownloadClient) -> Result<()> { +// download_dir downloads all files in the directory. +async fn download_dir(args: Args, download_client: DfdaemonDownloadClient) -> Result<()> { // Only when the `access_key_id` and `access_key_secret` are provided at the same time, // they will be passed to the `DownloadTaskRequest`. let mut object_storage = None; @@ -509,117 +515,84 @@ async fn download_tasks(args: Args, download_client: DfdaemonDownloadClient) -> }); } - // Init the backend factory to choose which backend to use for sending head request. - let backend_factory = BackendFactory::new(None)?; - - // Get the actual backend to send head request. - let backend = backend_factory.build(args.url.as_str())?; - - // Send head request. - let head_response = backend - .head(HeadRequest { - task_id: DFGET_HEAD_REQUEST_TASK_ID.into(), - url: args.url.to_string(), - http_header: None, - timeout: args.timeout, - client_certs: None, - object_storage, - }) - .await?; - - // Return error when response is failed. - if !head_response.success { - return Err(Error::BackendError(BackendError { - message: head_response.error_message.unwrap_or_default(), - status_code: Some(head_response.http_status_code.unwrap_or_default()), - header: Some(head_response.http_header.unwrap_or_default()), - })); - } - - // If target directory is empty, then just return. - let Some(entries) = head_response.entries else { - warn!("no file is found in {}", args.url); + // Get all entries in the directory. If the directory is empty, then return directly. + let entries = get_entries(args.clone(), object_storage.clone()).await?; + if entries.is_empty() { + warn!("directory {} is empty", args.url); return Ok(()); }; - // Calc the total file count and compare it with args to decide whether to execute download task. - let file_count = entries.iter().filter(|e| !e.is_dir).count(); - if file_count > args.download_max_files { - return Err(Error::MaxDownloadFileCountExceeded(file_count)); + // If the actual file count is greater than the max_files, then reject the downloading. + let count = entries.iter().filter(|entry| !entry.is_dir).count(); + if count > args.max_files { + return Err(Error::MaxDownloadFilesExceeded(count)); } - // Due to the root_dir always ends with '/', but the args.output may end with '/', so - // append '/' to output_root_dir if need. - // These two variable root_dir and output_root_dir will be used to build the actual output - // directory. - // For example, if root_dir is '/test/' and output_root_dir is '/path/to/target/', the actual output - // directory will be '/path/to/target/file-to-download', so, if output_root_dir is not suffix with - // '/', it's necessary to append a '/' to it. - let root_dir = args.url.path(); - let output_root_dir = if args.output.to_string_lossy().ends_with('/') { - args.output.to_string_lossy().to_string() - } else { - format!("{}/", args.output.to_string_lossy()) - }; - + // Initialize the multi progress bar. let multi_progress_bar = MultiProgress::new(); - // Use the semaphore to control the concurrent download task number. - // The initial value of semaphore is taken from the user input. - let concurrent_control = Arc::new(Semaphore::new(args.download_concurrent_count)); - - // To store the download task handler. - let mut handlers = Vec::with_capacity(file_count); + // Initialize the join set. + let mut join_set = JoinSet::new(); + let semaphore = Arc::new(Semaphore::new(args.max_concurrent_requests)); + // Iterate all entries in the directory. for entry in entries { - // The url in the entry is returned by head request which must be a value url and will - // not panic, so just use .expect() to get the url. - let url: Url = entry.url.parse().expect("unexpected url"); + let entry_url: Url = entry.url.parse().or_err(ErrorType::ParseError)?; - // If entry is a directory, then create it, or execute the download task. + // If entry is a directory, then create the output directory. If entry is a file, + // then download the file to the output directory. if entry.is_dir { - // The url in the entry is percentage encoded, so we should decode it to get right path. - let decoded_url_path = percent_decode_str(url.path()).decode_utf8_lossy(); - // Get the actual path. - let output_dir = decoded_url_path.replacen(root_dir, &output_root_dir, 1); - - tokio::fs::create_dir(&output_dir).await.map_err(|e| { - error!("create {} failed: {}", output_dir, e); - e + let output_dir = make_output_by_entry(args.url.clone(), &args.output, entry)?; + fs::create_dir_all(&output_dir).await.map_err(|err| { + error!("create {} failed: {}", output_dir.to_string_lossy(), err); + err })?; } else { - let mut args = args.clone(); - // The url in the entry is percentage encoded, so we should decode it to get right path. - let decoded_url_path = percent_decode_str(url.path()).decode_utf8_lossy(); - // Get the actual path. - args.output = decoded_url_path - .replacen(root_dir, &output_root_dir, 1) - .into(); - args.url = url; + let mut entry_args = args.clone(); + entry_args.output = make_output_by_entry(args.url.clone(), &args.output, entry)?; + entry_args.url = entry_url; let progress_bar = multi_progress_bar.add(ProgressBar::new(0)); - let client = download_client.clone(); - let semaphore = concurrent_control.clone(); - - handlers.push(tokio::spawn(async move { - // This is used for concurrent control. - // semaphore will live until all the download task finished, it should - // remain open when the download task executing, so we can expect it directly. - let _permit = semaphore.acquire().await.expect("semaphore closed"); - download_task(args, progress_bar, client).await - })); + async fn download_entry( + args: Args, + progress_bar: ProgressBar, + download_client: DfdaemonDownloadClient, + _permit: OwnedSemaphorePermit, + ) -> Result<()> { + download(args, progress_bar, download_client).await + } + + let permit = semaphore.clone().acquire_owned().await.unwrap(); + join_set.spawn(download_entry( + entry_args, + progress_bar, + download_client.clone(), + permit, + )); } } // Wait for all download tasks finished. - for handler in handlers { - handler.await.map_err(Error::TokioJoinError)??; + while let Some(message) = join_set + .join_next() + .await + .transpose() + .or_err(ErrorType::AsyncRuntimeError)? + { + match message { + Ok(_) => continue, + Err(err) => { + error!("download entry failed: {}", err); + return Err(err); + } + } } Ok(()) } -async fn download_task( +// download downloads the single file. +async fn download( args: Args, progress_bar: ProgressBar, download_client: DfdaemonDownloadClient, @@ -675,9 +648,8 @@ async fn download_task( err })?; - // Get actual path rather than percentage encoded path as task name. - let task_name = percent_decode_str(args.url.path()).decode_utf8_lossy(); - + // Get actual path rather than percentage encoded path as download path. + let download_path = percent_decode_str(args.url.path()).decode_utf8_lossy(); progress_bar.set_style( ProgressStyle::with_template( "{msg:.bold}\n[{elapsed_precise}] [{bar:60.green/red}] {percent:3}% ({bytes_per_sec:.red}, {eta:.cyan})", @@ -686,9 +658,9 @@ async fn download_task( .with_key("eta", |state: &ProgressState, w: &mut dyn Write| { write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap() }) - .progress_chars("━-"), + .progress_chars("=>-"), ); - progress_bar.set_message(task_name.to_string()); + progress_bar.set_message(download_path.to_string()); // Download file. let mut downloaded = 0; @@ -715,10 +687,61 @@ async fn download_task( } } - progress_bar.finish_with_message(format!("{} downloaded", task_name)); + progress_bar.finish_with_message(format!("{} downloaded", download_path)); Ok(()) } +// get_entries gets all entries in the directory. +async fn get_entries(args: Args, object_storage: Option) -> Result> { + // Initialize backend factory and build backend. + let backend_factory = BackendFactory::new(None)?; + let backend = backend_factory.build(args.url.as_str())?; + + // Send head request. + let response = backend + .head(HeadRequest { + // NOTE: Mock a task id for head request. + task_id: Uuid::new_v4().to_string(), + url: args.url.to_string(), + http_header: Some(header_vec_to_reqwest_headermap( + args.header.clone().unwrap_or_default(), + )?), + timeout: args.timeout, + client_certs: None, + object_storage, + }) + .await?; + + // Return error when response is failed. + if !response.success { + return Err(Error::BackendError(BackendError { + message: response.error_message.unwrap_or_default(), + status_code: Some(response.http_status_code.unwrap_or_default()), + header: Some(response.http_header.unwrap_or_default()), + })); + } + + Ok(response.entries) +} + +// make_output_by_entry makes the output path by the entry information. +fn make_output_by_entry(url: Url, output: &Path, entry: DirEntry) -> Result { + // Get the root directory of the download directory and the output root directory. + let root_dir = url.path().to_string(); + let mut output_root_dir = output.to_string_lossy().to_string(); + if !output_root_dir.ends_with('/') { + output_root_dir.push('/'); + }; + + // The url in the entry is percentage encoded, so we should decode it to get right path and + // replace the root directory with the output root directory. + let entry_url: Url = entry.url.parse().or_err(ErrorType::ParseError)?; + let decoded_entry_url = percent_decode_str(entry_url.path()).decode_utf8_lossy(); + Ok(decoded_entry_url + .replace(root_dir.as_str(), output_root_dir.as_str()) + .into()) +} + // get_and_check_dfdaemon_download_client gets a dfdaemon download client and checks its health. async fn get_dfdaemon_download_client(endpoint: PathBuf) -> Result { // Check dfdaemon's health.