Skip to content

Commit

Permalink
feat: calculate piece range and store the actual piece reader
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi committed Dec 19, 2024
1 parent 64341d6 commit 0e10e9d
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 77 deletions.
71 changes: 65 additions & 6 deletions dragonfly-client-storage/src/content.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,20 +212,21 @@ impl Content {
#[instrument(skip_all)]
pub async fn read_task_by_range(&self, task_id: &str, range: Range) -> Result<impl AsyncRead> {
let task_path = self.get_task_path(task_id);
let mut from_f = File::open(task_path.as_path()).await.map_err(|err| {
let from_f = File::open(task_path.as_path()).await.map_err(|err| {
error!("open {:?} failed: {}", task_path, err);
err
})?;
let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, from_f);

from_f
f_reader
.seek(SeekFrom::Start(range.start))
.await
.map_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
err
})?;

let range_reader = from_f.take(range.length);
let range_reader = f_reader.take(range.length);
Ok(range_reader)
}

Expand All @@ -251,10 +252,11 @@ impl Content {
range: Option<Range>,
) -> Result<impl AsyncRead> {
let task_path = self.get_task_path(task_id);
let mut f = File::open(task_path.as_path()).await.map_err(|err| {
let f = File::open(task_path.as_path()).await.map_err(|err| {
error!("open {:?} failed: {}", task_path, err);
err
})?;
let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f);

// Calculate the target offset and length based on the range.
let (target_offset, target_length) = if let Some(range) = range {
Expand All @@ -266,14 +268,71 @@ impl Content {
(offset, length)
};

f.seek(SeekFrom::Start(target_offset))
f_reader
.seek(SeekFrom::Start(target_offset))
.await
.map_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
err
})?;

Ok(f_reader.take(target_length))
}

/// read_piece_with_dual_read return two readers, one is the range reader, and the other is the
/// full reader of the piece. It is used for cache the piece content to the proxy cache.
#[instrument(skip_all)]
pub async fn read_piece_with_dual_read(
&self,
task_id: &str,
offset: u64,
length: u64,
range: Option<Range>,
) -> Result<(impl AsyncRead, impl AsyncRead)> {
let task_path = self.get_task_path(task_id);

// Calculate the target offset and length based on the range.
let (target_offset, target_length) = if let Some(range) = range {
let target_offset = max(offset, range.start);
let target_length =
min(offset + length - 1, range.start + range.length - 1) - target_offset + 1;
(target_offset, target_length)
} else {
(offset, length)
};

let f = File::open(task_path.as_path()).await.map_err(|err| {
error!("open {:?} failed: {}", task_path, err);
err
})?;
let mut f_range_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f);

f_range_reader
.seek(SeekFrom::Start(target_offset))
.await
.map_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
err
})?;
let range_reader = f_range_reader.take(target_length);

// Create full reader of the piece.
let f = File::open(task_path.as_path()).await.map_err(|err| {
error!("open {:?} failed: {}", task_path, err);
err
})?;
let mut f_reader = BufReader::with_capacity(self.config.storage.read_buffer_size, f);

f_reader
.seek(SeekFrom::Start(offset))
.await
.map_err(|err| {
error!("seek {:?} failed: {}", task_path, err);
err
})?;
let reader = f_reader.take(length);

Ok(f.take(target_length))
Ok((range_reader, reader))
}

/// write_piece_with_crc32_castagnoli writes the piece to the content with crc32 castagnoli.
Expand Down
48 changes: 48 additions & 0 deletions dragonfly-client-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,54 @@ impl Storage {
}
}

/// upload_piece_with_dual_read returns the dual reader of the piece, one is the range reader, and the other is the
/// full reader of the piece. It is used for cache the piece content to the proxy cache.
#[instrument(skip_all)]
pub async fn upload_piece_with_dual_read(
&self,
piece_id: &str,
task_id: &str,
range: Option<Range>,
) -> Result<(impl AsyncRead, impl AsyncRead)> {
// Wait for the piece to be finished.
self.wait_for_piece_finished(piece_id).await?;

// Start uploading the task.
self.metadata.upload_task_started(task_id)?;

// Get the piece metadata and return the content of the piece.
match self.metadata.get_piece(piece_id) {
Ok(Some(piece)) => {
match self
.content
.read_piece_with_dual_read(task_id, piece.offset, piece.length, range)
.await
{
Ok(dual_reader) => {
// Finish uploading the task.
self.metadata.upload_task_finished(task_id)?;
Ok(dual_reader)
}
Err(err) => {
// Failed uploading the task.
self.metadata.upload_task_failed(task_id)?;
Err(err)
}
}
}
Ok(None) => {
// Failed uploading the task.
self.metadata.upload_task_failed(task_id)?;
Err(Error::PieceNotFound(piece_id.to_string()))
}
Err(err) => {
// Failed uploading the task.
self.metadata.upload_task_failed(task_id)?;
Err(err)
}
}
}

/// get_piece returns the piece metadata.
#[instrument(skip_all)]
pub fn get_piece(&self, piece_id: &str) -> Result<Option<metadata::Piece>> {
Expand Down
2 changes: 1 addition & 1 deletion dragonfly-client/src/grpc/dfdaemon_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
let mut reader = self
.task
.piece
.upload_from_local_peer_into_async_read(
.upload_from_local_into_async_read(
piece_id.as_str(),
task_id.as_str(),
piece.length,
Expand Down
77 changes: 59 additions & 18 deletions dragonfly-client/src/proxy/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
*/

use crate::resource::task::Task;
use dragonfly_api::common::v2::Range;
use dragonfly_api::dfdaemon::v2::DownloadTaskRequest;
use dragonfly_client_core::{Error, Result};
use dragonfly_client_util::http::{get_range, hashmap_to_headermap};
use lru::LruCache;
use std::cmp::{max, min};
use std::num::NonZeroUsize;
use std::sync::{Arc, Mutex};
use tracing::error;

/// Cache is the cache for storing http response by LRU algorithm.
#[derive(Clone)]
Expand Down Expand Up @@ -67,7 +70,14 @@ impl Cache {
return Ok(None);
};

let range = download.range;
let Ok(request_header) = hashmap_to_headermap(&download.request_header) else {
return Ok(None);
};

let Ok(range) = get_range(&request_header, content_length) else {
return Ok(None);
};

let interested_pieces =
self.task
.piece
Expand All @@ -84,38 +94,69 @@ impl Cache {
};

// Calculate the target offset and length based on the range.
let (target_offset, target_length) = if let Some(range) = range {
let target_offset =
max(interested_piece.offset, range.start) - interested_piece.offset;
let target_length = min(
interested_piece.offset + interested_piece.length - 1,
range.start + range.length - 1,
) - target_offset
+ 1;
(target_offset as usize, target_length as usize)
} else {
(0, interested_piece.length as usize)
};
let (target_offset, target_length) =
self.calculate_piece_range(interested_piece.offset, interested_piece.length, range);

let begin = target_offset;
let end = target_offset + target_length;
if begin >= piece_content.len() || end > piece_content.len() {
error!(
"invalid piece content, piece_id: {}, begin: {}, end: {}",
piece_id, begin, end
);

return Err(Error::InvalidParameter);
}

let piece_content = piece_content.slice(target_offset..target_offset + target_length);
let piece_content = piece_content.slice(begin..end);
content.extend_from_slice(&piece_content);
}

Ok(Some(content.freeze()))
}

/// get gets the piece content from the cache.
/// calculate_piece_range calculates the target offset and length based on the piece range and
/// request range.
fn calculate_piece_range(
&self,
piece_offset: u64,
piece_length: u64,
range: Option<Range>,
) -> (usize, usize) {
if let Some(range) = range {
let target_offset = max(piece_offset, range.start) - piece_offset;

let interested_piece_end = piece_offset + piece_length - 1;
let range_end = range.start + range.length - 1;
let target_length =
min(interested_piece_end, range_end) - target_offset - piece_offset + 1;

(target_offset as usize, target_length as usize)
} else {
(0, piece_length as usize)
}
}

/// get_piece gets the piece content from the cache.
pub fn get_piece(&self, id: &str) -> Option<bytes::Bytes> {
let mut pieces = self.pieces.lock().unwrap();
pieces.get(id).cloned()
}

/// add create the piece content into the cache, if the key already exists, no operation will
/// add_piece create the piece content into the cache, if the key already exists, no operation will
/// be performed.
pub fn add_piece(&self, id: &str, content: bytes::Bytes) {
let mut pieces = self.pieces.lock().unwrap();
if !pieces.contains(id) {
pieces.put(id.to_string(), content);
if pieces.contains(id) {
return;
}

pieces.put(id.to_string(), content);
}

/// contains_piece checks whether the piece exists in the cache.
pub fn contains_piece(&self, id: &str) -> bool {
let pieces = self.pieces.lock().unwrap();
pieces.contains(id)
}
}
Loading

0 comments on commit 0e10e9d

Please sign in to comment.