From 93f26327954eeb14600a52b73047b44130504b03 Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Wed, 21 Aug 2024 06:09:25 +0000 Subject: [PATCH] stash, todo: rebase me --- foyer-common/Cargo.toml | 1 - foyer-common/src/lib.rs | 2 - foyer-storage/Cargo.toml | 1 + foyer-storage/src/large/flusher.rs | 2 +- foyer-storage/src/serde.rs | 12 +- foyer-storage/src/small/batch.rs | 289 +++++++++++ foyer-storage/src/small/bloom_filter.rs | 85 ++++ foyer-storage/src/small/flusher.rs | 188 +++++++ foyer-storage/src/small/generic.rs | 175 ++++++- foyer-storage/src/small/mod.rs | 6 + foyer-storage/src/small/serde.rs | 109 ++++ foyer-storage/src/small/set.rs | 471 ++++++++++++++++++ foyer-storage/src/small/set_manager.rs | 254 ++++++++++ foyer-storage/src/store.rs | 24 +- foyer-util/Cargo.toml | 2 + .../src/compact_bloom_filter.rs | 2 +- foyer-util/src/lib.rs | 1 + 17 files changed, 1591 insertions(+), 33 deletions(-) create mode 100644 foyer-storage/src/small/batch.rs create mode 100644 foyer-storage/src/small/bloom_filter.rs create mode 100644 foyer-storage/src/small/flusher.rs create mode 100644 foyer-storage/src/small/serde.rs create mode 100644 foyer-storage/src/small/set.rs create mode 100644 foyer-storage/src/small/set_manager.rs rename {foyer-common => foyer-util}/src/compact_bloom_filter.rs (99%) diff --git a/foyer-common/Cargo.toml b/foyer-common/Cargo.toml index 6dc20174..cbebb66a 100644 --- a/foyer-common/Cargo.toml +++ b/foyer-common/Cargo.toml @@ -11,7 +11,6 @@ readme = "../README.md" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -bitvec = "1" bytes = "1" cfg-if = "1" crossbeam = "0.8" diff --git a/foyer-common/src/lib.rs b/foyer-common/src/lib.rs index 95228f8d..bf8f6884 100644 --- a/foyer-common/src/lib.rs +++ b/foyer-common/src/lib.rs @@ -26,8 +26,6 @@ pub mod bits; pub mod buf; /// The trait for the key and value encoding and decoding. pub mod code; -/// Bloom filters. -pub mod compact_bloom_filter; /// A concurrent count down util. pub mod countdown; /// Components for monitoring internal events. diff --git a/foyer-storage/Cargo.toml b/foyer-storage/Cargo.toml index fdcfa6d2..b2a1d57a 100644 --- a/foyer-storage/Cargo.toml +++ b/foyer-storage/Cargo.toml @@ -31,6 +31,7 @@ itertools = { workspace = true } lazy_static = "1" libc = "0.2" lz4 = "1.24" +ordered_hash_map = "0.4" parking_lot = { version = "0.12", features = ["arc_lock"] } pin-project = "1" rand = "0.8" diff --git a/foyer-storage/src/large/flusher.rs b/foyer-storage/src/large/flusher.rs index 9c101555..02b6b5f3 100644 --- a/foyer-storage/src/large/flusher.rs +++ b/foyer-storage/src/large/flusher.rs @@ -178,7 +178,7 @@ where value_len: info.value_len as _, hash: entry.hash(), sequence, - checksum: Checksummer::checksum(&buffer), + checksum: Checksummer::checksum64(&buffer), compression: self.compression, }; diff --git a/foyer-storage/src/serde.rs b/foyer-storage/src/serde.rs index fe63184e..59d910b6 100644 --- a/foyer-storage/src/serde.rs +++ b/foyer-storage/src/serde.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::{fmt::Debug, hash::Hasher}; -use twox_hash::XxHash64; +use twox_hash::{XxHash32, XxHash64}; use foyer_common::code::{StorageKey, StorageValue}; @@ -27,11 +27,17 @@ use crate::{ pub struct Checksummer; impl Checksummer { - pub fn checksum(buf: &[u8]) -> u64 { + pub fn checksum64(buf: &[u8]) -> u64 { let mut hasher = XxHash64::with_seed(0); hasher.write(buf); hasher.finish() } + + pub fn checksum32(buf: &[u8]) -> u32 { + let mut hasher = XxHash32::with_seed(0); + hasher.write(buf); + hasher.finish() as u32 + } } #[derive(Debug)] @@ -115,7 +121,7 @@ impl EntryDeserializer { // calculate checksum if needed if let Some(expected) = checksum { - let get = Checksummer::checksum(&buffer[..value_len + ken_len]); + let get = Checksummer::checksum64(&buffer[..value_len + ken_len]); if expected != get { return Err(Error::ChecksumMismatch { expected, get }); } diff --git a/foyer-storage/src/small/batch.rs b/foyer-storage/src/small/batch.rs new file mode 100644 index 00000000..d7c53137 --- /dev/null +++ b/foyer-storage/src/small/batch.rs @@ -0,0 +1,289 @@ +// Copyright 2024 Foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + collections::{hash_map::HashMap, HashSet}, + fmt::Debug, +}; + +use foyer_common::code::{HashBuilder, StorageKey, StorageValue}; +use foyer_memory::CacheEntry; +use tokio::sync::oneshot; + +use crate::{serde::KvInfo, small::serde::EntryHeader, IoBytes, IoBytesMut}; + +use super::set::SetId; + +struct Insertion +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + entry: CacheEntry, + buffer: IoBytes, + info: KvInfo, +} + +struct Deletion { + hash: u64, +} + +struct Entry +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + insertion: Insertion, + prev_hash: Option, + next_hash: Option, +} + +struct SetBatchMut +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + entries: HashMap>, + deletions: HashMap, + + head_hash: Option, + tail_hash: Option, + + len: usize, + capacity: usize, +} + +impl Debug for SetBatchMut +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SetMut") + .field("count", &self.entries.len()) + .field("len", &self.len) + .finish() + } +} + +impl SetBatchMut +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + fn new(capacity: usize) -> Self { + Self { + entries: HashMap::new(), + deletions: HashMap::new(), + head_hash: None, + tail_hash: None, + len: 0, + capacity, + } + } + + fn insert(&mut self, insertion: Insertion) { + self.deletions.remove(&insertion.entry.hash()); + + let mut entry = Entry { + insertion, + prev_hash: None, + next_hash: None, + }; + + self.list_push(&mut entry); + self.len += EntryHeader::ENTRY_HEADER_SIZE + entry.insertion.buffer.len(); + if let Some(mut old) = self.entries.insert(entry.insertion.entry.hash(), entry) { + self.list_unlink(&mut old); + self.len -= EntryHeader::ENTRY_HEADER_SIZE + old.insertion.buffer.len(); + } + + while self.len > self.capacity { + let entry = self.pop().unwrap(); + self.len -= EntryHeader::ENTRY_HEADER_SIZE + entry.insertion.buffer.len(); + } + } + + fn delete(&mut self, deletion: Deletion) { + if let Some(mut entry) = self.entries.remove(&deletion.hash) { + self.list_unlink(&mut entry); + self.len -= EntryHeader::ENTRY_HEADER_SIZE + entry.insertion.buffer.len(); + } + + self.deletions.insert(deletion.hash, deletion); + } + + fn freeze(mut self) -> SetBatch { + let mut buf = IoBytesMut::with_capacity(self.len); + let mut entries = Vec::with_capacity(self.entries.len()); + let mut deletions = HashSet::with_capacity(self.entries.len() + self.deletions.len()); + let mut insertions = Vec::with_capacity(self.entries.len()); + + while let Some(entry) = self.pop() { + let header = EntryHeader::new( + entry.insertion.entry.hash(), + entry.insertion.info.key_len, + entry.insertion.info.value_len, + ); + header.write(&mut buf); + deletions.insert(entry.insertion.entry.hash()); + insertions.push(entry.insertion.entry.hash()); + buf.copy_from_slice(&entry.insertion.buffer); + entries.push(entry.insertion.entry); + } + + for deletion in self.deletions.into_values() { + deletions.insert(deletion.hash); + } + + assert_eq!(buf.len(), self.len); + + SetBatch { + deletions, + insertions, + bytes: buf.freeze(), + entries, + } + } + + fn list_unlink(&mut self, entry: &mut Entry) { + if let Some(prev_hash) = entry.prev_hash { + self.entries.get_mut(&prev_hash).unwrap().next_hash = entry.next_hash; + } else { + assert_eq!(self.head_hash, Some(entry.insertion.entry.hash())); + self.head_hash = entry.next_hash; + } + if let Some(next_hash) = entry.next_hash { + self.entries.get_mut(&next_hash).unwrap().prev_hash = entry.prev_hash; + } else { + assert_eq!(self.tail_hash, Some(entry.insertion.entry.hash())); + self.tail_hash = entry.prev_hash; + } + entry.prev_hash = None; + entry.next_hash = None; + } + + fn list_push(&mut self, entry: &mut Entry) { + assert!(entry.prev_hash.is_none()); + assert!(entry.next_hash.is_none()); + + if let Some(tail_hash) = self.tail_hash { + let tail = self.entries.get_mut(&tail_hash).unwrap(); + + tail.next_hash = Some(entry.insertion.entry.hash()); + entry.prev_hash = Some(tail_hash); + + self.tail_hash = Some(entry.insertion.entry.hash()); + } else { + assert!(self.head_hash.is_none()); + + self.head_hash = Some(entry.insertion.entry.hash()); + self.tail_hash = Some(entry.insertion.entry.hash()); + } + } + + fn pop(&mut self) -> Option> { + let head_hash = self.head_hash?; + let mut entry = self.entries.remove(&head_hash).unwrap(); + self.list_unlink(&mut entry); + Some(entry) + } +} + +#[derive(Debug)] +pub struct BatchMut +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + sets: HashMap>, + /// Total set count. + total: SetId, + /// Set data capacity. + set_capacity: usize, + + waiters: Vec>, +} + +impl BatchMut +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + pub fn new(total: SetId, set_data_size: usize) -> Self { + Self { + sets: HashMap::new(), + total, + set_capacity: set_data_size, + waiters: vec![], + } + } + + pub fn append(&mut self, entry: CacheEntry, buffer: IoBytes, info: KvInfo) { + let sid = entry.hash() % self.total; + let set = self.sets.entry(sid).or_insert(SetBatchMut::new(self.set_capacity)); + let insertion = Insertion { entry, buffer, info }; + set.insert(insertion); + } + + pub fn delete(&mut self, hash: u64) { + let sid = hash % self.total; + let set = self.sets.entry(sid).or_insert(SetBatchMut::new(self.set_capacity)); + set.delete(Deletion { hash }) + } + + pub fn rotate(&mut self) -> Batch { + let sets = std::mem::take(&mut self.sets); + let sets = sets.into_iter().map(|(id, set)| (id, set.freeze())).collect(); + let waiters = std::mem::take(&mut self.waiters); + Batch { sets, waiters } + } + + /// Register a waiter to be notified after the batch is finished. + pub fn wait(&mut self) -> oneshot::Receiver<()> { + tracing::trace!("[batch]: register waiter"); + let (tx, rx) = oneshot::channel(); + self.waiters.push(tx); + rx + } +} + +pub struct Batch +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + pub sets: HashMap>, + pub waiters: Vec>, +} + +pub struct SetBatch +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + pub deletions: HashSet, + pub insertions: Vec, + pub bytes: IoBytes, + pub entries: Vec>, +} diff --git a/foyer-storage/src/small/bloom_filter.rs b/foyer-storage/src/small/bloom_filter.rs new file mode 100644 index 00000000..83e1995f --- /dev/null +++ b/foyer-storage/src/small/bloom_filter.rs @@ -0,0 +1,85 @@ +// Copyright 2024 Foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// A u64 bloom filter with 4 hash hashers. +#[derive(Debug, Default)] +pub struct BloomFilterU64 { + value: u64, +} + +impl From for BloomFilterU64 { + fn from(value: u64) -> Self { + Self { value } + } +} + +impl BloomFilterU64 { + const HASHERS: usize = 4; + + pub fn insert(&mut self, hash: u64) { + for i in 0..Self::HASHERS { + let seed = twang_mix64(i as _); + let hash = combine_hashes(hash, seed); + let hash = (hash & u16::MAX as u64) << (i * u16::BITS as usize); + self.value |= hash; + } + } + + pub fn lookup(&self, hash: u64) -> bool { + for i in 0..Self::HASHERS { + let seed = twang_mix64(i as _); + let hash = combine_hashes(hash, seed); + let hash = (hash & u16::MAX as u64) << (i * u16::BITS as usize); + if self.value & hash == 0 { + return false; + } + } + true + } + + pub fn clear(&mut self) { + self.value = 0 + } + + pub fn value(&self) -> u64 { + self.value + } +} + +/// Reduce two 64-bit hashes into one. +/// +/// Ported from CacheLib, which uses the `Hash128to64` function from Google's city hash. +#[inline(always)] +fn combine_hashes(upper: u64, lower: u64) -> u64 { + const MUL: u64 = 0x9ddfea08eb382d69; + + let mut a = (lower ^ upper).wrapping_mul(MUL); + a ^= a >> 47; + let mut b = (upper ^ a).wrapping_mul(MUL); + b ^= b >> 47; + b = b.wrapping_mul(MUL); + b +} + +#[inline(always)] +fn twang_mix64(val: u64) -> u64 { + let mut val = (!val).wrapping_add(val << 21); // val *= (1 << 21); val -= 1 + val = val ^ (val >> 24); + val = val.wrapping_add(val << 3).wrapping_add(val << 8); // val *= 1 + (1 << 3) + (1 << 8) + val = val ^ (val >> 14); + val = val.wrapping_add(val << 2).wrapping_add(val << 4); // va; *= 1 + (1 << 2) + (1 << 4) + val = val ^ (val >> 28); + val = val.wrapping_add(val << 31); // val *= 1 + (1 << 31) + val +} diff --git a/foyer-storage/src/small/flusher.rs b/foyer-storage/src/small/flusher.rs new file mode 100644 index 00000000..f969b26c --- /dev/null +++ b/foyer-storage/src/small/flusher.rs @@ -0,0 +1,188 @@ +// Copyright 2024 Foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + fmt::Debug, + future::Future, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; + +use foyer_common::code::{HashBuilder, StorageKey, StorageValue}; +use foyer_memory::CacheEntry; +use futures::future::try_join_all; +use parking_lot::Mutex; +use tokio::{runtime::Handle, sync::Notify, task::JoinHandle}; + +use crate::{ + error::{Error, Result}, + serde::KvInfo, + IoBytes, +}; + +use super::{ + batch::{Batch, BatchMut, SetBatch}, + set_manager::SetManager, +}; + +pub struct Flusher +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + batch: Arc>>, + + notify: Arc, + + handle: Arc>>>, + stop: Arc, +} + +impl Flusher +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + pub fn new(set_manager: SetManager, runtime: &Handle) -> Self { + let batch = Arc::new(Mutex::new(BatchMut::new( + set_manager.sets() as _, + set_manager.set_data_size(), + ))); + let notify = Arc::::default(); + + let stop = Arc::::default(); + + let runner = Runner { + batch: batch.clone(), + notify: notify.clone(), + set_manager, + stop: stop.clone(), + }; + + let handle = runtime.spawn(async move { + if let Err(e) = runner.run().await { + tracing::error!("[sodc flusher]: flusher exit with error: {e}"); + } + }); + let handle = Arc::new(Mutex::new(Some(handle))); + + Self { + batch, + notify, + handle, + stop, + } + } + + pub fn wait(&self) -> impl Future + Send + 'static { + let waiter = self.batch.lock().wait(); + self.notify.notify_one(); + async move { + let _ = waiter.await; + } + } + + pub fn entry(&self, entry: CacheEntry, buffer: IoBytes, info: KvInfo) { + self.batch.lock().append(entry, buffer, info); + self.notify.notify_one(); + } + + pub fn deletion(&self, hash: u64) { + self.batch.lock().delete(hash); + self.notify.notify_one(); + } + + pub async fn close(&self) -> Result<()> { + self.stop.store(true, Ordering::SeqCst); + self.notify.notify_one(); + let handle = self.handle.lock().take(); + if let Some(handle) = handle { + handle.await.unwrap(); + } + Ok(()) + } +} + +struct Runner +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + batch: Arc>>, + + notify: Arc, + + set_manager: SetManager, + + stop: Arc, +} + +impl Runner +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + pub async fn run(self) -> Result<()> { + loop { + let batch = self.batch.lock().rotate(); + if batch.sets.is_empty() { + if self.stop.load(Ordering::SeqCst) { + return Ok(()); + } + self.notify.notified().await; + continue; + } + self.commit(batch).await + } + } + + pub async fn commit(&self, batch: Batch) { + let futures = batch.sets.into_iter().map( + |( + sid, + SetBatch { + deletions, + insertions, + bytes, + entries, + }, + )| { + let set_manager = self.set_manager.clone(); + async move { + let mut set = set_manager.write(sid).await?; + set.apply(&deletions, insertions.into_iter(), &bytes[..]); + set_manager.apply(set).await?; + + drop(entries); + + Ok::<_, Error>(()) + } + }, + ); + + if let Err(e) = try_join_all(futures).await { + tracing::error!("[sodc flusher]: error raised when committing batch, error: {e}"); + } + + for waiter in batch.waiters { + let _ = waiter.send(()); + } + } +} diff --git a/foyer-storage/src/small/generic.rs b/foyer-storage/src/small/generic.rs index 428f248d..95748fcd 100644 --- a/foyer-storage/src/small/generic.rs +++ b/foyer-storage/src/small/generic.rs @@ -14,11 +14,25 @@ use foyer_common::code::{HashBuilder, StorageKey, StorageValue}; use foyer_memory::CacheEntry; -use futures::Future; +use futures::{ + future::{join_all, try_join_all}, + Future, +}; +use itertools::Itertools; -use std::{fmt::Debug, marker::PhantomData, sync::Arc}; +use tokio::runtime::Handle; -use crate::{error::Result, serde::KvInfo, storage::Storage, DeviceStats, IoBytes}; +use std::{fmt::Debug, marker::PhantomData, ops::Range, sync::Arc}; + +use crate::{ + device::{MonitoredDevice, RegionId}, + error::Result, + serde::KvInfo, + storage::Storage, + DeviceStats, IoBytes, +}; + +use super::{flusher::Flusher, set::SetId, set_manager::SetManager}; pub struct GenericSmallStorageConfig where @@ -26,7 +40,24 @@ where V: StorageValue, S: HashBuilder + Debug, { - pub placeholder: PhantomData<(K, V, S)>, + /// Set size in bytes. + pub set_size: usize, + /// Set cache capacity by set count. + pub set_cache_capacity: usize, + /// Device for small object disk cache. + pub device: MonitoredDevice, + /// Regions of the device to use. + pub regions: Range, + /// Whether to flush after writes. + pub flush: bool, + /// Flusher count. + pub flushers: usize, + + pub read_runtime_handle: Handle, + pub write_runtime_handle: Handle, + pub user_runtime_handle: Handle, + + pub marker: PhantomData<(K, V, S)>, } impl Debug for GenericSmallStorageConfig @@ -40,13 +71,29 @@ where } } +struct GenericSmallStorageInner +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + flushers: Vec>, + + device: MonitoredDevice, + set_manager: SetManager, + + read_runtime_handle: Handle, + _write_runtime_handle: Handle, + _user_runtime_handle: Handle, +} + pub struct GenericSmallStorage where K: StorageKey, V: StorageValue, S: HashBuilder + Debug, { - _marker: PhantomData<(K, V, S)>, + inner: Arc>, } impl Debug for GenericSmallStorage @@ -67,7 +114,93 @@ where S: HashBuilder + Debug, { fn clone(&self) -> Self { - Self { _marker: PhantomData } + Self { + inner: self.inner.clone(), + } + } +} + +impl GenericSmallStorage +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + async fn open(config: GenericSmallStorageConfig) -> Result { + let set_manager = SetManager::new( + config.set_size, + config.set_cache_capacity, + config.device.clone(), + config.regions, + config.flush, + ); + + let flushers = (0..config.flushers) + .map(|_| Flusher::new(set_manager.clone(), &config.write_runtime_handle)) + .collect_vec(); + + let inner = GenericSmallStorageInner { + flushers, + device: config.device, + set_manager, + read_runtime_handle: config.read_runtime_handle, + _write_runtime_handle: config.write_runtime_handle, + _user_runtime_handle: config.user_runtime_handle, + }; + let inner = Arc::new(inner); + + Ok(Self { inner }) + } + + async fn close(&self) -> Result<()> { + try_join_all(self.inner.flushers.iter().map(|flusher| flusher.close())).await?; + Ok(()) + } + + fn enqueue(&self, entry: CacheEntry, buffer: IoBytes, info: KvInfo) { + // Entries with the same hash must be grouped in the batch. + let id = entry.hash() as usize % self.inner.flushers.len(); + self.inner.flushers[id].entry(entry, buffer, info); + } + + fn load(&self, hash: u64) -> impl Future>> + Send + 'static { + let set_manager = self.inner.set_manager.clone(); + let sid = hash % set_manager.sets() as SetId; + async move { + match set_manager.read(sid, hash).await? { + Some(set) => { + let kv = set.get(hash)?; + Ok(kv) + } + None => Ok(None), + } + } + } + + fn delete(&self, hash: u64) { + // Entries with the same hash MUST be grouped in the same batch. + let id = hash as usize % self.inner.flushers.len(); + self.inner.flushers[id].deletion(hash); + } + + fn may_contains(&self, hash: u64) -> bool { + let set_manager = self.inner.set_manager.clone(); + let sid = hash % set_manager.sets() as SetId; + // FIXME: Anyway without blocking? Use atomic? + self.inner + .read_runtime_handle + .block_on(async move { set_manager.contains(sid, hash).await }) + } + + fn stats(&self) -> Arc { + self.inner.device.stat().clone() + } + + fn wait(&self) -> impl Future + Send + 'static { + let wait_flushers = join_all(self.inner.flushers.iter().map(|flusher| flusher.wait())); + async move { + wait_flushers.await; + } } } @@ -82,29 +215,32 @@ where type BuildHasher = S; type Config = GenericSmallStorageConfig; - async fn open(_config: Self::Config) -> Result { - todo!() + async fn open(config: Self::Config) -> Result { + Self::open(config).await } async fn close(&self) -> Result<()> { - todo!() + self.close().await?; + Ok(()) } - fn enqueue(&self, _entry: CacheEntry, _buffer: IoBytes, _info: KvInfo) { - todo!() + fn enqueue(&self, entry: CacheEntry, buffer: IoBytes, info: KvInfo) { + self.enqueue(entry, buffer, info); } // FIXME: REMOVE THE CLIPPY IGNORE. // TODO(MrCroxx): use `expect` after `lint_reasons` is stable. #[allow(clippy::manual_async_fn)] - fn load(&self, _hash: u64) -> impl Future>> + Send + 'static { - async { todo!() } + fn load(&self, hash: u64) -> impl Future>> + Send + 'static { + self.load(hash) } - fn delete(&self, _hash: u64) {} + fn delete(&self, hash: u64) { + self.delete(hash) + } - fn may_contains(&self, _hash: u64) -> bool { - todo!() + fn may_contains(&self, hash: u64) -> bool { + self.may_contains(hash) } async fn destroy(&self) -> Result<()> { @@ -112,13 +248,10 @@ where } fn stats(&self) -> Arc { - todo!() + self.stats() } - // TODO(MrCroxx): Remove the attr after impl. - // TODO(MrCroxx): use `expect` after `lint_reasons` is stable. - #[allow(clippy::manual_async_fn)] fn wait(&self) -> impl Future + Send + 'static { - async { todo!() } + self.wait() } } diff --git a/foyer-storage/src/small/mod.rs b/foyer-storage/src/small/mod.rs index 01e617c2..44c6f579 100644 --- a/foyer-storage/src/small/mod.rs +++ b/foyer-storage/src/small/mod.rs @@ -12,4 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod batch; +pub mod bloom_filter; +pub mod flusher; pub mod generic; +pub mod serde; +pub mod set; +pub mod set_manager; diff --git a/foyer-storage/src/small/serde.rs b/foyer-storage/src/small/serde.rs new file mode 100644 index 00000000..9ba9d513 --- /dev/null +++ b/foyer-storage/src/small/serde.rs @@ -0,0 +1,109 @@ +// Copyright 2024 Foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::u8; + +use bytes::{Buf, BufMut}; +use foyer_common::strict_assert; + +/// max key/value len: `64 KiB - 1` +/// +/// # Format +/// +/// ```plain +/// | hash 64b | +/// | key len low 8b | value len low 8b | key len high 4b | value len high 4b | +/// ``` +#[derive(Debug, PartialEq, Eq)] +pub struct EntryHeader { + hash: u64, + key_len: u16, + value_len: u16, +} + +impl EntryHeader { + pub const ENTRY_HEADER_SIZE: usize = (12 + 12 + 64) / 8; + + pub fn new(hash: u64, key_len: usize, value_len: usize) -> Self { + strict_assert!(key_len < (1 << 12)); + strict_assert!(value_len < (1 << 12)); + Self { + hash, + key_len: key_len as _, + value_len: value_len as _, + } + } + + #[inline] + pub fn hash(&self) -> u64 { + self.hash + } + + #[inline] + pub fn key_len(&self) -> usize { + self.key_len as _ + } + + #[inline] + pub fn value_len(&self) -> usize { + self.value_len as _ + } + + #[inline] + pub fn entry_len(&self) -> usize { + Self::ENTRY_HEADER_SIZE + self.key_len() + self.value_len() + } + + pub fn write(&self, mut buf: impl BufMut) { + buf.put_u64(self.hash); + buf.put_u8(self.key_len as u8); + buf.put_u8(self.value_len as u8); + let v = ((self.key_len >> 4) as u8 & 0b_1111_0000) | (self.value_len >> 8) as u8; + buf.put_u8(v); + } + + pub fn read(mut buf: impl Buf) -> Self { + let hash = buf.get_u64(); + let mut key_len = buf.get_u8() as u16; + let mut value_len = buf.get_u8() as u16; + let v = buf.get_u8() as u16; + key_len |= (v & 0b_1111_0000) << 8; + value_len |= (v & 0b_0000_1111) << 8; + Self { + hash, + key_len, + value_len, + } + } +} + +#[cfg(test)] +mod tests { + use crate::IoBytesMut; + + use super::*; + + #[test] + fn test_entry_header_serde() { + let header = EntryHeader { + hash: 114514, + key_len: 114, + value_len: 514, + }; + let mut buf = IoBytesMut::new(); + header.write(&mut buf); + let h = EntryHeader::read(&buf[..]); + assert_eq!(header, h); + } +} diff --git a/foyer-storage/src/small/set.rs b/foyer-storage/src/small/set.rs new file mode 100644 index 00000000..21b17ccb --- /dev/null +++ b/foyer-storage/src/small/set.rs @@ -0,0 +1,471 @@ +// Copyright 2024 Foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + collections::HashSet, + fmt::Debug, + ops::{Deref, DerefMut, Range}, + sync::Arc, + time::{SystemTime, UNIX_EPOCH}, +}; + +use bytes::{Buf, BufMut}; +use foyer_common::code::{StorageKey, StorageValue}; + +use crate::{ + error::Result, + serde::{Checksummer, EntryDeserializer}, + IoBytes, IoBytesMut, +}; + +use super::{bloom_filter::BloomFilterU64, serde::EntryHeader}; + +pub type SetId = u64; + +#[derive(Debug)] +pub struct Set { + storage: Arc, +} + +impl Deref for Set { + type Target = SetStorage; + + fn deref(&self) -> &Self::Target { + &self.storage + } +} + +impl Set { + pub fn new(storage: Arc) -> Self { + Self { storage } + } +} + +#[derive(Debug)] +pub struct SetMut { + storage: SetStorage, +} + +impl Deref for SetMut { + type Target = SetStorage; + + fn deref(&self) -> &Self::Target { + &self.storage + } +} + +impl DerefMut for SetMut { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.storage + } +} + +impl SetMut { + pub fn new(storage: SetStorage) -> Self { + Self { storage } + } + + pub fn into_storage(self) -> SetStorage { + self.storage + } +} + +/// # Format +/// +/// ```plain +/// | checksum (4B) | timestamp (8B) | len (4B) | +/// ``` +pub struct SetStorage { + /// Set checksum. + checksum: u32, + + /// Set written data length. + len: usize, + /// Set data length capacity. + capacity: usize, + /// Set size. + size: usize, + /// Set last updated timestamp. + timestamp: u64, + /// Set bloom filter. + bloom_filter: BloomFilterU64, + + buffer: IoBytesMut, +} + +impl Debug for SetStorage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SetStorage") + .field("checksum", &self.checksum) + .field("len", &self.len) + .field("capacity", &self.capacity) + .field("size", &self.size) + .field("timestamp", &self.timestamp) + .field("bloom_filter", &self.bloom_filter) + .finish() + } +} + +impl SetStorage { + pub const SET_HEADER_SIZE: usize = 24; + + pub fn load(buffer: IoBytesMut) -> Self { + assert!(buffer.len() >= Self::SET_HEADER_SIZE); + + let checksum = (&buffer[0..4]).get_u32(); + let timestamp = (&buffer[4..12]).get_u64(); + let len = (&buffer[12..16]).get_u32() as usize; + let bloom_filter_value = (&buffer[16..24]).get_u64(); + let bloom_filter = BloomFilterU64::from(bloom_filter_value); + + let mut this = Self { + checksum, + len, + capacity: buffer.len() - Self::SET_HEADER_SIZE, + size: buffer.len(), + timestamp, + bloom_filter, + buffer, + }; + + let c = Checksummer::checksum32(&this.buffer[4..]); + if c != checksum { + // Do not report checksum mismiatch. Clear the set directly. + this.clear(); + } + + this + } + + pub fn bloom_filter(&self) -> &BloomFilterU64 { + &self.bloom_filter + } + + // TODO(MrCroxx): use `expect` after `lint_reasons` is stable. + #[allow(unused)] + pub fn len(&self) -> usize { + self.len + } + + // TODO(MrCroxx): use `expect` after `lint_reasons` is stable. + #[allow(unused)] + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + pub fn clear(&mut self) { + self.len = 0; + self.bloom_filter.clear(); + } + + pub fn update(&mut self) { + (&mut self.buffer[16..24]).put_u64(self.bloom_filter.value()); + (&mut self.buffer[12..16]).put_u32(self.len as _); + self.timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64; + (&mut self.buffer[4..12]).put_u64(self.timestamp); + self.checksum = Checksummer::checksum32(&self.buffer[4..self.size]); + (&mut self.buffer[0..4]).put_u32(self.checksum); + } + + pub fn freeze(self) -> IoBytes { + self.buffer.freeze() + } + + pub fn apply(&mut self, deletions: &HashSet, insertions: impl Iterator, buffer: &[u8]) { + assert!(buffer.len() < self.capacity); + + self.deletes(deletions); + self.append(insertions, buffer); + } + + fn deletes(&mut self, deletes: &HashSet) { + if deletes.is_empty() { + return; + } + + let mut rcursor = 0; + let mut wcursor = 0; + // Rebuild bloom filter. + self.bloom_filter.clear(); + + while rcursor < self.len { + let header = EntryHeader::read( + &self.buffer + [Self::SET_HEADER_SIZE + rcursor..Self::SET_HEADER_SIZE + rcursor + EntryHeader::ENTRY_HEADER_SIZE], + ); + + if !deletes.contains(&header.hash()) { + if rcursor != wcursor { + self.buffer.copy_within( + Self::SET_HEADER_SIZE + rcursor..Self::SET_HEADER_SIZE + header.entry_len(), + wcursor, + ); + } + wcursor += header.entry_len(); + self.bloom_filter.insert(header.hash()); + } + + rcursor += header.entry_len(); + } + + self.len = wcursor; + } + + fn append(&mut self, insertions: impl Iterator, buffer: &[u8]) { + if buffer.is_empty() { + return; + } + + self.reserve(buffer.len()); + (&mut self.buffer[Self::SET_HEADER_SIZE + self.len..Self::SET_HEADER_SIZE + self.len + buffer.len()]) + .copy_from_slice(buffer); + self.len += buffer.len(); + for hash in insertions { + self.bloom_filter.insert(hash); + } + } + + pub fn get(&self, hash: u64) -> Result> + where + K: StorageKey, + V: StorageValue, + { + if !self.bloom_filter.lookup(hash) { + return Ok(None); + } + for entry in self.iter() { + if hash == entry.hash { + let k = EntryDeserializer::deserialize_key::(&entry.key)?; + let v = EntryDeserializer::deserialize_value::(&entry.value, crate::Compression::None)?; + return Ok(Some((k, v))); + } + } + Ok(None) + } + + /// from: + /// + /// ```plain + /// 0 wipe len capacity + /// |_________|ooooooooooooo|___________| + /// ``` + /// + /// to: + /// + /// ```plain + /// 0 new len = len - wipe capacity + /// |ooooooooooooo|_____________________| + /// ``` + fn reserve(&mut self, required: usize) { + let remains = self.capacity - self.len; + if remains >= required { + return; + } + + let mut wipe = 0; + for entry in self.iter() { + wipe += entry.len(); + if remains + wipe >= required { + break; + } + } + self.buffer.copy_within( + Self::SET_HEADER_SIZE + wipe..Self::SET_HEADER_SIZE + self.len, + Self::SET_HEADER_SIZE, + ); + self.len -= wipe; + assert!(self.capacity - self.len >= required); + let mut bloom_filter = BloomFilterU64::default(); + for entry in self.iter() { + bloom_filter.insert(entry.hash); + } + self.bloom_filter = bloom_filter; + } + + fn iter(&self) -> SetIter<'_> { + SetIter::open(self) + } + + fn data(&self) -> &[u8] { + &self.buffer[Self::SET_HEADER_SIZE..self.size] + } +} + +pub struct SetEntry<'a> { + offset: usize, + pub hash: u64, + pub key: &'a [u8], + pub value: &'a [u8], +} + +impl<'a> SetEntry<'a> { + /// Length of the entry with header, key and value included. + pub fn len(&self) -> usize { + EntryHeader::ENTRY_HEADER_SIZE + self.key.len() + self.value.len() + } + + /// Range of the entry in the set data. + // TODO(MrCroxx): use `expect` after `lint_reasons` is stable. + #[allow(unused)] + pub fn range(&self) -> Range { + self.offset..self.offset + self.len() + } +} + +pub struct SetIter<'a> { + set: &'a SetStorage, + offset: usize, +} + +impl<'a> SetIter<'a> { + fn open(set: &'a SetStorage) -> Self { + Self { set, offset: 0 } + } + + fn is_valid(&self) -> bool { + self.offset < self.set.len as usize + } + + fn next(&mut self) -> Option> { + if !self.is_valid() { + return None; + } + let mut cursor = self.offset; + let header = EntryHeader::read(&self.set.data()[cursor..cursor + EntryHeader::ENTRY_HEADER_SIZE]); + cursor += EntryHeader::ENTRY_HEADER_SIZE; + let value = &self.set.data()[cursor..cursor + header.value_len()]; + cursor += header.value_len(); + let key = &self.set.data()[cursor..cursor + header.key_len()]; + let entry = SetEntry { + offset: self.offset, + hash: header.hash(), + key, + value, + }; + self.offset += entry.len(); + Some(entry) + } +} + +impl<'a> Iterator for SetIter<'a> { + type Item = SetEntry<'a>; + + fn next(&mut self) -> Option { + self.next() + } +} + +#[cfg(test)] +mod tests { + + use crate::{serde::EntrySerializer, Compression}; + + use super::*; + + const PAGE: usize = 4096; + + fn key(len: usize) -> Vec { + vec![b'k'; len] + } + + fn value(len: usize) -> Vec { + vec![b'v'; len] + } + + fn entry(hash: u64, key_len: usize, value_len: usize) -> IoBytesMut { + let mut buf = IoBytesMut::new(); + + // reserve header + let header = EntryHeader::new(0, 0, 0); + header.write(&mut buf); + + let key = key(key_len); + let value = value(value_len); + + let info = EntrySerializer::serialize(&key, &value, &Compression::None, &mut buf).unwrap(); + + let header = EntryHeader::new(hash, info.key_len, info.value_len); + header.write(&mut buf[0..EntryHeader::ENTRY_HEADER_SIZE]); + + buf + } + + fn assert_some(storage: &SetStorage, hash: u64, key: Vec, value: Vec) { + let ret = storage.get::, Vec>(hash).unwrap(); + let (k, v) = ret.unwrap(); + assert_eq!(k, key); + assert_eq!(v, value); + } + + fn assert_none(storage: &SetStorage, hash: u64) { + let ret = storage.get::, Vec>(hash).unwrap(); + assert!(ret.is_none()); + } + + #[test] + #[should_panic] + fn test_set_storage_empty() { + let buffer = IoBytesMut::new(); + SetStorage::load(buffer); + } + + #[test] + fn test_set_storage_basic() { + let mut buffer = IoBytesMut::with_capacity(PAGE); + unsafe { buffer.set_len(PAGE) }; + + // load will result in an empty set + let mut storage = SetStorage::load(buffer); + assert!(storage.is_empty()); + + let e1 = entry(1, 1, 42); + storage.apply(&HashSet::from_iter([2, 4]), std::iter::once(1), &e1); + assert_eq!(storage.len(), e1.len()); + assert_some(&storage, 1, key(1), value(42)); + + let e2 = entry(2, 2, 97); + storage.apply(&HashSet::from_iter([1, 3, 5]), std::iter::once(2), &e2); + assert_eq!(storage.len(), e2.len()); + assert_none(&storage, 1); + assert_some(&storage, 2, key(2), value(97)); + + let e3 = entry(3, 100, 100); + storage.apply(&HashSet::from_iter([1]), std::iter::once(3), &e3); + assert_eq!(storage.len(), e2.len() + e3.len()); + assert_none(&storage, 1); + assert_some(&storage, 2, key(2), value(97)); + assert_some(&storage, 3, key(100), value(100)); + + let e4 = entry(4, 100, 3800); + storage.apply(&HashSet::from_iter([1]), std::iter::once(4), &e4); + assert_eq!(storage.len(), e4.len()); + assert_none(&storage, 1); + assert_none(&storage, 2); + assert_none(&storage, 3); + assert_some(&storage, 4, key(100), value(3800)); + + let buf = storage.freeze(); + println!("{buf:?}"); + + let mut buffer = IoBytesMut::with_capacity(PAGE); + unsafe { buffer.set_len(PAGE) }; + (&mut buffer[0..buf.len()]).copy_from_slice(&buf); + let storage = SetStorage::load(buffer); + assert_eq!(storage.len(), e4.len()); + assert_none(&storage, 1); + assert_none(&storage, 2); + assert_none(&storage, 3); + assert_some(&storage, 4, key(100), value(3800)); + } +} diff --git a/foyer-storage/src/small/set_manager.rs b/foyer-storage/src/small/set_manager.rs new file mode 100644 index 00000000..fc2c3f05 --- /dev/null +++ b/foyer-storage/src/small/set_manager.rs @@ -0,0 +1,254 @@ +// Copyright 2024 Foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + fmt::Debug, + ops::{Deref, DerefMut, Range}, + sync::Arc, +}; + +use foyer_common::strict_assert; +use itertools::Itertools; +use ordered_hash_map::OrderedHashMap; + +use tokio::sync::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}; + +use crate::{ + device::{MonitoredDevice, RegionId}, + Dev, +}; + +use super::{ + bloom_filter::BloomFilterU64, + set::{Set, SetId, SetMut, SetStorage}, +}; +use crate::error::Result; + +struct SetManagerInner { + /// A phantom rwlock to prevent set storage operations on disk. + /// + /// All set disk operations must be prevented by the lock. + /// + /// In addition, the rwlock also serves as the lock of the in-memory bloom filter. + sets: Vec>, + cache: Mutex>>, + set_cache_capacity: usize, + + set_size: usize, + device: MonitoredDevice, + regions: Range, + flush: bool, +} + +#[derive(Clone)] +pub struct SetManager { + inner: Arc, +} + +impl Debug for SetManager { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SetManager") + .field("sets", &self.inner.sets.len()) + .field("cache_capacity", &self.inner.set_cache_capacity) + .field("size", &self.inner.set_size) + .field("device", &self.inner.device) + .field("regions", &self.inner.regions) + .field("flush", &self.inner.flush) + .finish() + } +} + +impl SetManager { + pub fn new( + set_size: usize, + set_cache_capacity: usize, + device: MonitoredDevice, + regions: Range, + flush: bool, + ) -> Self { + let sets = (device.region_size() / set_size) * (regions.end - regions.start) as usize; + assert!(sets > 0); + + let sets = (0..sets).map(|_| RwLock::default()).collect_vec(); + let cache = Mutex::new(OrderedHashMap::with_capacity(set_cache_capacity)); + + let inner = SetManagerInner { + sets, + cache, + set_cache_capacity, + set_size, + device, + regions, + flush, + }; + let inner = Arc::new(inner); + Self { inner } + } + + pub async fn write(&self, id: SetId) -> Result> { + let guard = self.inner.sets[id as usize].write().await; + + let invalid = self.inner.cache.lock().await.remove(&id); + let storage = match invalid { + // `guard` already guarantees that there is only one storage reference left. + Some(storage) => Arc::into_inner(storage).unwrap(), + None => self.storage(id).await?, + }; + + Ok(SetWriteGuard { + bloom_filter: guard, + id, + set: SetMut::new(storage), + drop: DropPanicGuard::default(), + }) + } + + pub async fn read(&self, id: SetId, hash: u64) -> Result>> { + let guard = self.inner.sets[id as usize].read().await; + if !guard.lookup(hash) { + return Ok(None); + } + + let mut cache = self.inner.cache.lock().await; + let cached = cache.get(&id).cloned(); + let storage = match cached { + Some(storage) => storage, + None => { + let storage = self.storage(id).await?; + let storage = Arc::new(storage); + cache.insert(id, storage.clone()); + if cache.len() > self.inner.set_cache_capacity { + cache.pop_front(); + strict_assert!(cache.len() <= self.inner.set_cache_capacity); + } + storage + } + }; + drop(cache); + + Ok(Some(SetReadGuard { + _bloom_filter: guard, + _id: id, + set: Set::new(storage), + })) + } + + pub async fn apply(&self, mut guard: SetWriteGuard<'_>) -> Result<()> { + let mut storage = guard.set.into_storage(); + + // Update in-memory bloom filter. + storage.update(); + *guard.bloom_filter = BloomFilterU64::from(storage.bloom_filter().value()); + + let buffer = storage.freeze(); + + let (region, offset) = self.locate(guard.id); + self.inner.device.write(buffer, region, offset).await?; + if self.inner.flush { + self.inner.device.flush(Some(region)).await?; + } + guard.drop.disable(); + drop(guard.bloom_filter); + Ok(()) + } + + pub async fn contains(&self, id: SetId, hash: u64) -> bool { + let guard = self.inner.sets[id as usize].read().await; + guard.lookup(hash) + } + + pub fn sets(&self) -> usize { + self.inner.sets.len() + } + + pub fn set_data_size(&self) -> usize { + self.inner.set_size - SetStorage::SET_HEADER_SIZE + } + + async fn storage(&self, id: SetId) -> Result { + let (region, offset) = self.locate(id); + let buffer = self.inner.device.read(region, offset, self.inner.set_size).await?; + let storage = SetStorage::load(buffer); + Ok(storage) + } + + #[inline] + fn region_sets(&self) -> usize { + self.inner.device.region_size() / self.inner.set_size + } + + #[inline] + fn locate(&self, id: SetId) -> (RegionId, u64) { + let region_sets = self.region_sets(); + let region = id as RegionId / region_sets as RegionId; + let offset = ((id as usize % region_sets) * self.inner.set_size) as u64; + (region, offset) + } +} + +#[derive(Debug, Default)] +struct DropPanicGuard { + disabled: bool, +} + +impl Drop for DropPanicGuard { + fn drop(&mut self) { + if !self.disabled { + panic!("unexpected drop panic guard drop"); + } + } +} + +impl DropPanicGuard { + fn disable(&mut self) { + self.disabled = true; + } +} + +#[derive(Debug)] +pub struct SetWriteGuard<'a> { + bloom_filter: RwLockWriteGuard<'a, BloomFilterU64>, + id: SetId, + set: SetMut, + drop: DropPanicGuard, +} + +impl<'a> Deref for SetWriteGuard<'a> { + type Target = SetMut; + + fn deref(&self) -> &Self::Target { + &self.set + } +} + +impl<'a> DerefMut for SetWriteGuard<'a> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.set + } +} + +#[derive(Debug)] +pub struct SetReadGuard<'a> { + _bloom_filter: RwLockReadGuard<'a, BloomFilterU64>, + _id: SetId, + set: Set, +} + +impl<'a> Deref for SetReadGuard<'a> { + type Target = Set; + + fn deref(&self) -> &Self::Target { + &self.set + } +} diff --git a/foyer-storage/src/store.rs b/foyer-storage/src/store.rs index 6bfacfdd..8dcd5602 100644 --- a/foyer-storage/src/store.rs +++ b/foyer-storage/src/store.rs @@ -656,8 +656,16 @@ where .await } CombinedConfig::Small => { - Engine::open(EngineConfig::Small(GenericSmallStorageConfig { - placeholder: PhantomData, + Engine::open(EngineConfig::Small(GenericSmallStorageConfig { + set_size: todo!(), + set_cache_capacity: todo!(), + device, regions: todo!(), + flush: todo!(), + flushers: todo!(), + read_runtime_handle, + write_runtime_handle, + user_runtime_handle, + marker: PhantomData })) .await } @@ -671,8 +679,16 @@ where (device.regions() - large_region_count) as RegionId..device.regions() as RegionId; Engine::open(EngineConfig::Combined(EitherConfig { selector: SizeSelector::new(large_object_threshold), - left: GenericSmallStorageConfig { - placeholder: PhantomData, + left: GenericSmallStorageConfig { + set_size: todo!(), + set_cache_capacity: todo!(), + device, regions: todo!(), + flush: todo!(), + flushers: todo!(), + read_runtime_handle, + write_runtime_handle, + user_runtime_handle, + marker: PhantomData }, right: GenericLargeStorageConfig { name: self.name, diff --git a/foyer-util/Cargo.toml b/foyer-util/Cargo.toml index 478f17d3..bb73271e 100644 --- a/foyer-util/Cargo.toml +++ b/foyer-util/Cargo.toml @@ -13,8 +13,10 @@ publish = false [dependencies] bitmaps = "3" +bitvec = "1" bytes = "1" cfg-if = "1" +foyer-common = { path = "../foyer-common" } futures = "0.3" hashbrown = "0.14" itertools = { workspace = true } diff --git a/foyer-common/src/compact_bloom_filter.rs b/foyer-util/src/compact_bloom_filter.rs similarity index 99% rename from foyer-common/src/compact_bloom_filter.rs rename to foyer-util/src/compact_bloom_filter.rs index eea2957d..43d3adbc 100644 --- a/foyer-common/src/compact_bloom_filter.rs +++ b/foyer-util/src/compact_bloom_filter.rs @@ -17,7 +17,7 @@ use std::{cell::UnsafeCell, sync::Arc}; use bitvec::prelude::*; use itertools::Itertools; -use crate::strict_assert; +use foyer_common::strict_assert; /// Reduce two 64-bit hashes into one. /// diff --git a/foyer-util/src/lib.rs b/foyer-util/src/lib.rs index b7e5aa6c..85317159 100644 --- a/foyer-util/src/lib.rs +++ b/foyer-util/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod batch; +pub mod compact_bloom_filter; pub mod continuum; pub mod erwlock; pub mod iostat;