Skip to content

Commit

Permalink
feat: support the rate limit for preheat request
Browse files Browse the repository at this point in the history
Signed-off-by: chlins <[email protected]>
  • Loading branch information
suhan.zcy authored and chlins committed Dec 5, 2024
1 parent d8f0931 commit 57c8b5f
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 15 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 11 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,17 @@ dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.1
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.123" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.123" }
thiserror = "1.0"
dragonfly-api = "=2.0.173"
reqwest = { version = "0.12.4", features = ["stream", "native-tls", "default-tls", "rustls-tls", "gzip", "brotli", "zstd", "deflate"] }
dragonfly-api = "=2.0.174"
reqwest = { version = "0.12.4", features = [
"stream",
"native-tls",
"default-tls",
"rustls-tls",
"gzip",
"brotli",
"zstd",
"deflate",
] }
reqwest-middleware = "0.4"
rcgen = { version = "0.12.1", features = ["x509-parser"] }
hyper = { version = "1.5", features = ["full"] }
Expand Down
13 changes: 13 additions & 0 deletions dragonfly-client-config/src/dfdaemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,14 @@ pub fn default_proxy_read_buffer_size() -> usize {
32 * 1024
}

/// default_prefetch_rate_limit is the default rate limit of the prefetch speed in GiB/Mib/Kib per second.
/// the prefetch request has lower priority so limit the rate to avoid occupying the bandwidth impact other download tasks.
#[inline]
fn default_prefetch_rate_limit() -> ByteSize {
// Default rate limit is 2GiB/s.
ByteSize::gib(2)
}

/// default_s3_filtered_query_params is the default filtered query params with s3 protocol to generate the task id.
#[inline]
fn s3_filtered_query_params() -> Vec<String> {
Expand Down Expand Up @@ -1089,6 +1097,10 @@ pub struct Proxy {
/// prefetch pre-downloads full of the task when download with range request.
pub prefetch: bool,

/// rate_limit is the rate limit of the prefetch speed in GiB/Mib/Kib per second.
#[serde(with = "bytesize_serde", default = "default_prefetch_rate_limit")]
pub prefetch_rate_limit: ByteSize,

/// cache_capacity is the capacity of the cache by LRU algorithm for HTTP proxy, default is 150.
/// The cache is used to store the hot piece content of the task, piece length is 4MB~16MB.
/// If the capacity is 150, the cache size is 600MB~2.4GB, need to adjust according to the
Expand All @@ -1110,6 +1122,7 @@ impl Default for Proxy {
registry_mirror: RegistryMirror::default(),
disable_back_to_source: false,
prefetch: false,
prefetch_rate_limit: default_prefetch_rate_limit(),
cache_capacity: default_proxy_cache_capacity(),
read_buffer_size: default_proxy_read_buffer_size(),
}
Expand Down
11 changes: 6 additions & 5 deletions dragonfly-client/src/bin/dfget/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,19 @@ Examples:
# Download a file from Amazon Simple Storage Service(S3).
$ dfget s3://<bucket>/<path> -O /tmp/file.txt --storage-access-key-id=<access_key_id> --storage-access-key-secret=<access_key_secret>
# Download a file from Google Cloud Storage Service(GCS).
$ dfget gs://<bucket>/<path> -O /tmp/file.txt --storage-credential-path=<credential_path>
# Download a file from Azure Blob Storage Service(ABS).
$ dfget abs://<container>/<path> -O /tmp/file.txt --storage-access-key-id=<account_name> --storage-access-key-secret=<account_key>
# Download a file from Aliyun Object Storage Service(OSS).
$ dfget oss://<bucket>/<path> -O /tmp/file.txt --storage-access-key-id=<access_key_id> --storage-access-key-secret=<access_key_secret> --storage-endpoint=<endpoint>
# Download a file from Huawei Cloud Object Storage Service(OBS).
$ dfget obs://<bucket>/<path> -O /tmp/file.txt --storage-access-key-id=<access_key_id> --storage-access-key-secret=<access_key_secret> --storage-endpoint=<endpoint>
# Download a file from Tencent Cloud Object Storage Service(COS).
$ dfget cos://<bucket>/<path> -O /tmp/file.txt --storage-access-key-id=<access_key_id> --storage-access-key-secret=<access_key_secret> --storage-endpoint=<endpoint>
"#;
Expand Down Expand Up @@ -738,6 +738,7 @@ async fn download(
disable_back_to_source: args.disable_back_to_source,
certificate_chain: Vec::new(),
prefetch: false,
is_prefetch: false,
object_storage,
hdfs,
}),
Expand Down
2 changes: 1 addition & 1 deletion dragonfly-client/src/grpc/dfdaemon_download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
error!("missing download");
Status::invalid_argument("missing download")
})?;

error!("download.is_prefetch: {:?}", download.is_prefetch);
// Generate the task id.
let task_id = self
.task
Expand Down
4 changes: 4 additions & 0 deletions dragonfly-client/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ pub async fn prefetch_task(
// Remove the prefetch flag for prevent the infinite loop.
download.prefetch = false;

// Mark the is_prefetch flag as true to represents it is a prefetch request
// because the prefetch flag has been removed.
download.is_prefetch = true;

// Remove the range header for download full task.
download
.request_header
Expand Down
2 changes: 2 additions & 0 deletions dragonfly-client/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,7 @@ async fn proxy_via_dfdaemon(
piece.length,
download_task_started_response.range,
true,
false,
)
.await
{
Expand Down Expand Up @@ -1089,6 +1090,7 @@ fn make_download_task_request(
prefetch: need_prefetch(config.clone(), &header),
object_storage: None,
hdfs: None,
is_prefetch: false,
}),
})
}
Expand Down
1 change: 1 addition & 0 deletions dragonfly-client/src/resource/persistent_cache_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,7 @@ impl PersistentCacheTask {
number,
length,
parent.clone(),
false,
)
.await
.map_err(|err| {
Expand Down
38 changes: 33 additions & 5 deletions dragonfly-client/src/resource/piece.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ pub struct Piece {

/// upload_rate_limiter is the rate limiter of the upload speed in bps(bytes per second).
upload_rate_limiter: Arc<RateLimiter>,

/// prefetch_rate_limiter is the rate limiter of the prefetch speed in bps(bytes per second).
prefetch_rate_limiter: Arc<RateLimiter>,
}

/// Piece implements the piece manager.
Expand Down Expand Up @@ -106,6 +109,13 @@ impl Piece {
.interval(Duration::from_secs(1))
.build(),
),
prefetch_rate_limiter: Arc::new(
RateLimiter::builder()
.initial(config.proxy.prefetch_rate_limit.as_u64() as usize)
.refill(config.proxy.prefetch_rate_limit.as_u64() as usize)
.interval(Duration::from_secs(1))
.build(),
),
}
}

Expand Down Expand Up @@ -343,13 +353,18 @@ impl Piece {
length: u64,
range: Option<Range>,
disable_rate_limit: bool,
is_prefetch: bool,
) -> Result<impl AsyncRead> {
// Span record the piece_id.
Span::current().record("piece_id", piece_id);

// Acquire the download rate limiter.
if !disable_rate_limit {
self.download_rate_limiter.acquire(length as usize).await;
if is_prefetch {
self.prefetch_rate_limiter.acquire(length as usize).await;
} else {
self.download_rate_limiter.acquire(length as usize).await;
}
}

// Upload the piece content.
Expand All @@ -368,6 +383,7 @@ impl Piece {
}

/// download_from_remote_peer downloads a single piece from a remote peer.
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all, fields(piece_id))]
pub async fn download_from_remote_peer(
&self,
Expand All @@ -377,12 +393,18 @@ impl Piece {
number: u32,
length: u64,
parent: piece_collector::CollectedParent,
is_prefetch: bool,
) -> Result<metadata::Piece> {
// Span record the piece_id.
Span::current().record("piece_id", piece_id);

// Acquire the download rate limiter.
self.download_rate_limiter.acquire(length as usize).await;
if is_prefetch {
// Acquire the prefetch rate limiter.
self.prefetch_rate_limiter.acquire(length as usize).await;
} else {
// Acquire the download rate limiter.
self.download_rate_limiter.acquire(length as usize).await;
}

// Record the start of downloading piece.
let piece = self
Expand Down Expand Up @@ -506,14 +528,20 @@ impl Piece {
offset: u64,
length: u64,
request_header: HeaderMap,
is_prefetch: bool,
object_storage: Option<ObjectStorage>,
hdfs: Option<Hdfs>,
) -> Result<metadata::Piece> {
// Span record the piece_id.
Span::current().record("piece_id", piece_id);

// Acquire the download rate limiter.
self.download_rate_limiter.acquire(length as usize).await;
if is_prefetch {
// Acquire the prefetch rate limiter.
self.prefetch_rate_limiter.acquire(length as usize).await;
} else {
// Acquire the download rate limiter.
self.download_rate_limiter.acquire(length as usize).await;
}

// Record the start of downloading piece.
let piece = self
Expand Down
11 changes: 11 additions & 0 deletions dragonfly-client/src/resource/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,7 @@ impl Task {
peer_id,
response.candidate_parents.clone(),
remaining_interested_pieces.clone(),
request.is_prefetch,
download_progress_tx.clone(),
in_stream_tx.clone(),
)
Expand Down Expand Up @@ -916,6 +917,7 @@ impl Task {
peer_id: &str,
parents: Vec<Peer>,
interested_pieces: Vec<metadata::Piece>,
is_prefetch: bool,
download_progress_tx: Sender<Result<DownloadTaskResponse, Status>>,
in_stream_tx: Sender<AnnouncePeerRequest>,
) -> ClientResult<Vec<metadata::Piece>> {
Expand Down Expand Up @@ -974,6 +976,7 @@ impl Task {
in_stream_tx: Sender<AnnouncePeerRequest>,
interrupt: Arc<AtomicBool>,
finished_pieces: Arc<Mutex<Vec<metadata::Piece>>>,
is_prefetch: bool,
) -> ClientResult<metadata::Piece> {
// Limit the concurrent piece count.
let _permit = semaphore.acquire().await.unwrap();
Expand All @@ -993,6 +996,7 @@ impl Task {
number,
length,
parent.clone(),
is_prefetch,
)
.await
.map_err(|err| {
Expand Down Expand Up @@ -1101,6 +1105,7 @@ impl Task {
in_stream_tx.clone(),
interrupt.clone(),
finished_pieces.clone(),
is_prefetch,
)
.in_current_span(),
);
Expand Down Expand Up @@ -1212,6 +1217,7 @@ impl Task {
offset: u64,
length: u64,
request_header: HeaderMap,
is_prefetch: bool,
piece_manager: Arc<piece::Piece>,
storage: Arc<Storage>,
semaphore: Arc<Semaphore>,
Expand All @@ -1235,6 +1241,7 @@ impl Task {
offset,
length,
request_header,
is_prefetch,
object_storage,
hdfs,
)
Expand Down Expand Up @@ -1315,6 +1322,7 @@ impl Task {
interested_piece.offset,
interested_piece.length,
request_header.clone(),
request.is_prefetch,
self.piece.clone(),
self.storage.clone(),
semaphore.clone(),
Expand Down Expand Up @@ -1551,6 +1559,7 @@ impl Task {
offset: u64,
length: u64,
request_header: HeaderMap,
is_prefetch: bool,
piece_manager: Arc<piece::Piece>,
storage: Arc<Storage>,
semaphore: Arc<Semaphore>,
Expand All @@ -1573,6 +1582,7 @@ impl Task {
offset,
length,
request_header,
is_prefetch,
object_storage,
hdfs,
)
Expand Down Expand Up @@ -1631,6 +1641,7 @@ impl Task {
interested_piece.offset,
interested_piece.length,
request_header.clone(),
request.is_prefetch,
self.piece.clone(),
self.storage.clone(),
semaphore.clone(),
Expand Down

0 comments on commit 57c8b5f

Please sign in to comment.