Skip to content

Commit

Permalink
refactor: download directory from object storage (#614)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Jul 19, 2024
1 parent 2247e0f commit e676cfb
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 149 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ toml = "0.8.13"
base16ct = { version = "0.2", features = ["alloc"] }
bytesize = {version = "1.2.0", features = ["serde"]}
bytesize-serde = "0.2.1"
percent-encoding = "2.3.1"

[profile.release]
debug = true
Expand Down
2 changes: 1 addition & 1 deletion dragonfly-client-backend/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl super::Backend for HTTP {
http_header: Some(header),
http_status_code: Some(status_code),
error_message: Some(status_code.to_string()),
entries: None,
entries: Vec::new(),
})
}

Expand Down
3 changes: 1 addition & 2 deletions dragonfly-client-backend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,8 @@ pub struct HeadResponse {
// http_status_code is the status code of the response.
pub http_status_code: Option<reqwest::StatusCode>,

// If path is a directory, entries will be returned. Otherwise, entries will be None.
// Entries is the information of the entries in the directory.
pub entries: Option<Vec<DirEntry>>,
pub entries: Vec<DirEntry>,

// error_message is the error message of the response.
pub error_message: Option<String>,
Expand Down
54 changes: 26 additions & 28 deletions dragonfly-client-backend/src/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,36 +444,34 @@ impl crate::Backend for ObjectStorage {

// Get the entries if url point to a directory.
let entries = if parsed_url.is_dir() {
Some(
operator
.list_with(&parsed_url.key)
.recursive(true)
.metakey(Metakey::ContentLength | Metakey::Mode)
.await // Do the list op here.
.map_err(|err| {
error!(
"list request failed {} {}: {}",
request.task_id, request.url, err
);
ClientError::BackendError(BackendError {
message: err.to_string(),
status_code: None,
header: None,
})
})?
.into_iter()
.map(|entry| {
let metadata = entry.metadata();
super::DirEntry {
url: parsed_url.make_url_by_entry_path(entry.path()).to_string(),
content_length: metadata.content_length() as usize,
is_dir: metadata.is_dir(),
}
operator
.list_with(&parsed_url.key)
.recursive(true)
.metakey(Metakey::ContentLength | Metakey::Mode)
.await // Do the list op here.
.map_err(|err| {
error!(
"list request failed {} {}: {}",
request.task_id, request.url, err
);
ClientError::BackendError(BackendError {
message: err.to_string(),
status_code: None,
header: None,
})
.collect(),
)
})?
.into_iter()
.map(|entry| {
let metadata = entry.metadata();
super::DirEntry {
url: parsed_url.make_url_by_entry_path(entry.path()).to_string(),
content_length: metadata.content_length() as usize,
is_dir: metadata.is_dir(),
}
})
.collect()
} else {
None
Vec::new()
};

// Stat the object to get the response from the ObjectStorage.
Expand Down
10 changes: 8 additions & 2 deletions dragonfly-client-core/src/error/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,15 @@ pub enum DFError {
#[error(transparent)]
ExternalError(#[from] ExternalError),

#[error("max download file count {0} exceeded")]
MaxDownloadFileCountExceeded(usize),
// MaxDownloadFilesExceeded is the error for max download files exceeded.
#[error("max number of files to download exceeded: {0}")]
MaxDownloadFilesExceeded(usize),

// Unsupported is the error for unsupported.
#[error("unsupported {0}")]
Unsupported(String),

// TokioJoinError is the error for tokio join.
#[error(transparent)]
TokioJoinError(tokio::task::JoinError),
}
Expand Down
7 changes: 7 additions & 0 deletions dragonfly-client-util/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ pub fn header_vec_to_hashmap(raw_header: Vec<String>) -> Result<HashMap<String,
Ok(header)
}

// header_vec_to_reqwest_headermap converts a vector of header string to a reqwest headermap.
pub fn header_vec_to_reqwest_headermap(
raw_header: Vec<String>,
) -> Result<reqwest::header::HeaderMap> {
hashmap_to_reqwest_headermap(&header_vec_to_hashmap(raw_header)?)
}

// get_range gets the range from http header.
pub fn get_range(header: &HeaderMap, content_length: u64) -> Result<Option<Range>> {
match header.get(reqwest::header::RANGE) {
Expand Down
3 changes: 2 additions & 1 deletion dragonfly-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ clap.workspace = true
anyhow.workspace = true
blake3.workspace = true
bytesize.workspace = true
uuid.workspace = true
percent-encoding.workspace = true
tracing-log = "0.2"
tracing-subscriber = { version = "0.3", features = ["env-filter", "time", "chrono"] }
tracing-appender = "0.2.3"
Expand Down Expand Up @@ -88,7 +90,6 @@ futures-util = "0.3.30"
termion = "4.0.2"
tabled = "0.15.0"
path-absolutize = "3.1.1"
percent-encoding = "2.3.1"

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = { version = "0.5.4", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms", "background_threads"] }
Expand Down
Loading

0 comments on commit e676cfb

Please sign in to comment.