diff --git a/Cargo.lock b/Cargo.lock index e30f5cdc..8d07f9a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1105,9 +1105,9 @@ dependencies = [ [[package]] name = "dragonfly-api" -version = "2.0.143" +version = "2.0.147" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bff3ba3f7d411a6aa67e8c812f94e927ed06b782a2f44aa4dbe3df99593da5b7" +checksum = "51247ca7774a0a4b65167904050e1b4df8e6112b0b02e94164e30819683041c5" dependencies = [ "prost 0.13.1", "prost-types", @@ -1120,7 +1120,7 @@ dependencies = [ [[package]] name = "dragonfly-client" -version = "0.1.97" +version = "0.1.98" dependencies = [ "anyhow", "blake3", @@ -1190,7 +1190,7 @@ dependencies = [ [[package]] name = "dragonfly-client-backend" -version = "0.1.97" +version = "0.1.98" dependencies = [ "dragonfly-api", "dragonfly-client-core", @@ -1212,7 +1212,7 @@ dependencies = [ [[package]] name = "dragonfly-client-config" -version = "0.1.97" +version = "0.1.98" dependencies = [ "bytesize", "bytesize-serde", @@ -1233,7 +1233,7 @@ dependencies = [ [[package]] name = "dragonfly-client-core" -version = "0.1.97" +version = "0.1.98" dependencies = [ "hyper 1.4.0", "hyper-util", @@ -1248,7 +1248,7 @@ dependencies = [ [[package]] name = "dragonfly-client-init" -version = "0.1.97" +version = "0.1.98" dependencies = [ "anyhow", "clap", @@ -1264,7 +1264,7 @@ dependencies = [ [[package]] name = "dragonfly-client-storage" -version = "0.1.97" +version = "0.1.98" dependencies = [ "base16ct", "chrono", @@ -1288,7 +1288,7 @@ dependencies = [ [[package]] name = "dragonfly-client-util" -version = "0.1.97" +version = "0.1.98" dependencies = [ "base16ct", "blake3", @@ -1758,7 +1758,7 @@ dependencies = [ [[package]] name = "hdfs" -version = "0.1.97" +version = "0.1.98" dependencies = [ "dragonfly-client-backend", "dragonfly-client-core", diff --git a/Cargo.toml b/Cargo.toml index 92f07da8..fc37fde2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ members = [ ] [workspace.package] -version = "0.1.97" +version = "0.1.98" authors = ["The Dragonfly Developers"] homepage = "https://d7y.io/" repository = "https://github.com/dragonflyoss/client.git" @@ -22,15 +22,15 @@ readme = "README.md" edition = "2021" [workspace.dependencies] -dragonfly-client = { path = "dragonfly-client", version = "0.1.97" } -dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.97" } -dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.97" } -dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.97" } -dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.97" } -dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.97" } -dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.97" } +dragonfly-client = { path = "dragonfly-client", version = "0.1.98" } +dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.98" } +dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.98" } +dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.98" } +dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.98" } +dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.98" } +dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.98" } thiserror = "1.0" -dragonfly-api = "2.0.143" +dragonfly-api = "2.0.147" reqwest = { version = "0.12.4", features = ["stream", "native-tls", "default-tls", "rustls-tls"] } rcgen = { version = "0.12.1", features = ["x509-parser"] } hyper = { version = "1.4", features = ["full"] } diff --git a/dragonfly-client-backend/src/http.rs b/dragonfly-client-backend/src/http.rs index 555fee2a..80007ee7 100644 --- a/dragonfly-client-backend/src/http.rs +++ b/dragonfly-client-backend/src/http.rs @@ -113,8 +113,8 @@ impl super::Backend for HTTP { // get gets the content of the request. async fn get(&self, request: super::GetRequest) -> Result> { info!( - "get request {} {}: {:?}", - request.piece_id, request.url, request.http_header + "get request {} {} {}: {:?}", + request.task_id, request.piece_id, request.url, request.http_header ); // The header of the request is required. @@ -128,8 +128,8 @@ impl super::Backend for HTTP { .await .map_err(|err| { error!( - "get request failed {} {}: {}", - request.piece_id, request.url, err + "get request failed {} {} {}: {}", + request.task_id, request.piece_id, request.url, err ); err })?; @@ -230,6 +230,7 @@ mod tests { let http_backend = http::HTTP::new(); let mut resp = http_backend .get(GetRequest { + task_id: "test".to_string(), piece_id: "test".to_string(), url: server.url("/get"), range: None, diff --git a/dragonfly-client-backend/src/lib.rs b/dragonfly-client-backend/src/lib.rs index 1ffab42f..e2dad77a 100644 --- a/dragonfly-client-backend/src/lib.rs +++ b/dragonfly-client-backend/src/lib.rs @@ -82,6 +82,9 @@ pub struct HeadResponse { // GetRequest is the get request for backend. pub struct GetRequest { + // task_id is the id of the task. + pub task_id: String, + // piece_id is the id of the piece. pub piece_id: String, diff --git a/dragonfly-client-config/src/lib.rs b/dragonfly-client-config/src/lib.rs index c7a5fae6..b7b34efe 100644 --- a/dragonfly-client-config/src/lib.rs +++ b/dragonfly-client-config/src/lib.rs @@ -99,8 +99,3 @@ pub fn default_cache_dir() -> PathBuf { #[cfg(target_os = "macos")] return home::home_dir().unwrap().join(".dragonfly").join("cache"); } - -// default_piece_length is the default piece length for task. -pub fn default_piece_length() -> u64 { - 4 * 1024 * 1024 -} diff --git a/dragonfly-client-core/src/error/mod.rs b/dragonfly-client-core/src/error/mod.rs index 1c944dc2..30a838ec 100644 --- a/dragonfly-client-core/src/error/mod.rs +++ b/dragonfly-client-core/src/error/mod.rs @@ -118,6 +118,10 @@ pub enum DFError { #[error("invalid content length")] InvalidContentLength, + // InvalidPieceLength is the error when the piece length is invalid. + #[error("invalid piece length")] + InvalidPieceLength, + // InvalidParameter is the error when the parameter is invalid. #[error("invalid parameter")] InvalidParameter, diff --git a/dragonfly-client-storage/src/lib.rs b/dragonfly-client-storage/src/lib.rs index e61cf447..8cb93408 100644 --- a/dragonfly-client-storage/src/lib.rs +++ b/dragonfly-client-storage/src/lib.rs @@ -81,7 +81,7 @@ impl Storage { pub fn download_task_started( &self, id: &str, - piece_length: u64, + piece_length: Option, content_length: Option, response_header: Option, ) -> Result { @@ -155,6 +155,7 @@ impl Storage { ttl: Duration, path: &Path, piece_length: u64, + content_length: u64, expected_digest: &str, ) -> Result { let response = self.content.write_cache_task(id, path).await?; @@ -170,7 +171,7 @@ impl Storage { id, ttl, piece_length, - response.length, + content_length, digest.to_string().as_str(), ) } diff --git a/dragonfly-client-storage/src/metadata.rs b/dragonfly-client-storage/src/metadata.rs index bdf4b3ff..ef931b39 100644 --- a/dragonfly-client-storage/src/metadata.rs +++ b/dragonfly-client-storage/src/metadata.rs @@ -36,7 +36,7 @@ pub struct Task { pub id: String, // piece_length is the length of the piece. - pub piece_length: u64, + pub piece_length: Option, // content_length is the length of the content. pub content_length: Option, @@ -116,6 +116,11 @@ impl Task { false } + // piece_length returns the piece length of the task. + pub fn piece_length(&self) -> Option { + self.piece_length + } + // content_length returns the content length of the task. pub fn content_length(&self) -> Option { self.content_length @@ -215,6 +220,11 @@ impl CacheTask { self.persistent } + // piece_length returns the piece length of the cache task. + pub fn piece_length(&self) -> u64 { + self.piece_length + } + // content_length returns the content length of the cache task. pub fn content_length(&self) -> u64 { self.content_length @@ -320,7 +330,7 @@ impl Metadata { pub fn download_task_started( &self, id: &str, - piece_length: u64, + piece_length: Option, content_length: Option, response_header: Option, ) -> Result { @@ -341,6 +351,11 @@ impl Metadata { task.content_length = content_length; } + // Protect piece length to be overwritten by None. + if piece_length.is_some() { + task.piece_length = piece_length; + } + // If the task has the response header, the response header // will not be covered. if task.response_header.is_empty() { @@ -847,14 +862,14 @@ mod tests { // Test download_task_started. metadata - .download_task_started(task_id, 1024, Some(1024), None) + .download_task_started(task_id, Some(1024), Some(1024), None) .unwrap(); let task = metadata .get_task(task_id) .unwrap() .expect("task should exist after download_task_started"); assert_eq!(task.id, task_id); - assert_eq!(task.piece_length, 1024); + assert_eq!(task.piece_length, Some(1024)); assert_eq!(task.content_length, Some(1024)); assert!(task.response_header.is_empty()); assert_eq!(task.uploading_count, 0); @@ -905,7 +920,7 @@ mod tests { let task_id = "task2"; metadata - .download_task_started(task_id, 1024, None, None) + .download_task_started(task_id, Some(1024), None, None) .unwrap(); let tasks = metadata.get_tasks().unwrap(); assert_eq!(tasks.len(), 2, "should get 2 tasks in total"); diff --git a/dragonfly-client-util/src/id_generator/mod.rs b/dragonfly-client-util/src/id_generator/mod.rs index 4083c04a..022d9131 100644 --- a/dragonfly-client-util/src/id_generator/mod.rs +++ b/dragonfly-client-util/src/id_generator/mod.rs @@ -73,7 +73,6 @@ impl IDGenerator { digest: Option<&str>, tag: Option<&str>, application: Option<&str>, - piece_length: u64, filtered_query_params: Vec, ) -> Result { // Filter the query parameters. @@ -105,9 +104,6 @@ impl IDGenerator { hasher.update(application); } - // Add the piece length to generate the task id. - hasher.update(piece_length.to_string()); - // Generate the task id. Ok(hex::encode(hasher.finalize())) } @@ -118,7 +114,6 @@ impl IDGenerator { path: &PathBuf, tag: Option<&str>, application: Option<&str>, - piece_length: u64, ) -> Result { // Initialize the hasher. let mut hasher = blake3::Hasher::new(); @@ -137,9 +132,6 @@ impl IDGenerator { hasher.update(application.as_bytes()); } - // Add the piece length to generate the cache task id. - hasher.update(piece_length.to_string().as_bytes()); - // Generate the cache task id. Ok(hasher.finalize().to_hex().to_string()) } diff --git a/dragonfly-client/src/bin/dfcache/export.rs b/dragonfly-client/src/bin/dfcache/export.rs index c89cede9..10fbf9e9 100644 --- a/dragonfly-client/src/bin/dfcache/export.rs +++ b/dragonfly-client/src/bin/dfcache/export.rs @@ -17,7 +17,6 @@ use clap::Parser; use dragonfly_api::dfdaemon::v2::{download_cache_task_response, DownloadCacheTaskRequest}; use dragonfly_api::errordetails::v2::Backend; -use dragonfly_client_config::default_piece_length; use dragonfly_client_core::{ error::{ErrorType, OrErr}, Error, Result, @@ -52,13 +51,6 @@ pub struct ExportCommand { )] tag: String, - #[arg( - long = "piece-length", - default_value_t = default_piece_length(), - help = "Specify the byte length of the piece" - )] - piece_length: u64, - #[arg( short = 'O', long = "output", @@ -139,7 +131,7 @@ impl ExportCommand { ); eprintln!( - "{}{}{}Message:{}, can not connect {}, please check the unix socket.{}", + "{}{}{}Message:{}, can not connect {}, please check the unix socket {}", color::Fg(color::Cyan), style::Italic, style::Bold, @@ -381,7 +373,6 @@ impl ExportCommand { persistent: false, tag: Some(self.tag.clone()), application: Some(self.application.clone()), - piece_length: self.piece_length, output_path: absolute_path.to_string_lossy().to_string(), timeout: Some( prost_wkt_types::Duration::try_from(self.timeout) diff --git a/dragonfly-client/src/bin/dfcache/import.rs b/dragonfly-client/src/bin/dfcache/import.rs index 1d9f4413..5d2a75f2 100644 --- a/dragonfly-client/src/bin/dfcache/import.rs +++ b/dragonfly-client/src/bin/dfcache/import.rs @@ -16,9 +16,7 @@ use clap::Parser; use dragonfly_api::dfdaemon::v2::UploadCacheTaskRequest; -use dragonfly_client_config::{ - default_piece_length, dfcache::default_dfcache_persistent_replica_count, -}; +use dragonfly_client_config::dfcache::default_dfcache_persistent_replica_count; use dragonfly_client_core::{ error::{ErrorType, OrErr}, Error, Result, @@ -60,13 +58,6 @@ pub struct ImportCommand { )] tag: Option, - #[arg( - long = "piece-length", - default_value_t = default_piece_length(), - help = "Specify the byte length of the piece" - )] - piece_length: u64, - #[arg( long = "ttl", value_parser= humantime::parse_duration, @@ -148,7 +139,7 @@ impl ImportCommand { ); eprintln!( - "{}{}{}Message:{}, can not connect {}, please check the unix socket.{}", + "{}{}{}Message:{}, can not connect {}, please check the unix socket {}", color::Fg(color::Cyan), style::Italic, style::Bold, @@ -283,7 +274,6 @@ impl ImportCommand { persistent_replica_count: self.persistent_replica_count, tag: self.tag.clone(), application: self.application.clone(), - piece_length: self.piece_length, ttl: Some( prost_wkt_types::Duration::try_from(self.ttl).or_err(ErrorType::ParseError)?, ), diff --git a/dragonfly-client/src/bin/dfcache/remove.rs b/dragonfly-client/src/bin/dfcache/remove.rs index 7b782840..b2f38f50 100644 --- a/dragonfly-client/src/bin/dfcache/remove.rs +++ b/dragonfly-client/src/bin/dfcache/remove.rs @@ -60,7 +60,7 @@ impl RemoveCommand { ); eprintln!( - "{}{}{}Message:{}, can not connect {}, please check the unix socket.{}", + "{}{}{}Message:{}, can not connect {}, please check the unix socket {}", color::Fg(color::Cyan), style::Italic, style::Bold, diff --git a/dragonfly-client/src/bin/dfcache/stat.rs b/dragonfly-client/src/bin/dfcache/stat.rs index c6c2b0e6..6dcc1395 100644 --- a/dragonfly-client/src/bin/dfcache/stat.rs +++ b/dragonfly-client/src/bin/dfcache/stat.rs @@ -65,7 +65,7 @@ impl StatCommand { ); eprintln!( - "{}{}{}Message:{}, can not connect {}, please check the unix socket.{}", + "{}{}{}Message:{}, can not connect {}, please check the unix socket {}", color::Fg(color::Cyan), style::Italic, style::Bold, diff --git a/dragonfly-client/src/bin/dfget/main.rs b/dragonfly-client/src/bin/dfget/main.rs index 3bc16979..3c7547b2 100644 --- a/dragonfly-client/src/bin/dfget/main.rs +++ b/dragonfly-client/src/bin/dfget/main.rs @@ -22,7 +22,7 @@ use dragonfly_client::grpc::dfdaemon_download::DfdaemonDownloadClient; use dragonfly_client::grpc::health::HealthClient; use dragonfly_client::tracing::init_tracing; use dragonfly_client_backend::{object_storage, BackendFactory, DirEntry, HeadRequest}; -use dragonfly_client_config::{self, default_piece_length, dfdaemon, dfget}; +use dragonfly_client_config::{self, dfdaemon, dfget}; use dragonfly_client_core::error::{BackendError, ErrorType, OrErr}; use dragonfly_client_core::{Error, Result}; use dragonfly_client_util::http::{header_vec_to_hashmap, header_vec_to_reqwest_headermap}; @@ -105,13 +105,6 @@ struct Args { )] timeout: Duration, - #[arg( - long = "piece-length", - default_value_t = default_piece_length(), - help = "Specify the byte length of the piece" - )] - piece_length: u64, - #[arg( short = 'd', long = "digest", @@ -321,7 +314,7 @@ async fn main() -> anyhow::Result<()> { ); eprintln!( - "{}{}{}Message:{}, can not connect {}, please check the unix socket.{}", + "{}{}{}Message:{}, can not connect {}, please check the unix socket {}", color::Fg(color::Cyan), style::Italic, style::Bold, @@ -704,7 +697,7 @@ async fn download( priority: args.priority, filtered_query_params: args.filtered_query_params.unwrap_or_default(), request_header: header_vec_to_hashmap(args.header.unwrap_or_default())?, - piece_length: args.piece_length, + piece_length: None, output_path: Some(args.output.to_string_lossy().to_string()), timeout: Some( prost_wkt_types::Duration::try_from(args.timeout) diff --git a/dragonfly-client/src/grpc/dfdaemon_download.rs b/dragonfly-client/src/grpc/dfdaemon_download.rs index 78fd4ab7..ff0fb792 100644 --- a/dragonfly-client/src/grpc/dfdaemon_download.rs +++ b/dragonfly-client/src/grpc/dfdaemon_download.rs @@ -201,7 +201,6 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { download.digest.as_deref(), download.tag.as_deref(), download.application.as_deref(), - download.piece_length, download.filtered_query_params.clone(), ) .map_err(|e| { @@ -291,7 +290,11 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { error!("missing content length in the response"); return Err(Status::internal("missing content length in the response")); }; - info!("content length: {}", content_length); + info!( + "content length {}, piece length {}", + content_length, + task.piece_length().unwrap_or_default() + ); // Download's range priority is higher than the request header's range. // If download protocol is http, use the range of the request header. @@ -721,6 +724,11 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { task } }; + info!( + "content length {}, piece length {}", + task.content_length(), + task.piece_length() + ); // Clone the cache task. let task_manager = self.cache_task.clone(); @@ -834,7 +842,6 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { &path.to_path_buf(), request.tag.as_deref(), request.application.as_deref(), - request.piece_length, ) .map_err(|err| { error!("generate task id: {}", err); diff --git a/dragonfly-client/src/grpc/dfdaemon_upload.rs b/dragonfly-client/src/grpc/dfdaemon_upload.rs index ed89ab06..9ab3de5d 100644 --- a/dragonfly-client/src/grpc/dfdaemon_upload.rs +++ b/dragonfly-client/src/grpc/dfdaemon_upload.rs @@ -184,7 +184,6 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { download.digest.as_deref(), download.tag.as_deref(), download.application.as_deref(), - download.piece_length, download.filtered_query_params.clone(), ) .map_err(|e| { @@ -275,7 +274,11 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { return Err(Status::internal("missing content length in the response")); }; - info!("content length: {}", content_length); + info!( + "content length {}, piece length {}", + content_length, + task.piece_length().unwrap_or_default() + ); // Download's range priority is higher than the request header's range. // If download protocol is http, use the range of the request header. @@ -828,6 +831,11 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { task } }; + info!( + "content length {}, piece length {}", + task.content_length(), + task.piece_length() + ); // Clone the cache task. let task_manager = self.cache_task.clone(); diff --git a/dragonfly-client/src/proxy/header.rs b/dragonfly-client/src/proxy/header.rs index 84927d04..47dd6bdb 100644 --- a/dragonfly-client/src/proxy/header.rs +++ b/dragonfly-client/src/proxy/header.rs @@ -39,10 +39,6 @@ pub const DRAGONFLY_REGISTRY_HEADER: &str = "X-Dragonfly-Registry"; // Default value includes the filtered query params of s3, gcs, oss, obs, cos. pub const DRAGONFLY_FILTERED_QUERY_PARAMS_HEADER: &str = "X-Dragonfly-Filtered-Query-Params"; -// DRAGONFLY_PIECE_LENGTH_HEADER is the header key of piece length in http request, -// it specifies the piece length of the task. -pub const DRAGONFLY_PIECE_LENGTH_HEADER: &str = "X-Dragonfly-Piece-Length"; - // get_tag gets the tag from http header. pub fn get_tag(header: &HeaderMap) -> Option { match header.get(DRAGONFLY_TAG_HEADER) { @@ -122,23 +118,3 @@ pub fn get_filtered_query_params( None => default_filtered_query_params, } } - -// get_piece_length gets the piece length from http header. -pub fn get_piece_length(header: &HeaderMap) -> u64 { - match header.get(DRAGONFLY_PIECE_LENGTH_HEADER) { - Some(piece_length) => match piece_length.to_str() { - Ok(piece_length) => match piece_length.parse::() { - Ok(piece_length) => piece_length, - Err(err) => { - error!("parse piece length from header failed: {}", err); - dragonfly_client_config::default_piece_length() - } - }, - Err(err) => { - error!("get piece length from header failed: {}", err); - dragonfly_client_config::default_piece_length() - } - }, - None => dragonfly_client_config::default_piece_length(), - } -} diff --git a/dragonfly-client/src/proxy/mod.rs b/dragonfly-client/src/proxy/mod.rs index 30aa064d..0408daa6 100644 --- a/dragonfly-client/src/proxy/mod.rs +++ b/dragonfly-client/src/proxy/mod.rs @@ -859,7 +859,7 @@ fn make_download_task_request( rule.filtered_query_params.clone(), ), request_header: reqwest_headermap_to_hashmap(&reqwest_request_header), - piece_length: header::get_piece_length(&reqwest_request_header), + piece_length: None, output_path: None, timeout: None, need_back_to_source: false, diff --git a/dragonfly-client/src/resource/cache_task.rs b/dragonfly-client/src/resource/cache_task.rs index d2e7c7b6..51bd1a09 100644 --- a/dragonfly-client/src/resource/cache_task.rs +++ b/dragonfly-client/src/resource/cache_task.rs @@ -109,6 +109,24 @@ impl CacheTask { digest: &str, request: UploadCacheTaskRequest, ) -> ClientResult { + // Convert prost_wkt_types::Duration to std::time::Duration. + let ttl = Duration::try_from(request.ttl.ok_or(Error::UnexpectedResponse)?) + .or_err(ErrorType::ParseError)?; + + // Get the content length of the file. + let content_length = std::fs::metadata(path) + .map_err(|err| { + error!("get file metadata error: {}", err); + err + })? + .len(); + + // Get the piece length of the file. + let piece_length = self.piece.calculate_piece_length( + piece::PieceLengthStrategy::OptimizeByFileLength, + content_length, + ); + // Notify the scheduler that the cache task is started. self.scheduler_client .upload_cache_task_started(UploadCacheTaskStartedRequest { @@ -118,7 +136,7 @@ impl CacheTask { persistent_replica_count: request.persistent_replica_count, tag: request.tag.clone(), application: request.application.clone(), - piece_length: request.piece_length, + piece_length, ttl: request.ttl, timeout: request.timeout, }) @@ -128,14 +146,10 @@ impl CacheTask { err })?; - // Convert prost_wkt_types::Duration to std::time::Duration. - let ttl = Duration::try_from(request.ttl.ok_or(Error::UnexpectedResponse)?) - .or_err(ErrorType::ParseError)?; - // Create the persistent cache task. match self .storage - .create_persistent_cache_task(task_id, ttl, path, request.piece_length, digest) + .create_persistent_cache_task(task_id, ttl, path, piece_length, content_length, digest) .await { Ok(metadata) => { @@ -177,7 +191,7 @@ impl CacheTask { digest: digest.to_string(), tag: request.tag, application: request.application, - piece_length: request.piece_length, + piece_length: metadata.piece_length, content_length: metadata.content_length, piece_count: response.piece_count, state: response.state, @@ -231,7 +245,7 @@ impl CacheTask { task_id, ttl, request.persistent, - request.piece_length, + response.piece_length, response.content_length, ) } @@ -270,7 +284,7 @@ impl CacheTask { let interested_pieces = match self .piece - .calculate_interested(request.piece_length, task.content_length, None) + .calculate_interested(task.piece_length, task.content_length, None) { Ok(interested_pieces) => interested_pieces, Err(err) => { diff --git a/dragonfly-client/src/resource/piece.rs b/dragonfly-client/src/resource/piece.rs index 8a8fb0b9..0c216bd0 100644 --- a/dragonfly-client/src/resource/piece.rs +++ b/dragonfly-client/src/resource/piece.rs @@ -36,6 +36,24 @@ use tracing::{error, info, instrument, Span}; use super::*; +// MAX_PIECE_COUNT is the maximum piece count. If the piece count is upper +// than MAX_PIECE_COUNT, the piece length will be optimized by the file length. +// When piece length becames the MAX_PIECE_LENGTH, the piece piece count +// probably will be upper than MAX_PIECE_COUNT. +const MAX_PIECE_COUNT: u64 = 500; + +// MIN_PIECE_LENGTH is the minimum piece length. +const MIN_PIECE_LENGTH: u64 = 4 * 1024 * 1024; + +// MAX_PIECE_LENGTH is the maximum piece length. +const MAX_PIECE_LENGTH: u64 = 16 * 1024 * 1024; + +// PieceLengthStrategy sets the optimization strategy of piece length. +pub enum PieceLengthStrategy { + // OptimizeByFileLength optimizes the piece length by the file length. + OptimizeByFileLength, +} + // Piece represents a piece manager. pub struct Piece { // config is the configuration of the dfdaemon. @@ -254,6 +272,29 @@ impl Piece { pieces.into_values().collect() } + // calculate_piece_size calculates the piece size by content_length. + pub fn calculate_piece_length( + &self, + strategy: PieceLengthStrategy, + content_length: u64, + ) -> u64 { + match strategy { + PieceLengthStrategy::OptimizeByFileLength => { + let piece_length = (content_length as f64 / MAX_PIECE_COUNT as f64) as u64; + let actual_piece_length = piece_length.next_power_of_two(); + + match ( + actual_piece_length > MIN_PIECE_LENGTH, + actual_piece_length < MAX_PIECE_LENGTH, + ) { + (true, true) => actual_piece_length, + (_, false) => MAX_PIECE_LENGTH, + (false, _) => MIN_PIECE_LENGTH, + } + } + } + } + // upload_from_local_peer_into_async_read uploads a single piece from a local peer. #[instrument(skip_all, fields(piece_id))] pub async fn upload_from_local_peer_into_async_read( @@ -493,6 +534,7 @@ impl Piece { let mut response = backend .get(GetRequest { + task_id: task_id.to_string(), piece_id: self.storage.piece_id(task_id, number), url: url.to_string(), range: Some(Range { diff --git a/dragonfly-client/src/resource/task.rs b/dragonfly-client/src/resource/task.rs index 11421421..b0152194 100644 --- a/dragonfly-client/src/resource/task.rs +++ b/dragonfly-client/src/resource/task.rs @@ -114,9 +114,10 @@ impl Task { id: &str, request: Download, ) -> ClientResult { - let task = self - .storage - .download_task_started(id, request.piece_length, None, None)?; + let task = self.storage.download_task_started(id, None, None, None)?; + if task.content_length.is_some() && task.piece_length.is_some() { + return Ok(task); + } // Handle the request header. let mut request_header = @@ -125,10 +126,6 @@ impl Task { err })?; - if task.content_length.is_some() { - return Ok(task); - } - // Remove the range header to prevent the server from // returning a 206 partial content and returning // a 200 full content. @@ -156,10 +153,20 @@ impl Task { })); } + let content_length = match response.content_length { + Some(content_length) => content_length, + None => return Err(Error::InvalidContentLength), + }; + + let piece_length = self.piece.calculate_piece_length( + piece::PieceLengthStrategy::OptimizeByFileLength, + content_length, + ); + self.storage.download_task_started( id, - request.piece_length, - response.content_length, + Some(piece_length), + Some(content_length), response.http_header, ) } @@ -171,8 +178,7 @@ impl Task { // download_failed updates the metadata of the task when the task downloads failed. pub async fn download_failed(&self, id: &str) -> ClientResult<()> { - let _ = self.storage.download_task_failed(id).await?; - Ok(()) + self.storage.download_task_failed(id).await.map(|_| ()) } // prefetch_task_started updates the metadata of the task when the task prefetch started. @@ -211,18 +217,24 @@ impl Task { return Err(Error::InvalidContentLength); }; - // Calculate the interested pieces to download. - let interested_pieces = match self.piece.calculate_interested( - request.piece_length, - content_length, - request.range, - ) { - Ok(interested_pieces) => interested_pieces, - Err(err) => { - error!("calculate interested pieces error: {:?}", err); - return Err(err); - } + // Get the piece length from the task. + let Some(piece_length) = task.piece_length() else { + error!("piece length not found"); + return Err(Error::InvalidPieceLength); }; + + // Calculate the interested pieces to download. + let interested_pieces = + match self + .piece + .calculate_interested(piece_length, content_length, request.range) + { + Ok(interested_pieces) => interested_pieces, + Err(err) => { + error!("calculate interested pieces error: {:?}", err); + return Err(err); + } + }; info!( "interested pieces: {:?}", interested_pieces