Skip to content

Commit

Permalink
refactor: refine batch buffer pool by IoBufferPool (#627)
Browse files Browse the repository at this point in the history
* refactor: refine batch buffer pool by IoBufferPool

Signed-off-by: MrCroxx <[email protected]>

* refactor: remove utils dir

Signed-off-by: MrCroxx <[email protected]>

---------

Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx authored Aug 2, 2024
1 parent 524a5c5 commit 6e973a6
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 11 deletions.
67 changes: 67 additions & 0 deletions foyer-storage/src/io_buffer_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2024 Foyer Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.use std::marker::PhantomData;

use std::collections::VecDeque;

use crate::{IoBuffer, IoBytes};

pub enum Buffer {
IoBuffer(IoBuffer),
IoBytes(IoBytes),
}

impl From<IoBuffer> for Buffer {
fn from(value: IoBuffer) -> Self {
Self::IoBuffer(value)
}
}

impl From<IoBytes> for Buffer {
fn from(value: IoBytes) -> Self {
Self::IoBytes(value)
}
}

pub struct IoBufferPool {
capacity: usize,
buffer_size: usize,
queue: VecDeque<Buffer>,
}

impl IoBufferPool {
pub fn new(buffer_size: usize, capacity: usize) -> Self {
Self {
capacity,
buffer_size,
queue: VecDeque::with_capacity(capacity),
}
}

pub fn acquire(&mut self) -> IoBuffer {
let create = || IoBuffer::new(self.buffer_size);
let res = match self.queue.pop_front() {
Some(Buffer::IoBuffer(buffer)) => buffer,
Some(Buffer::IoBytes(bytes)) => bytes.into_io_buffer().unwrap_or_else(create),
None => create(),
};
assert_eq!(res.len(), self.buffer_size);
res
}

pub fn release(&mut self, buffer: impl Into<Buffer>) {
if self.queue.len() < self.capacity {
self.queue.push_back(buffer.into());
}
}
}
19 changes: 8 additions & 11 deletions foyer-storage/src/large/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use tokio::sync::oneshot;
use crate::{
device::{bytes::IoBytes, MonitoredDevice, RegionId},
error::Result,
io_buffer_pool::IoBufferPool,
large::indexer::HashedEntryAddress,
region::{GetCleanRegionHandle, RegionManager},
Dev, DevExt, IoBuffer,
Expand Down Expand Up @@ -85,7 +86,7 @@ where
wait: WaitGroup,

/// Cache write buffer between rotation to reduce page fault.
buffer_pool: Option<IoBytes>,
buffer_pool: IoBufferPool,

region_manager: RegionManager,
device: MonitoredDevice,
Expand Down Expand Up @@ -122,7 +123,7 @@ where
tombstones: vec![],
init: None,
wait: WaitGroup::default(),
buffer_pool: Some(IoBytes::from(IoBuffer::new(capacity))),
buffer_pool: IoBufferPool::new(capacity, 1),
region_manager,
device,
indexer,
Expand Down Expand Up @@ -205,23 +206,19 @@ where
Some(allocation)
}

/// Rotate the batch.
///
/// # Panics
///
/// The caller must guarantee all io bytes from the last batch are dropped.
// Note: Make sure `rotate` is called after all buffer from the last batch are dropped.
//
// Otherwise, the page fault caused by the buffer pool will hurt the performance.
pub fn rotate(&mut self) -> Option<(Batch<K, V, S>, WaitGroupFuture)> {
if self.is_empty() {
return None;
}

let buffer = self.buffer_pool.take().unwrap();
let mut buffer = buffer.into_io_buffer().unwrap();

let mut buffer = self.buffer_pool.acquire();
std::mem::swap(&mut self.buffer, &mut buffer);
self.len = 0;
let buffer = IoBytes::from(buffer);
self.buffer_pool = Some(buffer.clone());
self.buffer_pool.release(buffer.clone());

let wait = std::mem::take(&mut self.wait);

Expand Down
1 change: 1 addition & 0 deletions foyer-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod compress;
mod device;
mod engine;
mod error;
mod io_buffer_pool;
mod large;
mod picker;
mod region;
Expand Down

0 comments on commit 6e973a6

Please sign in to comment.