From 53e8a98cc6099981a9f9806fcc497dcc3e6b5977 Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Thu, 8 Aug 2024 10:32:24 +0000 Subject: [PATCH] stash, todo: rebase me --- foyer-storage/Cargo.toml | 1 + foyer-storage/src/device/monitor.rs | 2 +- foyer-storage/src/engine.rs | 12 - foyer-storage/src/io_buffer_pool.rs | 2 +- foyer-storage/src/large/flusher.rs | 45 ++-- foyer-storage/src/large/generic.rs | 16 +- foyer-storage/src/serde.rs | 12 +- foyer-storage/src/small/batch.rs | 297 ++++++++++++++++++++++ foyer-storage/src/small/flusher.rs | 183 ++++++++++++++ foyer-storage/src/small/generic.rs | 179 ++++++++++--- foyer-storage/src/small/mod.rs | 5 + foyer-storage/src/small/serde.rs | 109 ++++++++ foyer-storage/src/small/set.rs | 335 +++++++++++++++++++++++++ foyer-storage/src/small/set_manager.rs | 241 ++++++++++++++++++ foyer-storage/src/storage/either.rs | 5 - foyer-storage/src/storage/mod.rs | 4 - foyer-storage/src/storage/noop.rs | 4 - foyer-storage/src/storage/runtime.rs | 4 - foyer-storage/src/store.rs | 37 ++- foyer/src/hybrid/cache.rs | 3 +- 20 files changed, 1395 insertions(+), 101 deletions(-) create mode 100644 foyer-storage/src/small/batch.rs create mode 100644 foyer-storage/src/small/flusher.rs create mode 100644 foyer-storage/src/small/serde.rs create mode 100644 foyer-storage/src/small/set.rs create mode 100644 foyer-storage/src/small/set_manager.rs diff --git a/foyer-storage/Cargo.toml b/foyer-storage/Cargo.toml index e9cee6b1..87af3c4d 100644 --- a/foyer-storage/Cargo.toml +++ b/foyer-storage/Cargo.toml @@ -30,6 +30,7 @@ itertools = { workspace = true } lazy_static = "1" libc = "0.2" lz4 = "1.24" +ordered_hash_map = "0.4" parking_lot = { version = "0.12", features = ["arc_lock"] } pin-project = "1" rand = "0.8" diff --git a/foyer-storage/src/device/monitor.rs b/foyer-storage/src/device/monitor.rs index 9f45e9a2..88bb913e 100644 --- a/foyer-storage/src/device/monitor.rs +++ b/foyer-storage/src/device/monitor.rs @@ -10,7 +10,7 @@ // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and -// limitations under the License.use std::marker::PhantomData; +// limitations under the License. use std::{ fmt::Debug, diff --git a/foyer-storage/src/engine.rs b/foyer-storage/src/engine.rs index 26f4ef09..3ad0a502 100644 --- a/foyer-storage/src/engine.rs +++ b/foyer-storage/src/engine.rs @@ -382,18 +382,6 @@ where } } - async fn wait(&self) -> Result<()> { - match self { - Engine::Noop(storage) => storage.wait().await, - Engine::Large(storage) => storage.wait().await, - Engine::LargeRuntime(storage) => storage.wait().await, - Engine::Small(storage) => storage.wait().await, - Engine::SmallRuntime(storage) => storage.wait().await, - Engine::Combined(storage) => storage.wait().await, - Engine::CombinedRuntime(storage) => storage.wait().await, - } - } - fn runtime(&self) -> &tokio::runtime::Handle { match self { Engine::Noop(storage) => storage.runtime(), diff --git a/foyer-storage/src/io_buffer_pool.rs b/foyer-storage/src/io_buffer_pool.rs index 611fe4b1..4a136fdf 100644 --- a/foyer-storage/src/io_buffer_pool.rs +++ b/foyer-storage/src/io_buffer_pool.rs @@ -10,7 +10,7 @@ // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and -// limitations under the License.use std::marker::PhantomData; +// limitations under the License. use std::collections::VecDeque; diff --git a/foyer-storage/src/large/flusher.rs b/foyer-storage/src/large/flusher.rs index 2ae175a7..fafae467 100644 --- a/foyer-storage/src/large/flusher.rs +++ b/foyer-storage/src/large/flusher.rs @@ -30,11 +30,15 @@ use futures::future::{try_join, try_join_all}; use parking_lot::Mutex; use std::{ fmt::Debug, - sync::{atomic::Ordering, Arc}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, }; use tokio::{ runtime::Handle, - sync::{oneshot, Notify, Semaphore}, + sync::{oneshot, Notify}, + task::JoinHandle, }; use super::{ @@ -82,10 +86,11 @@ where notify: Arc, - flight: Arc, - compression: Compression, metrics: Arc, + + handle: Arc>>>, + stop: Arc, } impl Clone for Flusher @@ -98,9 +103,10 @@ where Self { batch: self.batch.clone(), notify: self.notify.clone(), - flight: self.flight.clone(), compression: self.compression, metrics: self.metrics.clone(), + handle: self.handle.clone(), + stop: self.stop.clone(), } } } @@ -133,7 +139,7 @@ where indexer.clone(), ))); - let flight = Arc::new(Semaphore::new(1)); + let stop = Arc::::default(); let runner = Runner { batch: batch.clone(), @@ -144,20 +150,23 @@ where flush: config.flush, stats, metrics: metrics.clone(), + stop: stop.clone(), }; - runtime.spawn(async move { + let handle = runtime.spawn(async move { if let Err(e) = runner.run().await { - tracing::error!("[flusher]: flusher exit with error: {e}"); + tracing::error!("[lodc flusher]: flusher exit with error: {e}"); } }); + let handle = Arc::new(Mutex::new(Some(handle))); Ok(Self { batch, notify, - flight, compression: config.compression, metrics, + handle, + stop, }) } @@ -176,9 +185,13 @@ where self.notify.notify_one(); } - pub async fn wait(&self) -> Result<()> { - // TODO(MrCroxx): Consider a better implementation? - let _permit = self.flight.acquire().await; + pub async fn close(&self) -> Result<()> { + self.stop.store(true, Ordering::SeqCst); + self.notify.notify_one(); + let handle = self.handle.lock().take(); + if let Some(handle) = handle { + handle.await.unwrap(); + } Ok(()) } @@ -195,7 +208,7 @@ where value_len: info.value_len as _, hash: entry.hash(), sequence, - checksum: Checksummer::checksum(&buffer), + checksum: Checksummer::checksum64(&buffer), compression: self.compression, }; @@ -247,6 +260,8 @@ where stats: Arc, metrics: Arc, + + stop: Arc, } impl Runner @@ -256,12 +271,14 @@ where S: HashBuilder + Debug, { pub async fn run(self) -> Result<()> { - // TODO(MrCroxx): Graceful shutdown. loop { let rotation = self.batch.lock().rotate(); let (batch, wait) = match rotation { Some(rotation) => rotation, None => { + if self.stop.load(Ordering::SeqCst) { + return Ok(()); + } self.notify.notified().await; continue; } diff --git a/foyer-storage/src/large/generic.rs b/foyer-storage/src/large/generic.rs index 80d593a8..21ce089b 100644 --- a/foyer-storage/src/large/generic.rs +++ b/foyer-storage/src/large/generic.rs @@ -10,7 +10,7 @@ // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and -// limitations under the License.use std::marker::PhantomData; +// limitations under the License. use std::{ borrow::Borrow, @@ -287,15 +287,11 @@ where }) } - async fn wait(&self) -> Result<()> { - try_join_all(self.inner.flushers.iter().map(|flusher| flusher.wait())).await?; - join_all(self.inner.reclaimers.iter().map(|reclaimer| reclaimer.wait())).await; - Ok(()) - } - async fn close(&self) -> Result<()> { self.inner.active.store(false, Ordering::Relaxed); - self.wait().await + try_join_all(self.inner.flushers.iter().map(|flusher| flusher.close())).await?; + join_all(self.inner.reclaimers.iter().map(|reclaimer| reclaimer.wait())).await; + Ok(()) } #[fastrace::trace(name = "foyer::storage::large::generic::enqueue")] @@ -528,10 +524,6 @@ where self.inner.device.stat().clone() } - async fn wait(&self) -> Result<()> { - self.wait().await - } - fn runtime(&self) -> &Handle { self.runtime() } diff --git a/foyer-storage/src/serde.rs b/foyer-storage/src/serde.rs index 663de97d..4d3ee797 100644 --- a/foyer-storage/src/serde.rs +++ b/foyer-storage/src/serde.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::{fmt::Debug, hash::Hasher}; -use twox_hash::XxHash64; +use twox_hash::{XxHash32, XxHash64}; use foyer_common::code::{StorageKey, StorageValue}; @@ -27,11 +27,17 @@ use crate::{ pub struct Checksummer; impl Checksummer { - pub fn checksum(buf: &[u8]) -> u64 { + pub fn checksum64(buf: &[u8]) -> u64 { let mut hasher = XxHash64::with_seed(0); hasher.write(buf); hasher.finish() } + + pub fn checksum32(buf: &[u8]) -> u32 { + let mut hasher = XxHash32::with_seed(0); + hasher.write(buf); + hasher.finish() as u32 + } } #[derive(Debug)] @@ -113,7 +119,7 @@ impl EntryDeserializer { // calculate checksum if needed if let Some(expected) = checksum { - let get = Checksummer::checksum(&buffer[..value_len + ken_len]); + let get = Checksummer::checksum64(&buffer[..value_len + ken_len]); if expected != get { return Err(Error::ChecksumMismatch { expected, get }); } diff --git a/foyer-storage/src/small/batch.rs b/foyer-storage/src/small/batch.rs new file mode 100644 index 00000000..b9736e76 --- /dev/null +++ b/foyer-storage/src/small/batch.rs @@ -0,0 +1,297 @@ +// Copyright 2024 Foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + collections::{hash_map::HashMap, HashSet}, + fmt::Debug, +}; + +use foyer_common::code::{HashBuilder, StorageKey, StorageValue}; +use foyer_memory::CacheEntry; +use tokio::sync::oneshot; + +use crate::{error::Result, serde::KvInfo, small::serde::EntryHeader, IoBytes, IoBytesMut}; + +use super::set::SetId; + +struct Insertion +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + entry: CacheEntry, + buffer: IoBytes, + info: KvInfo, + tx: oneshot::Sender>, +} + +struct Deletion { + hash: u64, + tx: oneshot::Sender>, +} + +struct Entry +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + insertion: Insertion, + prev_hash: Option, + next_hash: Option, +} + +struct SetBatchMut +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + entries: HashMap>, + deletions: HashMap, + + head_hash: Option, + tail_hash: Option, + + len: usize, + capacity: usize, +} + +impl Debug for SetBatchMut +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SetMut") + .field("count", &self.entries.len()) + .field("len", &self.len) + .finish() + } +} + +impl SetBatchMut +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + fn new(capacity: usize) -> Self { + Self { + entries: HashMap::new(), + deletions: HashMap::new(), + head_hash: None, + tail_hash: None, + len: 0, + capacity, + } + } + + fn insert(&mut self, insertion: Insertion) { + if let Some(deletion) = self.deletions.remove(&insertion.entry.hash()) { + let _ = deletion.tx.send(Ok(false)); + } + + let mut entry = Entry { + insertion, + prev_hash: None, + next_hash: None, + }; + + self.list_push(&mut entry); + self.len += EntryHeader::ENTRY_HEADER_SIZE + entry.insertion.buffer.len(); + if let Some(mut old) = self.entries.insert(entry.insertion.entry.hash(), entry) { + self.list_unlink(&mut old); + self.len -= EntryHeader::ENTRY_HEADER_SIZE + old.insertion.buffer.len(); + let _ = old.insertion.tx.send(Ok(false)); + } + + while self.len > self.capacity { + let entry = self.pop().unwrap(); + self.len -= EntryHeader::ENTRY_HEADER_SIZE + entry.insertion.buffer.len(); + let _ = entry.insertion.tx.send(Ok(false)); + } + } + + fn delete(&mut self, deletion: Deletion) { + if let Some(mut entry) = self.entries.remove(&deletion.hash) { + self.list_unlink(&mut entry); + self.len -= EntryHeader::ENTRY_HEADER_SIZE + entry.insertion.buffer.len(); + let _ = entry.insertion.tx.send(Ok(false)); + } + + if let Some(deletion) = self.deletions.insert(deletion.hash, deletion) { + let _ = deletion.tx.send(Ok(false)); + } + } + + fn freeze(mut self) -> SetBatch { + let mut buf = IoBytesMut::with_capacity(self.len); + let mut entries = Vec::with_capacity(self.entries.len()); + let mut txs = Vec::with_capacity(self.entries.len()); + let mut deletions = HashSet::with_capacity(self.entries.len() + self.deletions.len()); + + while let Some(entry) = self.pop() { + let header = EntryHeader::new( + entry.insertion.entry.hash(), + entry.insertion.info.key_len, + entry.insertion.info.value_len, + ); + header.write(&mut buf); + deletions.insert(entry.insertion.entry.hash()); + buf.copy_from_slice(&entry.insertion.buffer); + entries.push(entry.insertion.entry); + txs.push(entry.insertion.tx); + } + + for deletion in self.deletions.into_values() { + deletions.insert(deletion.hash); + txs.push(deletion.tx); + } + + assert_eq!(buf.len(), self.len); + + SetBatch { + deletions, + bytes: buf.freeze(), + entries, + txs, + } + } + + fn list_unlink(&mut self, entry: &mut Entry) { + if let Some(prev_hash) = entry.prev_hash { + self.entries.get_mut(&prev_hash).unwrap().next_hash = entry.next_hash; + } else { + assert_eq!(self.head_hash, Some(entry.insertion.entry.hash())); + self.head_hash = entry.next_hash; + } + if let Some(next_hash) = entry.next_hash { + self.entries.get_mut(&next_hash).unwrap().prev_hash = entry.prev_hash; + } else { + assert_eq!(self.tail_hash, Some(entry.insertion.entry.hash())); + self.tail_hash = entry.prev_hash; + } + entry.prev_hash = None; + entry.next_hash = None; + } + + fn list_push(&mut self, entry: &mut Entry) { + assert!(entry.prev_hash.is_none()); + assert!(entry.next_hash.is_none()); + + if let Some(tail_hash) = self.tail_hash { + let tail = self.entries.get_mut(&tail_hash).unwrap(); + + tail.next_hash = Some(entry.insertion.entry.hash()); + entry.prev_hash = Some(tail_hash); + + self.tail_hash = Some(entry.insertion.entry.hash()); + } else { + assert!(self.head_hash.is_none()); + + self.head_hash = Some(entry.insertion.entry.hash()); + self.tail_hash = Some(entry.insertion.entry.hash()); + } + } + + fn pop(&mut self) -> Option> { + let head_hash = self.head_hash?; + let mut entry = self.entries.remove(&head_hash).unwrap(); + self.list_unlink(&mut entry); + Some(entry) + } +} + +#[derive(Debug)] +pub struct BatchMut +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + sets: HashMap>, + /// Total set count. + total: SetId, + /// Set data capacity. + set_capacity: usize, +} + +impl BatchMut +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + pub fn new(total: SetId, set_data_size: usize) -> Self { + Self { + sets: HashMap::new(), + total, + set_capacity: set_data_size, + } + } + + pub fn append( + &mut self, + entry: CacheEntry, + buffer: IoBytes, + info: KvInfo, + tx: oneshot::Sender>, + ) { + let sid = entry.hash() % self.total; + let set = self.sets.entry(sid).or_insert(SetBatchMut::new(self.set_capacity)); + let insertion = Insertion { + entry, + buffer, + info, + tx, + }; + set.insert(insertion); + } + + pub fn delete(&mut self, hash: u64, tx: oneshot::Sender>) { + let sid = hash % self.total; + let set = self.sets.entry(sid).or_insert(SetBatchMut::new(self.set_capacity)); + set.delete(Deletion { hash, tx }) + } + + pub fn rotate(&mut self) -> Batch { + let sets = std::mem::take(&mut self.sets); + let sets = sets.into_iter().map(|(id, set)| (id, set.freeze())).collect(); + Batch { sets } + } +} + +pub struct Batch +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + pub sets: HashMap>, +} + +pub struct SetBatch +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + pub deletions: HashSet, + pub bytes: IoBytes, + pub entries: Vec>, + pub txs: Vec>>, +} diff --git a/foyer-storage/src/small/flusher.rs b/foyer-storage/src/small/flusher.rs new file mode 100644 index 00000000..2c679ed4 --- /dev/null +++ b/foyer-storage/src/small/flusher.rs @@ -0,0 +1,183 @@ +// Copyright 2024 Foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + fmt::Debug, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; + +use foyer_common::code::{HashBuilder, StorageKey, StorageValue}; +use foyer_memory::CacheEntry; +use futures::future::try_join_all; +use parking_lot::Mutex; +use tokio::{ + runtime::Handle, + sync::{oneshot, Notify}, + task::JoinHandle, +}; + +use crate::{ + error::{Error, Result}, + serde::KvInfo, + IoBytes, +}; + +use super::{ + batch::{Batch, BatchMut, SetBatch}, + set_manager::SetManager, +}; + +pub struct Flusher +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + batch: Arc>>, + + notify: Arc, + + handle: Arc>>>, + stop: Arc, +} + +impl Flusher +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + pub fn new(set_manager: SetManager, runtime: &Handle) -> Self { + let batch = Arc::new(Mutex::new(BatchMut::new( + set_manager.sets() as _, + set_manager.set_data_size(), + ))); + let notify = Arc::::default(); + + let stop = Arc::::default(); + + let runner = Runner { + batch: batch.clone(), + notify: notify.clone(), + set_manager, + stop: stop.clone(), + }; + + let handle = runtime.spawn(async move { + if let Err(e) = runner.run().await { + tracing::error!("[sodc flusher]: flusher exit with error: {e}"); + } + }); + let handle = Arc::new(Mutex::new(Some(handle))); + + Self { + batch, + notify, + handle, + stop, + } + } + + pub fn entry(&self, entry: CacheEntry, buffer: IoBytes, info: KvInfo, tx: oneshot::Sender>) { + self.batch.lock().append(entry, buffer, info, tx); + self.notify.notify_one(); + } + + pub fn deletion(&self, hash: u64, tx: oneshot::Sender>) { + self.batch.lock().delete(hash, tx); + self.notify.notify_one(); + } + + pub async fn close(&self) -> Result<()> { + self.stop.store(true, Ordering::SeqCst); + self.notify.notify_one(); + let handle = self.handle.lock().take(); + if let Some(handle) = handle { + handle.await.unwrap(); + } + Ok(()) + } +} + +struct Runner +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + batch: Arc>>, + + notify: Arc, + + set_manager: SetManager, + + stop: Arc, +} + +impl Runner +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + pub async fn run(self) -> Result<()> { + loop { + let batch = self.batch.lock().rotate(); + if batch.sets.is_empty() { + if self.stop.load(Ordering::SeqCst) { + return Ok(()); + } + self.notify.notified().await; + continue; + } + self.commit(batch).await + } + } + + pub async fn commit(&self, batch: Batch) { + let futures = batch.sets.into_iter().map( + |( + sid, + SetBatch { + deletions, + bytes, + entries, + txs, + }, + )| { + let set_manager = self.set_manager.clone(); + async move { + let mut set = set_manager.write(sid).await?; + set.apply(&deletions, &bytes[..]); + set_manager.apply(set).await?; + + // TODO(MrCroxx): Move notification to another task. + for tx in txs { + let _ = tx.send(Ok(true)); + } + + drop(entries); + + Ok::<_, Error>(()) + } + }, + ); + if let Err(e) = try_join_all(futures).await { + tracing::error!("[sodc flusher]: error raised when committing batch, error: {e}"); + } + } +} diff --git a/foyer-storage/src/small/generic.rs b/foyer-storage/src/small/generic.rs index 3dddddc6..234e3d70 100644 --- a/foyer-storage/src/small/generic.rs +++ b/foyer-storage/src/small/generic.rs @@ -10,29 +10,46 @@ // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and -// limitations under the License.use std::marker::PhantomData; +// limitations under the License. use foyer_common::code::{HashBuilder, StorageKey, StorageValue}; -use foyer_memory::CacheEntry; -use futures::Future; -use tokio::sync::oneshot; +use foyer_memory::{Cache, CacheEntry}; +use futures::{future::try_join_all, Future, FutureExt}; +use itertools::Itertools; -use std::{borrow::Borrow, fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc}; +use tokio::{runtime::Handle, sync::oneshot}; + +use std::{borrow::Borrow, fmt::Debug, hash::Hash, ops::Range, sync::Arc}; use crate::{ + device::{MonitoredDevice, RegionId}, error::Result, serde::KvInfo, storage::{Storage, WaitHandle}, DeviceStats, IoBytes, }; +use super::{flusher::Flusher, set::SetId, set_manager::SetManager}; + pub struct GenericSmallStorageConfig where K: StorageKey, V: StorageValue, S: HashBuilder + Debug, { - pub placeholder: PhantomData<(K, V, S)>, + pub memory: Cache, + /// Set size in bytes. + pub set_size: usize, + /// Set cache capacity by set count. + pub set_cache_capacity: usize, + /// Device for small object disk cache. + pub device: MonitoredDevice, + /// Regions of the device to use. + pub regions: Range, + /// Whether to flush after writes. + pub flush: bool, + /// Flusher count. + pub flushers: usize, } impl Debug for GenericSmallStorageConfig @@ -46,13 +63,29 @@ where } } +struct GenericSmallStorageInner +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + memory: Cache, + + flushers: Vec>, + + device: MonitoredDevice, + set_manager: SetManager, + + runtime: Handle, +} + pub struct GenericSmallStorage where K: StorageKey, V: StorageValue, S: HashBuilder + Debug, { - _marker: PhantomData<(K, V, S)>, + inner: Arc>, } impl Debug for GenericSmallStorage @@ -73,7 +106,94 @@ where S: HashBuilder + Debug, { fn clone(&self) -> Self { - Self { _marker: PhantomData } + Self { + inner: self.inner.clone(), + } + } +} + +impl GenericSmallStorage +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + async fn open(config: GenericSmallStorageConfig) -> Result { + let runtime = Handle::current(); + + let set_manager = SetManager::new( + config.set_size, + config.set_cache_capacity, + config.device.clone(), + config.regions, + config.flush, + ); + + let flushers = (0..config.flushers) + .map(|_| Flusher::new(set_manager.clone(), &runtime)) + .collect_vec(); + + let inner = GenericSmallStorageInner { + memory: config.memory, + flushers, + device: config.device, + set_manager, + runtime, + }; + let inner = Arc::new(inner); + + Ok(Self { inner }) + } + + async fn close(&self) -> Result<()> { + try_join_all(self.inner.flushers.iter().map(|flusher| flusher.close())).await?; + Ok(()) + } + + fn enqueue(&self, entry: CacheEntry, buffer: IoBytes, info: KvInfo, tx: oneshot::Sender>) { + // Entries with the same hash must be grouped in the batch. + let id = entry.hash() as usize % self.inner.flushers.len(); + self.inner.flushers[id].entry(entry, buffer, info, tx); + } + + fn load(&self, key: &Q) -> impl Future>> + Send + 'static + where + K: Borrow, + Q: Hash + Eq + ?Sized + Send + Sync + 'static, + { + let hash = self.inner.memory.hash_builder().hash_one(key); + let set_manager = self.inner.set_manager.clone(); + let sid = hash % set_manager.sets() as SetId; + async move { + let set = set_manager.read(sid).await?; + let kv = set.get(hash)?; + Ok(kv) + } + } + + fn delete(&self, key: &Q) -> WaitHandle> + Send + 'static> + where + K: Borrow, + Q: Hash + Eq + ?Sized, + { + // Entries with the same hash must be grouped in the batch. + let hash = self.inner.memory.hash_builder().hash_one(key); + let id = hash as usize % self.inner.flushers.len(); + + let (tx, rx) = oneshot::channel(); + let handle = WaitHandle::new(rx.map(|recv| recv.unwrap())); + + self.inner.flushers[id].deletion(hash, tx); + + handle + } + + fn stats(&self) -> Arc { + self.inner.device.stat().clone() + } + + fn runtime(&self) -> &Handle { + &self.inner.runtime } } @@ -88,47 +208,48 @@ where type BuildHasher = S; type Config = GenericSmallStorageConfig; - async fn open(_config: Self::Config) -> Result { - todo!() + async fn open(config: Self::Config) -> Result { + Self::open(config).await } async fn close(&self) -> Result<()> { - todo!() + self.close().await?; + Ok(()) } fn enqueue( &self, - _entry: CacheEntry, - _buffer: IoBytes, - _info: KvInfo, - _tx: oneshot::Sender>, + entry: CacheEntry, + buffer: IoBytes, + info: KvInfo, + tx: oneshot::Sender>, ) { - todo!() + self.enqueue(entry, buffer, info, tx) } // FIXME: REMOVE THE CLIPPY IGNORE. // TODO(MrCroxx): use `expect` after `lint_reasons` is stable. #[allow(clippy::manual_async_fn)] - fn load(&self, _key: &Q) -> impl Future>> + Send + 'static + fn load(&self, key: &Q) -> impl Future>> + Send + 'static where - Self::Key: std::borrow::Borrow, - Q: std::hash::Hash + Eq + ?Sized + Send + Sync + 'static, + Self::Key: Borrow, + Q: Hash + Eq + ?Sized + Send + Sync + 'static, { - async { todo!() } + self.load(key) } - fn delete(&self, _key: &Q) -> WaitHandle> + Send + 'static> + fn delete(&self, key: &Q) -> WaitHandle> + Send + 'static> where Self::Key: Borrow, Q: Hash + Eq + ?Sized, { - WaitHandle::new(async move { todo!() }) + self.delete(key) } fn may_contains(&self, _key: &Q) -> bool where - Self::Key: std::borrow::Borrow, - Q: std::hash::Hash + Eq + ?Sized, + Self::Key: Borrow, + Q: Hash + Eq + ?Sized, { todo!() } @@ -138,14 +259,10 @@ where } fn stats(&self) -> Arc { - todo!() + self.stats() } - async fn wait(&self) -> Result<()> { - todo!() - } - - fn runtime(&self) -> &tokio::runtime::Handle { - todo!() + fn runtime(&self) -> &Handle { + self.runtime() } } diff --git a/foyer-storage/src/small/mod.rs b/foyer-storage/src/small/mod.rs index 01e617c2..7aff6b27 100644 --- a/foyer-storage/src/small/mod.rs +++ b/foyer-storage/src/small/mod.rs @@ -12,4 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod batch; +pub mod flusher; pub mod generic; +pub mod serde; +pub mod set; +pub mod set_manager; diff --git a/foyer-storage/src/small/serde.rs b/foyer-storage/src/small/serde.rs new file mode 100644 index 00000000..9ba9d513 --- /dev/null +++ b/foyer-storage/src/small/serde.rs @@ -0,0 +1,109 @@ +// Copyright 2024 Foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::u8; + +use bytes::{Buf, BufMut}; +use foyer_common::strict_assert; + +/// max key/value len: `64 KiB - 1` +/// +/// # Format +/// +/// ```plain +/// | hash 64b | +/// | key len low 8b | value len low 8b | key len high 4b | value len high 4b | +/// ``` +#[derive(Debug, PartialEq, Eq)] +pub struct EntryHeader { + hash: u64, + key_len: u16, + value_len: u16, +} + +impl EntryHeader { + pub const ENTRY_HEADER_SIZE: usize = (12 + 12 + 64) / 8; + + pub fn new(hash: u64, key_len: usize, value_len: usize) -> Self { + strict_assert!(key_len < (1 << 12)); + strict_assert!(value_len < (1 << 12)); + Self { + hash, + key_len: key_len as _, + value_len: value_len as _, + } + } + + #[inline] + pub fn hash(&self) -> u64 { + self.hash + } + + #[inline] + pub fn key_len(&self) -> usize { + self.key_len as _ + } + + #[inline] + pub fn value_len(&self) -> usize { + self.value_len as _ + } + + #[inline] + pub fn entry_len(&self) -> usize { + Self::ENTRY_HEADER_SIZE + self.key_len() + self.value_len() + } + + pub fn write(&self, mut buf: impl BufMut) { + buf.put_u64(self.hash); + buf.put_u8(self.key_len as u8); + buf.put_u8(self.value_len as u8); + let v = ((self.key_len >> 4) as u8 & 0b_1111_0000) | (self.value_len >> 8) as u8; + buf.put_u8(v); + } + + pub fn read(mut buf: impl Buf) -> Self { + let hash = buf.get_u64(); + let mut key_len = buf.get_u8() as u16; + let mut value_len = buf.get_u8() as u16; + let v = buf.get_u8() as u16; + key_len |= (v & 0b_1111_0000) << 8; + value_len |= (v & 0b_0000_1111) << 8; + Self { + hash, + key_len, + value_len, + } + } +} + +#[cfg(test)] +mod tests { + use crate::IoBytesMut; + + use super::*; + + #[test] + fn test_entry_header_serde() { + let header = EntryHeader { + hash: 114514, + key_len: 114, + value_len: 514, + }; + let mut buf = IoBytesMut::new(); + header.write(&mut buf); + let h = EntryHeader::read(&buf[..]); + assert_eq!(header, h); + } +} diff --git a/foyer-storage/src/small/set.rs b/foyer-storage/src/small/set.rs new file mode 100644 index 00000000..ce0325c9 --- /dev/null +++ b/foyer-storage/src/small/set.rs @@ -0,0 +1,335 @@ +// Copyright 2024 Foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + collections::HashSet, + fmt::Debug, + ops::{Deref, DerefMut, Range}, + sync::Arc, + time::{SystemTime, UNIX_EPOCH}, +}; + +use bytes::Buf; +use foyer_common::code::{StorageKey, StorageValue}; + +use crate::{ + error::Result, + serde::{Checksummer, EntryDeserializer}, + IoBytes, IoBytesMut, +}; + +use super::serde::EntryHeader; + +pub type SetId = u64; + +#[derive(Debug)] +pub struct Set { + storage: Arc, +} + +impl Deref for Set { + type Target = SetStorage; + + fn deref(&self) -> &Self::Target { + &self.storage + } +} + +impl Set { + pub fn new(storage: Arc) -> Self { + Self { storage } + } +} + +#[derive(Debug)] +pub struct SetMut { + storage: SetStorage, +} + +impl Deref for SetMut { + type Target = SetStorage; + + fn deref(&self) -> &Self::Target { + &self.storage + } +} + +impl DerefMut for SetMut { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.storage + } +} + +impl SetMut { + pub fn new(storage: SetStorage) -> Self { + Self { storage } + } + + pub fn into_storage(self) -> SetStorage { + self.storage + } +} + +/// # Format +/// +/// ```plain +/// | checksum (4B) | timestamp (8B) | len (4B) | +/// ``` +pub struct SetStorage { + /// Set checksum. + checksum: u32, + + /// Set written data length. + len: usize, + /// Set data length capacity. + capacity: usize, + /// Set size. + size: usize, + /// Set last updated timestamp. + timestamp: u64, + + buffer: IoBytesMut, +} + +impl Debug for SetStorage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SetStorage") + .field("checksum", &self.checksum) + .field("len", &self.len) + .field("capacity", &self.capacity) + .field("size", &self.size) + .field("timestamp", &self.timestamp) + .finish() + } +} + +impl SetStorage { + pub const SET_HEADER_SIZE: usize = 16; + + pub fn load(buffer: IoBytesMut) -> Self { + let checksum = (&buffer[0..4]).get_u32(); + let timestamp = (&buffer[4..12]).get_u64(); + let len = (&buffer[12..16]).get_u32() as usize; + + let mut this = Self { + checksum, + len, + capacity: buffer.len() - Self::SET_HEADER_SIZE, + size: buffer.len(), + timestamp, + buffer, + }; + + let c = Checksummer::checksum32(&this.buffer[4..]); + if c != checksum { + // Do not report checksum mismiatch. Clear the set directly. + this.clear(); + } + + this + } + + pub fn clear(&mut self) { + self.len = 0; + } + + pub fn freeze(mut self) -> IoBytes { + self.update(); + self.buffer.freeze() + } + + pub fn apply(&mut self, deletions: &HashSet, insertions: &[u8]) { + assert!(insertions.len() < self.capacity); + + self.deletes(deletions); + self.append(insertions); + } + + fn deletes(&mut self, deletes: &HashSet) { + if deletes.is_empty() { + return; + } + + let mut rcursor = 0; + let mut wcursor = 0; + + while rcursor < self.len { + let header = EntryHeader::read( + &self.buffer + [Self::SET_HEADER_SIZE + rcursor..Self::SET_HEADER_SIZE + rcursor + EntryHeader::ENTRY_HEADER_SIZE], + ); + + if !deletes.contains(&header.hash()) { + if rcursor != wcursor { + self.buffer.copy_within( + Self::SET_HEADER_SIZE + rcursor..Self::SET_HEADER_SIZE + header.entry_len(), + wcursor, + ); + } + wcursor += header.entry_len(); + } + + rcursor += header.entry_len(); + } + } + + fn append(&mut self, buffer: &[u8]) { + if buffer.is_empty() { + return; + } + + self.reserve(buffer.len()); + (&mut self.buffer[Self::SET_HEADER_SIZE + self.len..Self::SET_HEADER_SIZE + self.len + buffer.len()]) + .copy_from_slice(buffer); + self.len += buffer.len(); + } + + pub fn get(&self, hash: u64) -> Result> + where + K: StorageKey, + V: StorageValue, + { + for entry in self.iter() { + if hash == entry.hash { + let k = EntryDeserializer::deserialize_key::(&entry.key)?; + let v = EntryDeserializer::deserialize_value::(&entry.value, crate::Compression::None)?; + return Ok(Some((k, v))); + } + } + Ok(None) + } + + /// from: + /// + /// ```plain + /// 0 wipe len capacity + /// |_________|ooooooooooooo|___________| + /// ``` + /// + /// to: + /// + /// ```plain + /// 0 new len = len - wipe capacity + /// |ooooooooooooo|_____________________| + /// ``` + fn reserve(&mut self, required: usize) { + let remains = self.capacity - self.len; + if remains < required { + let mut wipe = 0; + for entry in self.iter() { + wipe += entry.len(); + if remains + wipe >= required { + break; + } + } + self.buffer.copy_within( + Self::SET_HEADER_SIZE + wipe..Self::SET_HEADER_SIZE + self.len, + Self::SET_HEADER_SIZE, + ); + self.len -= wipe; + assert!(self.capacity - self.len >= required); + } + } + + fn iter(&self) -> SetIter<'_> { + SetIter::open(self) + } + + fn data(&self) -> &[u8] { + &self.buffer[Self::SET_HEADER_SIZE..self.size] + } + + fn update(&mut self) { + self.update_timestamp(); + self.update_checksum(); + } + + fn update_timestamp(&mut self) { + self.timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64; + } + + fn update_checksum(&mut self) { + self.checksum = Checksummer::checksum32(&self.buffer[4..self.size]); + } +} + +pub struct SetEntry<'a> { + offset: usize, + pub hash: u64, + pub key: &'a [u8], + pub value: &'a [u8], +} + +impl<'a> SetEntry<'a> { + /// Length of the entry with header, key and value included. + pub fn len(&self) -> usize { + EntryHeader::ENTRY_HEADER_SIZE + self.key.len() + self.value.len() + } + + /// Range of the entry in the set data. + pub fn range(&self) -> Range { + self.offset..self.offset + self.len() + } +} + +pub struct SetIter<'a> { + set: &'a SetStorage, + offset: usize, +} + +impl<'a> SetIter<'a> { + fn open(set: &'a SetStorage) -> Self { + Self { set, offset: 0 } + } + + fn is_valid(&self) -> bool { + self.offset < self.set.len as usize + } + + fn next(&mut self) -> Option> { + if !self.is_valid() { + return None; + } + let mut cursor = self.offset; + let header = EntryHeader::read(&self.set.data()[cursor..cursor + EntryHeader::ENTRY_HEADER_SIZE]); + cursor += EntryHeader::ENTRY_HEADER_SIZE; + let key = &self.set.data()[cursor..cursor + header.key_len()]; + cursor += header.key_len(); + let value = &self.set.data()[cursor..cursor + header.value_len()]; + let entry = SetEntry { + offset: self.offset, + hash: header.hash(), + key, + value, + }; + self.offset += entry.len(); + Some(entry) + } +} + +impl<'a> Iterator for SetIter<'a> { + type Item = SetEntry<'a>; + + fn next(&mut self) -> Option { + self.next() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_set_storage_basic() {} +} diff --git a/foyer-storage/src/small/set_manager.rs b/foyer-storage/src/small/set_manager.rs new file mode 100644 index 00000000..edd6e24a --- /dev/null +++ b/foyer-storage/src/small/set_manager.rs @@ -0,0 +1,241 @@ +// Copyright 2024 Foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + fmt::Debug, + ops::{Deref, DerefMut, Range}, + sync::Arc, +}; + +use foyer_common::strict_assert; +use itertools::Itertools; +use ordered_hash_map::OrderedHashMap; + +use tokio::sync::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}; + +use crate::{ + device::{MonitoredDevice, RegionId}, + Dev, +}; + +use super::set::{Set, SetId, SetMut, SetStorage}; +use crate::error::Result; + +struct SetManagerInner { + sets: Vec>, + cache: Mutex>>, + set_cache_capacity: usize, + + set_size: usize, + device: MonitoredDevice, + regions: Range, + flush: bool, +} + +#[derive(Clone)] +pub struct SetManager { + inner: Arc, +} + +impl Debug for SetManager { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SetManager") + .field("sets", &self.inner.sets.len()) + .field("cache_capacity", &self.inner.set_cache_capacity) + .field("size", &self.inner.set_size) + .field("device", &self.inner.device) + .field("regions", &self.inner.regions) + .field("flush", &self.inner.flush) + .finish() + } +} + +impl SetManager { + pub fn new( + set_size: usize, + set_cache_capacity: usize, + device: MonitoredDevice, + regions: Range, + flush: bool, + ) -> Self { + let sets = (device.region_size() / set_size) * (regions.end - regions.start) as usize; + assert!(sets > 0); + + let sets = (0..sets).map(|_| RwLock::default()).collect_vec(); + let cache = Mutex::new(OrderedHashMap::with_capacity(set_cache_capacity)); + + let inner = SetManagerInner { + sets, + cache, + set_cache_capacity, + set_size, + device, + regions, + flush, + }; + let inner = Arc::new(inner); + Self { inner } + } + + pub async fn write(&self, id: SetId) -> Result> { + let guard = self.inner.sets[id as usize].write().await; + + let invalid = self.inner.cache.lock().await.remove(&id); + let storage = match invalid { + // `guard` already guarantees that there is only one storage reference left. + Some(storage) => Arc::into_inner(storage).unwrap(), + None => self.storage(id).await?, + }; + + Ok(SetWriteGuard { + guard, + id, + set: SetMut::new(storage), + drop: DropPanicGuard::default(), + }) + } + + pub async fn read(&self, id: SetId) -> Result> { + let guard = self.inner.sets[id as usize].read().await; + + let mut cache = self.inner.cache.lock().await; + let cached = cache.get(&id).cloned(); + let storage = match cached { + Some(storage) => storage, + None => { + let storage = self.storage(id).await?; + let storage = Arc::new(storage); + cache.insert(id, storage.clone()); + if cache.len() > self.inner.set_cache_capacity { + cache.pop_front(); + strict_assert!(cache.len() <= self.inner.set_cache_capacity); + } + storage + } + }; + drop(cache); + + Ok(SetReadGuard { + _guard: guard, + _id: id, + set: Set::new(storage), + }) + } + + pub async fn apply(&self, mut guard: SetWriteGuard<'_>) -> Result<()> { + let storage = guard.set.into_storage(); + self.flush(guard.id, storage).await?; + guard.drop.disable(); + drop(guard.guard); + Ok(()) + } + + pub fn sets(&self) -> usize { + self.inner.sets.len() + } + + pub fn set_size(&self) -> usize { + self.inner.set_size + } + + pub fn set_data_size(&self) -> usize { + self.inner.set_size - SetStorage::SET_HEADER_SIZE + } + + async fn storage(&self, id: SetId) -> Result { + let (region, offset) = self.locate(id); + let buffer = self.inner.device.read(region, offset, self.inner.set_size).await?; + let storage = SetStorage::load(buffer); + Ok(storage) + } + + async fn flush(&self, id: SetId, storage: SetStorage) -> Result<()> { + let buffer = storage.freeze(); + let (region, offset) = self.locate(id); + self.inner.device.write(buffer, region, offset).await?; + if self.inner.flush { + self.inner.device.flush(Some(region)).await?; + } + Ok(()) + } + + #[inline] + fn region_sets(&self) -> usize { + self.inner.device.region_size() / self.inner.set_size + } + + #[inline] + fn locate(&self, id: SetId) -> (RegionId, u64) { + let region_sets = self.region_sets(); + let region = id as RegionId / region_sets as RegionId; + let offset = ((id as usize % region_sets) * self.inner.set_size) as u64; + (region, offset) + } +} + +#[derive(Debug, Default)] +struct DropPanicGuard { + disabled: bool, +} + +impl Drop for DropPanicGuard { + fn drop(&mut self) { + if !self.disabled { + panic!("unexpected drop panic guard drop"); + } + } +} + +impl DropPanicGuard { + fn disable(&mut self) { + self.disabled = true; + } +} + +#[derive(Debug)] +pub struct SetWriteGuard<'a> { + guard: RwLockWriteGuard<'a, ()>, + id: SetId, + set: SetMut, + drop: DropPanicGuard, +} + +impl<'a> Deref for SetWriteGuard<'a> { + type Target = SetMut; + + fn deref(&self) -> &Self::Target { + &self.set + } +} + +impl<'a> DerefMut for SetWriteGuard<'a> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.set + } +} + +#[derive(Debug)] +pub struct SetReadGuard<'a> { + _guard: RwLockReadGuard<'a, ()>, + _id: SetId, + set: Set, +} + +impl<'a> Deref for SetReadGuard<'a> { + type Target = Set; + + fn deref(&self) -> &Self::Target { + &self.set + } +} diff --git a/foyer-storage/src/storage/either.rs b/foyer-storage/src/storage/either.rs index 666e9ff2..ff63d2a9 100644 --- a/foyer-storage/src/storage/either.rs +++ b/foyer-storage/src/storage/either.rs @@ -310,11 +310,6 @@ where self.left.stats() } - async fn wait(&self) -> Result<()> { - try_join(self.left.wait(), self.right.wait()).await?; - Ok(()) - } - fn runtime(&self) -> &Handle { if cfg!(debug_assertions) { let hleft = self.left.runtime(); diff --git a/foyer-storage/src/storage/mod.rs b/foyer-storage/src/storage/mod.rs index 11aeb754..c912371e 100644 --- a/foyer-storage/src/storage/mod.rs +++ b/foyer-storage/src/storage/mod.rs @@ -128,10 +128,6 @@ pub trait Storage: Send + Sync + 'static + Clone + Debug { /// Get the statistics information of the disk cache. fn stats(&self) -> Arc; - /// Wait for the ongoing flush and reclaim tasks to finish. - #[must_use] - fn wait(&self) -> impl Future> + Send; - /// Get disk cache runtime handle. /// /// The runtime is determined during the opening phase. diff --git a/foyer-storage/src/storage/noop.rs b/foyer-storage/src/storage/noop.rs index d634e444..f7ab65a8 100644 --- a/foyer-storage/src/storage/noop.rs +++ b/foyer-storage/src/storage/noop.rs @@ -133,10 +133,6 @@ where Arc::default() } - async fn wait(&self) -> Result<()> { - Ok(()) - } - fn runtime(&self) -> &Handle { &self.runtime } diff --git a/foyer-storage/src/storage/runtime.rs b/foyer-storage/src/storage/runtime.rs index 1e1bb929..fd9875d0 100644 --- a/foyer-storage/src/storage/runtime.rs +++ b/foyer-storage/src/storage/runtime.rs @@ -196,10 +196,6 @@ where self.store.stats() } - async fn wait(&self) -> Result<()> { - self.store.wait().await - } - fn runtime(&self) -> &Handle { self.runtime.handle() } diff --git a/foyer-storage/src/store.rs b/foyer-storage/src/store.rs index 0cbf3de7..238827e1 100644 --- a/foyer-storage/src/store.rs +++ b/foyer-storage/src/store.rs @@ -190,11 +190,6 @@ where self.engine.stats() } - /// Wait for the ongoing flush and reclaim tasks to finish. - pub async fn wait(&self) -> Result<()> { - self.engine.wait().await - } - /// Get disk cache runtime handle. /// /// The runtime is determined during the opening phase. @@ -567,14 +562,26 @@ where } (CombinedConfig::Small, None) => { Engine::open(EngineConfig::Small(GenericSmallStorageConfig { - placeholder: PhantomData, + memory: todo!(), + set_size: todo!(), + set_cache_capacity: todo!(), + device, + regions: todo!(), + flush: todo!(), + flushers: todo!(), })) .await? } (CombinedConfig::Small, Some(runtime_config)) => { Engine::open(EngineConfig::SmallRuntime(RuntimeStoreConfig { store_config: GenericSmallStorageConfig { - placeholder: PhantomData, + memory: todo!(), + set_size: todo!(), + set_cache_capacity: todo!(), + device, + regions: todo!(), + flush: todo!(), + flushers: todo!(), }, runtime_config, })) @@ -595,7 +602,13 @@ where Engine::open(EngineConfig::Combined(EitherConfig { selector: SizeSelector::new(large_object_threshold), left: GenericSmallStorageConfig { - placeholder: PhantomData, + memory: todo!(), + set_size: todo!(), + set_cache_capacity: todo!(), + device, + regions: todo!(), + flush: todo!(), + flushers: todo!(), }, right: GenericLargeStorageConfig { memory: self.memory, @@ -636,7 +649,13 @@ where store_config: EitherConfig { selector: SizeSelector::new(large_object_threshold), left: GenericSmallStorageConfig { - placeholder: PhantomData, + memory: todo!(), + set_size: todo!(), + set_cache_capacity: todo!(), + device, + regions: todo!(), + flush: todo!(), + flushers: todo!(), }, right: GenericLargeStorageConfig { memory: self.memory, diff --git a/foyer/src/hybrid/cache.rs b/foyer/src/hybrid/cache.rs index d2293a47..f150280e 100644 --- a/foyer/src/hybrid/cache.rs +++ b/foyer/src/hybrid/cache.rs @@ -611,7 +611,8 @@ mod tests { let e2g = hybrid.obtain(2).await.unwrap().unwrap(); assert_eq!(e2g.value(), &vec![2; 7 * KB]); - hybrid.storage.wait().await.unwrap(); + // TODO(MrCroxx): Remove me. + // hybrid.storage.wait().await.unwrap(); assert!(hybrid.contains(&3)); hybrid.remove(&3);