Skip to content

Commit

Permalink
feat: add downloader for downloading piece (#894)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Dec 13, 2024
1 parent df39410 commit eb66100
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 65 deletions.
12 changes: 12 additions & 0 deletions dragonfly-client-config/src/dfdaemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ fn default_dfdaemon_cache_dir() -> PathBuf {
crate::default_cache_dir().join(NAME)
}

/// default_upload_protocol is the default protocol of the upload server.
#[inline]
fn default_upload_protocol() -> String {
"grpc".to_string()
}

/// default_upload_grpc_server_port is the default port of the upload grpc server.
#[inline]
fn default_upload_grpc_server_port() -> u16 {
Expand Down Expand Up @@ -450,6 +456,11 @@ impl Default for Download {
#[derive(Debug, Clone, Validate, Deserialize)]
#[serde(default, rename_all = "camelCase")]
pub struct UploadServer {
/// protocol is the protocol of the upload server. The protocol used for downloading pieces
/// between different peers, now only support grpc.
#[serde(default = "default_upload_protocol")]
pub protocol: String,

/// ip is the listen ip of the grpc server.
pub ip: Option<IpAddr>,

Expand All @@ -474,6 +485,7 @@ pub struct UploadServer {
impl Default for UploadServer {
fn default() -> Self {
UploadServer {
protocol: default_upload_protocol(),
ip: None,
port: default_upload_grpc_server_port(),
ca_cert: None,
Expand Down
4 changes: 2 additions & 2 deletions dragonfly-client/src/bin/dfdaemon/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ async fn main() -> Result<(), anyhow::Error> {
storage.clone(),
scheduler_client.clone(),
backend_factory.clone(),
);
)?;
let task = Arc::new(task);

// Initialize persistent cache task manager.
Expand All @@ -233,7 +233,7 @@ async fn main() -> Result<(), anyhow::Error> {
storage.clone(),
scheduler_client.clone(),
backend_factory.clone(),
);
)?;
let persistent_cache_task = Arc::new(persistent_cache_task);

// Initialize health server.
Expand Down
2 changes: 1 addition & 1 deletion dragonfly-client/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ async fn proxy_via_dfdaemon(
debug!("cache miss");
}
Ok(Some(content)) => {
debug!("cache hit");
info!("cache hit");

// Collect the download piece traffic metrics and the proxy request via dfdaemon and
// cache hits metrics.
Expand Down
1 change: 1 addition & 0 deletions dragonfly-client/src/resource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@
pub mod persistent_cache_task;
pub mod piece;
pub mod piece_collector;
pub mod piece_downloader;
pub mod task;
8 changes: 4 additions & 4 deletions dragonfly-client/src/resource/persistent_cache_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,22 +86,22 @@ impl PersistentCacheTask {
storage: Arc<Storage>,
scheduler_client: Arc<SchedulerClient>,
backend_factory: Arc<BackendFactory>,
) -> Self {
) -> ClientResult<Self> {
let piece = piece::Piece::new(
config.clone(),
id_generator.clone(),
storage.clone(),
backend_factory.clone(),
);
)?;
let piece = Arc::new(piece);

PersistentCacheTask {
Ok(Self {
config,
id_generator,
storage,
scheduler_client,
piece: piece.clone(),
}
})
}

/// create_persistent creates a persistent cache task from local.
Expand Down
76 changes: 22 additions & 54 deletions dragonfly-client/src/resource/piece.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@
*/

use super::*;
use crate::grpc::dfdaemon_upload::DfdaemonUploadClient;
use crate::metrics::{
collect_backend_request_failure_metrics, collect_backend_request_finished_metrics,
collect_backend_request_started_metrics, collect_download_piece_traffic_metrics,
collect_upload_piece_traffic_metrics,
};
use chrono::Utc;
use dragonfly_api::common::v2::{Hdfs, ObjectStorage, Range, TrafficType};
use dragonfly_api::dfdaemon::v2::DownloadPieceRequest;
use dragonfly_client_backend::{BackendFactory, GetRequest};
use dragonfly_client_config::dfdaemon::Config;
use dragonfly_client_core::{error::BackendError, Error, Result};
Expand Down Expand Up @@ -66,6 +64,9 @@ pub struct Piece {
/// storage is the local storage.
storage: Arc<Storage>,

/// downloader_factory is the piece downloader factory.
downloader_factory: Arc<piece_downloader::DownloaderFactory>,

/// backend_factory is the backend factory.
backend_factory: Arc<BackendFactory>,

Expand All @@ -88,11 +89,15 @@ impl Piece {
id_generator: Arc<IDGenerator>,
storage: Arc<Storage>,
backend_factory: Arc<BackendFactory>,
) -> Self {
Self {
) -> Result<Self> {
Ok(Self {
config: config.clone(),
id_generator,
storage,
downloader_factory: Arc::new(piece_downloader::DownloaderFactory::new(
config.upload.server.protocol.as_str(),
config.clone(),
)?),
backend_factory,
download_rate_limiter: Arc::new(
RateLimiter::builder()
Expand All @@ -116,7 +121,7 @@ impl Piece {
.interval(Duration::from_secs(1))
.build(),
),
}
})
}

/// id generates a new piece id.
Expand Down Expand Up @@ -429,70 +434,32 @@ impl Piece {

Error::InvalidPeer(parent.id.clone())
})?;
let dfdaemon_upload_client = DfdaemonUploadClient::new(
self.config.clone(),
format!("http://{}:{}", host.ip, host.port),
)
.await
.map_err(|err| {
error!(
"create dfdaemon upload client from {}:{} failed: {}",
host.ip, host.port, err
);
if let Some(err) = self.storage.download_piece_failed(piece_id).err() {
error!("set piece metadata failed: {}", err)
};

err
})?;

// Send the interested pieces request.
let response = dfdaemon_upload_client
let (content, offset, digest) = self
.downloader_factory
.build()
.download_piece(
DownloadPieceRequest {
host_id: host_id.to_string(),
task_id: task_id.to_string(),
piece_number: number,
},
self.config.download.piece_timeout,
format!("{}:{}", host.ip, host.port).as_str(),
number,
host_id,
task_id,
)
.await
.map_err(|err| {
.inspect_err(|err| {
error!("download piece failed: {}", err);
if let Some(err) = self.storage.download_piece_failed(piece_id).err() {
error!("set piece metadata failed: {}", err)
};

err
})?;

let piece = response.piece.ok_or_else(|| {
error!("piece is empty");
if let Some(err) = self.storage.download_piece_failed(piece_id).err() {
error!("set piece metadata failed: {}", err)
};

Error::InvalidParameter
})?;

// Get the piece content.
let content = piece.content.ok_or_else(|| {
error!("piece content is empty");
if let Some(err) = self.storage.download_piece_failed(piece_id).err() {
error!("set piece metadata failed: {}", err)
};

Error::InvalidParameter
})?;

// Record the finish of downloading piece.
match self
.storage
.download_piece_from_remote_peer_finished(
piece_id,
task_id,
piece.offset,
piece.digest.as_str(),
offset,
digest.as_str(),
parent.id.as_str(),
&mut content.as_slice(),
)
Expand Down Expand Up @@ -715,7 +682,8 @@ mod tests {
id_generator.clone(),
storage.clone(),
backend_factory.clone(),
);
)
.unwrap();

let test_cases = vec![
(1000, 1, None, 1, vec![0], 0, 1),
Expand Down
119 changes: 119 additions & 0 deletions dragonfly-client/src/resource/piece_downloader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright 2024 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use crate::grpc::dfdaemon_upload::DfdaemonUploadClient;
use dragonfly_api::dfdaemon::v2::DownloadPieceRequest;
use dragonfly_client_config::dfdaemon::Config;
use dragonfly_client_core::{Error, Result};
use std::sync::Arc;
use tracing::{error, instrument};

/// Downloader is the interface for downloading pieces, which is implemented by different
/// protocols. The downloader is used to download pieces from the other peers.
#[tonic::async_trait]
pub trait Downloader {
/// download_piece downloads a piece from the other peer by different protocols.
async fn download_piece(
&self,
addr: &str,
number: u32,
host_id: &str,
task_id: &str,
) -> Result<(Vec<u8>, u64, String)>;
}

/// DownloaderFactory is the factory for creating different downloaders by different protocols.
pub struct DownloaderFactory {
/// downloader is the downloader for downloading pieces, which is implemented by different
/// protocols.
downloader: Arc<dyn Downloader + Send + Sync>,
}

/// DownloadFactory implements the DownloadFactory trait.
impl DownloaderFactory {
/// new returns a new DownloadFactory.
#[instrument(skip_all)]
pub fn new(protocol: &str, config: Arc<Config>) -> Result<Self> {
let downloader = match protocol {
"grpc" => Arc::new(GRPCDownloader::new(config.clone())),
_ => {
error!("downloader unsupported protocol: {}", protocol);
return Err(Error::InvalidParameter);
}
};

Ok(Self { downloader })
}

/// build returns the downloader.
#[instrument(skip_all)]
pub fn build(&self) -> Arc<dyn Downloader + Send + Sync> {
self.downloader.clone()
}
}

/// GRPCDownloader is the downloader for downloading pieces by the gRPC protocol.
pub struct GRPCDownloader {
/// config is the configuration of the dfdaemon.
config: Arc<Config>,
}

/// GRPCDownloader implements the downloader with the gRPC protocol.
impl GRPCDownloader {
/// new returns a new GRPCDownloader.
#[instrument(skip_all)]
pub fn new(config: Arc<Config>) -> Self {
Self { config }
}
}

/// GRPCDownloader implements the Downloader trait.
#[tonic::async_trait]
impl Downloader for GRPCDownloader {
/// download_piece downloads a piece from the other peer by the gRPC protocol.
#[instrument(skip_all)]
async fn download_piece(
&self,
addr: &str,
number: u32,
host_id: &str,
task_id: &str,
) -> Result<(Vec<u8>, u64, String)> {
let dfdaemon_upload_client =
DfdaemonUploadClient::new(self.config.clone(), format!("http://{}", addr)).await?;

let response = dfdaemon_upload_client
.download_piece(
DownloadPieceRequest {
host_id: host_id.to_string(),
task_id: task_id.to_string(),
piece_number: number,
},
self.config.download.piece_timeout,
)
.await?;

let Some(piece) = response.piece else {
return Err(Error::InvalidParameter);
};

let Some(content) = piece.content else {
return Err(Error::InvalidParameter);
};

Ok((content, piece.offset, piece.digest))
}
}
8 changes: 4 additions & 4 deletions dragonfly-client/src/resource/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,23 +97,23 @@ impl Task {
storage: Arc<Storage>,
scheduler_client: Arc<SchedulerClient>,
backend_factory: Arc<BackendFactory>,
) -> Self {
) -> ClientResult<Self> {
let piece = piece::Piece::new(
config.clone(),
id_generator.clone(),
storage.clone(),
backend_factory.clone(),
);
)?;
let piece = Arc::new(piece);

Self {
Ok(Self {
config,
id_generator,
storage: storage.clone(),
scheduler_client: scheduler_client.clone(),
backend_factory: backend_factory.clone(),
piece: piece.clone(),
}
})
}

/// get gets the metadata of the task.
Expand Down

0 comments on commit eb66100

Please sign in to comment.