Skip to content

Commit

Permalink
feat: calculate piece length by file length (#661)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Aug 12, 2024
1 parent 175be38 commit e7f7c50
Show file tree
Hide file tree
Showing 21 changed files with 182 additions and 138 deletions.
20 changes: 10 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 9 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ members = [
]

[workspace.package]
version = "0.1.97"
version = "0.1.98"
authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git"
Expand All @@ -22,15 +22,15 @@ readme = "README.md"
edition = "2021"

[workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "0.1.97" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.97" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.97" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.97" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.97" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.97" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.97" }
dragonfly-client = { path = "dragonfly-client", version = "0.1.98" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.98" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.98" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.98" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.98" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.98" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.98" }
thiserror = "1.0"
dragonfly-api = "2.0.143"
dragonfly-api = "2.0.147"
reqwest = { version = "0.12.4", features = ["stream", "native-tls", "default-tls", "rustls-tls"] }
rcgen = { version = "0.12.1", features = ["x509-parser"] }
hyper = { version = "1.4", features = ["full"] }
Expand Down
9 changes: 5 additions & 4 deletions dragonfly-client-backend/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ impl super::Backend for HTTP {
// get gets the content of the request.
async fn get(&self, request: super::GetRequest) -> Result<super::GetResponse<super::Body>> {
info!(
"get request {} {}: {:?}",
request.piece_id, request.url, request.http_header
"get request {} {} {}: {:?}",
request.task_id, request.piece_id, request.url, request.http_header
);

// The header of the request is required.
Expand All @@ -128,8 +128,8 @@ impl super::Backend for HTTP {
.await
.map_err(|err| {
error!(
"get request failed {} {}: {}",
request.piece_id, request.url, err
"get request failed {} {} {}: {}",
request.task_id, request.piece_id, request.url, err
);
err
})?;
Expand Down Expand Up @@ -230,6 +230,7 @@ mod tests {
let http_backend = http::HTTP::new();
let mut resp = http_backend
.get(GetRequest {
task_id: "test".to_string(),
piece_id: "test".to_string(),
url: server.url("/get"),
range: None,
Expand Down
3 changes: 3 additions & 0 deletions dragonfly-client-backend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ pub struct HeadResponse {

// GetRequest is the get request for backend.
pub struct GetRequest {
// task_id is the id of the task.
pub task_id: String,

// piece_id is the id of the piece.
pub piece_id: String,

Expand Down
5 changes: 0 additions & 5 deletions dragonfly-client-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,3 @@ pub fn default_cache_dir() -> PathBuf {
#[cfg(target_os = "macos")]
return home::home_dir().unwrap().join(".dragonfly").join("cache");
}

// default_piece_length is the default piece length for task.
pub fn default_piece_length() -> u64 {
4 * 1024 * 1024
}
4 changes: 4 additions & 0 deletions dragonfly-client-core/src/error/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ pub enum DFError {
#[error("invalid content length")]
InvalidContentLength,

// InvalidPieceLength is the error when the piece length is invalid.
#[error("invalid piece length")]
InvalidPieceLength,

// InvalidParameter is the error when the parameter is invalid.
#[error("invalid parameter")]
InvalidParameter,
Expand Down
5 changes: 3 additions & 2 deletions dragonfly-client-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl Storage {
pub fn download_task_started(
&self,
id: &str,
piece_length: u64,
piece_length: Option<u64>,
content_length: Option<u64>,
response_header: Option<HeaderMap>,
) -> Result<metadata::Task> {
Expand Down Expand Up @@ -155,6 +155,7 @@ impl Storage {
ttl: Duration,
path: &Path,
piece_length: u64,
content_length: u64,
expected_digest: &str,
) -> Result<metadata::CacheTask> {
let response = self.content.write_cache_task(id, path).await?;
Expand All @@ -170,7 +171,7 @@ impl Storage {
id,
ttl,
piece_length,
response.length,
content_length,
digest.to_string().as_str(),
)
}
Expand Down
25 changes: 20 additions & 5 deletions dragonfly-client-storage/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct Task {
pub id: String,

// piece_length is the length of the piece.
pub piece_length: u64,
pub piece_length: Option<u64>,

// content_length is the length of the content.
pub content_length: Option<u64>,
Expand Down Expand Up @@ -116,6 +116,11 @@ impl Task {
false
}

// piece_length returns the piece length of the task.
pub fn piece_length(&self) -> Option<u64> {
self.piece_length
}

// content_length returns the content length of the task.
pub fn content_length(&self) -> Option<u64> {
self.content_length
Expand Down Expand Up @@ -215,6 +220,11 @@ impl CacheTask {
self.persistent
}

// piece_length returns the piece length of the cache task.
pub fn piece_length(&self) -> u64 {
self.piece_length
}

// content_length returns the content length of the cache task.
pub fn content_length(&self) -> u64 {
self.content_length
Expand Down Expand Up @@ -320,7 +330,7 @@ impl<E: StorageEngineOwned> Metadata<E> {
pub fn download_task_started(
&self,
id: &str,
piece_length: u64,
piece_length: Option<u64>,
content_length: Option<u64>,
response_header: Option<HeaderMap>,
) -> Result<Task> {
Expand All @@ -341,6 +351,11 @@ impl<E: StorageEngineOwned> Metadata<E> {
task.content_length = content_length;
}

// Protect piece length to be overwritten by None.
if piece_length.is_some() {
task.piece_length = piece_length;
}

// If the task has the response header, the response header
// will not be covered.
if task.response_header.is_empty() {
Expand Down Expand Up @@ -847,14 +862,14 @@ mod tests {

// Test download_task_started.
metadata
.download_task_started(task_id, 1024, Some(1024), None)
.download_task_started(task_id, Some(1024), Some(1024), None)
.unwrap();
let task = metadata
.get_task(task_id)
.unwrap()
.expect("task should exist after download_task_started");
assert_eq!(task.id, task_id);
assert_eq!(task.piece_length, 1024);
assert_eq!(task.piece_length, Some(1024));
assert_eq!(task.content_length, Some(1024));
assert!(task.response_header.is_empty());
assert_eq!(task.uploading_count, 0);
Expand Down Expand Up @@ -905,7 +920,7 @@ mod tests {
let task_id = "task2";

metadata
.download_task_started(task_id, 1024, None, None)
.download_task_started(task_id, Some(1024), None, None)
.unwrap();
let tasks = metadata.get_tasks().unwrap();
assert_eq!(tasks.len(), 2, "should get 2 tasks in total");
Expand Down
8 changes: 0 additions & 8 deletions dragonfly-client-util/src/id_generator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ impl IDGenerator {
digest: Option<&str>,
tag: Option<&str>,
application: Option<&str>,
piece_length: u64,
filtered_query_params: Vec<String>,
) -> Result<String> {
// Filter the query parameters.
Expand Down Expand Up @@ -105,9 +104,6 @@ impl IDGenerator {
hasher.update(application);
}

// Add the piece length to generate the task id.
hasher.update(piece_length.to_string());

// Generate the task id.
Ok(hex::encode(hasher.finalize()))
}
Expand All @@ -118,7 +114,6 @@ impl IDGenerator {
path: &PathBuf,
tag: Option<&str>,
application: Option<&str>,
piece_length: u64,
) -> Result<String> {
// Initialize the hasher.
let mut hasher = blake3::Hasher::new();
Expand All @@ -137,9 +132,6 @@ impl IDGenerator {
hasher.update(application.as_bytes());
}

// Add the piece length to generate the cache task id.
hasher.update(piece_length.to_string().as_bytes());

// Generate the cache task id.
Ok(hasher.finalize().to_hex().to_string())
}
Expand Down
11 changes: 1 addition & 10 deletions dragonfly-client/src/bin/dfcache/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
use clap::Parser;
use dragonfly_api::dfdaemon::v2::{download_cache_task_response, DownloadCacheTaskRequest};
use dragonfly_api::errordetails::v2::Backend;
use dragonfly_client_config::default_piece_length;
use dragonfly_client_core::{
error::{ErrorType, OrErr},
Error, Result,
Expand Down Expand Up @@ -52,13 +51,6 @@ pub struct ExportCommand {
)]
tag: String,

#[arg(
long = "piece-length",
default_value_t = default_piece_length(),
help = "Specify the byte length of the piece"
)]
piece_length: u64,

#[arg(
short = 'O',
long = "output",
Expand Down Expand Up @@ -139,7 +131,7 @@ impl ExportCommand {
);

eprintln!(
"{}{}{}Message:{}, can not connect {}, please check the unix socket.{}",
"{}{}{}Message:{}, can not connect {}, please check the unix socket {}",
color::Fg(color::Cyan),
style::Italic,
style::Bold,
Expand Down Expand Up @@ -381,7 +373,6 @@ impl ExportCommand {
persistent: false,
tag: Some(self.tag.clone()),
application: Some(self.application.clone()),
piece_length: self.piece_length,
output_path: absolute_path.to_string_lossy().to_string(),
timeout: Some(
prost_wkt_types::Duration::try_from(self.timeout)
Expand Down
14 changes: 2 additions & 12 deletions dragonfly-client/src/bin/dfcache/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@

use clap::Parser;
use dragonfly_api::dfdaemon::v2::UploadCacheTaskRequest;
use dragonfly_client_config::{
default_piece_length, dfcache::default_dfcache_persistent_replica_count,
};
use dragonfly_client_config::dfcache::default_dfcache_persistent_replica_count;
use dragonfly_client_core::{
error::{ErrorType, OrErr},
Error, Result,
Expand Down Expand Up @@ -60,13 +58,6 @@ pub struct ImportCommand {
)]
tag: Option<String>,

#[arg(
long = "piece-length",
default_value_t = default_piece_length(),
help = "Specify the byte length of the piece"
)]
piece_length: u64,

#[arg(
long = "ttl",
value_parser= humantime::parse_duration,
Expand Down Expand Up @@ -148,7 +139,7 @@ impl ImportCommand {
);

eprintln!(
"{}{}{}Message:{}, can not connect {}, please check the unix socket.{}",
"{}{}{}Message:{}, can not connect {}, please check the unix socket {}",
color::Fg(color::Cyan),
style::Italic,
style::Bold,
Expand Down Expand Up @@ -283,7 +274,6 @@ impl ImportCommand {
persistent_replica_count: self.persistent_replica_count,
tag: self.tag.clone(),
application: self.application.clone(),
piece_length: self.piece_length,
ttl: Some(
prost_wkt_types::Duration::try_from(self.ttl).or_err(ErrorType::ParseError)?,
),
Expand Down
Loading

0 comments on commit e7f7c50

Please sign in to comment.