diff --git a/Cargo.toml b/Cargo.toml index 954908aa..c99b5088 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,9 @@ members = [ "foyer-cli", "foyer-common", "foyer-intrusive", + "foyer-intrusive-v2", "foyer-memory", + "foyer-memory-v2", "foyer-storage", "foyer-util", ] @@ -51,6 +53,7 @@ tokio = { package = "madsim-tokio", version = "0.2", features = [ # foyer components foyer-common = { version = "0.12.2", path = "foyer-common" } foyer-intrusive = { version = "0.12.2", path = "foyer-intrusive" } +foyer-intrusive-v2 = { version = "0.12.2", path = "foyer-intrusive-v2" } foyer-memory = { version = "0.12.2", path = "foyer-memory" } foyer-storage = { version = "0.12.2", path = "foyer-storage" } foyer = { version = "0.12.2", path = "foyer" } diff --git a/foyer-common/src/lib.rs b/foyer-common/src/lib.rs index 98356144..c94e7b79 100644 --- a/foyer-common/src/lib.rs +++ b/foyer-common/src/lib.rs @@ -42,6 +42,8 @@ pub mod rate; pub mod rated_ticket; /// A runtime that automatically shutdown itself on drop. pub mod runtime; +/// A kotlin like functional programming helper. +pub mod scope; /// Tracing related components. pub mod tracing; /// An async wait group implementation. diff --git a/foyer-common/src/scope.rs b/foyer-common/src/scope.rs new file mode 100644 index 00000000..02fb1568 --- /dev/null +++ b/foyer-common/src/scope.rs @@ -0,0 +1,42 @@ +// Copyright 2024 foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// Scoped functional programming extensions. +pub trait Scope { + /// Scoped with ownership. + fn with(self, f: F) -> R + where + Self: Sized, + F: FnOnce(Self) -> R, + { + f(self) + } + + /// Scoped with reference. + fn with_ref(&self, f: F) -> R + where + F: FnOnce(&Self) -> R, + { + f(self) + } + + /// Scoped with mutable reference. + fn with_mut(&mut self, f: F) -> R + where + F: FnOnce(&mut Self) -> R, + { + f(self) + } +} +impl Scope for T {} \ No newline at end of file diff --git a/foyer-intrusive-v2/Cargo.toml b/foyer-intrusive-v2/Cargo.toml new file mode 100644 index 00000000..2533f1fa --- /dev/null +++ b/foyer-intrusive-v2/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "foyer-intrusive-v2" +description = "intrusive data structures for foyer - Hybrid cache for Rust" +version = { workspace = true } +edition = { workspace = true } +rust-version = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +keywords = { workspace = true } +authors = { workspace = true } +license = { workspace = true } +readme = { workspace = true } +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +foyer-common = { workspace = true } +itertools = { workspace = true } + +[features] +strict_assertions = ["foyer-common/strict_assertions"] + +[lints] +workspace = true diff --git a/foyer-intrusive-v2/src/adapter.rs b/foyer-intrusive-v2/src/adapter.rs new file mode 100644 index 00000000..3f3f719c --- /dev/null +++ b/foyer-intrusive-v2/src/adapter.rs @@ -0,0 +1,186 @@ +// Copyright 2024 foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copyright 2020 Amari Robinson +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Intrusive data structure adapter that locates between pointer and item. + +use std::fmt::Debug; + +/// Intrusive data structure link. +pub trait Link: Send + Sync + 'static + Default + Debug { + /// Check if the link is linked by the intrusive data structure. + fn is_linked(&self) -> bool; +} + +/// Intrusive data structure adapter. +/// +/// # Safety +/// +/// Pointer operations MUST be valid. +/// +/// [`Adapter`] is recommended to be generated by macro `intrusive_adapter!`. +pub unsafe trait Adapter: Send + Sync + Debug + 'static { + /// Item type for the adapter. + type Item: ?Sized; + /// Link type for the adapter. + type Link: Link; + + /// Create a new intrusive data structure link. + fn new() -> Self; + + /// # Safety + /// + /// Pointer operations MUST be valid. + unsafe fn link2ptr(&self, link: std::ptr::NonNull) -> std::ptr::NonNull; + + /// # Safety + /// + /// Pointer operations MUST be valid. + unsafe fn ptr2link(&self, ptr: std::ptr::NonNull) -> std::ptr::NonNull; +} + +/// Macro to generate an implementation of [`Adapter`] for intrusive container and items. +/// +/// The basic syntax to create an adapter is: +/// +/// ```rust,ignore +/// intrusive_adapter! { Adapter = Pointer: Item { link_field: LinkType } } +/// ``` +/// +/// # Generics +/// +/// This macro supports generic arguments: +/// +/// Note that due to macro parsing limitations, `T: Trait` bounds are not +/// supported in the generic argument list. You must list any trait bounds in +/// a separate `where` clause at the end of the macro. +/// +/// # Examples +/// +/// ``` +/// use foyer_intrusive::intrusive_adapter; +/// use foyer_intrusive::adapter::Link; +/// +/// #[derive(Debug)] +/// pub struct Item +/// where +/// L: Link +/// { +/// link: L, +/// key: u64, +/// } +/// +/// intrusive_adapter! { ItemAdapter = Item { link: L } where L: Link } +/// ``` +#[macro_export] +macro_rules! intrusive_adapter { + (@impl + $vis:vis $name:ident ($($args:tt),*) = $item:ty { $($fields:expr)+ => $link:ty } $($where_:tt)* + ) => { + $vis struct $name<$($args),*> $($where_)* { + _marker: std::marker::PhantomData<($($args),*)> + } + + unsafe impl<$($args),*> Send for $name<$($args),*> $($where_)* {} + unsafe impl<$($args),*> Sync for $name<$($args),*> $($where_)* {} + + unsafe impl<$($args),*> $crate::adapter::Adapter for $name<$($args),*> $($where_)*{ + type Item = $item; + type Link = $link; + + fn new() -> Self { + Self { + _marker: std::marker::PhantomData, + } + } + + unsafe fn link2ptr(&self, link: std::ptr::NonNull) -> std::ptr::NonNull { + std::ptr::NonNull::new_unchecked($crate::container_of!(link.as_ptr(), $item, $($fields)+)) + } + + unsafe fn ptr2link(&self, item: std::ptr::NonNull) -> std::ptr::NonNull { + std::ptr::NonNull::new_unchecked((item.as_ptr() as *mut u8).add(std::mem::offset_of!($item, $($fields)+)) as *mut Self::Link) + } + } + + impl<$($args),*> std::fmt::Debug for $name<$($args),*> $($where_)*{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + f.debug_struct(stringify!($name)).finish() + } + } + }; + ( + $vis:vis $name:ident = $($rest:tt)* + ) => { + intrusive_adapter! {@impl + $vis $name () = $($rest)* + } + }; + ( + $vis:vis $name:ident<$($args:tt),*> = $($rest:tt)* + ) => { + intrusive_adapter! {@impl + $vis $name ($($args),*) = $($rest)* + } + }; +} + +#[cfg(test)] +mod tests { + use std::ptr::NonNull; + + use itertools::Itertools; + + use super::*; + use crate::{dlist::*, intrusive_adapter}; + + #[derive(Debug)] + struct DlistItem { + link: DlistLink, + val: u64, + } + + impl DlistItem { + fn new(val: u64) -> Self { + Self { + link: DlistLink::default(), + val, + } + } + } + + intrusive_adapter! { DlistItemAdapter = DlistItem { link => DlistLink }} + + #[test] + fn test_adapter_macro() { + let mut l = Dlist::::new(); + l.push_front(unsafe { NonNull::new_unchecked(Box::into_raw(Box::new(DlistItem::new(1)))) }); + let v = l.iter().map(|item| item.val).collect_vec(); + assert_eq!(v, vec![1]); + let _ = unsafe { Box::from_raw(l.pop_front().unwrap().as_ptr()) }; + } +} diff --git a/foyer-intrusive-v2/src/dlist.rs b/foyer-intrusive-v2/src/dlist.rs new file mode 100644 index 00000000..884b2d1b --- /dev/null +++ b/foyer-intrusive-v2/src/dlist.rs @@ -0,0 +1,591 @@ +// Copyright 2024 foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! An intrusive double linked list implementation. + +use std::ptr::NonNull; + +use foyer_common::{assert::OptionExt, strict_assert}; + +use crate::adapter::{Adapter, Link}; + +/// The link for the intrusive double linked list. +#[derive(Debug, Default)] +pub struct DlistLink { + prev: Option>, + next: Option>, + is_linked: bool, +} + +impl DlistLink { + /// Get the `NonNull` pointer of the link. + pub fn raw(&mut self) -> NonNull { + unsafe { NonNull::new_unchecked(self as *mut _) } + } + + /// Get the pointer of the prev link. + pub fn prev(&self) -> Option> { + self.prev + } + + /// Get the pointer of the next link. + pub fn next(&self) -> Option> { + self.next + } +} + +unsafe impl Send for DlistLink {} +unsafe impl Sync for DlistLink {} + +impl Link for DlistLink { + fn is_linked(&self) -> bool { + self.is_linked + } +} + +/// Intrusive double linked list. +#[derive(Debug)] +pub struct Dlist +where + A: Adapter, +{ + head: Option>, + tail: Option>, + + len: usize, + + adapter: A, +} + +unsafe impl Send for Dlist where A: Adapter {} +unsafe impl Sync for Dlist where A: Adapter {} + +impl Drop for Dlist +where + A: Adapter, +{ + fn drop(&mut self) { + let mut iter = self.iter_mut(); + iter.front(); + while iter.is_valid() { + iter.remove(); + } + assert!(self.is_empty()); + } +} + +impl Dlist +where + A: Adapter, +{ + /// Create a new intrusive double linked list. + pub fn new() -> Self { + Self { + head: None, + tail: None, + len: 0, + + adapter: A::new(), + } + } + + /// Get the reference of the first item of the intrusive double linked list. + pub fn front(&self) -> Option<&A::Item> { + unsafe { self.head.map(|link| self.adapter.link2ptr(link).as_ref()) } + } + + /// Get the reference of the last item of the intrusive double linked list. + pub fn back(&self) -> Option<&A::Item> { + unsafe { self.tail.map(|link| self.adapter.link2ptr(link).as_ref()) } + } + + /// Get the mutable reference of the first item of the intrusive double linked list. + pub fn front_mut(&mut self) -> Option<&mut A::Item> { + unsafe { self.head.map(|link| self.adapter.link2ptr(link).as_mut()) } + } + + /// Get the mutable reference of the last item of the intrusive double linked list. + pub fn back_mut(&mut self) -> Option<&mut A::Item> { + unsafe { self.tail.map(|link| self.adapter.link2ptr(link).as_mut()) } + } + + /// Push an item to the first position of the intrusive double linked list. + pub fn push_front(&mut self, ptr: NonNull) { + self.iter_mut().insert_after(ptr); + } + + /// Push an item to the last position of the intrusive double linked list. + pub fn push_back(&mut self, ptr: NonNull) { + self.iter_mut().insert_before(ptr); + } + + /// Pop an item from the first position of the intrusive double linked list. + pub fn pop_front(&mut self) -> Option> { + let mut iter = self.iter_mut(); + iter.next(); + iter.remove() + } + + /// Pop an item from the last position of the intrusive double linked list. + pub fn pop_back(&mut self) -> Option> { + let mut iter = self.iter_mut(); + iter.prev(); + iter.remove() + } + + /// Get the item reference iterator of the intrusive double linked list. + pub fn iter(&self) -> DlistIter<'_, A> { + DlistIter { + link: None, + dlist: self, + } + } + + /// Get the item mutable reference iterator of the intrusive double linked list. + pub fn iter_mut(&mut self) -> DlistIterMut<'_, A> { + DlistIterMut { + link: None, + dlist: self, + } + } + + /// Get the length of the intrusive double linked list. + pub fn len(&self) -> usize { + self.len + } + + /// Check if the intrusive double linked list is empty. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Remove an ptr. + /// + /// # Safety + /// + /// `ptr` MUST be in this [`Dlist`]. + pub unsafe fn remove(&mut self, ptr: NonNull) -> NonNull { + let mut iter = self.iter_mut_with_ptr(ptr); + strict_assert!(iter.is_valid()); + iter.remove().strict_unwrap_unchecked() + } + + /// Create mutable iterator directly on ptr. + /// + /// # Safety + /// + /// `ptr` MUST be in this [`Dlist`]. + pub unsafe fn iter_mut_with_ptr(&mut self, ptr: NonNull) -> DlistIterMut<'_, A> { + let link = self.adapter.ptr2link(ptr); + DlistIterMut { + link: Some(link), + dlist: self, + } + } + + /// Create immutable iterator directly on ptr. + /// + /// # Safety + /// + /// `ptr` MUST be in this [`Dlist`]. + pub unsafe fn iter_from_with_ptr(&self, ptr: NonNull) -> DlistIter<'_, A> { + let link = self.adapter.ptr2link(ptr); + DlistIter { + link: Some(link), + dlist: self, + } + } + + /// Get the intrusive adapter of the double linked list. + pub fn adapter(&self) -> &A { + &self.adapter + } +} + +/// Item reference iterator of the intrusive double linked list. +pub struct DlistIter<'a, A> +where + A: Adapter, +{ + link: Option>, + dlist: &'a Dlist, +} + +impl<'a, A> DlistIter<'a, A> +where + A: Adapter, +{ + /// Check if the iter is in a valid position. + pub fn is_valid(&self) -> bool { + self.link.is_some() + } + + /// Get the item of the current position. + pub fn item(&self) -> Option<&A::Item> { + self.link + .map(|link| unsafe { self.dlist.adapter.link2ptr(link).as_ref() }) + } + + /// Move to next. + /// + /// If iter is on tail, move to null. + /// If iter is on null, move to head. + pub fn next(&mut self) { + unsafe { + match self.link { + Some(link) => self.link = link.as_ref().next, + None => self.link = self.dlist.head, + } + } + } + + /// Move to prev. + /// + /// If iter is on head, move to null. + /// If iter is on null, move to tail. + pub fn prev(&mut self) { + unsafe { + match self.link { + Some(link) => self.link = link.as_ref().prev, + None => self.link = self.dlist.tail, + } + } + } + + /// Move to head. + pub fn front(&mut self) { + self.link = self.dlist.head; + } + + /// Move to head. + pub fn back(&mut self) { + self.link = self.dlist.tail; + } + + /// Check if the iterator is in the first position of the intrusive double linked list. + pub fn is_front(&self) -> bool { + self.link == self.dlist.head + } + + /// Check if the iterator is in the last position of the intrusive double linked list. + pub fn is_back(&self) -> bool { + self.link == self.dlist.tail + } +} + +/// Item mutable reference iterator of the intrusive double linked list. +pub struct DlistIterMut<'a, A> +where + A: Adapter, +{ + link: Option>, + dlist: &'a mut Dlist, +} + +impl<'a, A> DlistIterMut<'a, A> +where + A: Adapter, +{ + /// Check if the iter is in a valid position. + pub fn is_valid(&self) -> bool { + self.link.is_some() + } + + /// Get the item reference of the current position. + pub fn item(&self) -> Option<&A::Item> { + self.link + .map(|link| unsafe { self.dlist.adapter.link2ptr(link).as_ref() }) + } + + /// Get the item mutable reference of the current position. + pub fn item_mut(&mut self) -> Option<&mut A::Item> { + self.link + .map(|link| unsafe { self.dlist.adapter.link2ptr(link).as_mut() }) + } + + /// Move to next. + /// + /// If iter is on tail, move to null. + /// If iter is on null, move to head. + pub fn next(&mut self) { + unsafe { + match self.link { + Some(link) => self.link = link.as_ref().next, + None => self.link = self.dlist.head, + } + } + } + + /// Move to prev. + /// + /// If iter is on head, move to null. + /// If iter is on null, move to tail. + pub fn prev(&mut self) { + unsafe { + match self.link { + Some(link) => self.link = link.as_ref().prev, + None => self.link = self.dlist.tail, + } + } + } + + /// Move to front. + pub fn front(&mut self) { + self.link = self.dlist.head; + } + + /// Move to back. + pub fn back(&mut self) { + self.link = self.dlist.tail; + } + + /// Removes the current item from [`Dlist`] and move next. + pub fn remove(&mut self) -> Option> { + unsafe { + if !self.is_valid() { + return None; + } + + strict_assert!(self.is_valid()); + let mut link = self.link.strict_unwrap_unchecked(); + let ptr = self.dlist.adapter.link2ptr(link); + + // fix head and tail if node is either of that + let mut prev = link.as_ref().prev; + let mut next = link.as_ref().next; + if Some(link) == self.dlist.head { + self.dlist.head = next; + } + if Some(link) == self.dlist.tail { + self.dlist.tail = prev; + } + + // fix the next and prev ptrs of the node before and after this + if let Some(prev) = &mut prev { + prev.as_mut().next = next; + } + if let Some(next) = &mut next { + next.as_mut().prev = prev; + } + + link.as_mut().next = None; + link.as_mut().prev = None; + link.as_mut().is_linked = false; + + self.dlist.len -= 1; + + self.link = next; + + Some(ptr) + } + } + + /// Link a new ptr before the current one. + /// + /// If iter is on null, link to tail. + pub fn insert_before(&mut self, ptr: NonNull) { + unsafe { + let mut link_new = self.dlist.adapter.ptr2link(ptr); + assert!(!link_new.as_ref().is_linked()); + + match self.link { + Some(link) => self.link_before(link_new, link), + None => { + self.link_between(link_new, self.dlist.tail, None); + self.dlist.tail = Some(link_new); + } + } + + if self.dlist.head == self.link { + self.dlist.head = Some(link_new); + } + + link_new.as_mut().is_linked = true; + + self.dlist.len += 1; + } + } + + /// Link a new ptr after the current one. + /// + /// If iter is on null, link to head. + pub fn insert_after(&mut self, ptr: NonNull) { + unsafe { + let mut link_new = self.dlist.adapter.ptr2link(ptr); + assert!(!link_new.as_ref().is_linked()); + + match self.link { + Some(link) => self.link_after(link_new, link), + None => { + self.link_between(link_new, None, self.dlist.head); + self.dlist.head = Some(link_new); + } + } + + if self.dlist.tail == self.link { + self.dlist.tail = Some(link_new); + } + + link_new.as_mut().is_linked = true; + + self.dlist.len += 1; + } + } + + unsafe fn link_before(&mut self, link: NonNull, next: NonNull) { + self.link_between(link, next.as_ref().prev, Some(next)); + } + + unsafe fn link_after(&mut self, link: NonNull, prev: NonNull) { + self.link_between(link, Some(prev), prev.as_ref().next); + } + + unsafe fn link_between( + &mut self, + mut link: NonNull, + mut prev: Option>, + mut next: Option>, + ) { + if let Some(prev) = &mut prev { + prev.as_mut().next = Some(link); + } + if let Some(next) = &mut next { + next.as_mut().prev = Some(link); + } + link.as_mut().prev = prev; + link.as_mut().next = next; + } + + /// Check if the iterator is in the first position of the intrusive double linked list. + pub fn is_front(&self) -> bool { + self.link == self.dlist.head + } + + /// Check if the iterator is in the last position of the intrusive double linked list. + pub fn is_back(&self) -> bool { + self.link == self.dlist.tail + } +} + +impl<'a, A> Iterator for DlistIter<'a, A> +where + A: Adapter, +{ + type Item = &'a A::Item; + + fn next(&mut self) -> Option { + self.next(); + match self.link { + Some(link) => Some(unsafe { self.dlist.adapter.link2ptr(link).as_ref() }), + None => None, + } + } +} + +impl<'a, A> Iterator for DlistIterMut<'a, A> +where + A: Adapter, +{ + type Item = &'a mut A::Item; + + fn next(&mut self) -> Option { + self.next(); + match self.link { + Some(link) => Some(unsafe { self.dlist.adapter.link2ptr(link).as_mut() }), + None => None, + } + } +} + +// TODO(MrCroxx): Need more tests. + +#[cfg(test)] +mod tests { + + use itertools::Itertools; + + use super::*; + use crate::intrusive_adapter; + + #[derive(Debug)] + struct DlistItem { + link: DlistLink, + val: u64, + } + + impl DlistItem { + fn new(val: u64) -> Self { + Self { + link: DlistLink::default(), + val, + } + } + } + + #[derive(Debug, Default)] + struct DlistAdapter; + + unsafe impl Adapter for DlistAdapter { + type Item = DlistItem; + type Link = DlistLink; + + fn new() -> Self { + Self + } + + unsafe fn link2ptr(&self, link: NonNull) -> NonNull { + NonNull::new_unchecked(crate::container_of!(link.as_ptr(), DlistItem, link)) + } + + unsafe fn ptr2link(&self, item: NonNull) -> NonNull { + NonNull::new_unchecked((item.as_ptr() as *const u8).add(std::mem::offset_of!(DlistItem, link)) as *mut _) + } + } + + intrusive_adapter! { DlistArcAdapter = DlistItem { link => DlistLink } } + + #[test] + fn test_dlist_simple() { + let mut l = Dlist::::new(); + + l.push_back(unsafe { NonNull::new_unchecked(Box::into_raw(Box::new(DlistItem::new(2)))) }); + l.push_front(unsafe { NonNull::new_unchecked(Box::into_raw(Box::new(DlistItem::new(1)))) }); + l.push_back(unsafe { NonNull::new_unchecked(Box::into_raw(Box::new(DlistItem::new(3)))) }); + + let v = l.iter_mut().map(|item| item.val).collect_vec(); + assert_eq!(v, vec![1, 2, 3]); + assert_eq!(l.len(), 3); + + let mut iter = l.iter_mut(); + iter.next(); + iter.next(); + assert_eq!(iter.item().unwrap().val, 2); + let p2 = iter.remove(); + let i2 = unsafe { Box::from_raw(p2.unwrap().as_ptr()) }; + assert_eq!(i2.val, 2); + assert_eq!(iter.item().unwrap().val, 3); + let v = l.iter_mut().map(|item| item.val).collect_vec(); + assert_eq!(v, vec![1, 3]); + assert_eq!(l.len(), 2); + + let p3 = l.pop_back(); + let i3 = unsafe { Box::from_raw(p3.unwrap().as_ptr()) }; + assert_eq!(i3.val, 3); + let p1 = l.pop_front(); + let i1 = unsafe { Box::from_raw(p1.unwrap().as_ptr()) }; + assert_eq!(i1.val, 1); + assert!(l.pop_front().is_none()); + assert_eq!(l.len(), 0); + } +} diff --git a/foyer-intrusive-v2/src/lib.rs b/foyer-intrusive-v2/src/lib.rs new file mode 100644 index 00000000..f6aa5388 --- /dev/null +++ b/foyer-intrusive-v2/src/lib.rs @@ -0,0 +1,46 @@ +// Copyright 2024 foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![expect(clippy::new_without_default)] + +//! Intrusive data structures and utils for foyer. + +/// Unsafe macro to get a raw pointer to an outer object from a pointer to one +/// of its fields. +/// +/// # Examples +/// +/// ``` +/// use foyer_intrusive::container_of; +/// +/// struct S { x: u32, y: u32 }; +/// let mut container = S { x: 1, y: 2 }; +/// let field = &mut container.x; +/// let container2: *mut S = unsafe { container_of!(field, S, x) }; +/// assert_eq!(&mut container as *mut S, container2); +/// ``` +/// +/// # Safety +/// +/// This is unsafe because it assumes that the given expression is a valid +/// pointer to the specified field of some container type. +#[macro_export] +macro_rules! container_of { + ($ptr:expr, $container:path, $($fields:expr)+) => { + ($ptr as *mut _ as *const u8).sub(std::mem::offset_of!($container, $($fields)+)) as *mut $container + }; +} + +pub mod adapter; +pub mod dlist; diff --git a/foyer-memory-v2/Cargo.toml b/foyer-memory-v2/Cargo.toml new file mode 100644 index 00000000..911f950f --- /dev/null +++ b/foyer-memory-v2/Cargo.toml @@ -0,0 +1,49 @@ +[package] +name = "foyer-memory-v2" +description = "memory cache for foyer - Hybrid cache for Rust" +version = { workspace = true } +edition = { workspace = true } +rust-version = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +keywords = { workspace = true } +authors = { workspace = true } +license = { workspace = true } +readme = { workspace = true } +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +ahash = "0.8" +bitflags = "2" +cmsketch = "0.2.1" +equivalent = { workspace = true } +fastrace = { workspace = true } +foyer-common = { workspace = true } +foyer-intrusive-v2 = { workspace = true } +futures = "0.3" +hashbrown = { workspace = true } +itertools = { workspace = true } +parking_lot = "0.12" +pin-project = "1" +serde = { workspace = true } +tokio = { workspace = true } +tracing = "0.1" + +[dev-dependencies] +anyhow = "1" +csv = "1.3.0" +moka = { version = "0.12", features = ["sync"] } +rand = { version = "0.8", features = ["small_rng"] } +test-log = { workspace = true } +zipf = "7.0.1" + +[features] +deadlock = ["parking_lot/deadlock_detection"] +strict_assertions = [ + "foyer-common/strict_assertions", + "foyer-intrusive-v2/strict_assertions", +] +tracing = ["fastrace/enable", "foyer-common/tracing"] + +[lints] +workspace = true diff --git a/foyer-memory-v2/src/eviction/fifo.rs b/foyer-memory-v2/src/eviction/fifo.rs new file mode 100644 index 00000000..510c682e --- /dev/null +++ b/foyer-memory-v2/src/eviction/fifo.rs @@ -0,0 +1,112 @@ +// Copyright 2024 foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ptr::NonNull; + +use foyer_common::code::{Key, Value}; +use foyer_intrusive_v2::{ + dlist::{Dlist, DlistLink}, + intrusive_adapter, +}; +use serde::{Deserialize, Serialize}; + +use crate::Record; + +use super::{Eviction, Operator}; + +/// Fifo eviction algorithm config. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct FifoConfig; + +/// Fifo eviction algorithm hint. +#[derive(Debug, Clone, Default)] +pub struct FifoHint; + +/// Fifo eviction algorithm hint. +#[derive(Debug, Default)] +pub struct FifoState { + link: DlistLink, +} + +intrusive_adapter! { Adapter = Record> { state.link => DlistLink } where K: Key, V: Value } + +pub struct Fifo +where + K: Key, + V: Value, +{ + queue: Dlist>, +} + +impl Eviction for Fifo +where + K: Key, + V: Value, +{ + type Config = FifoConfig; + type Key = K; + type Value = V; + type Hint = FifoHint; + type State = FifoState; + + fn new(_capacity: usize, _config: &Self::Config) -> Self + where + Self: Sized, + { + Self { queue: Dlist::new() } + } + + fn update(&mut self, _: usize, _: &Self::Config) {} + + fn push(&mut self, ptr: NonNull>) { + self.queue.push_back(ptr); + unsafe { ptr.as_ref().set_in_eviction(true) }; + } + + fn pop(&mut self) -> Option>> { + self.queue + .pop_front() + .inspect(|ptr| unsafe { ptr.as_ref().set_in_eviction(false) }) + } + + fn remove(&mut self, ptr: NonNull>) { + let p = unsafe { self.queue.remove(ptr) }; + assert_eq!(p, ptr); + unsafe { ptr.as_ref().set_in_eviction(false) }; + } + + fn clear(&mut self) { + while let Some(_) = self.pop() {} + } + + fn len(&self) -> usize { + self.queue.len() + } + + fn is_empty(&self) -> bool { + self.queue.is_empty() + } + + fn acquire_operator() -> super::Operator { + Operator::Immutable + } + + fn acquire_immutable(&self, _ptr: NonNull>) {} + + fn acquire_mutable(&mut self, _ptr: NonNull>) { + unreachable!() + } + + fn release(&mut self, _ptr: NonNull>) {} +} diff --git a/foyer-memory-v2/src/eviction/mod.rs b/foyer-memory-v2/src/eviction/mod.rs new file mode 100644 index 00000000..7db15fe1 --- /dev/null +++ b/foyer-memory-v2/src/eviction/mod.rs @@ -0,0 +1,68 @@ +// Copyright 2024 foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ptr::NonNull; + +use foyer_common::code::{Key, Value}; +use serde::{de::DeserializeOwned, Serialize}; + +use crate::Record; + +pub trait Hint: Send + Sync + 'static + Clone + Default {} +impl Hint for T where T: Send + Sync + 'static + Clone + Default {} + +pub trait State: Send + Sync + 'static + Default {} +impl State for T where T: Send + Sync + 'static + Default {} + +pub trait Config: Send + Sync + 'static + Clone + Serialize + DeserializeOwned + Default {} +impl Config for T where T: Send + Sync + 'static + Clone + Serialize + DeserializeOwned + Default {} + +pub enum Operator { + Immutable, + Mutable, +} + +pub trait Eviction: Send + Sync + 'static + Sized { + type Config: Config; + type Key: Key; + type Value: Value; + type Hint: Hint; + type State: State; + + fn new(capacity: usize, config: &Self::Config) -> Self; + + fn update(&mut self, capacity: usize, config: &Self::Config); + + fn push(&mut self, ptr: NonNull>); + + fn pop(&mut self) -> Option>>; + + fn remove(&mut self, ptr: NonNull>); + + fn clear(&mut self); + + fn len(&self) -> usize; + + fn is_empty(&self) -> bool; + + fn acquire_operator() -> Operator; + + fn acquire_immutable(&self, ptr: NonNull>); + + fn acquire_mutable(&mut self, ptr: NonNull>); + + fn release(&mut self, ptr: NonNull>); +} + +pub mod fifo; diff --git a/foyer-memory-v2/src/indexer/hash_table.rs b/foyer-memory-v2/src/indexer/hash_table.rs new file mode 100644 index 00000000..af2d5ec7 --- /dev/null +++ b/foyer-memory-v2/src/indexer/hash_table.rs @@ -0,0 +1,98 @@ +// Copyright 2024 foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ptr::NonNull; + +use hashbrown::hash_table::{Entry as HashTableEntry, HashTable}; + +use crate::{Eviction, Record}; + +use super::Indexer; + +pub struct HashTableIndexer +where + E: Eviction, +{ + table: HashTable>>, +} + +unsafe impl Send for HashTableIndexer where E: Eviction {} +unsafe impl Sync for HashTableIndexer where E: Eviction {} + +impl Default for HashTableIndexer +where + E: Eviction, +{ + fn default() -> Self { + Self { + table: Default::default(), + } + } +} + +impl Indexer for HashTableIndexer +where + E: Eviction, +{ + type Eviction = E; + + fn insert(&mut self, mut ptr: NonNull>) -> Option>> { + let record = unsafe { ptr.as_ref() }; + + match self.table.entry( + record.hash(), + |p| unsafe { p.as_ref() }.key() == record.key(), + |p| unsafe { p.as_ref() }.hash(), + ) { + HashTableEntry::Occupied(mut o) => { + std::mem::swap(o.get_mut(), &mut ptr); + Some(ptr) + } + HashTableEntry::Vacant(v) => { + v.insert(ptr); + None + } + } + } + + fn get(&self, hash: u64, key: &Q) -> Option>> + where + Q: std::hash::Hash + equivalent::Equivalent<::Key> + ?Sized, + { + self.table + .find(hash, |p| key.equivalent(unsafe { p.as_ref() }.key())) + .copied() + } + + fn remove(&mut self, hash: u64, key: &Q) -> Option>> + where + Q: std::hash::Hash + equivalent::Equivalent<::Key> + ?Sized, + { + match self.table.entry( + hash, + |p| key.equivalent(unsafe { p.as_ref() }.key()), + |p| unsafe { p.as_ref() }.hash(), + ) { + HashTableEntry::Occupied(o) => { + let (p, _) = o.remove(); + Some(p) + } + HashTableEntry::Vacant(_) => None, + } + } + + fn drain(&mut self) -> impl Iterator>> { + self.table.drain() + } +} diff --git a/foyer-memory-v2/src/indexer/mod.rs b/foyer-memory-v2/src/indexer/mod.rs new file mode 100644 index 00000000..06039868 --- /dev/null +++ b/foyer-memory-v2/src/indexer/mod.rs @@ -0,0 +1,35 @@ +// Copyright 2024 foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{hash::Hash, ptr::NonNull}; + +use equivalent::Equivalent; + +use crate::{Eviction, Record}; + +pub trait Indexer: Send + Sync + 'static + Default { + type Eviction: Eviction; + + fn insert(&mut self, ptr: NonNull>) -> Option>>; + fn get(&self, hash: u64, key: &Q) -> Option>> + where + Q: Hash + Equivalent<::Key> + ?Sized; + fn remove(&mut self, hash: u64, key: &Q) -> Option>> + where + Q: Hash + Equivalent<::Key> + ?Sized; + fn drain(&mut self) -> impl Iterator>>; +} + +pub mod hash_table; +pub mod sentry; diff --git a/foyer-memory-v2/src/indexer/sentry.rs b/foyer-memory-v2/src/indexer/sentry.rs new file mode 100644 index 00000000..5e89f59f --- /dev/null +++ b/foyer-memory-v2/src/indexer/sentry.rs @@ -0,0 +1,78 @@ +// Copyright 2024 foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{hash::Hash, ptr::NonNull}; + +use equivalent::Equivalent; +use foyer_common::strict_assert; + +use crate::{Eviction, Record}; + +use super::Indexer; + +pub struct Sentry +where + I: Indexer, +{ + indexer: I, +} + +impl Default for Sentry +where + I: Indexer, +{ + fn default() -> Self { + Self { indexer: I::default() } + } +} + +impl Indexer for Sentry +where + I: Indexer, +{ + type Eviction = I::Eviction; + + fn insert(&mut self, ptr: NonNull>) -> Option>> { + strict_assert!(!unsafe { ptr.as_ref() }.is_in_indexer()); + let res = self.indexer.insert(ptr); + unsafe { ptr.as_ref() }.set_in_indexer(true); + res + } + + fn get(&self, hash: u64, key: &Q) -> Option>> + where + Q: Hash + Equivalent<::Key> + ?Sized, + { + self.indexer.get(hash, key).inspect(|ptr| { + strict_assert!(unsafe { ptr.as_ref() }.is_in_indexer()); + }) + } + + fn remove(&mut self, hash: u64, key: &Q) -> Option>> + where + Q: Hash + Equivalent<::Key> + ?Sized, + { + self.indexer.remove(hash, key).inspect(|ptr| { + strict_assert!(unsafe { ptr.as_ref() }.is_in_indexer()); + unsafe { ptr.as_ref() }.set_in_indexer(false) + }) + } + + fn drain(&mut self) -> impl Iterator>> { + self.indexer.drain().inspect(|ptr| { + strict_assert!(unsafe { ptr.as_ref() }.is_in_indexer()); + unsafe { ptr.as_ref() }.set_in_indexer(false) + }) + } +} diff --git a/foyer-memory-v2/src/lib.rs b/foyer-memory-v2/src/lib.rs new file mode 100644 index 00000000..27a9a2bd --- /dev/null +++ b/foyer-memory-v2/src/lib.rs @@ -0,0 +1,25 @@ +// Copyright 2024 foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! TODO(MrCroxx): Finish the crate level docs. + +mod eviction; +mod indexer; +mod raw; +mod record; +mod slab; +mod sync; + +mod prelude; +pub use prelude::*; diff --git a/foyer-memory-v2/src/prelude.rs b/foyer-memory-v2/src/prelude.rs new file mode 100644 index 00000000..033d4cc5 --- /dev/null +++ b/foyer-memory-v2/src/prelude.rs @@ -0,0 +1,15 @@ +// Copyright 2024 foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub use crate::{eviction::Eviction, raw::Weighter, record::Record}; diff --git a/foyer-memory-v2/src/raw.rs b/foyer-memory-v2/src/raw.rs new file mode 100644 index 00000000..be0fc505 --- /dev/null +++ b/foyer-memory-v2/src/raw.rs @@ -0,0 +1,1064 @@ +// Copyright 2024 foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! # Atomic Reference Count Management +//! +//! [`RawCache`] uses an atomic reference count to management the release of an entry. +//! +//! The atomic reference count represents the *external* references of a cache record. +//! +//! When the reference count drops to zero, the related cache shard is locked to release to cache record. +//! +//! It is important to guarantee the correctness of the usage of the atomic reference count. Especially when triggering +//! the release of a record. Without any other synchronize mechanism, there would be dangling pointers or double frees: +//! +//! ```plain +//! Thread 1: [ decrease ARC to 0 ] ============> [ release record ] +//! Thread 2: [ increase ARC to 1 ] =======> dangling!! ==> double free!! +//! ``` +//! +//! Thankfully, we can prevent it from happening with the usage of the shard lock: +//! +//! The only ops that will increase the atomic reference count are: +//! 1. Insert/get/fetch the [`RawCache`] and get external entries. (locked) +//! 2. Clone an external entry. (lock-free) +//! +//! The op 1 is always guarded by a mutex/rwlock, which means it is impossible to happen while releasing a record with +//! the shard is locked. +//! +//! The op 2 is lock-free, but only happens when there is at least 1 external reference. So, it cannot happen while +//! releasing a record. Because when releasing is happening, there must be no external reference. +//! +//! So, this case will never happen: +//! +//! ```plain +//! Thread 1: [ decrease ARC to 0 ] ================> [ release record ] +//! Thread 2: [ (op2) increase ARC to 1 ] ==> dangling!! ==> double free!! +//! ``` +//! +//! When starting to release a record, after locking the shard, it's still required to check the atomic reference count. +//! Because this cause still exist: +//! +//! ```plain +//! Thread 1: [ decrease ARC to 0 ] ====================================> [ release record ] +//! Thread 2: [ (op1) increase ARC to 1 ] =======> dangling!! ==> double free!! +//! ``` +//! +//! Although the op1 requires to be locked, but the release operation can be delayed after that. So the release +//! operation can be ignored after checking the atomic reference count is not zero. +//! +//! There is no need to be afraid of leaking, there is always to be a release operation following the increasing when +//! the atomic reference count drops to zero again. + +use std::{ + collections::hash_map::{Entry as HashMapEntry, HashMap}, + fmt::Debug, + future::Future, + hash::Hash, + ops::Deref, + pin::Pin, + ptr::NonNull, + sync::{atomic::Ordering, Arc}, + task::{Context, Poll}, +}; + +use equivalent::Equivalent; +use fastrace::{ + future::{FutureExt, InSpan}, + Span, +}; +use foyer_common::{ + code::HashBuilder, + event::EventListener, + future::{Diversion, DiversionFuture}, + metrics::Metrics, + runtime::SingletonHandle, + scope::Scope, + strict_assert, strict_assert_eq, +}; +use itertools::Itertools; +use parking_lot::Mutex; +use pin_project::pin_project; +use tokio::{sync::oneshot, task::JoinHandle}; + +use crate::{ + eviction::Operator, + indexer::{sentry::Sentry, Indexer}, + record::Data, + slab::{Slab, SlabBuilder}, + sync::Lock, + Eviction, Record, +}; + +/// The weighter for the in-memory cache. +/// +/// The weighter is used to calculate the weight of the cache entry. +pub trait Weighter: Fn(&K, &V) -> usize + Send + Sync + 'static {} +impl Weighter for T where T: Fn(&K, &V) -> usize + Send + Sync + 'static {} + +pub struct RawCacheConfig +where + S: HashBuilder, + E: Eviction, +{ + pub name: String, + pub capacity: usize, + pub shards: usize, + pub eviction_config: E::Config, + pub slab_initial_capacity: usize, + pub slab_segment_size: usize, + pub hash_builder: S, + pub weighter: Arc>, + pub event_listener: Option>>, +} + +struct RawCacheShard +where + S: HashBuilder, + E: Eviction, + I: Indexer, +{ + slab: Slab>, + + eviction: E, + indexer: Sentry, + + usage: usize, + capacity: usize, + + waiters: Mutex>>>>, + + metrics: Arc, + event_listener: Option>>, +} + +impl RawCacheShard +where + S: HashBuilder, + E: Eviction, + I: Indexer, +{ + #[fastrace::trace(name = "foyer::memory::raw::shard::emplace")] + fn emplace( + &mut self, + data: Data, + ephemeral: bool, + garbages: &mut Vec>, + waiters: &mut Vec>>, + ) -> NonNull> { + std::mem::swap(waiters, &mut self.waiters.lock().remove(&data.key).unwrap_or_default()); + + let weight = data.weight; + + // Allocate and setup new record. + let token = self.slab.insert(Record::new(data)); + let mut ptr = self.slab.ptr(token); + unsafe { ptr.as_mut().init(token) }; + + // Evict overflow records. + while self.usage + weight > self.capacity { + let evicted = match self.eviction.pop() { + Some(evicted) => evicted, + None => break, + }; + self.metrics.memory_evict.increment(1); + strict_assert!(unsafe { evicted.as_ref().is_in_indexer() }); + strict_assert!(unsafe { !evicted.as_ref().is_in_eviction() }); + unsafe { evicted.as_ref().set_ephemeral(true) }; + if unsafe { evicted.as_ref().refs().load(Ordering::SeqCst) } == 0 { + if let Some(garbage) = self.release(evicted) { + garbages.push(garbage); + } + } + } + + // Insert new record + if let Some(old) = self.indexer.insert(ptr) { + self.metrics.memory_replace.increment(1); + + strict_assert!(!unsafe { old.as_ref() }.is_in_indexer()); + if unsafe { old.as_ref() }.is_in_eviction() { + self.eviction.remove(old); + } + strict_assert!(!unsafe { old.as_ref() }.is_in_eviction()); + // Because the `old` handle is removed from the indexer, it will not be reinserted again. + if unsafe { old.as_ref().refs().load(Ordering::SeqCst) } == 0 { + if let Some(garbage) = self.release(old) { + garbages.push(garbage); + } + } + } else { + self.metrics.memory_insert.increment(1); + } + strict_assert!(unsafe { ptr.as_ref() }.is_in_indexer()); + + unsafe { ptr.as_mut().set_ephemeral(ephemeral) }; + if !ephemeral { + self.eviction.push(ptr); + strict_assert!(unsafe { ptr.as_ref() }.is_in_eviction()); + } + + self.usage += weight; + self.metrics.memory_usage.increment(weight as f64); + // Increase the reference count within the lock section. + unsafe { ptr.as_ref().refs().fetch_add(waiters.len() + 1, Ordering::SeqCst) }; + + ptr + } + + #[fastrace::trace(name = "foyer::memory::raw::shard::release")] + fn release(&mut self, mut ptr: NonNull>) -> Option> { + let record = unsafe { ptr.as_mut() }; + + if record.refs().load(Ordering::SeqCst) > 0 { + return None; + } + + if record.is_in_indexer() { + if record.is_ephemeral() { + // The entry is ephemeral, remove it from indexer. Ignore reinsertion. + strict_assert!(!record.is_in_eviction()); + self.indexer.remove(record.hash(), record.key()); + strict_assert!(!record.is_in_indexer()); + } else { + // The entry has no external refs, give it another chance by reinsertion if the cache is not busy + // and the algorithm allows. + + // The usage is higher than the capacity means most handles are held externally, + // the cache shard cannot release enough weight for the new inserted entries. + // In this case, the reinsertion should be given up. + if self.usage <= self.capacity { + let was_in_eviction = record.is_in_eviction(); + self.eviction.release(ptr); + if record.is_in_eviction() { + if !was_in_eviction { + self.metrics.memory_reinsert.increment(1); + } + strict_assert!(record.is_in_indexer()); + strict_assert!(record.is_in_eviction()); + return None; + } + } + } + + // If the entry has not been reinserted, remove it from the indexer and the eviction container (if needed). + self.indexer.remove(record.hash(), record.key()); + if record.is_in_eviction() { + self.eviction.remove(ptr); + } + } + + // Here the handle is neither in the indexer nor in the eviction container. + strict_assert!(!record.is_in_indexer()); + strict_assert!(!record.is_in_eviction()); + strict_assert_eq!(record.refs().load(Ordering::SeqCst), 0); + + self.metrics.memory_release.increment(1); + self.usage -= record.weight(); + self.metrics.memory_usage.decrement(record.weight() as f64); + + let token = record.token(); + drop((record, ptr)); + + let record = self.slab.remove(token); + let data = record.into_data(); + + Some(data) + } + + #[fastrace::trace(name = "foyer::memory::raw::shard::remove")] + fn remove(&mut self, hash: u64, key: &Q) -> Option>> + where + Q: Hash + Equivalent + ?Sized, + { + let mut ptr = self.indexer.remove(hash, key)?; + + let record = unsafe { ptr.as_mut() }; + if record.is_in_eviction() { + self.eviction.remove(ptr); + } + record.refs().fetch_add(1, Ordering::SeqCst); + + strict_assert!(record.is_in_indexer()); + strict_assert!(record.is_in_eviction()); + + self.metrics.memory_remove.increment(1); + + Some(ptr) + } + + #[fastrace::trace(name = "foyer::memory::raw::shard::get_immutable")] + fn get_immutable(&self, hash: u64, key: &Q) -> Option>> + where + Q: Hash + Equivalent + ?Sized, + { + self.get_inner(hash, key).inspect(|&ptr| self.acquire_immutable(ptr)) + } + + #[fastrace::trace(name = "foyer::memory::raw::shard::get_mutable")] + fn get_mutable(&mut self, hash: u64, key: &Q) -> Option>> + where + Q: Hash + Equivalent + ?Sized, + { + self.get_inner(hash, key).inspect(|&ptr| self.acquire_mutable(ptr)) + } + + #[fastrace::trace(name = "foyer::memory::raw::shard::get_inner")] + fn get_inner(&self, hash: u64, key: &Q) -> Option>> + where + Q: Hash + Equivalent + ?Sized, + { + let ptr = match self.indexer.get(hash, key) { + Some(ptr) => { + self.metrics.memory_hit.increment(1); + ptr + } + None => { + self.metrics.memory_miss.increment(1); + return None; + } + }; + + let record = unsafe { ptr.as_ref() }; + + strict_assert!(record.is_in_indexer()); + + record.set_ephemeral(false); + record.refs().fetch_add(1, Ordering::SeqCst); + + Some(ptr) + } + + #[fastrace::trace(name = "foyer::memory::raw::shard::acquire_immutable")] + fn acquire_immutable(&self, ptr: NonNull>) { + self.eviction.acquire_immutable(ptr); + } + + #[fastrace::trace(name = "foyer::memory::raw::shard::acquire_mutable")] + fn acquire_mutable(&mut self, ptr: NonNull>) { + self.eviction.acquire_mutable(ptr); + } + + #[fastrace::trace(name = "foyer::memory::raw::shard::clear")] + fn clear(&mut self, garbages: &mut Vec>) { + let ptrs = self.indexer.drain().collect_vec(); + self.eviction.clear(); + + let mut count = 0; + + for ptr in ptrs { + count += 1; + strict_assert!(unsafe { !ptr.as_ref().is_in_indexer() }); + strict_assert!(unsafe { !ptr.as_ref().is_in_eviction() }); + if unsafe { ptr.as_ref().refs().load(Ordering::SeqCst) } == 0 { + if let Some(garbage) = self.release(ptr) { + garbages.push(garbage); + } + } + } + + self.metrics.memory_remove.increment(count); + } + + #[fastrace::trace(name = "foyer::memory::raw::shard::fetch_immutable")] + fn fetch_immutable(&self, hash: u64, key: &E::Key) -> RawShardFetch + where + E::Key: Clone, + { + if let Some(ptr) = self.get_immutable(hash, key) { + return RawShardFetch::Hit(ptr); + } + + self.fetch_queue(key.clone()) + } + + #[fastrace::trace(name = "foyer::memory::raw::shard::fetch_mutable")] + fn fetch_mutable(&mut self, hash: u64, key: &E::Key) -> RawShardFetch + where + E::Key: Clone, + { + if let Some(ptr) = self.get_mutable(hash, key) { + return RawShardFetch::Hit(ptr); + } + + self.fetch_queue(key.clone()) + } + + #[fastrace::trace(name = "foyer::memory::raw::shard::fetch_queue")] + fn fetch_queue(&self, key: E::Key) -> RawShardFetch { + match self.waiters.lock().entry(key) { + HashMapEntry::Occupied(mut o) => { + let (tx, rx) = oneshot::channel(); + o.get_mut().push(tx); + self.metrics.memory_queue.increment(1); + RawShardFetch::Wait(rx.in_span(Span::enter_with_local_parent( + "foyer::memory::raw::fetch_with_runtime::wait", + ))) + } + HashMapEntry::Vacant(v) => { + v.insert(vec![]); + self.metrics.memory_fetch.increment(1); + RawShardFetch::Miss + } + } + } +} + +struct RawCacheInner +where + S: HashBuilder, + E: Eviction, + I: Indexer, +{ + shards: Vec>>, + + capacity: usize, + + hash_builder: S, + weighter: Arc>, + + metrics: Arc, + event_listener: Option>>, +} + +impl RawCacheInner +where + S: HashBuilder, + E: Eviction, + I: Indexer, +{ + #[fastrace::trace(name = "foyer::memory::raw::inner::clear")] + fn clear(&self) { + let mut garbages = vec![]; + + self.shards + .iter() + .map(|shard| shard.write()) + .for_each(|mut shard| shard.clear(&mut garbages)); + + // Do not deallocate data within the lock section. + if let Some(listener) = self.event_listener.as_ref() { + for Data { key, value, .. } in garbages { + listener.on_memory_release(key, value); + } + } + } +} + +struct RawCache +where + S: HashBuilder, + E: Eviction, + I: Indexer, +{ + inner: Arc>, +} + +impl Drop for RawCacheInner +where + S: HashBuilder, + E: Eviction, + I: Indexer, +{ + fn drop(&mut self) { + self.clear(); + } +} + +impl Clone for RawCache +where + S: HashBuilder, + E: Eviction, + I: Indexer, +{ + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl RawCache +where + S: HashBuilder, + E: Eviction, + I: Indexer, +{ + pub fn new(config: RawCacheConfig) -> Self { + let metrics = Arc::new(Metrics::new(&config.name)); + + let shard_capacity = config.capacity / config.shards; + + let shards = (0..config.shards) + .map(|_| RawCacheShard { + slab: SlabBuilder::new() + .with_capacity(config.slab_initial_capacity) + .with_segment_size(config.slab_segment_size) + .build(), + eviction: E::new(shard_capacity, &config.eviction_config), + indexer: Sentry::default(), + usage: 0, + capacity: shard_capacity, + waiters: Mutex::default(), + metrics: metrics.clone(), + event_listener: config.event_listener.clone(), + }) + .map(|shard| match E::acquire_operator() { + Operator::Immutable => Lock::rwlock(shard), + Operator::Mutable => Lock::mutex(shard), + }) + .collect_vec(); + + let inner = RawCacheInner { + shards, + capacity: config.capacity, + hash_builder: config.hash_builder, + weighter: config.weighter, + metrics, + event_listener: config.event_listener, + }; + + Self { inner: Arc::new(inner) } + } + + #[fastrace::trace(name = "foyer::memory::raw::insert")] + pub fn insert(&self, key: E::Key, value: E::Value) -> RawCacheEntry { + self.insert_with_hint(key, value, Default::default()) + } + + #[fastrace::trace(name = "foyer::memory::raw::insert_with_hint")] + pub fn insert_with_hint(&self, key: E::Key, value: E::Value, hint: E::Hint) -> RawCacheEntry { + self.emplace(key, value, hint, false) + } + + #[fastrace::trace(name = "foyer::memory::raw::insert_ephemeral")] + pub fn insert_ephemeral(&self, key: E::Key, value: E::Value) -> RawCacheEntry { + self.insert_ephemeral_with_hint(key, value, Default::default()) + } + + #[fastrace::trace(name = "foyer::memory::raw::insert_ephemeral_with_hint")] + pub fn insert_ephemeral_with_hint(&self, key: E::Key, value: E::Value, hint: E::Hint) -> RawCacheEntry { + self.emplace(key, value, hint, true) + } + + #[fastrace::trace(name = "foyer::memory::raw::emplace")] + fn emplace(&self, key: E::Key, value: E::Value, hint: E::Hint, ephemeral: bool) -> RawCacheEntry { + let hash = self.inner.hash_builder.hash_one(&key); + let weight = (self.inner.weighter)(&key, &value); + + let mut garbages = vec![]; + let mut waiters = vec![]; + + let ptr = self.inner.shards[self.shard(hash)].write().with(|mut shard| { + shard.emplace( + Data { + key, + value, + hint, + state: Default::default(), + hash, + weight, + }, + ephemeral, + &mut garbages, + &mut waiters, + ) + }); + + // Notify waiters out of the lock critical sectwion. + for waiter in waiters { + let _ = waiter.send(RawCacheEntry { + inner: self.inner.clone(), + ptr: ptr, + }); + } + + // Deallocate data out of the lock critical section. + if let Some(listener) = self.inner.event_listener.as_ref() { + for Data { key, value, .. } in garbages { + listener.on_memory_release(key, value); + } + } + + RawCacheEntry { + inner: self.inner.clone(), + ptr, + } + } + + #[fastrace::trace(name = "foyer::memory::raw::remove")] + pub fn remove(&self, key: &Q) -> Option> + where + Q: Hash + Equivalent + ?Sized, + { + let hash = self.inner.hash_builder.hash_one(key); + + self.inner.shards[self.shard(hash)].write().with(|mut shard| { + shard.remove(hash, key).map(|ptr| RawCacheEntry { + inner: self.inner.clone(), + ptr, + }) + }) + } + + #[fastrace::trace(name = "foyer::memory::raw::get")] + pub fn get(&self, key: &Q) -> Option> + where + Q: Hash + Equivalent + ?Sized, + { + let hash = self.inner.hash_builder.hash_one(key); + + let ptr = match E::acquire_operator() { + Operator::Immutable => self.inner.shards[self.shard(hash)] + .read() + .with(|shard| shard.get_immutable(hash, key)), + Operator::Mutable => self.inner.shards[self.shard(hash)] + .write() + .with(|mut shard| shard.get_mutable(hash, key)), + }?; + + Some(RawCacheEntry { + inner: self.inner.clone(), + ptr, + }) + } + + #[fastrace::trace(name = "foyer::memory::raw::contains")] + pub fn contains(&self, key: &Q) -> bool + where + Q: Hash + Equivalent + ?Sized, + { + let hash = self.inner.hash_builder.hash_one(key); + + self.inner.shards[self.shard(hash)] + .read() + .with(|shard| shard.indexer.get(hash, key)) + .is_some() + } + + #[fastrace::trace(name = "foyer::memory::raw::touch")] + pub fn touch(&self, key: &Q) -> bool + where + Q: Hash + Equivalent + ?Sized, + { + let hash = self.inner.hash_builder.hash_one(key); + + match E::acquire_operator() { + Operator::Immutable => self.inner.shards[self.shard(hash)] + .read() + .with(|shard| shard.get_immutable(hash, key)), + Operator::Mutable => self.inner.shards[self.shard(hash)] + .write() + .with(|mut shard| shard.get_mutable(hash, key)), + } + .is_some() + } + + #[fastrace::trace(name = "foyer::memory::raw::clear")] + pub fn clear(&self) { + self.inner.clear(); + } + + pub fn capacity(&self) -> usize { + self.inner.capacity + } + + pub fn usage(&self) -> usize { + self.inner.shards.iter().map(|shard| shard.read().usage).sum() + } + + pub fn metrics(&self) -> &Metrics { + &self.inner.metrics + } + + pub fn hash_builder(&self) -> &S { + &self.inner.hash_builder + } + + pub fn shards(&self) -> usize { + self.inner.shards.len() + } + + fn shard(&self, hash: u64) -> usize { + hash as usize % self.inner.shards.len() + } +} + +pub struct RawCacheEntry +where + S: HashBuilder, + E: Eviction, + I: Indexer, +{ + inner: Arc>, + ptr: NonNull>, +} + +impl Debug for RawCacheEntry +where + S: HashBuilder, + E: Eviction, + I: Indexer, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RawCacheEntry").field("ptr", &self.ptr).finish() + } +} + +impl Drop for RawCacheEntry +where + S: HashBuilder, + E: Eviction, + I: Indexer, +{ + fn drop(&mut self) { + if unsafe { self.ptr.as_ref() }.refs().fetch_sub(1, Ordering::SeqCst) == 1 { + let hash = unsafe { self.ptr.as_ref() }.hash(); + let shard = hash as usize % self.inner.shards.len(); + let garbage = self.inner.shards[shard] + .write() + .with(|mut shard| shard.release(self.ptr)); + // Do not deallocate data within the lock section. + if let Some(listener) = self.inner.event_listener.as_ref() { + if let Some(Data { key, value, .. }) = garbage { + listener.on_memory_release(key, value); + } + } + } + } +} + +impl Clone for RawCacheEntry +where + S: HashBuilder, + E: Eviction, + I: Indexer, +{ + fn clone(&self) -> Self { + unsafe { self.ptr.as_ref() }.refs().fetch_add(1, Ordering::SeqCst); + Self { + inner: self.inner.clone(), + ptr: self.ptr, + } + } +} + +impl Deref for RawCacheEntry +where + S: HashBuilder, + E: Eviction, + I: Indexer, +{ + type Target = E::Value; + + fn deref(&self) -> &Self::Target { + self.value() + } +} + +unsafe impl Send for RawCacheEntry +where + S: HashBuilder, + E: Eviction, + I: Indexer, +{ +} + +unsafe impl Sync for RawCacheEntry +where + S: HashBuilder, + E: Eviction, + I: Indexer, +{ +} + +impl RawCacheEntry +where + S: HashBuilder, + E: Eviction, + I: Indexer, +{ + pub fn hash(&self) -> u64 { + unsafe { self.ptr.as_ref() }.hash() + } + + pub fn key(&self) -> &E::Key { + unsafe { &self.ptr.as_ref() }.key() + } + + pub fn value(&self) -> &E::Value { + unsafe { &self.ptr.as_ref() }.value() + } + + pub fn hint(&self) -> &E::Hint { + unsafe { self.ptr.as_ref() }.hint() + } + + pub fn weight(&self) -> usize { + unsafe { self.ptr.as_ref() }.weight() + } + + pub fn refs(&self) -> usize { + unsafe { self.ptr.as_ref() }.refs().load(Ordering::SeqCst) + } + + pub fn is_outdated(&self) -> bool { + !unsafe { self.ptr.as_ref() }.is_in_indexer() + } +} + +/// The state of [`Fetch`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FetchState { + /// Cache hit. + Hit, + /// Cache miss, but wait in queue. + Wait, + /// Cache miss, and there is no other waiters at the moment. + Miss, +} + +/// A mark for fetch calls. +pub struct FetchMark; + +enum RawShardFetch +where + S: HashBuilder, + E: Eviction, + I: Indexer, +{ + Hit(NonNull>), + Wait(InSpan>>), + Miss, +} + +pub type RawFetch = + DiversionFuture, std::result::Result, ER>, FetchMark>; + +type RawFetchHit = Option>; +type RawFetchWait = InSpan>>; +type RawFetchMiss = JoinHandle, ER>, DFS>>; + +#[pin_project(project = RawFetchInnerProj)] +pub enum RawFetchInner +where + S: HashBuilder, + E: Eviction, + I: Indexer, +{ + Hit(RawFetchHit), + Wait(#[pin] RawFetchWait), + Miss(#[pin] RawFetchMiss), +} + +impl RawFetchInner +where + S: HashBuilder, + E: Eviction, + I: Indexer, +{ + pub fn state(&self) -> FetchState { + match self { + RawFetchInner::Hit(_) => FetchState::Hit, + RawFetchInner::Wait(_) => FetchState::Wait, + RawFetchInner::Miss(_) => FetchState::Miss, + } + } +} + +impl Future for RawFetchInner +where + S: HashBuilder, + E: Eviction, + I: Indexer, + ER: From, +{ + type Output = Diversion, ER>, FetchMark>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.project() { + RawFetchInnerProj::Hit(opt) => Poll::Ready(Ok(opt.take().unwrap()).into()), + RawFetchInnerProj::Wait(waiter) => waiter.poll(cx).map_err(|err| err.into()).map(Diversion::from), + RawFetchInnerProj::Miss(handle) => handle.poll(cx).map(|join| join.unwrap()), + } + } +} + +// TODO(MrCroxx): use `hashbrown::HashTable` with `Handle` may relax the `Clone` bound? +impl RawCache +where + S: HashBuilder, + E: Eviction, + I: Indexer, + E::Key: Clone, +{ + #[fastrace::trace(name = "foyer::memory::raw::fetch")] + pub fn fetch(&self, key: E::Key, fetch: F) -> RawFetch + where + F: FnOnce() -> FU, + FU: Future> + Send + 'static, + ER: Send + 'static + Debug, + { + self.fetch_inner( + key, + Default::default(), + fetch, + &tokio::runtime::Handle::current().into(), + ) + } + + #[fastrace::trace(name = "foyer::memory::raw::fetch_with_hint")] + pub fn fetch_with_hint(&self, key: E::Key, hint: E::Hint, fetch: F) -> RawFetch + where + F: FnOnce() -> FU, + FU: Future> + Send + 'static, + ER: Send + 'static + Debug, + { + self.fetch_inner(key, hint, fetch, &tokio::runtime::Handle::current().into()) + } + + /// Internal fetch function, only for other foyer crates usages only, so the doc is hidden. + #[doc(hidden)] + #[fastrace::trace(name = "foyer::memory::raw::fetch_inner")] + pub fn fetch_inner( + &self, + key: E::Key, + hint: E::Hint, + fetch: F, + runtime: &SingletonHandle, + ) -> RawFetch + where + F: FnOnce() -> FU, + FU: Future + Send + 'static, + ER: Send + 'static + Debug, + ID: Into, FetchMark>>, + { + let hash = self.inner.hash_builder.hash_one(&key); + + let raw = match E::acquire_operator() { + Operator::Immutable => self.inner.shards[self.shard(hash)].read().fetch_immutable(hash, &key), + Operator::Mutable => self.inner.shards[self.shard(hash)].write().fetch_mutable(hash, &key), + }; + + match raw { + RawShardFetch::Hit(ptr) => { + return RawFetch::new(RawFetchInner::Hit(Some(RawCacheEntry { + inner: self.inner.clone(), + ptr, + }))) + } + RawShardFetch::Wait(future) => return RawFetch::new(RawFetchInner::Wait(future)), + RawShardFetch::Miss => {} + } + + let cache = self.clone(); + let future = fetch(); + let join = runtime.spawn( + async move { + let Diversion { target, store } = future + .in_span(Span::enter_with_local_parent("foyer::memory::raw::fetch_inner::fn")) + .await + .into(); + let value = match target { + Ok(value) => value, + Err(e) => { + cache.inner.shards[cache.shard(hash)].read().waiters.lock().remove(&key); + tracing::debug!("[fetch]: error raise while fetching, all waiter are dropped, err: {e:?}"); + return Diversion { target: Err(e), store }; + } + }; + let entry = cache.insert_with_hint(key, value, hint); + Diversion { + target: Ok(entry), + store, + } + } + .in_span(Span::enter_with_local_parent( + "foyer::memory::generic::fetch_with_runtime::spawn", + )), + ); + + RawFetch::new(RawFetchInner::Miss(join)) + } +} + +#[cfg(test)] +mod tests { + + use ahash::RandomState; + use rand::{rngs::SmallRng, seq::SliceRandom, RngCore, SeedableRng}; + + use crate::{ + eviction::fifo::{Fifo, FifoConfig, FifoHint}, + indexer::hash_table::HashTableIndexer, + }; + + use super::*; + + fn is_send_sync_static() {} + + #[test] + fn test_send_sync_static() { + is_send_sync_static::, HashTableIndexer>>>(); + } + + fn fuzzy(cache: RawCache>, hints: Vec) + where + E: Eviction, + { + // FIXME: restore this line + // let handles = (0..8) + let handles = (0..1) + .map(|i| { + let c = cache.clone(); + let hints = hints.clone(); + std::thread::spawn(move || { + let mut rng = SmallRng::seed_from_u64(i); + for _ in 0..100000 { + let key = rng.next_u64(); + if let Some(entry) = c.get(&key) { + assert_eq!(key, *entry); + drop(entry); + continue; + } + let hint = hints.choose(&mut rng).cloned().unwrap(); + c.insert_with_hint(key, key, hint); + } + }) + }) + .collect_vec(); + + handles.into_iter().for_each(|handle| handle.join().unwrap()); + + assert_eq!(cache.usage(), cache.capacity()); + } + + #[test_log::test] + fn test_fifo_cache_fuzzy() { + let cache: RawCache, HashTableIndexer>> = + RawCache::new(RawCacheConfig { + name: "test".to_string(), + capacity: 256, + shards: 4, + eviction_config: FifoConfig, + slab_initial_capacity: 0, + slab_segment_size: 16 * 1024, + hash_builder: RandomState::default(), + weighter: Arc::new(|_, _| 1), + event_listener: None, + }); + let hints = vec![FifoHint]; + fuzzy(cache, hints); + } +} diff --git a/foyer-memory-v2/src/record.rs b/foyer-memory-v2/src/record.rs new file mode 100644 index 00000000..252fa808 --- /dev/null +++ b/foyer-memory-v2/src/record.rs @@ -0,0 +1,191 @@ +// Copyright 2024 foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; + +use bitflags::bitflags; + +use crate::{slab::Token, Eviction}; + +bitflags! { + #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] + pub struct Flags: u64 { + const IN_INDEXER = 0b00000001; + const IN_EVICTION = 0b00000010; + const EPHEMERAL= 0b00000100; + } +} + +pub struct Data +where + E: Eviction, +{ + pub key: E::Key, + pub value: E::Value, + pub hint: E::Hint, + pub state: E::State, + pub hash: u64, + pub weight: usize, +} + +/// [`Record`] holds the information of the cached entry. +pub struct Record +where + E: Eviction, +{ + key: E::Key, + value: E::Value, + hint: E::Hint, + /// Make `state` visible to make intrusive data structure macro works. + pub(crate) state: E::State, + hash: u64, + weight: usize, + refs: AtomicUsize, + flags: AtomicU64, + token: Option, +} + +impl Record +where + E: Eviction, +{ + /// Create a new heap allocated record with data. + pub fn new(data: Data) -> Self { + Record { + key: data.key, + value: data.value, + hint: data.hint, + state: data.state, + hash: data.hash, + weight: data.weight, + refs: AtomicUsize::new(0), + flags: AtomicU64::new(0), + // Temporarily set to None, update after inserted into slab. + token: None, + } + } + + /// Set the token of the record. + /// + /// # Safety + /// + /// Panics if the token is already set. + pub fn init(&mut self, token: Token) { + let old = self.token.replace(token); + assert!(old.is_none()); + } + + /// Get the token of the record. + /// + /// # Safety + /// + /// Panics if the token is not set. + pub fn token(&self) -> Token { + self.token.unwrap() + } + + pub fn into_data(self) -> Data { + Data { + key: self.key, + value: self.value, + hint: self.hint, + state: self.state, + hash: self.hash, + weight: self.weight, + } + } + + /// Get the immutable reference of the record key. + pub fn key(&self) -> &E::Key { + &self.key + } + + /// Get the immutable reference of the record value. + pub fn value(&self) -> &E::Value { + &self.value + } + + /// Get the immutable reference of the record hint. + pub fn hint(&self) -> &E::Hint { + &self.hint + } + + /// Get the immutable reference of the record state. + pub fn state(&self) -> &E::State { + &self.state + } + + /// Get the mutable reference of the record state. + pub fn state_mut(&mut self) -> &mut E::State { + &mut self.state + } + + /// Get the record hash. + pub fn hash(&self) -> u64 { + self.hash + } + + /// Get the record weight. + pub fn weight(&self) -> usize { + self.weight + } + + /// Get the record atomic refs. + pub fn refs(&self) -> &AtomicUsize { + &self.refs + } + + /// Set in eviction flag with relaxed memory order. + pub fn set_in_eviction(&self, val: bool) { + self.set_flags(Flags::IN_EVICTION, val, Ordering::Relaxed); + } + + /// Get in eviction flag with relaxed memory order. + pub fn is_in_eviction(&self) -> bool { + self.get_flags(Flags::IN_EVICTION, Ordering::Relaxed) + } + + /// Set in indexer flag with relaxed memory order. + pub fn set_in_indexer(&self, val: bool) { + self.set_flags(Flags::IN_INDEXER, val, Ordering::Relaxed); + } + + /// Get in indexer flag with relaxed memory order. + pub fn is_in_indexer(&self) -> bool { + self.get_flags(Flags::IN_INDEXER, Ordering::Relaxed) + } + + /// Set ephemeral flag with relaxed memory order. + pub fn set_ephemeral(&self, val: bool) { + self.set_flags(Flags::EPHEMERAL, val, Ordering::Relaxed); + } + + /// Get ephemeral flag with relaxed memory order. + pub fn is_ephemeral(&self) -> bool { + self.get_flags(Flags::EPHEMERAL, Ordering::Relaxed) + } + + /// Set the record atomic flags. + pub fn set_flags(&self, flags: Flags, val: bool, order: Ordering) { + match val { + true => self.flags.fetch_or(flags.bits(), order), + false => self.flags.fetch_and(!flags.bits(), order), + }; + } + + /// Get the record atomic flags. + pub fn get_flags(&self, flags: Flags, order: Ordering) -> bool { + self.flags.load(order) & flags.bits() == flags.bits() + } +} diff --git a/foyer-memory-v2/src/slab.rs b/foyer-memory-v2/src/slab.rs new file mode 100644 index 00000000..9b519e3d --- /dev/null +++ b/foyer-memory-v2/src/slab.rs @@ -0,0 +1,307 @@ +// Copyright 2024 foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + cell::UnsafeCell, + fmt::Debug, + marker::PhantomData, + num::NonZeroUsize, + ops::{Deref, DerefMut}, + ptr::NonNull, +}; + +struct SyncUnsafeCell { + inner: UnsafeCell, +} + +unsafe impl Sync for SyncUnsafeCell where T: Sync {} + +impl Deref for SyncUnsafeCell { + type Target = UnsafeCell; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for SyncUnsafeCell { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl SyncUnsafeCell { + fn new(value: T) -> Self { + Self { + inner: UnsafeCell::new(value), + } + } + + fn into_inner(self) -> T { + self.inner.into_inner() + } +} + +/// Index of the slab. +type RawToken = usize; + +/// A token that can be used to access the allocated entry in [`Slab`]. +/// +/// # Safety +/// +/// [`Token`] can be used like an index. It will be stale if the entry is removed from the slab. +/// +/// Use a stale token to access the slab will lead to an UB. +#[derive(Debug, Clone, Copy)] +pub struct Token(NonZeroUsize); + +impl Token { + const HIGHEST_BIT: RawToken = 1 << (RawToken::BITS - 1); + + fn from_raw(raw: RawToken) -> Self { + assert_eq!(raw & Self::HIGHEST_BIT, 0); + let val = unsafe { NonZeroUsize::new_unchecked(raw | Self::HIGHEST_BIT) }; + Self(val) + } + + fn into_raw(self) -> RawToken { + self.0.get() & !Self::HIGHEST_BIT + } +} + +enum Entry { + Vacant(usize), + Occupied(SyncUnsafeCell), +} + +/// Use [`Vec`] with `with_capacity` to prevent from reallocation and avoid handmade allocation, because a lot of useful +/// functions for allocation and layout calculation are still unstable and for std/compiler usage only. +type Segment = Vec>; + +/// A segmented slab allocator for typed data structures to keep allocation/deallocation ops cheap. +/// +/// The allocated memory are guaranteed not to be moved. It is safe to use pointers to reference the allocated memory. +pub struct Slab { + /// Allocated segments. + segments: Vec>, + /// The count of entries that the segment can hold. + segment_entries: usize, + /// Allocated entry count. + len: usize, + /// Next slot to allocate. + next: usize, +} + +impl Debug for Slab { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Slab") + .field("capacity", &self.capacity()) + .field("segment_entries", &self.segment_entries) + .field("len", &self.len) + .field("next", &self.next) + .finish() + } +} + +impl Slab { + /// The count of entries that the slab can hold without further allocation. + pub fn capacity(&self) -> usize { + self.segments.len() * self.segment_entries + } + + /// The count of allocated entry count. + pub fn len(&self) -> usize { + self.len + } + + /// Insert a new entry into the slab. + pub fn insert(&mut self, val: T) -> Token { + let cell = SyncUnsafeCell::new(val); + + let index = self.next; + let (s, i) = self.pos(index); + + tracing::trace!("insert slab {self:?} at {}", index); + + if s == self.segments.len() { + // Allocate a new segment. + self.segments.push(Segment::with_capacity(self.segment_entries)); + } + + if i == self.segments[s].len() { + self.segments[s].push(Entry::Occupied(cell)); + self.next = index + 1; + } else { + self.next = match &self.segments[s][i] { + &Entry::Vacant(next) => next, + _ => panic!("invalid index (unexpected occupied): {index}, segment: {s}, entry index: {i}"), + }; + self.segments[s][i] = Entry::Occupied(cell); + } + + self.len += 1; + + Token::from_raw(index) + } + + /// Remove an inserted entry from slab. + /// + /// # Safety + /// + /// Accessing with a stale token will lead to UB. + pub fn remove(&mut self, token: Token) -> T { + let index = token.into_raw(); + let (s, i) = self.pos(index); + + tracing::trace!("remove slab {self:?} at {}", index); + + match std::mem::replace(&mut self.segments[s][i], Entry::Vacant(self.next)) { + Entry::Occupied(cell) => { + self.next = index; + self.len -= 1; + cell.into_inner() + } + _ => panic!("invalid index (unexpected vacant): {index}, segment: {s}, entry index: {i}"), + } + } + + /// Get the [`NonNull`] pointer of a entry from slab. + /// + /// # Safety + /// + /// Accessing with a stale token will lead to UB. + pub fn ptr(&self, token: Token) -> NonNull { + let index = token.into_raw(); + let (s, i) = self.pos(index); + + tracing::trace!("access slab {self:?} at {}", index); + + let ptr = match &self.segments[s][i] { + Entry::Occupied(cell) => unsafe { NonNull::new_unchecked(cell.get()) }, + _ => panic!("invalid index (unexpected vacant): {index}, segment: {s}, entry index: {i}"), + }; + + ptr + } + + fn pos(&self, index: usize) -> (usize, usize) { + (index / self.segment_entries, index % self.segment_entries) + } +} + +/// Builder for [`Slab`]. +pub struct SlabBuilder { + capacity: usize, + segment_entries: usize, + _marker: PhantomData, +} + +impl Default for SlabBuilder { + fn default() -> Self { + Self::new() + } +} + +impl SlabBuilder { + /// Default segment size for [`Slab`]. + pub const DEFAULT_SEGMENT_SIZE: usize = 64 * 1024; // 64 KiB + + pub fn new() -> Self { + Self { + capacity: 0, + segment_entries: Self::segment_entries(Self::DEFAULT_SEGMENT_SIZE), + _marker: PhantomData, + } + } + + /// Set the capacity of [`Slab`]. + /// + /// The capacity is the count of entries that the slab can hold without further allocation. + pub fn with_capacity(mut self, val: usize) -> Self { + self.capacity = val; + self + } + + /// Set the capacity of the segment of [`Slab`] by entries. + /// + /// The capacity is the count of entries that the segment can hold without further allocation. + pub fn with_segment_entries(mut self, val: usize) -> Self { + self.segment_entries = val; + self + } + + /// Set the capacity of the segment of [`Slab`] by size. + /// + /// The capacity is the count of entries that the segment can hold without further allocation. + pub fn with_segment_size(mut self, val: usize) -> Self { + self.segment_entries = Self::segment_entries(val); + self + } + + /// Build the [`Slab`] with the given configuration. + pub fn build(self) -> Slab { + let segment_count = self.capacity / self.segment_entries + + match self.capacity / self.segment_entries { + 0 => 0, + _ => 1, + }; + Slab { + segments: (0..segment_count) + .map(|_| Segment::with_capacity(self.segment_entries)) + .collect(), + segment_entries: self.segment_entries, + len: 0, + next: 0, + } + } + + fn segment_entries(segment_size: usize) -> usize { + segment_size / std::mem::size_of::() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[derive(Debug, PartialEq, Eq)] + struct Data { + a: u64, + b: Vec, + } + + #[test] + fn test_slab_basic() { + let mut slab = SlabBuilder::new().build(); + + let t1 = slab.insert(Data { a: 1, b: vec![1; 1024] }); + let t2 = slab.insert(Data { a: 2, b: vec![2; 1024] }); + + unsafe { slab.ptr(t1).as_mut() }.a = 2; + unsafe { slab.ptr(t2).as_mut() }.a = 1; + + let d1 = slab.remove(t1); + let d2 = slab.remove(t2); + + assert_eq!(d1, Data { a: 2, b: vec![1; 1024] }); + assert_eq!(d2, Data { a: 1, b: vec![2; 1024] }); + } + + fn is_send_sync_static() {} + + #[test] + fn test_send_sync_static() { + is_send_sync_static::>(); + } +} diff --git a/foyer-memory-v2/src/sync.rs b/foyer-memory-v2/src/sync.rs new file mode 100644 index 00000000..57ee7a2a --- /dev/null +++ b/foyer-memory-v2/src/sync.rs @@ -0,0 +1,87 @@ +// Copyright 2024 foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::{Deref, DerefMut}; + +use parking_lot::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; + +pub enum Lock { + RwLock(RwLock), + Mutex(Mutex), +} + +impl Lock { + pub fn rwlock(val: T) -> Self { + Self::RwLock(RwLock::new(val)) + } + + pub fn mutex(val: T) -> Self { + Self::Mutex(Mutex::new(val)) + } + + pub fn read(&self) -> LockReadGuard<'_, T> { + match self { + Lock::RwLock(inner) => LockReadGuard::RwLock(inner.read()), + Lock::Mutex(_) => unreachable!(), + } + } + + pub fn write(&self) -> LockWriteGuard<'_, T> { + match self { + Lock::RwLock(inner) => LockWriteGuard::RwLock(inner.write()), + Lock::Mutex(inner) => LockWriteGuard::Mutex(inner.lock()), + } + } +} + +pub enum LockReadGuard<'a, T> { + RwLock(RwLockReadGuard<'a, T>), + Mutex, +} + +impl<'a, T> Deref for LockReadGuard<'a, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + match self { + Self::RwLock(guard) => guard.deref(), + Self::Mutex => unreachable!(), + } + } +} + +pub enum LockWriteGuard<'a, T> { + RwLock(RwLockWriteGuard<'a, T>), + Mutex(MutexGuard<'a, T>), +} + +impl<'a, T> Deref for LockWriteGuard<'a, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + match self { + Self::RwLock(guard) => guard.deref(), + Self::Mutex(guard) => guard.deref(), + } + } +} + +impl<'a, T> DerefMut for LockWriteGuard<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + match self { + Self::RwLock(guard) => guard.deref_mut(), + Self::Mutex(guard) => guard.deref_mut(), + } + } +}