From 14dff304d0ff27723e2354b0e92af56fb2a2390e Mon Sep 17 00:00:00 2001 From: Gaius Date: Wed, 11 Dec 2024 22:03:52 +0800 Subject: [PATCH] feat: add downloader for downloading piece Signed-off-by: Gaius --- dragonfly-client-config/src/dfdaemon.rs | 12 ++ dragonfly-client/src/bin/dfdaemon/main.rs | 4 +- dragonfly-client/src/proxy/mod.rs | 2 +- dragonfly-client/src/resource/mod.rs | 1 + .../src/resource/persistent_cache_task.rs | 8 +- dragonfly-client/src/resource/piece.rs | 76 ++++------- .../src/resource/piece_downloader.rs | 119 ++++++++++++++++++ dragonfly-client/src/resource/task.rs | 8 +- 8 files changed, 165 insertions(+), 65 deletions(-) create mode 100644 dragonfly-client/src/resource/piece_downloader.rs diff --git a/dragonfly-client-config/src/dfdaemon.rs b/dragonfly-client-config/src/dfdaemon.rs index 9e2e79cb..019482b4 100644 --- a/dragonfly-client-config/src/dfdaemon.rs +++ b/dragonfly-client-config/src/dfdaemon.rs @@ -78,6 +78,12 @@ fn default_dfdaemon_cache_dir() -> PathBuf { crate::default_cache_dir().join(NAME) } +/// default_upload_protocol is the default protocol of the upload server. +#[inline] +fn default_upload_protocol() -> String { + "grpc".to_string() +} + /// default_upload_grpc_server_port is the default port of the upload grpc server. #[inline] fn default_upload_grpc_server_port() -> u16 { @@ -450,6 +456,11 @@ impl Default for Download { #[derive(Debug, Clone, Validate, Deserialize)] #[serde(default, rename_all = "camelCase")] pub struct UploadServer { + /// protocol is the protocol of the upload server. The protocol used for downloading pieces + /// between different peers, now only support grpc. + #[serde(default = "default_upload_protocol")] + pub protocol: String, + /// ip is the listen ip of the grpc server. pub ip: Option, @@ -474,6 +485,7 @@ pub struct UploadServer { impl Default for UploadServer { fn default() -> Self { UploadServer { + protocol: default_upload_protocol(), ip: None, port: default_upload_grpc_server_port(), ca_cert: None, diff --git a/dragonfly-client/src/bin/dfdaemon/main.rs b/dragonfly-client/src/bin/dfdaemon/main.rs index 98af8c83..b17225cf 100644 --- a/dragonfly-client/src/bin/dfdaemon/main.rs +++ b/dragonfly-client/src/bin/dfdaemon/main.rs @@ -223,7 +223,7 @@ async fn main() -> Result<(), anyhow::Error> { storage.clone(), scheduler_client.clone(), backend_factory.clone(), - ); + )?; let task = Arc::new(task); // Initialize persistent cache task manager. @@ -233,7 +233,7 @@ async fn main() -> Result<(), anyhow::Error> { storage.clone(), scheduler_client.clone(), backend_factory.clone(), - ); + )?; let persistent_cache_task = Arc::new(persistent_cache_task); // Initialize health server. diff --git a/dragonfly-client/src/proxy/mod.rs b/dragonfly-client/src/proxy/mod.rs index 36c7588f..9e932f2d 100644 --- a/dragonfly-client/src/proxy/mod.rs +++ b/dragonfly-client/src/proxy/mod.rs @@ -661,7 +661,7 @@ async fn proxy_via_dfdaemon( debug!("cache miss"); } Ok(Some(content)) => { - debug!("cache hit"); + info!("cache hit"); // Collect the download piece traffic metrics and the proxy request via dfdaemon and // cache hits metrics. diff --git a/dragonfly-client/src/resource/mod.rs b/dragonfly-client/src/resource/mod.rs index 25674c07..11c09870 100644 --- a/dragonfly-client/src/resource/mod.rs +++ b/dragonfly-client/src/resource/mod.rs @@ -17,4 +17,5 @@ pub mod persistent_cache_task; pub mod piece; pub mod piece_collector; +pub mod piece_downloader; pub mod task; diff --git a/dragonfly-client/src/resource/persistent_cache_task.rs b/dragonfly-client/src/resource/persistent_cache_task.rs index f4260972..6753cbd7 100644 --- a/dragonfly-client/src/resource/persistent_cache_task.rs +++ b/dragonfly-client/src/resource/persistent_cache_task.rs @@ -86,22 +86,22 @@ impl PersistentCacheTask { storage: Arc, scheduler_client: Arc, backend_factory: Arc, - ) -> Self { + ) -> ClientResult { let piece = piece::Piece::new( config.clone(), id_generator.clone(), storage.clone(), backend_factory.clone(), - ); + )?; let piece = Arc::new(piece); - PersistentCacheTask { + Ok(Self { config, id_generator, storage, scheduler_client, piece: piece.clone(), - } + }) } /// create_persistent creates a persistent cache task from local. diff --git a/dragonfly-client/src/resource/piece.rs b/dragonfly-client/src/resource/piece.rs index dc1c1ed2..6235041c 100644 --- a/dragonfly-client/src/resource/piece.rs +++ b/dragonfly-client/src/resource/piece.rs @@ -15,7 +15,6 @@ */ use super::*; -use crate::grpc::dfdaemon_upload::DfdaemonUploadClient; use crate::metrics::{ collect_backend_request_failure_metrics, collect_backend_request_finished_metrics, collect_backend_request_started_metrics, collect_download_piece_traffic_metrics, @@ -23,7 +22,6 @@ use crate::metrics::{ }; use chrono::Utc; use dragonfly_api::common::v2::{Hdfs, ObjectStorage, Range, TrafficType}; -use dragonfly_api::dfdaemon::v2::DownloadPieceRequest; use dragonfly_client_backend::{BackendFactory, GetRequest}; use dragonfly_client_config::dfdaemon::Config; use dragonfly_client_core::{error::BackendError, Error, Result}; @@ -66,6 +64,9 @@ pub struct Piece { /// storage is the local storage. storage: Arc, + /// downloader_factory is the piece downloader factory. + downloader_factory: Arc, + /// backend_factory is the backend factory. backend_factory: Arc, @@ -88,11 +89,15 @@ impl Piece { id_generator: Arc, storage: Arc, backend_factory: Arc, - ) -> Self { - Self { + ) -> Result { + Ok(Self { config: config.clone(), id_generator, storage, + downloader_factory: Arc::new(piece_downloader::DownloaderFactory::new( + config.upload.server.protocol.as_str(), + config.clone(), + )?), backend_factory, download_rate_limiter: Arc::new( RateLimiter::builder() @@ -116,7 +121,7 @@ impl Piece { .interval(Duration::from_secs(1)) .build(), ), - } + }) } /// id generates a new piece id. @@ -429,70 +434,32 @@ impl Piece { Error::InvalidPeer(parent.id.clone()) })?; - let dfdaemon_upload_client = DfdaemonUploadClient::new( - self.config.clone(), - format!("http://{}:{}", host.ip, host.port), - ) - .await - .map_err(|err| { - error!( - "create dfdaemon upload client from {}:{} failed: {}", - host.ip, host.port, err - ); - if let Some(err) = self.storage.download_piece_failed(piece_id).err() { - error!("set piece metadata failed: {}", err) - }; - - err - })?; - // Send the interested pieces request. - let response = dfdaemon_upload_client + let (content, offset, digest) = self + .downloader_factory + .build() .download_piece( - DownloadPieceRequest { - host_id: host_id.to_string(), - task_id: task_id.to_string(), - piece_number: number, - }, - self.config.download.piece_timeout, + format!("{}:{}", host.ip, host.port).as_str(), + number, + host_id, + task_id, ) .await - .map_err(|err| { + .inspect_err(|err| { error!("download piece failed: {}", err); if let Some(err) = self.storage.download_piece_failed(piece_id).err() { error!("set piece metadata failed: {}", err) }; - - err })?; - let piece = response.piece.ok_or_else(|| { - error!("piece is empty"); - if let Some(err) = self.storage.download_piece_failed(piece_id).err() { - error!("set piece metadata failed: {}", err) - }; - - Error::InvalidParameter - })?; - - // Get the piece content. - let content = piece.content.ok_or_else(|| { - error!("piece content is empty"); - if let Some(err) = self.storage.download_piece_failed(piece_id).err() { - error!("set piece metadata failed: {}", err) - }; - - Error::InvalidParameter - })?; - // Record the finish of downloading piece. match self .storage .download_piece_from_remote_peer_finished( piece_id, task_id, - piece.offset, - piece.digest.as_str(), + offset, + digest.as_str(), parent.id.as_str(), &mut content.as_slice(), ) @@ -715,7 +682,8 @@ mod tests { id_generator.clone(), storage.clone(), backend_factory.clone(), - ); + ) + .unwrap(); let test_cases = vec![ (1000, 1, None, 1, vec![0], 0, 1), diff --git a/dragonfly-client/src/resource/piece_downloader.rs b/dragonfly-client/src/resource/piece_downloader.rs new file mode 100644 index 00000000..04aa2f3d --- /dev/null +++ b/dragonfly-client/src/resource/piece_downloader.rs @@ -0,0 +1,119 @@ +/* + * Copyright 2024 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use crate::grpc::dfdaemon_upload::DfdaemonUploadClient; +use dragonfly_api::dfdaemon::v2::DownloadPieceRequest; +use dragonfly_client_config::dfdaemon::Config; +use dragonfly_client_core::{Error, Result}; +use std::sync::Arc; +use tracing::{error, instrument}; + +/// Downloader is the interface for downloading pieces, which is implemented by different +/// protocols. The downloader is used to download pieces from the other peers. +#[tonic::async_trait] +pub trait Downloader { + /// download_piece downloads a piece from the other peer by different protocols. + async fn download_piece( + &self, + addr: &str, + number: u32, + host_id: &str, + task_id: &str, + ) -> Result<(Vec, u64, String)>; +} + +/// DownloaderFactory is the factory for creating different downloaders by different protocols. +pub struct DownloaderFactory { + /// downloader is the downloader for downloading pieces, which is implemented by different + /// protocols. + downloader: Arc, +} + +/// DownloadFactory implements the DownloadFactory trait. +impl DownloaderFactory { + /// new returns a new DownloadFactory. + #[instrument(skip_all)] + pub fn new(protocol: &str, config: Arc) -> Result { + let downloader = match protocol { + "grpc" => Arc::new(GRPCDownloader::new(config.clone())), + _ => { + error!("downloader unsupported protocol: {}", protocol); + return Err(Error::InvalidParameter); + } + }; + + Ok(Self { downloader }) + } + + /// build returns the downloader. + #[instrument(skip_all)] + pub fn build(&self) -> Arc { + self.downloader.clone() + } +} + +/// GRPCDownloader is the downloader for downloading pieces by the gRPC protocol. +pub struct GRPCDownloader { + /// config is the configuration of the dfdaemon. + config: Arc, +} + +/// GRPCDownloader implements the downloader with the gRPC protocol. +impl GRPCDownloader { + /// new returns a new GRPCDownloader. + #[instrument(skip_all)] + pub fn new(config: Arc) -> Self { + Self { config } + } +} + +/// GRPCDownloader implements the Downloader trait. +#[tonic::async_trait] +impl Downloader for GRPCDownloader { + /// download_piece downloads a piece from the other peer by the gRPC protocol. + #[instrument(skip_all)] + async fn download_piece( + &self, + addr: &str, + number: u32, + host_id: &str, + task_id: &str, + ) -> Result<(Vec, u64, String)> { + let dfdaemon_upload_client = + DfdaemonUploadClient::new(self.config.clone(), format!("http://{}", addr)).await?; + + let response = dfdaemon_upload_client + .download_piece( + DownloadPieceRequest { + host_id: host_id.to_string(), + task_id: task_id.to_string(), + piece_number: number, + }, + self.config.download.piece_timeout, + ) + .await?; + + let Some(piece) = response.piece else { + return Err(Error::InvalidParameter); + }; + + let Some(content) = piece.content else { + return Err(Error::InvalidParameter); + }; + + Ok((content, piece.offset, piece.digest)) + } +} diff --git a/dragonfly-client/src/resource/task.rs b/dragonfly-client/src/resource/task.rs index b688d243..6137e05c 100644 --- a/dragonfly-client/src/resource/task.rs +++ b/dragonfly-client/src/resource/task.rs @@ -97,23 +97,23 @@ impl Task { storage: Arc, scheduler_client: Arc, backend_factory: Arc, - ) -> Self { + ) -> ClientResult { let piece = piece::Piece::new( config.clone(), id_generator.clone(), storage.clone(), backend_factory.clone(), - ); + )?; let piece = Arc::new(piece); - Self { + Ok(Self { config, id_generator, storage: storage.clone(), scheduler_client: scheduler_client.clone(), backend_factory: backend_factory.clone(), piece: piece.clone(), - } + }) } /// get gets the metadata of the task.