diff --git a/Cargo.lock b/Cargo.lock index fd8f6bd6..158c8c74 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -908,9 +908,9 @@ dependencies = [ [[package]] name = "dragonfly-api" -version = "2.0.173" +version = "2.0.174" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd0e08874afeb80d0e9ac8c383eed6022ceed975dd3623b58a45d29856859161" +checksum = "3cd1b2aad02ce9cbc167aaa77cb61d82673feb6360d581c42ee03c113b807a6f" dependencies = [ "prost 0.13.3", "prost-types", diff --git a/Cargo.toml b/Cargo.toml index 7bb2d805..ce317436 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,8 +30,17 @@ dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.1 dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.123" } dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.123" } thiserror = "1.0" -dragonfly-api = "=2.0.173" -reqwest = { version = "0.12.4", features = ["stream", "native-tls", "default-tls", "rustls-tls", "gzip", "brotli", "zstd", "deflate"] } +dragonfly-api = "=2.0.174" +reqwest = { version = "0.12.4", features = [ + "stream", + "native-tls", + "default-tls", + "rustls-tls", + "gzip", + "brotli", + "zstd", + "deflate", +] } reqwest-middleware = "0.4" rcgen = { version = "0.12.1", features = ["x509-parser"] } hyper = { version = "1.5", features = ["full"] } diff --git a/dragonfly-client-config/src/dfdaemon.rs b/dragonfly-client-config/src/dfdaemon.rs index 08217630..1759137f 100644 --- a/dragonfly-client-config/src/dfdaemon.rs +++ b/dragonfly-client-config/src/dfdaemon.rs @@ -225,6 +225,14 @@ pub fn default_proxy_read_buffer_size() -> usize { 32 * 1024 } +/// default_prefetch_rate_limit is the default rate limit of the prefetch speed in GiB/Mib/Kib per second. +/// the prefetch request has lower priority so limit the rate to avoid occupying the bandwidth impact other download tasks. +#[inline] +fn default_prefetch_rate_limit() -> ByteSize { + // Default rate limit is 2GiB/s. + ByteSize::gib(2) +} + /// default_s3_filtered_query_params is the default filtered query params with s3 protocol to generate the task id. #[inline] fn s3_filtered_query_params() -> Vec { @@ -1089,6 +1097,10 @@ pub struct Proxy { /// prefetch pre-downloads full of the task when download with range request. pub prefetch: bool, + /// rate_limit is the rate limit of the prefetch speed in GiB/Mib/Kib per second. + #[serde(with = "bytesize_serde", default = "default_prefetch_rate_limit")] + pub prefetch_rate_limit: ByteSize, + /// cache_capacity is the capacity of the cache by LRU algorithm for HTTP proxy, default is 150. /// The cache is used to store the hot piece content of the task, piece length is 4MB~16MB. /// If the capacity is 150, the cache size is 600MB~2.4GB, need to adjust according to the @@ -1110,6 +1122,7 @@ impl Default for Proxy { registry_mirror: RegistryMirror::default(), disable_back_to_source: false, prefetch: false, + prefetch_rate_limit: default_prefetch_rate_limit(), cache_capacity: default_proxy_cache_capacity(), read_buffer_size: default_proxy_read_buffer_size(), } diff --git a/dragonfly-client/src/bin/dfget/main.rs b/dragonfly-client/src/bin/dfget/main.rs index 912b1155..529fbd2f 100644 --- a/dragonfly-client/src/bin/dfget/main.rs +++ b/dragonfly-client/src/bin/dfget/main.rs @@ -61,19 +61,19 @@ Examples: # Download a file from Amazon Simple Storage Service(S3). $ dfget s3:/// -O /tmp/file.txt --storage-access-key-id= --storage-access-key-secret= - + # Download a file from Google Cloud Storage Service(GCS). $ dfget gs:/// -O /tmp/file.txt --storage-credential-path= - + # Download a file from Azure Blob Storage Service(ABS). $ dfget abs:/// -O /tmp/file.txt --storage-access-key-id= --storage-access-key-secret= - + # Download a file from Aliyun Object Storage Service(OSS). $ dfget oss:/// -O /tmp/file.txt --storage-access-key-id= --storage-access-key-secret= --storage-endpoint= - + # Download a file from Huawei Cloud Object Storage Service(OBS). $ dfget obs:/// -O /tmp/file.txt --storage-access-key-id= --storage-access-key-secret= --storage-endpoint= - + # Download a file from Tencent Cloud Object Storage Service(COS). $ dfget cos:/// -O /tmp/file.txt --storage-access-key-id= --storage-access-key-secret= --storage-endpoint= "#; @@ -738,6 +738,7 @@ async fn download( disable_back_to_source: args.disable_back_to_source, certificate_chain: Vec::new(), prefetch: false, + is_prefetch: false, object_storage, hdfs, }), diff --git a/dragonfly-client/src/grpc/dfdaemon_download.rs b/dragonfly-client/src/grpc/dfdaemon_download.rs index a4073075..6027f751 100644 --- a/dragonfly-client/src/grpc/dfdaemon_download.rs +++ b/dragonfly-client/src/grpc/dfdaemon_download.rs @@ -210,7 +210,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { error!("missing download"); Status::invalid_argument("missing download") })?; - + error!("download.is_prefetch: {:?}", download.is_prefetch); // Generate the task id. let task_id = self .task diff --git a/dragonfly-client/src/grpc/mod.rs b/dragonfly-client/src/grpc/mod.rs index c5aeafa4..a9ebfb21 100644 --- a/dragonfly-client/src/grpc/mod.rs +++ b/dragonfly-client/src/grpc/mod.rs @@ -77,6 +77,10 @@ pub async fn prefetch_task( // Remove the prefetch flag for prevent the infinite loop. download.prefetch = false; + // Mark the is_prefetch flag as true to represents it is a prefetch request + // because the prefetch flag has been removed. + download.is_prefetch = true; + // Remove the range header for download full task. download .request_header diff --git a/dragonfly-client/src/proxy/mod.rs b/dragonfly-client/src/proxy/mod.rs index 2dcf6b8a..36c7588f 100644 --- a/dragonfly-client/src/proxy/mod.rs +++ b/dragonfly-client/src/proxy/mod.rs @@ -831,6 +831,7 @@ async fn proxy_via_dfdaemon( piece.length, download_task_started_response.range, true, + false, ) .await { @@ -1089,6 +1090,7 @@ fn make_download_task_request( prefetch: need_prefetch(config.clone(), &header), object_storage: None, hdfs: None, + is_prefetch: false, }), }) } diff --git a/dragonfly-client/src/resource/persistent_cache_task.rs b/dragonfly-client/src/resource/persistent_cache_task.rs index 6afd441c..f4260972 100644 --- a/dragonfly-client/src/resource/persistent_cache_task.rs +++ b/dragonfly-client/src/resource/persistent_cache_task.rs @@ -856,6 +856,7 @@ impl PersistentCacheTask { number, length, parent.clone(), + false, ) .await .map_err(|err| { diff --git a/dragonfly-client/src/resource/piece.rs b/dragonfly-client/src/resource/piece.rs index 4699ae7b..1e38ba74 100644 --- a/dragonfly-client/src/resource/piece.rs +++ b/dragonfly-client/src/resource/piece.rs @@ -74,6 +74,9 @@ pub struct Piece { /// upload_rate_limiter is the rate limiter of the upload speed in bps(bytes per second). upload_rate_limiter: Arc, + + /// prefetch_rate_limiter is the rate limiter of the prefetch speed in bps(bytes per second). + prefetch_rate_limiter: Arc, } /// Piece implements the piece manager. @@ -106,6 +109,13 @@ impl Piece { .interval(Duration::from_secs(1)) .build(), ), + prefetch_rate_limiter: Arc::new( + RateLimiter::builder() + .initial(config.proxy.prefetch_rate_limit.as_u64() as usize) + .refill(config.proxy.prefetch_rate_limit.as_u64() as usize) + .interval(Duration::from_secs(1)) + .build(), + ), } } @@ -343,13 +353,18 @@ impl Piece { length: u64, range: Option, disable_rate_limit: bool, + is_prefetch: bool, ) -> Result { // Span record the piece_id. Span::current().record("piece_id", piece_id); // Acquire the download rate limiter. if !disable_rate_limit { - self.download_rate_limiter.acquire(length as usize).await; + if is_prefetch { + self.prefetch_rate_limiter.acquire(length as usize).await; + } else { + self.download_rate_limiter.acquire(length as usize).await; + } } // Upload the piece content. @@ -368,6 +383,7 @@ impl Piece { } /// download_from_remote_peer downloads a single piece from a remote peer. + #[allow(clippy::too_many_arguments)] #[instrument(skip_all, fields(piece_id))] pub async fn download_from_remote_peer( &self, @@ -377,12 +393,18 @@ impl Piece { number: u32, length: u64, parent: piece_collector::CollectedParent, + is_prefetch: bool, ) -> Result { // Span record the piece_id. Span::current().record("piece_id", piece_id); - // Acquire the download rate limiter. - self.download_rate_limiter.acquire(length as usize).await; + if is_prefetch { + // Acquire the prefetch rate limiter. + self.prefetch_rate_limiter.acquire(length as usize).await; + } else { + // Acquire the download rate limiter. + self.download_rate_limiter.acquire(length as usize).await; + } // Record the start of downloading piece. let piece = self @@ -506,14 +528,20 @@ impl Piece { offset: u64, length: u64, request_header: HeaderMap, + is_prefetch: bool, object_storage: Option, hdfs: Option, ) -> Result { // Span record the piece_id. Span::current().record("piece_id", piece_id); - // Acquire the download rate limiter. - self.download_rate_limiter.acquire(length as usize).await; + if is_prefetch { + // Acquire the prefetch rate limiter. + self.prefetch_rate_limiter.acquire(length as usize).await; + } else { + // Acquire the download rate limiter. + self.download_rate_limiter.acquire(length as usize).await; + } // Record the start of downloading piece. let piece = self diff --git a/dragonfly-client/src/resource/task.rs b/dragonfly-client/src/resource/task.rs index 988617af..b688d243 100644 --- a/dragonfly-client/src/resource/task.rs +++ b/dragonfly-client/src/resource/task.rs @@ -675,6 +675,7 @@ impl Task { peer_id, response.candidate_parents.clone(), remaining_interested_pieces.clone(), + request.is_prefetch, download_progress_tx.clone(), in_stream_tx.clone(), ) @@ -916,6 +917,7 @@ impl Task { peer_id: &str, parents: Vec, interested_pieces: Vec, + is_prefetch: bool, download_progress_tx: Sender>, in_stream_tx: Sender, ) -> ClientResult> { @@ -974,6 +976,7 @@ impl Task { in_stream_tx: Sender, interrupt: Arc, finished_pieces: Arc>>, + is_prefetch: bool, ) -> ClientResult { // Limit the concurrent piece count. let _permit = semaphore.acquire().await.unwrap(); @@ -993,6 +996,7 @@ impl Task { number, length, parent.clone(), + is_prefetch, ) .await .map_err(|err| { @@ -1101,6 +1105,7 @@ impl Task { in_stream_tx.clone(), interrupt.clone(), finished_pieces.clone(), + is_prefetch, ) .in_current_span(), ); @@ -1212,6 +1217,7 @@ impl Task { offset: u64, length: u64, request_header: HeaderMap, + is_prefetch: bool, piece_manager: Arc, storage: Arc, semaphore: Arc, @@ -1235,6 +1241,7 @@ impl Task { offset, length, request_header, + is_prefetch, object_storage, hdfs, ) @@ -1315,6 +1322,7 @@ impl Task { interested_piece.offset, interested_piece.length, request_header.clone(), + request.is_prefetch, self.piece.clone(), self.storage.clone(), semaphore.clone(), @@ -1551,6 +1559,7 @@ impl Task { offset: u64, length: u64, request_header: HeaderMap, + is_prefetch: bool, piece_manager: Arc, storage: Arc, semaphore: Arc, @@ -1573,6 +1582,7 @@ impl Task { offset, length, request_header, + is_prefetch, object_storage, hdfs, ) @@ -1631,6 +1641,7 @@ impl Task { interested_piece.offset, interested_piece.length, request_header.clone(), + request.is_prefetch, self.piece.clone(), self.storage.clone(), semaphore.clone(),