diff --git a/foyer-storage/src/io_buffer_pool.rs b/foyer-storage/src/io_buffer_pool.rs new file mode 100644 index 00000000..611fe4b1 --- /dev/null +++ b/foyer-storage/src/io_buffer_pool.rs @@ -0,0 +1,67 @@ +// Copyright 2024 Foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License.use std::marker::PhantomData; + +use std::collections::VecDeque; + +use crate::{IoBuffer, IoBytes}; + +pub enum Buffer { + IoBuffer(IoBuffer), + IoBytes(IoBytes), +} + +impl From for Buffer { + fn from(value: IoBuffer) -> Self { + Self::IoBuffer(value) + } +} + +impl From for Buffer { + fn from(value: IoBytes) -> Self { + Self::IoBytes(value) + } +} + +pub struct IoBufferPool { + capacity: usize, + buffer_size: usize, + queue: VecDeque, +} + +impl IoBufferPool { + pub fn new(buffer_size: usize, capacity: usize) -> Self { + Self { + capacity, + buffer_size, + queue: VecDeque::with_capacity(capacity), + } + } + + pub fn acquire(&mut self) -> IoBuffer { + let create = || IoBuffer::new(self.buffer_size); + let res = match self.queue.pop_front() { + Some(Buffer::IoBuffer(buffer)) => buffer, + Some(Buffer::IoBytes(bytes)) => bytes.into_io_buffer().unwrap_or_else(create), + None => create(), + }; + assert_eq!(res.len(), self.buffer_size); + res + } + + pub fn release(&mut self, buffer: impl Into) { + if self.queue.len() < self.capacity { + self.queue.push_back(buffer.into()); + } + } +} diff --git a/foyer-storage/src/large/batch.rs b/foyer-storage/src/large/batch.rs index c5a54b7b..c313cd04 100644 --- a/foyer-storage/src/large/batch.rs +++ b/foyer-storage/src/large/batch.rs @@ -32,6 +32,7 @@ use tokio::sync::oneshot; use crate::{ device::{bytes::IoBytes, MonitoredDevice, RegionId}, error::Result, + io_buffer_pool::IoBufferPool, large::indexer::HashedEntryAddress, region::{GetCleanRegionHandle, RegionManager}, Dev, DevExt, IoBuffer, @@ -85,7 +86,7 @@ where wait: WaitGroup, /// Cache write buffer between rotation to reduce page fault. - buffer_pool: Option, + buffer_pool: IoBufferPool, region_manager: RegionManager, device: MonitoredDevice, @@ -122,7 +123,7 @@ where tombstones: vec![], init: None, wait: WaitGroup::default(), - buffer_pool: Some(IoBytes::from(IoBuffer::new(capacity))), + buffer_pool: IoBufferPool::new(capacity, 1), region_manager, device, indexer, @@ -205,23 +206,19 @@ where Some(allocation) } - /// Rotate the batch. - /// - /// # Panics - /// - /// The caller must guarantee all io bytes from the last batch are dropped. + // Note: Make sure `rotate` is called after all buffer from the last batch are dropped. + // + // Otherwise, the page fault caused by the buffer pool will hurt the performance. pub fn rotate(&mut self) -> Option<(Batch, WaitGroupFuture)> { if self.is_empty() { return None; } - let buffer = self.buffer_pool.take().unwrap(); - let mut buffer = buffer.into_io_buffer().unwrap(); - + let mut buffer = self.buffer_pool.acquire(); std::mem::swap(&mut self.buffer, &mut buffer); self.len = 0; let buffer = IoBytes::from(buffer); - self.buffer_pool = Some(buffer.clone()); + self.buffer_pool.release(buffer.clone()); let wait = std::mem::take(&mut self.wait); diff --git a/foyer-storage/src/lib.rs b/foyer-storage/src/lib.rs index 6696a593..b30d05c3 100644 --- a/foyer-storage/src/lib.rs +++ b/foyer-storage/src/lib.rs @@ -21,6 +21,7 @@ mod compress; mod device; mod engine; mod error; +mod io_buffer_pool; mod large; mod picker; mod region;