diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c14a51b7..fe8163a4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -200,7 +200,9 @@ jobs: CI: true run: | mkdir -p $GITHUB_WORKSPACE/foyer-data/foyer-bench/codecov - cargo llvm-cov --no-report run --package foyer-bench --bin foyer-bench --features "strict_assertions,sanity" -- --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/codecov --mem 16MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 10MiB --time 60 + cargo llvm-cov --no-report run --package foyer-bench --bin foyer-bench --features "strict_assertions,sanity" -- --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/codecov --engine large --mem 16MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 10MiB --entry-size-min 2KiB --entry-size-max 128KiB --time 60 + cargo llvm-cov --no-report run --package foyer-bench --bin foyer-bench --features "strict_assertions,sanity" -- --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/codecov --engine small --mem 4MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 1MiB --entry-size-min 1KiB --entry-size-max 24KiB --time 60 + cargo llvm-cov --no-report run --package foyer-bench --bin foyer-bench --features "strict_assertions,sanity" -- --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/codecov --engine mixed=0.1 --mem 16MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 10MiB --entry-size-min 1KiB --entry-size-max 128KiB --time 60 - name: Generate codecov report run: | cargo llvm-cov report --lcov --output-path lcov.info @@ -240,7 +242,9 @@ jobs: run: |- cargo build --all --features deadlock mkdir -p $GITHUB_WORKSPACE/foyer-data/foyer-storage/deadlock - timeout 2m ./target/debug/foyer-bench --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/deadlock --mem 16MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 10MiB --time 60 + timeout 2m ./target/debug/foyer-bench --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/deadlock --engine large --mem 16MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 10MiB --entry-size-min 2KiB --entry-size-max 128KiB --time 60 + timeout 2m ./target/debug/foyer-bench --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/deadlock --engine small --mem 4MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 1MiB --entry-size-min 1KiB --entry-size-max 24KiB --time 60 + timeout 2m ./target/debug/foyer-bench --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/deadlock --engine mixed=0.1 --mem 16MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 10MiB --entry-size-min 1KiB --entry-size-max 128KiB --time 60 asan: name: run with address saniziter runs-on: ubuntu-latest @@ -278,7 +282,9 @@ jobs: run: |- cargo +${{ env.RUST_TOOLCHAIN_NIGHTLY }} build --all --target x86_64-unknown-linux-gnu mkdir -p $GITHUB_WORKSPACE/foyer-data/foyer-bench/asan - timeout 2m ./target/x86_64-unknown-linux-gnu/debug/foyer-bench --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/asan --mem 16MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 10MiB --time 60 + timeout 2m ./target/x86_64-unknown-linux-gnu/debug/foyer-bench --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/asan --engine large --mem 16MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 10MiB --entry-size-min 2KiB --entry-size-max 128KiB --time 60 + timeout 2m ./target/x86_64-unknown-linux-gnu/debug/foyer-bench --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/asan --engine small --mem 4MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 1MiB --entry-size-min 1KiB --entry-size-max 24KiB --time 60 + timeout 2m ./target/x86_64-unknown-linux-gnu/debug/foyer-bench --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/asan --engine mixed=0.1 --mem 16MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 10MiB --entry-size-min 1KiB --entry-size-max 128KiB --time 60 - name: Prepare Artifacts on Failure if: ${{ failure() }} run: |- @@ -326,7 +332,9 @@ jobs: run: |- cargo +${{ env.RUST_TOOLCHAIN_NIGHTLY }} build --all --target x86_64-unknown-linux-gnu mkdir -p $GITHUB_WORKSPACE/foyer-data/foyer-bench/lsan - timeout 2m ./target/x86_64-unknown-linux-gnu/debug/foyer-bench --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/lsan --mem 16MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 10MiB --time 60 + timeout 2m ./target/x86_64-unknown-linux-gnu/debug/foyer-bench --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/lsan --engine large --mem 16MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 10MiB --entry-size-min 2KiB --entry-size-max 128KiB --time 60 + timeout 2m ./target/x86_64-unknown-linux-gnu/debug/foyer-bench --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/lsan --engine small --mem 4MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 1MiB --entry-size-min 1KiB --entry-size-max 24KiB --time 60 + timeout 2m ./target/x86_64-unknown-linux-gnu/debug/foyer-bench --dir $GITHUB_WORKSPACE/foyer-data/foyer-bench/lsan --engine mixed=0.1 --mem 16MiB --disk 256MiB --region-size 16MiB --get-range 1000 --w-rate 1MiB --r-rate 1MiB --admission-rate-limit 10MiB --entry-size-min 1KiB --entry-size-max 128KiB --time 60 - name: Prepare Artifacts on Failure if: ${{ failure() }} run: |- diff --git a/examples/hybrid.rs b/examples/hybrid.rs index e8411295..2bf7a450 100644 --- a/examples/hybrid.rs +++ b/examples/hybrid.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use foyer::{DirectFsDeviceOptionsBuilder, HybridCache, HybridCacheBuilder}; +use foyer::{DirectFsDeviceOptionsBuilder, Engine, HybridCache, HybridCacheBuilder}; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -20,7 +20,7 @@ async fn main() -> anyhow::Result<()> { let hybrid: HybridCache = HybridCacheBuilder::new() .memory(64 * 1024 * 1024) - .storage() + .storage(Engine::Large) // use large object disk cache engine only .with_device_config( DirectFsDeviceOptionsBuilder::new(dir.path()) .with_capacity(256 * 1024 * 1024) diff --git a/examples/hybrid_full.rs b/examples/hybrid_full.rs index 97f98500..60cafb8e 100644 --- a/examples/hybrid_full.rs +++ b/examples/hybrid_full.rs @@ -17,8 +17,8 @@ use std::sync::Arc; use anyhow::Result; use chrono::Datelike; use foyer::{ - DirectFsDeviceOptionsBuilder, FifoPicker, HybridCache, HybridCacheBuilder, LruConfig, RateLimitPicker, RecoverMode, - RuntimeConfig, TokioRuntimeConfig, TombstoneLogConfigBuilder, + DirectFsDeviceOptionsBuilder, Engine, FifoPicker, HybridCache, HybridCacheBuilder, LargeEngineOptions, LruConfig, + RateLimitPicker, RecoverMode, RuntimeConfig, SmallEngineOptions, TokioRuntimeConfig, TombstoneLogConfigBuilder, }; use tempfile::tempdir; @@ -35,7 +35,7 @@ async fn main() -> Result<()> { .with_object_pool_capacity(1024) .with_hash_builder(ahash::RandomState::default()) .with_weighter(|_key, value: &String| value.len()) - .storage() + .storage(Engine::Mixed(0.1)) .with_device_config( DirectFsDeviceOptionsBuilder::new(dir.path()) .with_capacity(64 * 1024 * 1024) @@ -43,22 +43,9 @@ async fn main() -> Result<()> { .build(), ) .with_flush(true) - .with_indexer_shards(64) .with_recover_mode(RecoverMode::Quiet) - .with_recover_concurrency(8) - .with_flushers(2) - .with_reclaimers(2) - .with_buffer_pool_size(256 * 1024 * 1024) - .with_clean_region_threshold(4) - .with_eviction_pickers(vec![Box::::default()]) .with_admission_picker(Arc::new(RateLimitPicker::new(100 * 1024 * 1024))) - .with_reinsertion_picker(Arc::new(RateLimitPicker::new(10 * 1024 * 1024))) .with_compression(foyer::Compression::Lz4) - .with_tombstone_log_config( - TombstoneLogConfigBuilder::new(dir.path().join("tombstone-log-file")) - .with_flush(true) - .build(), - ) .with_runtime_config(RuntimeConfig::Separated { read_runtime_config: TokioRuntimeConfig { worker_threads: 4, @@ -69,6 +56,28 @@ async fn main() -> Result<()> { max_blocking_threads: 8, }, }) + .with_large_object_disk_cache_options( + LargeEngineOptions::new() + .with_indexer_shards(64) + .with_recover_concurrency(8) + .with_flushers(2) + .with_reclaimers(2) + .with_buffer_pool_size(256 * 1024 * 1024) + .with_clean_region_threshold(4) + .with_eviction_pickers(vec![Box::::default()]) + .with_reinsertion_picker(Arc::new(RateLimitPicker::new(10 * 1024 * 1024))) + .with_tombstone_log_config( + TombstoneLogConfigBuilder::new(dir.path().join("tombstone-log-file")) + .with_flush(true) + .build(), + ), + ) + .with_small_object_disk_cache_options( + SmallEngineOptions::new() + .with_set_size(16 * 1024) + .with_set_cache_capacity(64) + .with_flushers(2), + ) .build() .await?; diff --git a/examples/tail_based_tracing.rs b/examples/tail_based_tracing.rs index e74a0cef..171aebea 100644 --- a/examples/tail_based_tracing.rs +++ b/examples/tail_based_tracing.rs @@ -14,7 +14,7 @@ use std::time::Duration; -use foyer::{DirectFsDeviceOptionsBuilder, HybridCache, HybridCacheBuilder}; +use foyer::{DirectFsDeviceOptionsBuilder, Engine, HybridCache, HybridCacheBuilder}; #[cfg(feature = "jaeger")] fn init_jaeger_exporter() { @@ -70,7 +70,7 @@ async fn main() -> anyhow::Result<()> { let hybrid: HybridCache = HybridCacheBuilder::new() .memory(64 * 1024 * 1024) - .storage() + .storage(Engine::Large) .with_device_config( DirectFsDeviceOptionsBuilder::new(dir.path()) .with_capacity(256 * 1024 * 1024) diff --git a/foyer-bench/src/main.rs b/foyer-bench/src/main.rs index f72c30ea..87587bb5 100644 --- a/foyer-bench/src/main.rs +++ b/foyer-bench/src/main.rs @@ -34,9 +34,9 @@ use analyze::{analyze, monitor, Metrics}; use bytesize::ByteSize; use clap::{builder::PossibleValuesParser, ArgGroup, Parser}; use foyer::{ - Compression, DirectFileDeviceOptionsBuilder, DirectFsDeviceOptionsBuilder, FifoConfig, FifoPicker, HybridCache, - HybridCacheBuilder, InvalidRatioPicker, LfuConfig, LruConfig, RateLimitPicker, RecoverMode, RuntimeConfig, - S3FifoConfig, TokioRuntimeConfig, TracingConfig, + Compression, DirectFileDeviceOptionsBuilder, DirectFsDeviceOptionsBuilder, Engine, FifoConfig, FifoPicker, + HybridCache, HybridCacheBuilder, InvalidRatioPicker, LargeEngineOptions, LfuConfig, LruConfig, RateLimitPicker, + RecoverMode, RuntimeConfig, S3FifoConfig, SmallEngineOptions, TokioRuntimeConfig, TracingConfig, }; use futures::future::join_all; use itertools::Itertools; @@ -204,6 +204,11 @@ pub struct Args { #[arg(long, value_enum, default_value_t = Compression::None)] compression: Compression, + // TODO(MrCroxx): use mixed engine by default. + /// Disk cache engine. + #[arg(long, default_value_t = Engine::Large)] + engine: Engine, + /// Time-series operation distribution. /// /// Available values: "none", "uniform", "zipf". @@ -233,6 +238,12 @@ pub struct Args { #[arg(long, value_parser = PossibleValuesParser::new(["lru", "lfu", "fifo", "s3fifo"]), default_value = "lru")] eviction: String, + #[arg(long, default_value_t = ByteSize::kib(16))] + set_size: ByteSize, + + #[arg(long, default_value_t = 64)] + set_cache_capacity: usize, + /// Record insert trace threshold. Only effective with "mtrace" feature. #[arg(long, default_value_t = 1000 * 1000)] trace_insert_us: usize, @@ -448,7 +459,7 @@ async fn benchmark(args: Args) { let mut builder = builder .with_weighter(|_: &u64, value: &Value| u64::BITS as usize / 8 + value.len()) - .storage(); + .storage(args.engine); builder = match (args.file.as_ref(), args.dir.as_ref()) { (Some(file), None) => builder.with_device_config( @@ -468,15 +479,7 @@ async fn benchmark(args: Args) { builder = builder .with_flush(args.flush) - .with_indexer_shards(args.shards) .with_recover_mode(args.recover_mode) - .with_recover_concurrency(args.recover_concurrency) - .with_flushers(args.flushers) - .with_reclaimers(args.reclaimers) - .with_eviction_pickers(vec![ - Box::new(InvalidRatioPicker::new(args.invalid_ratio)), - Box::::default(), - ]) .with_compression(args.compression) .with_runtime_config(match args.runtime.as_str() { "disabled" => RuntimeConfig::Disabled, @@ -497,20 +500,39 @@ async fn benchmark(args: Args) { _ => unreachable!(), }); + let mut large = LargeEngineOptions::new() + .with_indexer_shards(args.shards) + .with_recover_concurrency(args.recover_concurrency) + .with_flushers(args.flushers) + .with_reclaimers(args.reclaimers) + .with_eviction_pickers(vec![ + Box::new(InvalidRatioPicker::new(args.invalid_ratio)), + Box::::default(), + ]); + + let small = SmallEngineOptions::new() + .with_flushers(args.flushers) + .with_set_size(args.set_size.as_u64() as _) + .with_set_cache_capacity(args.set_cache_capacity); + if args.admission_rate_limit.as_u64() > 0 { builder = builder.with_admission_picker(Arc::new(RateLimitPicker::new(args.admission_rate_limit.as_u64() as _))); } if args.reinsertion_rate_limit.as_u64() > 0 { - builder = - builder.with_reinsertion_picker(Arc::new(RateLimitPicker::new(args.admission_rate_limit.as_u64() as _))); + large = large.with_reinsertion_picker(Arc::new(RateLimitPicker::new(args.admission_rate_limit.as_u64() as _))); } if args.clean_region_threshold > 0 { - builder = builder.with_clean_region_threshold(args.clean_region_threshold); + large = large.with_clean_region_threshold(args.clean_region_threshold); } - let hybrid = builder.build().await.unwrap(); + let hybrid = builder + .with_large_object_disk_cache_options(large) + .with_small_object_disk_cache_options(small) + .build() + .await + .unwrap(); #[cfg(feature = "mtrace")] hybrid.enable_tracing(); diff --git a/foyer-common/src/runtime.rs b/foyer-common/src/runtime.rs index d2b269c2..a3727090 100644 --- a/foyer-common/src/runtime.rs +++ b/foyer-common/src/runtime.rs @@ -143,4 +143,78 @@ impl SingletonHandle { { self.0.spawn_blocking(func) } + + /// Runs a future to completion on this `Handle`'s associated `Runtime`. + /// + /// This runs the given future on the current thread, blocking until it is + /// complete, and yielding its resolved result. Any tasks or timers which + /// the future spawns internally will be executed on the runtime. + /// + /// When this is used on a `current_thread` runtime, only the + /// [`Runtime::block_on`] method can drive the IO and timer drivers, but the + /// `Handle::block_on` method cannot drive them. This means that, when using + /// this method on a `current_thread` runtime, anything that relies on IO or + /// timers will not work unless there is another thread currently calling + /// [`Runtime::block_on`] on the same runtime. + /// + /// # If the runtime has been shut down + /// + /// If the `Handle`'s associated `Runtime` has been shut down (through + /// [`Runtime::shutdown_background`], [`Runtime::shutdown_timeout`], or by + /// dropping it) and `Handle::block_on` is used it might return an error or + /// panic. Specifically IO resources will return an error and timers will + /// panic. Runtime independent futures will run as normal. + /// + /// # Panics + /// + /// This function panics if the provided future panics, if called within an + /// asynchronous execution context, or if a timer future is executed on a + /// runtime that has been shut down. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// // Create the runtime + /// let rt = Runtime::new().unwrap(); + /// + /// // Get a handle from this runtime + /// let handle = rt.handle(); + /// + /// // Execute the future, blocking the current thread until completion + /// handle.block_on(async { + /// println!("hello"); + /// }); + /// ``` + /// + /// Or using `Handle::current`: + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main () { + /// let handle = Handle::current(); + /// std::thread::spawn(move || { + /// // Using Handle::block_on to run async code in the new thread. + /// handle.block_on(async { + /// println!("hello"); + /// }); + /// }); + /// } + /// ``` + /// + /// [`JoinError`]: struct@crate::task::JoinError + /// [`JoinHandle`]: struct@crate::task::JoinHandle + /// [`Runtime::block_on`]: fn@crate::runtime::Runtime::block_on + /// [`Runtime::shutdown_background`]: fn@crate::runtime::Runtime::shutdown_background + /// [`Runtime::shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout + /// [`spawn_blocking`]: crate::task::spawn_blocking + /// [`tokio::fs`]: crate::fs + /// [`tokio::net`]: crate::net + /// [`tokio::time`]: crate::time + pub fn block_on(&self, future: F) -> F::Output { + self.0.block_on(future) + } } diff --git a/foyer-storage/Cargo.toml b/foyer-storage/Cargo.toml index b5863a39..d77630d3 100644 --- a/foyer-storage/Cargo.toml +++ b/foyer-storage/Cargo.toml @@ -33,7 +33,9 @@ futures = "0.3" itertools = { workspace = true } libc = "0.2" lz4 = "1.24" +ordered_hash_map = "0.4" parking_lot = { version = "0.12", features = ["arc_lock"] } +paste = "1" pin-project = "1" rand = "0.8" serde = { workspace = true } diff --git a/foyer-storage/src/device/direct_file.rs b/foyer-storage/src/device/direct_file.rs index ec02e2c2..cd6845a1 100644 --- a/foyer-storage/src/device/direct_file.rs +++ b/foyer-storage/src/device/direct_file.rs @@ -197,7 +197,7 @@ impl Dev for DirectFileDevice { if file.metadata().unwrap().is_file() { tracing::warn!( - "{}\n{}\n{}", + "{} {} {}", "It seems a `DirectFileDevice` is used within a normal file system, which is inefficient.", "Please use `DirectFileDevice` directly on a raw block device.", "Or use `DirectFsDevice` within a normal file system.", diff --git a/foyer-storage/src/engine.rs b/foyer-storage/src/engine.rs index eb16abe7..6735315e 100644 --- a/foyer-storage/src/engine.rs +++ b/foyer-storage/src/engine.rs @@ -101,7 +101,7 @@ where Noop, Large(GenericLargeStorageConfig), Small(GenericSmallStorageConfig), - Combined(EitherConfig, GenericLargeStorage, SizeSelector>), + Mixed(EitherConfig, GenericLargeStorage, SizeSelector>), } impl Debug for EngineConfig @@ -115,13 +115,13 @@ where Self::Noop => write!(f, "Noop"), Self::Large(config) => f.debug_tuple("Large").field(config).finish(), Self::Small(config) => f.debug_tuple("Small").field(config).finish(), - Self::Combined(config) => f.debug_tuple("Combined").field(config).finish(), + Self::Mixed(config) => f.debug_tuple("Mixed").field(config).finish(), } } } #[expect(clippy::type_complexity)] -pub enum Engine +pub enum EngineEnum where K: StorageKey, V: StorageValue, @@ -133,11 +133,11 @@ where Large(GenericLargeStorage), /// Small object disk cache. Small(GenericSmallStorage), - /// Combined large and small object disk cache. - Combined(Either, GenericLargeStorage, SizeSelector>), + /// Mixed large and small object disk cache. + Mixed(Either, GenericLargeStorage, SizeSelector>), } -impl Debug for Engine +impl Debug for EngineEnum where K: StorageKey, V: StorageValue, @@ -148,12 +148,12 @@ where Self::Noop(storage) => f.debug_tuple("Noop").field(storage).finish(), Self::Large(storage) => f.debug_tuple("Large").field(storage).finish(), Self::Small(storage) => f.debug_tuple("Small").field(storage).finish(), - Self::Combined(storage) => f.debug_tuple("Combined").field(storage).finish(), + Self::Mixed(storage) => f.debug_tuple("Mixed").field(storage).finish(), } } } -impl Clone for Engine +impl Clone for EngineEnum where K: StorageKey, V: StorageValue, @@ -164,12 +164,12 @@ where Self::Noop(storage) => Self::Noop(storage.clone()), Self::Large(storage) => Self::Large(storage.clone()), Self::Small(storage) => Self::Small(storage.clone()), - Self::Combined(storage) => Self::Combined(storage.clone()), + Self::Mixed(storage) => Self::Mixed(storage.clone()), } } } -impl Storage for Engine +impl Storage for EngineEnum where K: StorageKey, V: StorageValue, @@ -185,81 +185,81 @@ where EngineConfig::Noop => Ok(Self::Noop(Noop::open(()).await?)), EngineConfig::Large(config) => Ok(Self::Large(GenericLargeStorage::open(config).await?)), EngineConfig::Small(config) => Ok(Self::Small(GenericSmallStorage::open(config).await?)), - EngineConfig::Combined(config) => Ok(Self::Combined(Either::open(config).await?)), + EngineConfig::Mixed(config) => Ok(Self::Mixed(Either::open(config).await?)), } } async fn close(&self) -> Result<()> { match self { - Engine::Noop(storage) => storage.close().await, - Engine::Large(storage) => storage.close().await, - Engine::Small(storage) => storage.close().await, - Engine::Combined(storage) => storage.close().await, + EngineEnum::Noop(storage) => storage.close().await, + EngineEnum::Large(storage) => storage.close().await, + EngineEnum::Small(storage) => storage.close().await, + EngineEnum::Mixed(storage) => storage.close().await, } } fn enqueue(&self, entry: CacheEntry, estimated_size: usize) { match self { - Engine::Noop(storage) => storage.enqueue(entry, estimated_size), - Engine::Large(storage) => storage.enqueue(entry, estimated_size), - Engine::Small(storage) => storage.enqueue(entry, estimated_size), - Engine::Combined(storage) => storage.enqueue(entry, estimated_size), + EngineEnum::Noop(storage) => storage.enqueue(entry, estimated_size), + EngineEnum::Large(storage) => storage.enqueue(entry, estimated_size), + EngineEnum::Small(storage) => storage.enqueue(entry, estimated_size), + EngineEnum::Mixed(storage) => storage.enqueue(entry, estimated_size), } } #[auto_enum(Future)] fn load(&self, hash: u64) -> impl Future>> + Send + 'static { match self { - Engine::Noop(storage) => storage.load(hash), - Engine::Large(storage) => storage.load(hash), - Engine::Small(storage) => storage.load(hash), - Engine::Combined(storage) => storage.load(hash), + EngineEnum::Noop(storage) => storage.load(hash), + EngineEnum::Large(storage) => storage.load(hash), + EngineEnum::Small(storage) => storage.load(hash), + EngineEnum::Mixed(storage) => storage.load(hash), } } fn delete(&self, hash: u64) { match self { - Engine::Noop(storage) => storage.delete(hash), - Engine::Large(storage) => storage.delete(hash), - Engine::Small(storage) => storage.delete(hash), - Engine::Combined(storage) => storage.delete(hash), + EngineEnum::Noop(storage) => storage.delete(hash), + EngineEnum::Large(storage) => storage.delete(hash), + EngineEnum::Small(storage) => storage.delete(hash), + EngineEnum::Mixed(storage) => storage.delete(hash), } } fn may_contains(&self, hash: u64) -> bool { match self { - Engine::Noop(storage) => storage.may_contains(hash), - Engine::Large(storage) => storage.may_contains(hash), - Engine::Small(storage) => storage.may_contains(hash), - Engine::Combined(storage) => storage.may_contains(hash), + EngineEnum::Noop(storage) => storage.may_contains(hash), + EngineEnum::Large(storage) => storage.may_contains(hash), + EngineEnum::Small(storage) => storage.may_contains(hash), + EngineEnum::Mixed(storage) => storage.may_contains(hash), } } async fn destroy(&self) -> Result<()> { match self { - Engine::Noop(storage) => storage.destroy().await, - Engine::Large(storage) => storage.destroy().await, - Engine::Small(storage) => storage.destroy().await, - Engine::Combined(storage) => storage.destroy().await, + EngineEnum::Noop(storage) => storage.destroy().await, + EngineEnum::Large(storage) => storage.destroy().await, + EngineEnum::Small(storage) => storage.destroy().await, + EngineEnum::Mixed(storage) => storage.destroy().await, } } fn stats(&self) -> Arc { match self { - Engine::Noop(storage) => storage.stats(), - Engine::Large(storage) => storage.stats(), - Engine::Small(storage) => storage.stats(), - Engine::Combined(storage) => storage.stats(), + EngineEnum::Noop(storage) => storage.stats(), + EngineEnum::Large(storage) => storage.stats(), + EngineEnum::Small(storage) => storage.stats(), + EngineEnum::Mixed(storage) => storage.stats(), } } #[auto_enum(Future)] fn wait(&self) -> impl Future + Send + 'static { match self { - Engine::Noop(storage) => storage.wait(), - Engine::Large(storage) => storage.wait(), - Engine::Small(storage) => storage.wait(), - Engine::Combined(storage) => storage.wait(), + EngineEnum::Noop(storage) => storage.wait(), + EngineEnum::Large(storage) => storage.wait(), + EngineEnum::Small(storage) => storage.wait(), + EngineEnum::Mixed(storage) => storage.wait(), } } } diff --git a/foyer-storage/src/io_buffer_pool.rs b/foyer-storage/src/io_buffer_pool.rs index 83e42503..cc6ceff2 100644 --- a/foyer-storage/src/io_buffer_pool.rs +++ b/foyer-storage/src/io_buffer_pool.rs @@ -18,6 +18,7 @@ use foyer_common::bits; use crate::{device::ALIGN, IoBuffer, IoBytes}; +#[derive(Debug)] pub enum Buffer { IoBuffer(IoBuffer), IoBytes(IoBytes), @@ -35,6 +36,7 @@ impl From for Buffer { } } +#[derive(Debug)] pub struct IoBufferPool { capacity: usize, buffer_size: usize, diff --git a/foyer-storage/src/large/batch.rs b/foyer-storage/src/large/batch.rs index ba5a8b04..cacccfe6 100644 --- a/foyer-storage/src/large/batch.rs +++ b/foyer-storage/src/large/batch.rs @@ -86,13 +86,13 @@ where S: HashBuilder + Debug, { pub fn new( - capacity: usize, + buffer_size: usize, region_manager: RegionManager, device: MonitoredDevice, indexer: Indexer, metrics: Arc, ) -> Self { - let capacity = bits::align_up(device.align(), capacity); + let capacity = bits::align_up(device.align(), buffer_size); let mut batch = Self { buffer: IoBuffer::new(capacity), len: 0, @@ -130,7 +130,7 @@ where ) { Ok(info) => info, Err(e) => { - tracing::warn!("[batch]: serialize entry error: {e}"); + tracing::warn!("[lodc batch]: serialize entry error: {e}"); return false; } }; @@ -140,7 +140,7 @@ where value_len: info.value_len as _, hash: entry.hash(), sequence, - checksum: Checksummer::checksum( + checksum: Checksummer::checksum64( &self.buffer[pos + EntryHeader::serialized_len() ..pos + EntryHeader::serialized_len() + info.key_len + info.value_len], ), diff --git a/foyer-storage/src/large/flusher.rs b/foyer-storage/src/large/flusher.rs index 7a179701..1bcd9d95 100644 --- a/foyer-storage/src/large/flusher.rs +++ b/foyer-storage/src/large/flusher.rs @@ -128,7 +128,7 @@ where S: HashBuilder + Debug, { #[expect(clippy::too_many_arguments)] - pub async fn open( + pub fn open( config: &GenericLargeStorageConfig, indexer: Indexer, region_manager: RegionManager, @@ -140,7 +140,7 @@ where ) -> Result { let (tx, rx) = flume::unbounded(); - let buffer_size = config.buffer_threshold / config.flushers; + let buffer_size = config.buffer_pool_size / config.flushers; let batch = BatchMut::new( buffer_size, region_manager.clone(), diff --git a/foyer-storage/src/large/generic.rs b/foyer-storage/src/large/generic.rs index 489c6be0..e08057c7 100644 --- a/foyer-storage/src/large/generic.rs +++ b/foyer-storage/src/large/generic.rs @@ -74,7 +74,7 @@ where pub recover_concurrency: usize, pub flushers: usize, pub reclaimers: usize, - pub buffer_threshold: usize, + pub buffer_pool_size: usize, pub clean_region_threshold: usize, pub eviction_pickers: Vec>, pub reinsertion_picker: Arc>, @@ -101,7 +101,7 @@ where .field("recover_concurrency", &self.recover_concurrency) .field("flushers", &self.flushers) .field("reclaimers", &self.reclaimers) - .field("buffer_threshold", &self.buffer_threshold) + .field("buffer_pool_size", &self.buffer_pool_size) .field("clean_region_threshold", &self.clean_region_threshold) .field("eviction_pickers", &self.eviction_pickers) .field("reinsertion_pickers", &self.reinsertion_picker) @@ -203,7 +203,7 @@ where let indexer = Indexer::new(config.indexer_shards); let mut eviction_pickers = std::mem::take(&mut config.eviction_pickers); for picker in eviction_pickers.iter_mut() { - picker.init(device.regions(), device.region_size()); + picker.init(0..device.regions() as RegionId, device.region_size()); } let reclaim_semaphore = Arc::new(Semaphore::new(0)); let region_manager = RegionManager::new( @@ -237,7 +237,6 @@ where metrics.clone(), &config.runtime, ) - .await })) .await?; @@ -253,7 +252,6 @@ where metrics.clone(), &config.runtime, ) - .await })) .await; @@ -558,7 +556,7 @@ mod tests { eviction_pickers: vec![Box::::default()], reinsertion_picker, tombstone_log_config: None, - buffer_threshold: 16 * 1024 * 1024, + buffer_pool_size: 16 * 1024 * 1024, statistics: Arc::::default(), runtime: Runtime::new(None, None, Handle::current()), marker: PhantomData, @@ -587,7 +585,7 @@ mod tests { eviction_pickers: vec![Box::::default()], reinsertion_picker: Arc::>::default(), tombstone_log_config: Some(TombstoneLogConfigBuilder::new(path).with_flush(true).build()), - buffer_threshold: 16 * 1024 * 1024, + buffer_pool_size: 16 * 1024 * 1024, statistics: Arc::::default(), runtime: Runtime::new(None, None, Handle::current()), marker: PhantomData, diff --git a/foyer-storage/src/large/reclaimer.rs b/foyer-storage/src/large/reclaimer.rs index dafbff26..e49f4b43 100644 --- a/foyer-storage/src/large/reclaimer.rs +++ b/foyer-storage/src/large/reclaimer.rs @@ -45,7 +45,7 @@ pub struct Reclaimer { impl Reclaimer { #[expect(clippy::too_many_arguments)] - pub async fn open( + pub fn open( region_manager: RegionManager, reclaim_semaphore: Arc, reinsertion_picker: Arc>, diff --git a/foyer-storage/src/picker/mod.rs b/foyer-storage/src/picker/mod.rs index 8cec22df..2657bf48 100644 --- a/foyer-storage/src/picker/mod.rs +++ b/foyer-storage/src/picker/mod.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{collections::HashMap, fmt::Debug, sync::Arc}; +use std::{collections::HashMap, fmt::Debug, ops::Range, sync::Arc}; use crate::{device::RegionId, region::RegionStats, statistics::Statistics}; @@ -38,7 +38,7 @@ pub trait ReinsertionPicker: Send + Sync + 'static + Debug { pub trait EvictionPicker: Send + Sync + 'static + Debug { /// Init the eviction picker with information. #[expect(unused_variables)] - fn init(&mut self, regions: usize, region_size: usize) {} + fn init(&mut self, regions: Range, region_size: usize) {} /// Pick a region to evict. /// diff --git a/foyer-storage/src/picker/utils.rs b/foyer-storage/src/picker/utils.rs index a095ed36..fb793d5c 100644 --- a/foyer-storage/src/picker/utils.rs +++ b/foyer-storage/src/picker/utils.rs @@ -16,6 +16,7 @@ use std::{ collections::{HashMap, VecDeque}, fmt::Debug, marker::PhantomData, + ops::Range, sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -257,7 +258,7 @@ impl InvalidRatioPicker { } impl EvictionPicker for InvalidRatioPicker { - fn init(&mut self, _: usize, region_size: usize) { + fn init(&mut self, _: Range, region_size: usize) { self.region_size = region_size; } @@ -321,7 +322,7 @@ mod tests { #[test] fn test_invalid_ratio_picker() { let mut picker = InvalidRatioPicker::new(0.5); - picker.init(10, 10); + picker.init(0..10, 10); let mut m = HashMap::new(); diff --git a/foyer-storage/src/prelude.rs b/foyer-storage/src/prelude.rs index c2564896..ec0b94c8 100644 --- a/foyer-storage/src/prelude.rs +++ b/foyer-storage/src/prelude.rs @@ -33,5 +33,8 @@ pub use crate::{ runtime::Runtime, statistics::Statistics, storage::{either::Order, Storage}, - store::{CombinedConfig, DeviceConfig, RuntimeConfig, Store, StoreBuilder, TokioRuntimeConfig}, + store::{ + DeviceConfig, Engine, LargeEngineOptions, RuntimeConfig, SmallEngineOptions, Store, StoreBuilder, + TokioRuntimeConfig, + }, }; diff --git a/foyer-storage/src/serde.rs b/foyer-storage/src/serde.rs index c3ec570e..169e5377 100644 --- a/foyer-storage/src/serde.rs +++ b/foyer-storage/src/serde.rs @@ -18,7 +18,7 @@ use foyer_common::{ code::{StorageKey, StorageValue}, metrics::Metrics, }; -use twox_hash::XxHash64; +use twox_hash::{XxHash32, XxHash64}; use crate::{ compress::Compression, @@ -29,11 +29,17 @@ use crate::{ pub struct Checksummer; impl Checksummer { - pub fn checksum(buf: &[u8]) -> u64 { + pub fn checksum64(buf: &[u8]) -> u64 { let mut hasher = XxHash64::with_seed(0); hasher.write(buf); hasher.finish() } + + pub fn checksum32(buf: &[u8]) -> u32 { + let mut hasher = XxHash32::with_seed(0); + hasher.write(buf); + hasher.finish() as u32 + } } #[derive(Debug)] @@ -183,7 +189,7 @@ impl EntryDeserializer { // calculate checksum if needed if let Some(expected) = checksum { - let get = Checksummer::checksum(&buffer[..value_len + ken_len]); + let get = Checksummer::checksum64(&buffer[..value_len + ken_len]); if expected != get { return Err(Error::ChecksumMismatch { expected, get }); } diff --git a/foyer-storage/src/small/batch.rs b/foyer-storage/src/small/batch.rs new file mode 100644 index 00000000..310aa156 --- /dev/null +++ b/foyer-storage/src/small/batch.rs @@ -0,0 +1,329 @@ +// Copyright 2024 Foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + collections::{HashMap, HashSet}, + fmt::Debug, + ops::Range, + sync::Arc, + time::Instant, +}; + +use foyer_common::{ + bits, + code::{HashBuilder, StorageKey, StorageValue}, + metrics::Metrics, +}; +use foyer_memory::CacheEntry; +use itertools::Itertools; +use tokio::sync::oneshot; + +use crate::{ + device::ALIGN, + io_buffer_pool::IoBufferPool, + serde::EntrySerializer, + small::{serde::EntryHeader, set::SetId}, + Compression, IoBuffer, IoBytes, +}; + +type Sequence = usize; + +#[derive(Debug)] +struct ItemMut +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + range: Range, + entry: CacheEntry, + sequence: Sequence, +} + +#[derive(Debug)] +struct SetBatchMut +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + items: Vec>, + deletes: HashMap, +} + +impl Default for SetBatchMut +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + fn default() -> Self { + Self { + items: vec![], + deletes: HashMap::new(), + } + } +} + +#[derive(Debug)] +pub struct BatchMut +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + /// Total set count. + total: SetId, + + sets: HashMap>, + buffer: IoBuffer, + len: usize, + sequence: Sequence, + + /// Cache write buffer between rotation to reduce page fault. + buffer_pool: IoBufferPool, + + waiters: Vec>, + + init: Option, + + metrics: Arc, +} + +impl BatchMut +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + pub fn new(total: SetId, buffer_size: usize, metrics: Arc) -> Self { + let buffer_size = bits::align_up(ALIGN, buffer_size); + + Self { + total, + sets: HashMap::new(), + buffer: IoBuffer::new(buffer_size), + len: 0, + sequence: 0, + buffer_pool: IoBufferPool::new(buffer_size, 1), + waiters: vec![], + init: None, + metrics, + } + } + + pub fn insert(&mut self, entry: CacheEntry, estimated_size: usize) -> bool { + // For the small object disk cache does NOT compress entries, `estimated_size` is actually `exact_size`. + tracing::trace!("[sodc batch]: insert entry"); + + if self.init.is_none() { + self.init = Some(Instant::now()); + } + self.sequence += 1; + + let sid = self.sid(entry.hash()); + let len = EntryHeader::ENTRY_HEADER_SIZE + estimated_size; + + let set = &mut self.sets.entry(sid).or_default(); + + set.deletes.insert(entry.hash(), self.sequence); + + if entry.is_outdated() || self.len + len > self.buffer.len() { + return false; + } + + let info = match EntrySerializer::serialize( + entry.key(), + entry.value(), + &Compression::None, + &mut self.buffer[self.len + EntryHeader::ENTRY_HEADER_SIZE..self.len + len], + &self.metrics, + ) { + Ok(info) => info, + Err(e) => { + tracing::warn!("[sodc batch]: serialize entry error: {e}"); + return false; + } + }; + assert_eq!(info.key_len + info.value_len + EntryHeader::ENTRY_HEADER_SIZE, len); + let header = EntryHeader::new(entry.hash(), info.key_len, info.value_len); + header.write(&mut self.buffer[self.len..self.len + EntryHeader::ENTRY_HEADER_SIZE]); + + set.items.push(ItemMut { + range: self.len..self.len + len, + entry, + sequence: self.sequence, + }); + self.len += len; + + true + } + + pub fn delete(&mut self, hash: u64) { + tracing::trace!("[sodc batch]: delete entry"); + + if self.init.is_none() { + self.init = Some(Instant::now()); + } + self.sequence += 1; + + let sid = self.sid(hash); + self.sets.entry(sid).or_default().deletes.insert(hash, self.sequence); + } + + /// Register a waiter to be notified after the batch is finished. + pub fn wait(&mut self, tx: oneshot::Sender<()>) { + tracing::trace!("[sodc batch]: register waiter"); + if self.init.is_none() { + self.init = Some(Instant::now()); + } + self.waiters.push(tx); + } + + fn sid(&self, hash: u64) -> SetId { + hash % self.total + } + + pub fn is_empty(&self) -> bool { + self.init.is_none() + } + + pub fn rotate(&mut self) -> Option> { + if self.is_empty() { + return None; + } + + let mut buffer = self.buffer_pool.acquire(); + std::mem::swap(&mut self.buffer, &mut buffer); + self.len = 0; + self.sequence = 0; + let buffer = IoBytes::from(buffer); + self.buffer_pool.release(buffer.clone()); + + let sets = self + .sets + .drain() + .map(|(sid, batch)| { + let items = batch + .items + .into_iter() + .filter(|item| item.sequence >= batch.deletes.get(&item.entry.hash()).copied().unwrap_or_default()) + .map(|item| Item { + buffer: buffer.slice(item.range), + entry: item.entry, + }) + .collect_vec(); + let deletes = batch.deletes.keys().copied().collect(); + ( + sid, + SetBatch { + deletions: deletes, + items, + }, + ) + }) + .collect(); + + let waiters = std::mem::take(&mut self.waiters); + let init = self.init.take(); + + Some(Batch { sets, waiters, init }) + } +} + +pub struct Item +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + pub buffer: IoBytes, + pub entry: CacheEntry, +} + +impl Debug for Item +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Item").field("hash", &self.entry.hash()).finish() + } +} + +pub struct SetBatch +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + pub deletions: HashSet, + pub items: Vec>, +} + +impl Debug for SetBatch +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SetBatch") + .field("deletes", &self.deletions) + .field("items", &self.items) + .finish() + } +} + +pub struct Batch +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + pub sets: HashMap>, + pub waiters: Vec>, + pub init: Option, +} + +impl Default for Batch +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + fn default() -> Self { + Self { + sets: HashMap::new(), + waiters: vec![], + init: None, + } + } +} + +impl Debug for Batch +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Batch") + .field("sets", &self.sets) + .field("waiters", &self.waiters) + .field("init", &self.init) + .finish() + } +} diff --git a/foyer-storage/src/small/bloom_filter.rs b/foyer-storage/src/small/bloom_filter.rs new file mode 100644 index 00000000..4cf98689 --- /dev/null +++ b/foyer-storage/src/small/bloom_filter.rs @@ -0,0 +1,201 @@ +// Copyright 2024 Foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![cfg_attr(not(test), expect(dead_code))] + +use paste::paste; + +macro_rules! bloom_filter { + ($( {$type:ty, $suffix:ident}, )*) => { + paste! { + $( + /// A [<$type>] bloom filter with N hash hashers. + #[derive(Debug, Clone, PartialEq, Eq)] + pub struct [] { + data: [$type; N], + } + + impl Default for [] { + fn default() -> Self { + Self::new() + } + } + + impl [] { + const BYTES: usize = $type::BITS as usize / u8::BITS as usize * N; + + pub fn new() -> Self { + Self { + data: [0; N], + } + } + + pub fn read(raw: &[u8]) -> Self { + let mut data = [0; N]; + data.copy_from_slice(unsafe { std::slice::from_raw_parts(raw.as_ptr() as *const $type, N) }); + Self { data } + } + + pub fn write(&self, raw: &mut [u8]) { + raw[..Self::BYTES].copy_from_slice(unsafe { std::slice::from_raw_parts(self.data.as_ptr() as *const u8, Self::BYTES) }) + } + + pub fn insert(&mut self, hash: u64) { + for i in 0..N { + let seed = twang_mix64(i as _); + let hash = combine_hashes(hash, seed); + let bit = hash as usize % $type::BITS as usize; + self.data[i] |= 1 << bit; + } + } + + pub fn lookup(&self, hash: u64) -> bool { + for i in 0..N { + let seed = twang_mix64(i as _); + let hash = combine_hashes(hash, seed) as $type; + let bit = hash as usize % $type::BITS as usize; + if self.data[i] & (1 << bit) == 0 { + return false; + } + } + true + } + + pub fn clear(&mut self) { + self.data = [0; N]; + } + } + )* + } + }; +} + +macro_rules! for_all_uint_types { + ($macro:ident) => { + $macro! { + {u8, U8}, + {u16, U16}, + {u32, U32}, + {u64, U64}, + {usize, Usize}, + } + }; +} + +for_all_uint_types! { bloom_filter } + +/// Reduce two 64-bit hashes into one. +/// +/// Ported from CacheLib, which uses the `Hash128to64` function from Google's city hash. +#[inline(always)] +fn combine_hashes(upper: u64, lower: u64) -> u64 { + const MUL: u64 = 0x9ddfea08eb382d69; + + let mut a = (lower ^ upper).wrapping_mul(MUL); + a ^= a >> 47; + let mut b = (upper ^ a).wrapping_mul(MUL); + b ^= b >> 47; + b = b.wrapping_mul(MUL); + b +} + +#[inline(always)] +fn twang_mix64(val: u64) -> u64 { + let mut val = (!val).wrapping_add(val << 21); // val *= (1 << 21); val -= 1 + val = val ^ (val >> 24); + val = val.wrapping_add(val << 3).wrapping_add(val << 8); // val *= 1 + (1 << 3) + (1 << 8) + val = val ^ (val >> 14); + val = val.wrapping_add(val << 2).wrapping_add(val << 4); // va; *= 1 + (1 << 2) + (1 << 4) + val = val ^ (val >> 28); + val = val.wrapping_add(val << 31); // val *= 1 + (1 << 31) + val +} + +macro_rules! test_bloom_filter { + ($( {$type:ty, $suffix:ident}, )*) => { + #[cfg(test)] + mod tests { + use super::*; + + const N: usize = 4; + + paste! { + $( + #[test] + fn []() { + let mut bf = []::::new(); + + bf.insert(42); + assert!(bf.lookup(42)); + assert!(!bf.lookup(114514)); + bf.clear(); + assert!(!bf.lookup(42)); + } + + #[test] + fn []() { + let mut bf = []::::new(); + bf.insert(1); + bf.insert(2); + bf.insert(3); + assert!(bf.lookup(1)); + assert!(bf.lookup(2)); + assert!(bf.lookup(3)); + assert!(!bf.lookup(4)); + } + + #[test] + fn []() { + const INSERTS: usize = []::::BYTES; + const LOOKUPS: usize = []::::BYTES * 100; + const THRESHOLD: f64 = 0.1; + let mut bf = []::::new(); + // Insert a bunch of values + for i in 0..INSERTS { + bf.insert(i as _); + println!("{i}: {:X?}", bf.data); + } + // Check for false positives + let mut false_positives = 0; + for i in INSERTS..INSERTS + LOOKUPS { + if bf.lookup(i as _) { + false_positives += 1; + } + } + let ratio = false_positives as f64 / LOOKUPS as f64; + println!("ratio: {ratio}"); + assert!( + ratio < THRESHOLD, + "false positive ratio {ratio} > threshold {THRESHOLD}, inserts: {INSERTS}, lookups: {LOOKUPS}" + ); + } + + #[test] + fn []() { + let mut buf = [0; []::::BYTES]; + let mut bf = []::::new(); + bf.insert(42); + bf.write(&mut buf); + let bf2 = []::::read(&buf); + assert_eq!(bf, bf2); + } + )* + } + + } + + }; +} + +for_all_uint_types! { test_bloom_filter } diff --git a/foyer-storage/src/small/flusher.rs b/foyer-storage/src/small/flusher.rs new file mode 100644 index 00000000..43e720a3 --- /dev/null +++ b/foyer-storage/src/small/flusher.rs @@ -0,0 +1,225 @@ +// Copyright 2024 Foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + fmt::Debug, + future::Future, + sync::{atomic::Ordering, Arc}, +}; + +use foyer_common::{ + code::{HashBuilder, StorageKey, StorageValue}, + metrics::Metrics, +}; +use foyer_memory::CacheEntry; +use futures::future::try_join_all; +use tokio::sync::{oneshot, OwnedSemaphorePermit, Semaphore}; + +use super::{ + batch::{Batch, BatchMut, SetBatch}, + generic::GenericSmallStorageConfig, + set_manager::SetManager, +}; +use crate::{ + error::{Error, Result}, + Statistics, +}; + +pub enum Submission +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + Insertion { + entry: CacheEntry, + estimated_size: usize, + }, + Deletion { + hash: u64, + }, + Wait { + tx: oneshot::Sender<()>, + }, +} + +impl Debug for Submission +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Insertion { + entry: _, + estimated_size, + } => f + .debug_struct("Insertion") + .field("estimated_size", estimated_size) + .finish(), + Self::Deletion { hash } => f.debug_struct("Deletion").field("hash", hash).finish(), + Self::Wait { .. } => f.debug_struct("Wait").finish(), + } + } +} + +pub struct Flusher +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + tx: flume::Sender>, +} + +impl Flusher +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + pub fn open( + config: &GenericSmallStorageConfig, + set_manager: SetManager, + stats: Arc, + metrics: Arc, + ) -> Self { + let (tx, rx) = flume::unbounded(); + + let buffer_size = config.buffer_pool_size / config.flushers; + + let batch = BatchMut::new(set_manager.sets() as _, buffer_size, metrics.clone()); + + let runner = Runner { + rx, + batch, + flight: Arc::new(Semaphore::new(1)), + set_manager, + stats, + metrics, + }; + + config.runtime.write().spawn(async move { + if let Err(e) = runner.run().await { + tracing::error!("[sodc flusher]: flusher exit with error: {e}"); + } + }); + + Self { tx } + } + + pub fn submit(&self, submission: Submission) { + tracing::trace!("[sodc flusher]: submit task: {submission:?}"); + if let Err(e) = self.tx.send(submission) { + tracing::error!("[sodc flusher]: error raised when submitting task, error: {e}"); + } + } + + pub fn wait(&self) -> impl Future + Send + 'static { + let (tx, rx) = oneshot::channel(); + self.submit(Submission::Wait { tx }); + async move { + let _ = rx.await; + } + } +} + +struct Runner +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + rx: flume::Receiver>, + batch: BatchMut, + flight: Arc, + + set_manager: SetManager, + + stats: Arc, + metrics: Arc, +} + +impl Runner +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + pub async fn run(mut self) -> Result<()> { + loop { + let flight = self.flight.clone(); + tokio::select! { + biased; + Ok(permit) = flight.acquire_owned(), if !self.batch.is_empty() => { + // TODO(MrCroxx): `rotate()` should always return a `Some(..)` here. + if let Some(batch) = self.batch.rotate() { + self.commit(batch, permit).await; + } + } + Ok(submission) = self.rx.recv_async() => { + self.submit(submission); + } + // Graceful shutdown. + else => break, + } + } + Ok(()) + } + + fn submit(&mut self, submission: Submission) { + let report = |enqueued: bool| { + if !enqueued { + self.metrics.storage_queue_drop.increment(1); + } + }; + + match submission { + Submission::Insertion { entry, estimated_size } => report(self.batch.insert(entry, estimated_size)), + Submission::Deletion { hash } => self.batch.delete(hash), + Submission::Wait { tx } => self.batch.wait(tx), + } + } + + pub async fn commit(&self, batch: Batch, permit: OwnedSemaphorePermit) { + tracing::trace!("[sodc flusher] commit batch: {batch:?}"); + + let futures = batch.sets.into_iter().map(|(sid, SetBatch { deletions, items })| { + let set_manager = self.set_manager.clone(); + let stats = self.stats.clone(); + async move { + let mut set = set_manager.write(sid).await?; + set.apply(&deletions, items); + set_manager.apply(set).await?; + + stats + .cache_write_bytes + .fetch_add(set_manager.set_size(), Ordering::Relaxed); + + Ok::<_, Error>(()) + } + }); + + if let Err(e) = try_join_all(futures).await { + tracing::error!("[sodc flusher]: error raised when committing batch, error: {e}"); + } + + for waiter in batch.waiters { + let _ = waiter.send(()); + } + + drop(permit); + } +} diff --git a/foyer-storage/src/small/generic.rs b/foyer-storage/src/small/generic.rs index dd8ae8e2..3b088204 100644 --- a/foyer-storage/src/small/generic.rs +++ b/foyer-storage/src/small/generic.rs @@ -12,13 +12,29 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{fmt::Debug, marker::PhantomData, sync::Arc}; +use std::{ + fmt::Debug, + marker::PhantomData, + ops::Range, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; use foyer_common::code::{HashBuilder, StorageKey, StorageValue}; use foyer_memory::CacheEntry; -use futures::Future; +use futures::{future::join_all, Future}; +use itertools::Itertools; -use crate::{error::Result, storage::Storage, DeviceStats}; +use super::flusher::Submission; +use crate::{ + device::{MonitoredDevice, RegionId}, + error::Result, + small::{flusher::Flusher, set::SetId, set_manager::SetManager}, + storage::Storage, + DeviceStats, Runtime, Statistics, +}; pub struct GenericSmallStorageConfig where @@ -26,7 +42,16 @@ where V: StorageValue, S: HashBuilder + Debug, { - pub placeholder: PhantomData<(K, V, S)>, + pub set_size: usize, + pub set_cache_capacity: usize, + pub device: MonitoredDevice, + pub regions: Range, + pub flush: bool, + pub flushers: usize, + pub buffer_pool_size: usize, + pub statistics: Arc, + pub runtime: Runtime, + pub marker: PhantomData<(K, V, S)>, } impl Debug for GenericSmallStorageConfig @@ -36,17 +61,45 @@ where S: HashBuilder + Debug, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("GenericSmallStorageConfig").finish() + f.debug_struct("GenericSmallStorageConfig") + .field("set_size", &self.set_size) + .field("set_cache_capacity", &self.set_cache_capacity) + .field("device", &self.device) + .field("regions", &self.regions) + .field("flush", &self.flush) + .field("flushers", &self.flushers) + .field("buffer_pool_size", &self.buffer_pool_size) + .field("statistics", &self.statistics) + .field("runtime", &self.runtime) + .field("marker", &self.marker) + .finish() } } +struct GenericSmallStorageInner +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + flushers: Vec>, + + device: MonitoredDevice, + set_manager: SetManager, + + active: AtomicBool, + + stats: Arc, + runtime: Runtime, +} + pub struct GenericSmallStorage where K: StorageKey, V: StorageValue, S: HashBuilder + Debug, { - _marker: PhantomData<(K, V, S)>, + inner: Arc>, } impl Debug for GenericSmallStorage @@ -67,7 +120,114 @@ where S: HashBuilder + Debug, { fn clone(&self) -> Self { - Self { _marker: PhantomData } + Self { + inner: self.inner.clone(), + } + } +} + +impl GenericSmallStorage +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + async fn open(config: GenericSmallStorageConfig) -> Result { + let stats = config.statistics.clone(); + let metrics = config.device.metrics().clone(); + + let set_manager = SetManager::new( + config.set_size, + config.set_cache_capacity, + config.device.clone(), + config.regions.clone(), + config.flush, + ); + + let flushers = (0..config.flushers) + .map(|_| Flusher::open(&config, set_manager.clone(), stats.clone(), metrics.clone())) + .collect_vec(); + + let inner = GenericSmallStorageInner { + flushers, + device: config.device, + set_manager, + active: AtomicBool::new(true), + stats, + runtime: config.runtime, + }; + let inner = Arc::new(inner); + + Ok(Self { inner }) + } + + fn wait(&self) -> impl Future + Send + 'static { + let wait_flushers = join_all(self.inner.flushers.iter().map(|flusher| flusher.wait())); + async move { + wait_flushers.await; + } + } + + async fn close(&self) -> Result<()> { + self.inner.active.store(false, Ordering::Relaxed); + self.wait().await; + Ok(()) + } + + fn enqueue(&self, entry: CacheEntry, estimated_size: usize) { + if !self.inner.active.load(Ordering::Relaxed) { + tracing::warn!("cannot enqueue new entry after closed"); + return; + } + + // Entries with the same hash must be grouped in the batch. + let id = entry.hash() as usize % self.inner.flushers.len(); + self.inner.flushers[id].submit(Submission::Insertion { entry, estimated_size }); + } + + fn load(&self, hash: u64) -> impl Future>> + Send + 'static { + let set_manager = self.inner.set_manager.clone(); + let sid = hash % set_manager.sets() as SetId; + let stats = self.inner.stats.clone(); + + async move { + stats + .cache_read_bytes + .fetch_add(set_manager.set_size(), Ordering::Relaxed); + + match set_manager.read(sid, hash).await? { + Some(set) => { + let kv = set.get(hash)?; + Ok(kv) + } + None => Ok(None), + } + } + } + + fn delete(&self, hash: u64) { + if !self.inner.active.load(Ordering::Relaxed) { + tracing::warn!("cannot enqueue new entry after closed"); + return; + } + + // Entries with the same hash MUST be grouped in the same batch. + let id = hash as usize % self.inner.flushers.len(); + self.inner.flushers[id].submit(Submission::Deletion { hash }); + } + + fn may_contains(&self, hash: u64) -> bool { + let set_manager = self.inner.set_manager.clone(); + let sid = hash % set_manager.sets() as SetId; + // FIXME: Anyway without blocking? Use atomic? + self.inner + .runtime + .read() + .block_on(async move { set_manager.contains(sid, hash).await }) + } + + fn stats(&self) -> Arc { + self.inner.device.stat().clone() } } @@ -82,27 +242,29 @@ where type BuildHasher = S; type Config = GenericSmallStorageConfig; - async fn open(_config: Self::Config) -> Result { - todo!() + async fn open(config: Self::Config) -> Result { + Self::open(config).await } async fn close(&self) -> Result<()> { - todo!() + self.close().await?; + Ok(()) } - fn enqueue(&self, _entry: CacheEntry, _estimated_size: usize) { - todo!() + fn enqueue(&self, entry: CacheEntry, estimated_size: usize) { + self.enqueue(entry, estimated_size); } - #[expect(clippy::manual_async_fn)] - fn load(&self, _hash: u64) -> impl Future>> + Send + 'static { - async { todo!() } + fn load(&self, hash: u64) -> impl Future>> + Send + 'static { + self.load(hash) } - fn delete(&self, _hash: u64) {} + fn delete(&self, hash: u64) { + self.delete(hash) + } - fn may_contains(&self, _hash: u64) -> bool { - todo!() + fn may_contains(&self, hash: u64) -> bool { + self.may_contains(hash) } async fn destroy(&self) -> Result<()> { @@ -110,11 +272,10 @@ where } fn stats(&self) -> Arc { - todo!() + self.stats() } - #[expect(clippy::manual_async_fn)] fn wait(&self) -> impl Future + Send + 'static { - async { todo!() } + self.wait() } } diff --git a/foyer-storage/src/small/mod.rs b/foyer-storage/src/small/mod.rs index 01e617c2..44c6f579 100644 --- a/foyer-storage/src/small/mod.rs +++ b/foyer-storage/src/small/mod.rs @@ -12,4 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod batch; +pub mod bloom_filter; +pub mod flusher; pub mod generic; +pub mod serde; +pub mod set; +pub mod set_manager; diff --git a/foyer-storage/src/small/serde.rs b/foyer-storage/src/small/serde.rs new file mode 100644 index 00000000..2648ad48 --- /dev/null +++ b/foyer-storage/src/small/serde.rs @@ -0,0 +1,97 @@ +// Copyright 2024 Foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use bytes::{Buf, BufMut}; + +/// max key/value len: `64 KiB - 1` +/// +/// # Format +/// +/// ```plain +/// | hash 64b | key len 16b | value len 16b | +/// ``` +#[derive(Debug, PartialEq, Eq)] +pub struct EntryHeader { + hash: u64, + key_len: u16, + value_len: u16, +} + +impl EntryHeader { + pub const ENTRY_HEADER_SIZE: usize = (16 + 16 + 64) / 8; + + pub fn new(hash: u64, key_len: usize, value_len: usize) -> Self { + Self { + hash, + key_len: key_len as _, + value_len: value_len as _, + } + } + + #[inline] + pub fn hash(&self) -> u64 { + self.hash + } + + #[inline] + pub fn key_len(&self) -> usize { + self.key_len as _ + } + + #[inline] + pub fn value_len(&self) -> usize { + self.value_len as _ + } + + #[inline] + pub fn entry_len(&self) -> usize { + Self::ENTRY_HEADER_SIZE + self.key_len() + self.value_len() + } + + pub fn write(&self, mut buf: impl BufMut) { + buf.put_u64(self.hash); + buf.put_u16(self.key_len); + buf.put_u16(self.value_len); + } + + pub fn read(mut buf: impl Buf) -> Self { + let hash = buf.get_u64(); + let key_len = buf.get_u16(); + let value_len = buf.get_u16(); + Self { + hash, + key_len, + value_len, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::IoBytesMut; + + #[test] + fn test_entry_header_serde() { + let header = EntryHeader { + hash: 114514, + key_len: 114, + value_len: 514, + }; + let mut buf = IoBytesMut::new(); + header.write(&mut buf); + let h = EntryHeader::read(&buf[..]); + assert_eq!(header, h); + } +} diff --git a/foyer-storage/src/small/set.rs b/foyer-storage/src/small/set.rs new file mode 100644 index 00000000..02e94adb --- /dev/null +++ b/foyer-storage/src/small/set.rs @@ -0,0 +1,537 @@ +// Copyright 2024 Foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + collections::HashSet, + fmt::Debug, + ops::{Deref, DerefMut, Range}, + sync::Arc, + time::{SystemTime, UNIX_EPOCH}, +}; + +use bytes::{Buf, BufMut}; +use foyer_common::code::{HashBuilder, StorageKey, StorageValue}; + +use super::{batch::Item, bloom_filter::BloomFilterU64, serde::EntryHeader}; +use crate::{ + error::Result, + serde::{Checksummer, EntryDeserializer}, + IoBytes, IoBytesMut, +}; + +pub type SetId = u64; + +#[derive(Debug)] +pub struct Set { + storage: Arc, +} + +impl Deref for Set { + type Target = SetStorage; + + fn deref(&self) -> &Self::Target { + &self.storage + } +} + +impl Set { + pub fn new(storage: Arc) -> Self { + Self { storage } + } +} + +#[derive(Debug)] +pub struct SetMut { + storage: SetStorage, +} + +impl Deref for SetMut { + type Target = SetStorage; + + fn deref(&self) -> &Self::Target { + &self.storage + } +} + +impl DerefMut for SetMut { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.storage + } +} + +impl SetMut { + pub fn new(storage: SetStorage) -> Self { + Self { storage } + } + + pub fn into_storage(self) -> SetStorage { + self.storage + } +} + +/// # Format +/// +/// ```plain +/// | checksum (4B) | timestamp (8B) | len (4B) | +/// | bloom filter (4 * 8B = 32B) | +/// ``` +pub struct SetStorage { + /// Set checksum. + checksum: u32, + + /// Set written data length. + len: usize, + /// Set data length capacity. + capacity: usize, + /// Set size. + size: usize, + /// Set last updated timestamp. + timestamp: u64, + /// Set bloom filter. + bloom_filter: BloomFilterU64<4>, + + buffer: IoBytesMut, +} + +impl Debug for SetStorage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SetStorage") + .field("checksum", &self.checksum) + .field("len", &self.len) + .field("capacity", &self.capacity) + .field("size", &self.size) + .field("timestamp", &self.timestamp) + .field("bloom_filter", &self.bloom_filter) + .finish() + } +} + +impl SetStorage { + pub const SET_HEADER_SIZE: usize = 48; + + pub fn load(buffer: IoBytesMut) -> Self { + assert!(buffer.len() >= Self::SET_HEADER_SIZE); + + let checksum = (&buffer[0..4]).get_u32(); + let timestamp = (&buffer[4..12]).get_u64(); + let len = (&buffer[12..16]).get_u32() as usize; + let bloom_filter = BloomFilterU64::read(&buffer[16..48]); + + let mut this = Self { + checksum, + len, + capacity: buffer.len() - Self::SET_HEADER_SIZE, + size: buffer.len(), + timestamp, + bloom_filter, + buffer, + }; + + if Self::SET_HEADER_SIZE + this.len >= this.buffer.len() { + // invalid len + this.clear(); + } else { + let c = Checksummer::checksum32(&this.buffer[4..Self::SET_HEADER_SIZE + this.len]); + if c != checksum { + // checksum mismatch + this.clear(); + } + } + + this + } + + pub fn update(&mut self) { + self.bloom_filter.write(&mut self.buffer[16..48]); + (&mut self.buffer[12..16]).put_u32(self.len as _); + self.timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64; + (&mut self.buffer[4..12]).put_u64(self.timestamp); + self.checksum = Checksummer::checksum32(&self.buffer[4..Self::SET_HEADER_SIZE + self.len]); + (&mut self.buffer[0..4]).put_u32(self.checksum); + } + + pub fn bloom_filter(&self) -> &BloomFilterU64<4> { + &self.bloom_filter + } + + #[cfg_attr(not(test), expect(dead_code))] + pub fn len(&self) -> usize { + self.len + } + + #[cfg_attr(not(test), expect(dead_code))] + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + pub fn clear(&mut self) { + self.len = 0; + self.bloom_filter.clear(); + } + + pub fn freeze(self) -> IoBytes { + self.buffer.freeze() + } + + pub fn apply(&mut self, deletions: &HashSet, items: Vec>) + where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, + { + self.deletes(deletions); + self.append(items); + } + + fn deletes(&mut self, deletes: &HashSet) { + if deletes.is_empty() { + return; + } + + let mut rcursor = 0; + let mut wcursor = 0; + // Rebuild bloom filter. + self.bloom_filter.clear(); + + while rcursor < self.len { + let header = EntryHeader::read( + &self.buffer + [Self::SET_HEADER_SIZE + rcursor..Self::SET_HEADER_SIZE + rcursor + EntryHeader::ENTRY_HEADER_SIZE], + ); + + if !deletes.contains(&header.hash()) { + if rcursor != wcursor { + self.buffer.copy_within( + Self::SET_HEADER_SIZE + rcursor..Self::SET_HEADER_SIZE + header.entry_len(), + wcursor, + ); + } + wcursor += header.entry_len(); + self.bloom_filter.insert(header.hash()); + } + + rcursor += header.entry_len(); + } + + self.len = wcursor; + } + + fn append(&mut self, items: Vec>) + where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, + { + let (skip, size, _) = items + .iter() + .rev() + .fold((items.len(), 0, true), |(skip, size, proceed), item| { + let proceed = proceed && size + item.buffer.len() <= self.size - Self::SET_HEADER_SIZE; + if proceed { + (skip - 1, size + item.buffer.len(), proceed) + } else { + (skip, size, proceed) + } + }); + + self.reserve(size); + let mut cursor = Self::SET_HEADER_SIZE + self.len; + for item in items.iter().skip(skip) { + self.buffer[cursor..cursor + item.buffer.len()].copy_from_slice(&item.buffer); + self.bloom_filter.insert(item.entry.hash()); + cursor += item.buffer.len(); + } + self.len = cursor - Self::SET_HEADER_SIZE; + } + + pub fn get(&self, hash: u64) -> Result> + where + K: StorageKey, + V: StorageValue, + { + if !self.bloom_filter.lookup(hash) { + return Ok(None); + } + for entry in self.iter() { + if hash == entry.hash { + let k = EntryDeserializer::deserialize_key::(entry.key)?; + let v = EntryDeserializer::deserialize_value::(entry.value, crate::Compression::None)?; + return Ok(Some((k, v))); + } + } + Ok(None) + } + + /// from: + /// + /// ```plain + /// 0 wipe len capacity + /// |_________|ooooooooooooo|___________| + /// ``` + /// + /// to: + /// + /// ```plain + /// 0 new len = len - wipe capacity + /// |ooooooooooooo|_____________________| + /// ``` + fn reserve(&mut self, required: usize) { + let remains = self.capacity - self.len; + if remains >= required { + return; + } + + let mut wipe = 0; + for entry in self.iter() { + wipe += entry.len(); + if remains + wipe >= required { + break; + } + } + self.buffer.copy_within( + Self::SET_HEADER_SIZE + wipe..Self::SET_HEADER_SIZE + self.len, + Self::SET_HEADER_SIZE, + ); + self.len -= wipe; + assert!(self.capacity - self.len >= required); + let mut bloom_filter = BloomFilterU64::default(); + for entry in self.iter() { + bloom_filter.insert(entry.hash); + } + self.bloom_filter = bloom_filter; + } + + fn iter(&self) -> SetIter<'_> { + SetIter::open(self) + } + + fn data(&self) -> &[u8] { + &self.buffer[Self::SET_HEADER_SIZE..self.size] + } +} + +pub struct SetEntry<'a> { + offset: usize, + pub hash: u64, + pub key: &'a [u8], + pub value: &'a [u8], +} + +impl<'a> SetEntry<'a> { + /// Length of the entry with header, key and value included. + pub fn len(&self) -> usize { + EntryHeader::ENTRY_HEADER_SIZE + self.key.len() + self.value.len() + } + + /// Range of the entry in the set data. + #[expect(unused)] + pub fn range(&self) -> Range { + self.offset..self.offset + self.len() + } +} + +pub struct SetIter<'a> { + set: &'a SetStorage, + offset: usize, +} + +impl<'a> SetIter<'a> { + fn open(set: &'a SetStorage) -> Self { + Self { set, offset: 0 } + } + + fn is_valid(&self) -> bool { + self.offset < self.set.len + } + + fn next(&mut self) -> Option> { + if !self.is_valid() { + return None; + } + let mut cursor = self.offset; + let header = EntryHeader::read(&self.set.data()[cursor..cursor + EntryHeader::ENTRY_HEADER_SIZE]); + cursor += EntryHeader::ENTRY_HEADER_SIZE; + let value = &self.set.data()[cursor..cursor + header.value_len()]; + cursor += header.value_len(); + let key = &self.set.data()[cursor..cursor + header.key_len()]; + let entry = SetEntry { + offset: self.offset, + hash: header.hash(), + key, + value, + }; + self.offset += entry.len(); + Some(entry) + } +} + +impl<'a> Iterator for SetIter<'a> { + type Item = SetEntry<'a>; + + fn next(&mut self) -> Option { + self.next() + } +} + +#[cfg(test)] +mod tests { + + use foyer_memory::{Cache, CacheBuilder, CacheEntry}; + + use super::*; + use crate::{serde::EntrySerializer, test_utils::metrics_for_test, Compression}; + + const PAGE: usize = 4096; + + fn buffer(entry: &CacheEntry>) -> IoBytes { + let mut buf = IoBytesMut::new(); + + // reserve header + let header = EntryHeader::new(0, 0, 0); + header.write(&mut buf); + + let info = EntrySerializer::serialize( + entry.key(), + entry.value(), + &Compression::None, + &mut buf, + metrics_for_test(), + ) + .unwrap(); + + let header = EntryHeader::new(entry.hash(), info.key_len, info.value_len); + header.write(&mut buf[0..EntryHeader::ENTRY_HEADER_SIZE]); + + buf.freeze() + } + + fn assert_some(storage: &SetStorage, entry: &CacheEntry>) { + let ret = storage.get::>(entry.hash()).unwrap(); + let (k, v) = ret.unwrap(); + assert_eq!(&k, entry.key()); + assert_eq!(&v, entry.value()); + } + + fn assert_none(storage: &SetStorage, hash: u64) { + let ret = storage.get::>(hash).unwrap(); + assert!(ret.is_none()); + } + + fn memory_for_test() -> Cache> { + CacheBuilder::new(100).build() + } + + #[test] + #[should_panic] + fn test_set_storage_empty() { + let buffer = IoBytesMut::new(); + SetStorage::load(buffer); + } + + #[test] + fn test_set_storage_basic() { + let memory = memory_for_test(); + + let mut buf = IoBytesMut::with_capacity(PAGE); + unsafe { buf.set_len(PAGE) }; + + // load will result in an empty set + let mut storage = SetStorage::load(buf); + assert!(storage.is_empty()); + + let e1 = memory.insert(1, vec![b'1'; 42]); + let b1 = buffer(&e1); + storage.apply( + &HashSet::from_iter([2, 4]), + vec![Item { + buffer: b1.clone(), + entry: e1.clone(), + }], + ); + assert_eq!(storage.len(), b1.len()); + assert_some(&storage, &e1); + + let e2 = memory.insert(2, vec![b'2'; 97]); + let b2 = buffer(&e2); + storage.apply( + &HashSet::from_iter([e1.hash(), 3, 5]), + vec![Item { + buffer: b2.clone(), + entry: e2.clone(), + }], + ); + assert_eq!(storage.len(), b2.len()); + assert_none(&storage, e1.hash()); + assert_some(&storage, &e2); + + let e3 = memory.insert(3, vec![b'3'; 211]); + let b3 = buffer(&e3); + storage.apply( + &HashSet::from_iter([e1.hash()]), + vec![Item { + buffer: b3.clone(), + entry: e3.clone(), + }], + ); + assert_eq!(storage.len(), b2.len() + b3.len()); + assert_none(&storage, e1.hash()); + assert_some(&storage, &e2); + assert_some(&storage, &e3); + + let e4 = memory.insert(4, vec![b'4'; 3800]); + let b4 = buffer(&e4); + storage.apply( + &HashSet::from_iter([e1.hash()]), + vec![Item { + buffer: b4.clone(), + entry: e4.clone(), + }], + ); + assert_eq!(storage.len(), b4.len()); + assert_none(&storage, e1.hash()); + assert_none(&storage, e2.hash()); + assert_none(&storage, e3.hash()); + assert_some(&storage, &e4); + + // test recovery + storage.update(); + let bytes = storage.freeze(); + let mut buf = IoBytesMut::with_capacity(PAGE); + unsafe { buf.set_len(PAGE) }; + buf[0..bytes.len()].copy_from_slice(&bytes); + let mut storage = SetStorage::load(buf); + + assert_eq!(storage.len(), b4.len()); + assert_none(&storage, e1.hash()); + assert_none(&storage, e2.hash()); + assert_none(&storage, e3.hash()); + assert_some(&storage, &e4); + + // test oversize entry + let e5 = memory.insert(5, vec![b'5'; 20 * 1024]); + let b5 = buffer(&e5); + storage.apply( + &HashSet::new(), + vec![Item { + buffer: b5.clone(), + entry: e5.clone(), + }], + ); + assert_eq!(storage.len(), b4.len()); + assert_none(&storage, e1.hash()); + assert_none(&storage, e2.hash()); + assert_none(&storage, e3.hash()); + assert_some(&storage, &e4); + } +} diff --git a/foyer-storage/src/small/set_manager.rs b/foyer-storage/src/small/set_manager.rs new file mode 100644 index 00000000..441f0253 --- /dev/null +++ b/foyer-storage/src/small/set_manager.rs @@ -0,0 +1,253 @@ +// Copyright 2024 Foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + fmt::Debug, + ops::{Deref, DerefMut, Range}, + sync::Arc, +}; + +use foyer_common::strict_assert; +use itertools::Itertools; +use ordered_hash_map::OrderedHashMap; +use tokio::sync::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}; + +use super::{ + bloom_filter::BloomFilterU64, + set::{Set, SetId, SetMut, SetStorage}, +}; +use crate::{ + device::{MonitoredDevice, RegionId}, + error::Result, + Dev, +}; + +struct SetManagerInner { + /// A phantom rwlock to prevent set storage operations on disk. + /// + /// All set disk operations must be prevented by the lock. + /// + /// In addition, the rwlock also serves as the lock of the in-memory bloom filter. + sets: Vec>>, + cache: Mutex>>, + set_cache_capacity: usize, + + set_size: usize, + device: MonitoredDevice, + regions: Range, + flush: bool, +} + +#[derive(Clone)] +pub struct SetManager { + inner: Arc, +} + +impl Debug for SetManager { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SetManager") + .field("sets", &self.inner.sets) + .field("cache", &self.inner.cache) + .field("set_cache_capacity", &self.inner.set_cache_capacity) + .field("set_size", &self.inner.set_size) + .field("device", &self.inner.device) + .field("regions", &self.inner.regions) + .field("flush", &self.inner.flush) + .finish() + } +} + +impl SetManager { + pub fn new( + set_size: usize, + set_cache_capacity: usize, + device: MonitoredDevice, + regions: Range, + flush: bool, + ) -> Self { + let sets = (device.region_size() / set_size) * (regions.end - regions.start) as usize; + assert!(sets > 0); + + let sets = (0..sets).map(|_| RwLock::default()).collect_vec(); + let cache = Mutex::new(OrderedHashMap::with_capacity(set_cache_capacity)); + + let inner = SetManagerInner { + sets, + cache, + set_cache_capacity, + set_size, + device, + regions, + flush, + }; + let inner = Arc::new(inner); + Self { inner } + } + + pub async fn write(&self, id: SetId) -> Result> { + let guard = self.inner.sets[id as usize].write().await; + + let invalid = self.inner.cache.lock().await.remove(&id); + let storage = match invalid { + // `guard` already guarantees that there is only one storage reference left. + Some(storage) => Arc::into_inner(storage).unwrap(), + None => self.storage(id).await?, + }; + + Ok(SetWriteGuard { + bloom_filter: guard, + id, + set: SetMut::new(storage), + drop: DropPanicGuard::default(), + }) + } + + pub async fn read(&self, id: SetId, hash: u64) -> Result>> { + let guard = self.inner.sets[id as usize].read().await; + if !guard.lookup(hash) { + return Ok(None); + } + + let mut cache = self.inner.cache.lock().await; + let cached = cache.get(&id).cloned(); + let storage = match cached { + Some(storage) => storage, + None => { + let storage = self.storage(id).await?; + let storage = Arc::new(storage); + cache.insert(id, storage.clone()); + if cache.len() > self.inner.set_cache_capacity { + cache.pop_front(); + strict_assert!(cache.len() <= self.inner.set_cache_capacity); + } + storage + } + }; + drop(cache); + + Ok(Some(SetReadGuard { + _bloom_filter: guard, + _id: id, + set: Set::new(storage), + })) + } + + pub async fn apply(&self, mut guard: SetWriteGuard<'_>) -> Result<()> { + let mut storage = guard.set.into_storage(); + + // Update in-memory bloom filter. + storage.update(); + *guard.bloom_filter = storage.bloom_filter().clone(); + + let buffer = storage.freeze(); + + let (region, offset) = self.locate(guard.id); + self.inner.device.write(buffer, region, offset).await?; + if self.inner.flush { + self.inner.device.flush(Some(region)).await?; + } + guard.drop.disable(); + drop(guard.bloom_filter); + Ok(()) + } + + pub async fn contains(&self, id: SetId, hash: u64) -> bool { + let guard = self.inner.sets[id as usize].read().await; + guard.lookup(hash) + } + + pub fn sets(&self) -> usize { + self.inner.sets.len() + } + + pub fn set_size(&self) -> usize { + self.inner.set_size + } + + async fn storage(&self, id: SetId) -> Result { + let (region, offset) = self.locate(id); + let buffer = self.inner.device.read(region, offset, self.inner.set_size).await?; + let storage = SetStorage::load(buffer); + Ok(storage) + } + + #[inline] + fn region_sets(&self) -> usize { + self.inner.device.region_size() / self.inner.set_size + } + + #[inline] + fn locate(&self, id: SetId) -> (RegionId, u64) { + let region_sets = self.region_sets(); + let region = id as RegionId / region_sets as RegionId; + let offset = ((id as usize % region_sets) * self.inner.set_size) as u64; + (region, offset) + } +} + +#[derive(Debug, Default)] +struct DropPanicGuard { + disabled: bool, +} + +impl Drop for DropPanicGuard { + fn drop(&mut self) { + if !self.disabled { + panic!("unexpected drop panic guard drop"); + } + } +} + +impl DropPanicGuard { + fn disable(&mut self) { + self.disabled = true; + } +} + +#[derive(Debug)] +pub struct SetWriteGuard<'a> { + bloom_filter: RwLockWriteGuard<'a, BloomFilterU64<4>>, + id: SetId, + set: SetMut, + drop: DropPanicGuard, +} + +impl<'a> Deref for SetWriteGuard<'a> { + type Target = SetMut; + + fn deref(&self) -> &Self::Target { + &self.set + } +} + +impl<'a> DerefMut for SetWriteGuard<'a> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.set + } +} + +#[derive(Debug)] +pub struct SetReadGuard<'a> { + _bloom_filter: RwLockReadGuard<'a, BloomFilterU64<4>>, + _id: SetId, + set: Set, +} + +impl<'a> Deref for SetReadGuard<'a> { + type Target = Set; + + fn deref(&self) -> &Self::Target { + &self.set + } +} diff --git a/foyer-storage/src/storage/either.rs b/foyer-storage/src/storage/either.rs index c6c34c16..30788549 100644 --- a/foyer-storage/src/storage/either.rs +++ b/foyer-storage/src/storage/either.rs @@ -21,11 +21,12 @@ use futures::{ future::{join, ready, select, try_join, Either as EitherFuture}, pin_mut, Future, FutureExt, }; +use serde::{Deserialize, Serialize}; use crate::{error::Result, storage::Storage, DeviceStats}; /// Order of ops. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub enum Order { /// Use the left engine first. /// diff --git a/foyer-storage/src/store.rs b/foyer-storage/src/store.rs index 88a04d00..b4d5ae16 100644 --- a/foyer-storage/src/store.rs +++ b/foyer-storage/src/store.rs @@ -12,10 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{borrow::Borrow, fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc, time::Instant}; +use std::{ + borrow::Borrow, + fmt::{Debug, Display}, + hash::Hash, + marker::PhantomData, + str::FromStr, + sync::Arc, + time::Instant, +}; use ahash::RandomState; use foyer_common::{ + bits, code::{HashBuilder, StorageKey, StorageValue}, metrics::Metrics, runtime::BackgroundShutdownRuntime, @@ -29,9 +38,9 @@ use crate::{ device::{ direct_fs::DirectFsDeviceOptions, monitor::{DeviceStats, Monitored, MonitoredOptions}, - DeviceOptions, RegionId, + DeviceOptions, RegionId, ALIGN, }, - engine::{Engine, EngineConfig, SizeSelector}, + engine::{EngineConfig, EngineEnum, SizeSelector}, error::{Error, Result}, large::{generic::GenericLargeStorageConfig, recover::RecoverMode, tombstone::TombstoneLogConfig}, picker::{ @@ -67,7 +76,7 @@ where { memory: Cache, - engine: Engine, + engine: EngineEnum, admission_picker: Arc>, @@ -187,15 +196,6 @@ where self.inner.engine.stats() } - /// Get the runtime handles. - #[deprecated( - since = "0.11.5", - note = "The function will be renamed to \"runtime()\", use it instead." - )] - pub fn runtimes(&self) -> &Runtime { - &self.inner.runtime - } - /// Get the runtime. pub fn runtime(&self) -> &Runtime { &self.inner.runtime @@ -223,36 +223,38 @@ impl From for DeviceConfig { } } -/// [`CombinedConfig`] controls the ratio of the large object disk cache and the small object disk cache. +/// [`Engine`] controls the ratio of the large object disk cache and the small object disk cache. /// -/// If [`CombinedConfig::Combined`] is used, it will use the `Either` engine +/// If [`Engine::Mixed`] is used, it will use the `Either` engine /// with the small object disk cache as the left engine, /// and the large object disk cache as the right engine. -#[derive(Debug, Clone)] -pub enum CombinedConfig { +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub enum Engine { /// All space are used as the large object disk cache. Large, /// All space are used as the small object disk cache. Small, - /// Combined the large object disk cache and the small object disk cache. - Combined { - /// The ratio of the large object disk cache. - large_object_cache_ratio: f64, - /// The serialized entry size threshold to use the large object disk cache. - large_object_threshold: usize, - /// Load order. - load_order: Order, - }, + /// Mixed the large object disk cache and the small object disk cache. + /// + /// The argument controls the ratio of the small object disk cache. + /// + /// Range: [0 ~ 1] + Mixed(f64), } -impl Default for CombinedConfig { +impl Default for Engine { fn default() -> Self { - // TODO(MrCroxx): Use combined cache after small object disk cache is ready. + // TODO(MrCroxx): Use Mixed cache after small object disk cache is ready. Self::Large } } -impl CombinedConfig { +impl Engine { + /// Threshold for distinguishing small and large objects. + pub const OBJECT_SIZE_THRESHOLD: usize = 2048; + /// Check the large object disk cache first, for checking it does NOT involve disk ops. + pub const MIXED_LOAD_ORDER: Order = Order::RightFirst; + /// Default large object disk cache only config. pub fn large() -> Self { Self::Large @@ -263,16 +265,44 @@ impl CombinedConfig { Self::Small } - /// Default combined large object disk cache and small object disk cache only config. - pub fn combined() -> Self { - Self::Combined { - large_object_cache_ratio: 0.5, - large_object_threshold: 4096, - load_order: Order::RightFirst, + /// Default mixed large object disk cache and small object disk cache config. + pub fn mixed() -> Self { + Self::Mixed(0.1) + } +} + +impl Display for Engine { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Engine::Large => write!(f, "large"), + Engine::Small => write!(f, "small"), + Engine::Mixed(ratio) => write!(f, "mixed({ratio})"), } } } +impl FromStr for Engine { + type Err = String; + + fn from_str(s: &str) -> std::result::Result { + const MIXED_PREFIX: &str = "mixed="; + + match s { + "large" => return Ok(Engine::Large), + "small" => return Ok(Engine::Small), + _ => {} + } + + if s.starts_with(MIXED_PREFIX) { + if let Ok(ratio) = s[MIXED_PREFIX.len()..s.len()].parse::() { + return Ok(Engine::Mixed(ratio)); + } + } + + Err(format!("invalid input: {s}")) + } +} + /// Tokio runtime configuration. #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct TokioRuntimeConfig { @@ -315,25 +345,20 @@ where V: StorageValue, S: HashBuilder + Debug, { + name: String, memory: Cache, - name: String, device_config: DeviceConfig, - flush: bool, - indexer_shards: usize, - recover_mode: RecoverMode, - recover_concurrency: usize, - flushers: usize, - reclaimers: usize, - buffer_pool_size: usize, - clean_region_threshold: Option, - eviction_pickers: Vec>, + engine: Engine, + runtime_config: RuntimeConfig, + admission_picker: Arc>, - reinsertion_picker: Arc>, compression: Compression, - tombstone_log_config: Option, - combined_config: CombinedConfig, - runtime_config: RuntimeConfig, + recover_mode: RecoverMode, + flush: bool, + + large: LargeEngineOptions, + small: SmallEngineOptions, } impl StoreBuilder @@ -343,26 +368,26 @@ where S: HashBuilder + Debug, { /// Setup disk cache store for the given in-memory cache. - pub fn new(memory: Cache) -> Self { + pub fn new(memory: Cache, engine: Engine) -> Self { + if matches!(engine, Engine::Mixed(ratio) if !(0.0..=1.0).contains(&ratio)) { + panic!("mixed engine small object disk cache ratio must be a f64 in range [0.0, 1.0]"); + } + Self { - memory, name: "foyer".to_string(), + memory, + device_config: DeviceConfig::None, - flush: false, - indexer_shards: 64, - recover_mode: RecoverMode::Quiet, - recover_concurrency: 8, - flushers: 1, - reclaimers: 1, - buffer_pool_size: 16 * 1024 * 1024, // 16 MiB - clean_region_threshold: None, - eviction_pickers: vec![Box::new(InvalidRatioPicker::new(0.8)), Box::::default()], + engine, + runtime_config: RuntimeConfig::Disabled, + admission_picker: Arc::>::default(), - reinsertion_picker: Arc::>::default(), compression: Compression::None, - tombstone_log_config: None, - combined_config: CombinedConfig::default(), - runtime_config: RuntimeConfig::Disabled, + recover_mode: RecoverMode::Quiet, + flush: false, + + large: LargeEngineOptions::new(), + small: SmallEngineOptions::new(), } } @@ -390,11 +415,11 @@ where self } - /// Set the shard num of the indexer. Each shard has its own lock. + /// Set the compression algorithm of the disk cache store. /// - /// Default: `64`. - pub fn with_indexer_shards(mut self, indexer_shards: usize) -> Self { - self.indexer_shards = indexer_shards; + /// Default: [`Compression::None`]. + pub fn with_compression(mut self, compression: Compression) -> Self { + self.compression = compression; self } @@ -408,88 +433,6 @@ where self } - /// Set the recover concurrency for the disk cache store. - /// - /// Default: `8`. - pub fn with_recover_concurrency(mut self, recover_concurrency: usize) -> Self { - self.recover_concurrency = recover_concurrency; - self - } - - /// Set the flusher count for the disk cache store. - /// - /// The flusher count limits how many regions can be concurrently written. - /// - /// Default: `1`. - pub fn with_flushers(mut self, flushers: usize) -> Self { - self.flushers = flushers; - self - } - - /// Set the reclaimer count for the disk cache store. - /// - /// The reclaimer count limits how many regions can be concurrently reclaimed. - /// - /// Default: `1`. - pub fn with_reclaimers(mut self, reclaimers: usize) -> Self { - self.reclaimers = reclaimers; - self - } - - // FIXME(MrCroxx): remove it after 0.12 - /// Set the total flush buffer threshold. - /// - /// Each flusher shares a volume at `threshold / flushers`. - /// - /// If the buffer of the flush queue exceeds the threshold, the further entries will be ignored. - /// - /// Default: 16 MiB. - #[deprecated( - since = "0.11.4", - note = "The function will be renamed to \"with_buffer_pool_size()\", use it instead." - )] - pub fn with_buffer_threshold(mut self, threshold: usize) -> Self { - self.buffer_pool_size = threshold; - self - } - - /// Set the total flush buffer pool size. - /// - /// Each flusher shares a volume at `threshold / flushers`. - /// - /// If the buffer of the flush queue exceeds the threshold, the further entries will be ignored. - /// - /// Default: 16 MiB. - pub fn with_buffer_pool_size(mut self, buffer_pool_size: usize) -> Self { - self.buffer_pool_size = buffer_pool_size; - self - } - - /// Set the clean region threshold for the disk cache store. - /// - /// The reclaimers only work when the clean region count is equal to or lower than the clean region threshold. - /// - /// Default: the same value as the `reclaimers`. - pub fn with_clean_region_threshold(mut self, clean_region_threshold: usize) -> Self { - self.clean_region_threshold = Some(clean_region_threshold); - self - } - - /// Set the eviction pickers for th disk cache store. - /// - /// The eviction picker is used to pick the region to reclaim. - /// - /// The eviction pickers are applied in order. If the previous eviction picker doesn't pick any region, the next one - /// will be applied. - /// - /// If no eviction picker picks a region, a region will be picked randomly. - /// - /// Default: [ invalid ratio picker { threshold = 0.8 }, fifo picker ] - pub fn with_eviction_pickers(mut self, eviction_pickers: Vec>) -> Self { - self.eviction_pickers = eviction_pickers; - self - } - /// Set the admission pickers for th disk cache store. /// /// The admission picker is used to pick the entries that can be inserted into the disk cache store. @@ -500,57 +443,42 @@ where self } - /// Set the reinsertion pickers for th disk cache store. - /// - /// The reinsertion picker is used to pick the entries that can be reinsertion into the disk cache store while - /// reclaiming. - /// - /// Note: Only extremely important entries should be picked. If too many entries are picked, both insertion and - /// reinsertion will be stuck. - /// - /// Default: [`RejectAllPicker`]. - pub fn with_reinsertion_picker(mut self, reinsertion_picker: Arc>) -> Self { - self.reinsertion_picker = reinsertion_picker; + /// Configure the dedicated runtime for the disk cache store. + pub fn with_runtime_config(mut self, runtime_config: RuntimeConfig) -> Self { + self.runtime_config = runtime_config; self } - /// Set the compression algorithm of the disk cache store. + /// Setup the large object disk cache engine with the given options. /// - /// Default: [`Compression::None`]. - pub fn with_compression(mut self, compression: Compression) -> Self { - self.compression = compression; + /// Otherwise, the default options will be used. See [`LargeEngineOptions`]. + pub fn with_large_object_disk_cache_options(mut self, options: LargeEngineOptions) -> Self { + if matches!(self.engine, Engine::Small { .. }) { + tracing::warn!("[store builder]: Setting up large object disk cache options, but only small object disk cache is enabled."); + } + self.large = options; self } - /// Enable the tombstone log with the given config. + /// Setup the small object disk cache engine with the given options. /// - /// For updatable cache, either the tombstone log or [`RecoverMode::None`] must be enabled to prevent from the - /// phantom entries after reopen. - pub fn with_tombstone_log_config(mut self, tombstone_log_config: TombstoneLogConfig) -> Self { - self.tombstone_log_config = Some(tombstone_log_config); - self - } - - /// Set the ratio of the large object disk cache and the small object disk cache. - pub fn with_combined_config(mut self, combined_config: CombinedConfig) -> Self { - self.combined_config = combined_config; - self - } - - /// Configure the dedicated runtime for the disk cache store. - pub fn with_runtime_config(mut self, runtime_config: RuntimeConfig) -> Self { - self.runtime_config = runtime_config; + /// Otherwise, the default options will be used. See [`SmallEngineOptions`]. + pub fn with_small_object_disk_cache_options(mut self, options: SmallEngineOptions) -> Self { + if matches!(self.engine, Engine::Large { .. }) { + tracing::warn!("[store builder]: Setting up small object disk cache options, but only large object disk cache is enabled."); + } + self.small = options; self } /// Build the disk cache store with the given configuration. pub async fn build(self) -> Result> { - let clean_region_threshold = self.clean_region_threshold.unwrap_or(self.reclaimers); - let memory = self.memory.clone(); let admission_picker = self.admission_picker.clone(); + let metrics = Arc::new(Metrics::new(&self.name)); let statistics = Arc::::default(); + let compression = self.compression; let build_runtime = |config: &TokioRuntimeConfig, suffix: &str| { @@ -596,94 +524,109 @@ where let runtime = runtime.clone(); // Use the user runtime to open engine. tokio::spawn(async move { - match self.device_config { - DeviceConfig::None => { - tracing::warn!( - "[store builder]: No device config set. Use `NoneStore` which always returns `None` for queries." - ); - Engine::open(EngineConfig::Noop).await - } - DeviceConfig::DeviceOptions(options) => { - let device = match Monitored::open(MonitoredOptions { - options, - metrics: metrics.clone(), - }, runtime.clone()) - .await { - Ok(device) => device, - Err(e) =>return Err(e), - }; - match self.combined_config { - CombinedConfig::Large => { - let regions = 0..device.regions() as RegionId; - Engine::open(EngineConfig::Large(GenericLargeStorageConfig { + match self.device_config { + DeviceConfig::None => { + tracing::warn!( + "[store builder]: No device config set. Use `NoneStore` which always returns `None` for queries." + ); + EngineEnum::open(EngineConfig::Noop).await + } + DeviceConfig::DeviceOptions(options) => { + let device = match Monitored::open(MonitoredOptions { + options, + metrics: metrics.clone(), + }, runtime.clone()) + .await { + Ok(device) => device, + Err(e) =>return Err(e), + }; + match self.engine { + Engine::Large => { + let regions = 0..device.regions() as RegionId; + EngineEnum::open(EngineConfig::Large(GenericLargeStorageConfig { + name: self.name, + device, + regions, + compression: self.compression, + flush: self.flush, + indexer_shards: self.large.indexer_shards, + recover_mode: self.recover_mode, + recover_concurrency: self.large.recover_concurrency, + flushers: self.large.flushers, + reclaimers: self.large.reclaimers, + clean_region_threshold: self.large.clean_region_threshold.unwrap_or(self.large.reclaimers), + eviction_pickers: self.large.eviction_pickers, + reinsertion_picker: self.large.reinsertion_picker, + tombstone_log_config: self.large.tombstone_log_config, + buffer_pool_size: self.large.buffer_pool_size, + statistics: statistics.clone(), + runtime, + marker: PhantomData, + })) + .await + } + Engine::Small => { + let regions = 0..device.regions() as RegionId; + EngineEnum::open(EngineConfig::Small(GenericSmallStorageConfig { + set_size: self.small.set_size, + set_cache_capacity: self.small.set_cache_capacity, + device, + regions, + flush: self.flush, + flushers: self.small.flushers, + buffer_pool_size: self.small.buffer_pool_size, + statistics: statistics.clone(), + runtime, + marker: PhantomData, + })) + .await + } + Engine::Mixed(ratio) => { + let small_region_count = std::cmp::max((device.regions() as f64 * ratio) as usize,1); + let small_regions = 0..small_region_count as RegionId; + let large_regions = small_region_count as RegionId..device.regions() as RegionId; + EngineEnum::open(EngineConfig::Mixed(EitherConfig { + selector: SizeSelector::new(Engine::OBJECT_SIZE_THRESHOLD), + left: GenericSmallStorageConfig { + set_size: self.small.set_size, + set_cache_capacity: self.small.set_cache_capacity, + device: device.clone(), + regions: small_regions, + flush: self.flush, + flushers: self.small.flushers, + buffer_pool_size: self.small.buffer_pool_size, + statistics: statistics.clone(), + runtime: runtime.clone(), + marker: PhantomData, + }, + right: GenericLargeStorageConfig { name: self.name, device, - regions, + regions: large_regions, compression: self.compression, flush: self.flush, - indexer_shards: self.indexer_shards, + indexer_shards: self.large.indexer_shards, recover_mode: self.recover_mode, - recover_concurrency: self.recover_concurrency, - flushers: self.flushers, - reclaimers: self.reclaimers, - clean_region_threshold, - eviction_pickers: self.eviction_pickers, - reinsertion_picker: self.reinsertion_picker, - tombstone_log_config: self.tombstone_log_config, - buffer_threshold: self.buffer_pool_size, + recover_concurrency: self.large.recover_concurrency, + flushers: self.large.flushers, + reclaimers: self.large.reclaimers, + clean_region_threshold: self.large.clean_region_threshold.unwrap_or(self.large.reclaimers), + eviction_pickers: self.large.eviction_pickers, + reinsertion_picker: self.large.reinsertion_picker, + tombstone_log_config: self.large.tombstone_log_config, + buffer_pool_size: self.large.buffer_pool_size, statistics: statistics.clone(), runtime, marker: PhantomData, - })) - .await - } - CombinedConfig::Small => { - Engine::open(EngineConfig::Small(GenericSmallStorageConfig { - placeholder: PhantomData, - })) - .await - } - CombinedConfig::Combined { - large_object_cache_ratio, - large_object_threshold, - load_order, - } => { - let large_region_count = (device.regions() as f64 * large_object_cache_ratio) as usize; - let large_regions = - (device.regions() - large_region_count) as RegionId..device.regions() as RegionId; - Engine::open(EngineConfig::Combined(EitherConfig { - selector: SizeSelector::new(large_object_threshold), - left: GenericSmallStorageConfig { - placeholder: PhantomData, - }, - right: GenericLargeStorageConfig { - name: self.name, - device, - regions: large_regions, - compression: self.compression, - flush: self.flush, - indexer_shards: self.indexer_shards, - recover_mode: self.recover_mode, - recover_concurrency: self.recover_concurrency, - flushers: self.flushers, - reclaimers: self.reclaimers, - clean_region_threshold, - eviction_pickers: self.eviction_pickers, - reinsertion_picker: self.reinsertion_picker, - tombstone_log_config: self.tombstone_log_config, - buffer_threshold: self.buffer_pool_size, - statistics: statistics.clone(), - runtime, - marker: PhantomData, - }, - load_order, - })) - .await - } + }, + load_order: Engine::MIXED_LOAD_ORDER, + })) + .await } } } - }).await.unwrap()? + } + }).await.unwrap()? }; let inner = StoreInner { @@ -701,3 +644,241 @@ where Ok(store) } } + +/// Large object disk cache engine default options. +pub struct LargeEngineOptions +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + indexer_shards: usize, + recover_concurrency: usize, + flushers: usize, + reclaimers: usize, + buffer_pool_size: usize, + clean_region_threshold: Option, + eviction_pickers: Vec>, + reinsertion_picker: Arc>, + tombstone_log_config: Option, + + _marker: PhantomData<(K, V, S)>, +} + +impl Default for LargeEngineOptions +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + fn default() -> Self { + Self::new() + } +} + +impl LargeEngineOptions +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + /// Create large object disk cache engine default options. + pub fn new() -> Self { + Self { + indexer_shards: 64, + recover_concurrency: 8, + flushers: 1, + reclaimers: 1, + buffer_pool_size: 16 * 1024 * 1024, // 16 MiB + clean_region_threshold: None, + eviction_pickers: vec![Box::new(InvalidRatioPicker::new(0.8)), Box::::default()], + reinsertion_picker: Arc::>::default(), + tombstone_log_config: None, + _marker: PhantomData, + } + } + + /// Set the shard num of the indexer. Each shard has its own lock. + /// + /// Default: `64`. + pub fn with_indexer_shards(mut self, indexer_shards: usize) -> Self { + self.indexer_shards = indexer_shards; + self + } + + /// Set the recover concurrency for the disk cache store. + /// + /// Default: `8`. + pub fn with_recover_concurrency(mut self, recover_concurrency: usize) -> Self { + self.recover_concurrency = recover_concurrency; + self + } + + /// Set the flusher count for the disk cache store. + /// + /// The flusher count limits how many regions can be concurrently written. + /// + /// Default: `1`. + pub fn with_flushers(mut self, flushers: usize) -> Self { + self.flushers = flushers; + self + } + + /// Set the reclaimer count for the disk cache store. + /// + /// The reclaimer count limits how many regions can be concurrently reclaimed. + /// + /// Default: `1`. + pub fn with_reclaimers(mut self, reclaimers: usize) -> Self { + self.reclaimers = reclaimers; + self + } + + /// Set the total flush buffer pool size. + /// + /// Each flusher shares a volume at `threshold / flushers`. + /// + /// If the buffer of the flush queue exceeds the threshold, the further entries will be ignored. + /// + /// Default: 16 MiB. + pub fn with_buffer_pool_size(mut self, buffer_pool_size: usize) -> Self { + self.buffer_pool_size = buffer_pool_size; + self + } + + /// Set the clean region threshold for the disk cache store. + /// + /// The reclaimers only work when the clean region count is equal to or lower than the clean region threshold. + /// + /// Default: the same value as the `reclaimers`. + pub fn with_clean_region_threshold(mut self, clean_region_threshold: usize) -> Self { + self.clean_region_threshold = Some(clean_region_threshold); + self + } + + /// Set the eviction pickers for th disk cache store. + /// + /// The eviction picker is used to pick the region to reclaim. + /// + /// The eviction pickers are applied in order. If the previous eviction picker doesn't pick any region, the next one + /// will be applied. + /// + /// If no eviction picker picks a region, a region will be picked randomly. + /// + /// Default: [ invalid ratio picker { threshold = 0.8 }, fifo picker ] + pub fn with_eviction_pickers(mut self, eviction_pickers: Vec>) -> Self { + self.eviction_pickers = eviction_pickers; + self + } + + /// Set the reinsertion pickers for th disk cache store. + /// + /// The reinsertion picker is used to pick the entries that can be reinsertion into the disk cache store while + /// reclaiming. + /// + /// Note: Only extremely important entries should be picked. If too many entries are picked, both insertion and + /// reinsertion will be stuck. + /// + /// Default: [`RejectAllPicker`]. + pub fn with_reinsertion_picker(mut self, reinsertion_picker: Arc>) -> Self { + self.reinsertion_picker = reinsertion_picker; + self + } + + /// Enable the tombstone log with the given config. + /// + /// For updatable cache, either the tombstone log or [`RecoverMode::None`] must be enabled to prevent from the + /// phantom entries after reopen. + pub fn with_tombstone_log_config(mut self, tombstone_log_config: TombstoneLogConfig) -> Self { + self.tombstone_log_config = Some(tombstone_log_config); + self + } +} + +/// Small object disk cache engine default options. +pub struct SmallEngineOptions +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + set_size: usize, + set_cache_capacity: usize, + buffer_pool_size: usize, + flushers: usize, + + _marker: PhantomData<(K, V, S)>, +} + +impl Default for SmallEngineOptions +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + fn default() -> Self { + Self::new() + } +} + +/// Create small object disk cache engine default options. +impl SmallEngineOptions +where + K: StorageKey, + V: StorageValue, + S: HashBuilder + Debug, +{ + /// Create small object disk cache engine default options. + pub fn new() -> Self { + Self { + set_size: 16 * 1024, // 16 KiB + set_cache_capacity: 64, // 64 sets + flushers: 1, + buffer_pool_size: 4 * 1024 * 1024, // 4 MiB + _marker: PhantomData, + } + } + + /// Set the set size of the set-associated cache. + /// + /// The set size will be 4K aligned. + /// + /// Default: 16 KiB + pub fn with_set_size(mut self, set_size: usize) -> Self { + bits::assert_aligned(ALIGN, set_size); + self.set_size = set_size; + self + } + + /// Set the capacity of the set cache. + /// + /// Count by set amount. + /// + /// Default: 64 + pub fn with_set_cache_capacity(mut self, set_cache_capacity: usize) -> Self { + self.set_cache_capacity = set_cache_capacity; + self + } + + /// Set the total flush buffer pool size. + /// + /// Each flusher shares a volume at `threshold / flushers`. + /// + /// If the buffer of the flush queue exceeds the threshold, the further entries will be ignored. + /// + /// Default: 4 MiB. + pub fn with_buffer_pool_size(mut self, buffer_pool_size: usize) -> Self { + self.buffer_pool_size = buffer_pool_size; + self + } + + /// Set the flusher count for the disk cache store. + /// + /// The flusher count limits how many regions can be concurrently written. + /// + /// Default: `1`. + pub fn with_flushers(mut self, flushers: usize) -> Self { + self.flushers = flushers; + self + } +} diff --git a/foyer-storage/tests/storage_test.rs b/foyer-storage/tests/storage_test.rs index e7983577..e57bbf9c 100644 --- a/foyer-storage/tests/storage_test.rs +++ b/foyer-storage/tests/storage_test.rs @@ -18,7 +18,9 @@ use std::{path::Path, sync::Arc, time::Duration}; use ahash::RandomState; use foyer_memory::{Cache, CacheBuilder, CacheEntry, FifoConfig}; -use foyer_storage::{test_utils::Recorder, Compression, DirectFsDeviceOptionsBuilder, StoreBuilder}; +use foyer_storage::{ + test_utils::Recorder, Compression, DirectFsDeviceOptionsBuilder, Engine, LargeEngineOptions, StoreBuilder, +}; const KB: usize = 1024; const MB: usize = 1024 * 1024; @@ -106,18 +108,22 @@ fn basic( path: impl AsRef, recorder: &Arc>, ) -> StoreBuilder> { - StoreBuilder::new(memory.clone()) + // TODO(MrCroxx): Test mixed engine here. + StoreBuilder::new(memory.clone(), Engine::Large) .with_device_config( DirectFsDeviceOptionsBuilder::new(path) .with_capacity(4 * MB) .with_file_size(MB) .build(), ) - .with_indexer_shards(4) .with_admission_picker(recorder.clone()) - .with_reinsertion_picker(recorder.clone()) - .with_recover_concurrency(2) .with_flush(true) + .with_large_object_disk_cache_options( + LargeEngineOptions::new() + .with_recover_concurrency(2) + .with_indexer_shards(4) + .with_reinsertion_picker(recorder.clone()), + ) } #[test_log::test(tokio::test)] diff --git a/foyer/src/hybrid/builder.rs b/foyer/src/hybrid/builder.rs index 34eb69be..ab01c3c1 100644 --- a/foyer/src/hybrid/builder.rs +++ b/foyer/src/hybrid/builder.rs @@ -22,8 +22,8 @@ use foyer_common::{ }; use foyer_memory::{Cache, CacheBuilder, EvictionConfig, Weighter}; use foyer_storage::{ - AdmissionPicker, Compression, DeviceConfig, EvictionPicker, RecoverMode, ReinsertionPicker, RuntimeConfig, - StoreBuilder, TombstoneLogConfig, + AdmissionPicker, Compression, DeviceConfig, Engine, LargeEngineOptions, RecoverMode, RuntimeConfig, + SmallEngineOptions, StoreBuilder, }; use crate::HybridCache; @@ -173,11 +173,11 @@ where } } - /// Continue to modify the in-memory cache configurations. - pub fn storage(self) -> HybridCacheBuilderPhaseStorage { + /// Continue to modify the disk cache configurations. + pub fn storage(self, engine: Engine) -> HybridCacheBuilderPhaseStorage { let memory = self.builder.build(); HybridCacheBuilderPhaseStorage { - builder: StoreBuilder::new(memory.clone()).with_name(&self.name), + builder: StoreBuilder::new(memory.clone(), engine).with_name(&self.name), name: self.name, tracing_config: self.tracing_config, memory, @@ -228,19 +228,6 @@ where } } - /// Set the shard num of the indexer. Each shard has its own lock. - /// - /// Default: `64`. - pub fn with_indexer_shards(self, indexer_shards: usize) -> Self { - let builder = self.builder.with_indexer_shards(indexer_shards); - Self { - name: self.name, - tracing_config: self.tracing_config, - memory: self.memory, - builder, - } - } - /// Set the recover mode for the disk cache store. /// /// See more in [`RecoverMode`]. @@ -256,123 +243,6 @@ where } } - /// Set the recover concurrency for the disk cache store. - /// - /// Default: `8`. - pub fn with_recover_concurrency(self, recover_concurrency: usize) -> Self { - let builder = self.builder.with_recover_concurrency(recover_concurrency); - Self { - name: self.name, - tracing_config: self.tracing_config, - memory: self.memory, - builder, - } - } - - /// Set the flusher count for the disk cache store. - /// - /// The flusher count limits how many regions can be concurrently written. - /// - /// Default: `1`. - pub fn with_flushers(self, flushers: usize) -> Self { - let builder = self.builder.with_flushers(flushers); - Self { - name: self.name, - tracing_config: self.tracing_config, - memory: self.memory, - builder, - } - } - - /// Set the reclaimer count for the disk cache store. - /// - /// The reclaimer count limits how many regions can be concurrently reclaimed. - /// - /// Default: `1`. - pub fn with_reclaimers(self, reclaimers: usize) -> Self { - let builder = self.builder.with_reclaimers(reclaimers); - Self { - name: self.name, - tracing_config: self.tracing_config, - memory: self.memory, - builder, - } - } - - // FIXME(MrCroxx): remove it after 0.12 - /// Set the total flush buffer threshold. - /// - /// Each flusher shares a volume at `threshold / flushers`. - /// - /// If the buffer of the flush queue exceeds the threshold, the further entries will be ignored. - /// - /// Default: 16 MiB. - #[deprecated( - since = "0.11.4", - note = "The function will be renamed to \"with_buffer_pool_size()\", use it instead." - )] - pub fn with_buffer_threshold(self, threshold: usize) -> Self { - let builder = self.builder.with_buffer_pool_size(threshold); - Self { - name: self.name, - tracing_config: self.tracing_config, - memory: self.memory, - builder, - } - } - - /// Set the total flush buffer pool size. - /// - /// Each flusher shares a volume at `threshold / flushers`. - /// - /// If the buffer of the flush queue exceeds the threshold, the further entries will be ignored. - /// - /// Default: 16 MiB. - pub fn with_buffer_pool_size(self, buffer_pool_size: usize) -> Self { - let builder = self.builder.with_buffer_pool_size(buffer_pool_size); - Self { - name: self.name, - tracing_config: self.tracing_config, - memory: self.memory, - builder, - } - } - - /// Set the clean region threshold for the disk cache store. - /// - /// The reclaimers only work when the clean region count is equal to or lower than the clean region threshold. - /// - /// Default: the same value as the `reclaimers`. - pub fn with_clean_region_threshold(self, clean_region_threshold: usize) -> Self { - let builder = self.builder.with_clean_region_threshold(clean_region_threshold); - Self { - name: self.name, - tracing_config: self.tracing_config, - memory: self.memory, - builder, - } - } - - /// Set the eviction pickers for th disk cache store. - /// - /// The eviction picker is used to pick the region to reclaim. - /// - /// The eviction pickers are applied in order. If the previous eviction picker doesn't pick any region, the next one - /// will be applied. - /// - /// If no eviction picker picks a region, a region will be picked randomly. - /// - /// Default: [ invalid ratio picker { threshold = 0.8 }, fifo picker ] - pub fn with_eviction_pickers(self, eviction_pickers: Vec>) -> Self { - let builder = self.builder.with_eviction_pickers(eviction_pickers); - Self { - name: self.name, - tracing_config: self.tracing_config, - memory: self.memory, - builder, - } - } - /// Set the admission pickers for th disk cache store. /// /// The admission picker is used to pick the entries that can be inserted into the disk cache store. @@ -388,17 +258,11 @@ where } } - /// Set the reinsertion pickers for th disk cache store. - /// - /// The reinsertion picker is used to pick the entries that can be reinsertion into the disk cache store while - /// reclaiming. - /// - /// Note: Only extremely important entries should be picked. If too many entries are picked, both insertion and - /// reinsertion will be stuck. + /// Set the compression algorithm of the disk cache store. /// - /// Default: [`RejectAllPicker`]. - pub fn with_reinsertion_picker(self, reinsertion_picker: Arc>) -> Self { - let builder = self.builder.with_reinsertion_picker(reinsertion_picker); + /// Default: [`Compression::None`]. + pub fn with_compression(self, compression: Compression) -> Self { + let builder = self.builder.with_compression(compression); Self { name: self.name, tracing_config: self.tracing_config, @@ -407,11 +271,9 @@ where } } - /// Set the compression algorithm of the disk cache store. - /// - /// Default: [`Compression::None`]. - pub fn with_compression(self, compression: Compression) -> Self { - let builder = self.builder.with_compression(compression); + /// Configure the dedicated runtime for the disk cache store. + pub fn with_runtime_config(self, runtime_config: RuntimeConfig) -> Self { + let builder = self.builder.with_runtime_config(runtime_config); Self { name: self.name, tracing_config: self.tracing_config, @@ -420,12 +282,11 @@ where } } - /// Enable the tombstone log with the given config. + /// Setup the large object disk cache engine with the given options. /// - /// For updatable cache, either the tombstone log or [`RecoverMode::None`] must be enabled to prevent from the - /// phantom entries after reopen. - pub fn with_tombstone_log_config(self, tombstone_log_config: TombstoneLogConfig) -> Self { - let builder = self.builder.with_tombstone_log_config(tombstone_log_config); + /// Otherwise, the default options will be used. See [`LargeEngineOptions`]. + pub fn with_large_object_disk_cache_options(self, options: LargeEngineOptions) -> Self { + let builder = self.builder.with_large_object_disk_cache_options(options); Self { name: self.name, tracing_config: self.tracing_config, @@ -434,9 +295,11 @@ where } } - /// Configure the dedicated runtime for the disk cache store. - pub fn with_runtime_config(self, runtime_config: RuntimeConfig) -> Self { - let builder = self.builder.with_runtime_config(runtime_config); + /// Setup the small object disk cache engine with the given options. + /// + /// Otherwise, the default options will be used. See [`SmallEngineOptions`]. + pub fn with_small_object_disk_cache_options(self, options: SmallEngineOptions) -> Self { + let builder = self.builder.with_small_object_disk_cache_options(options); Self { name: self.name, tracing_config: self.tracing_config, diff --git a/foyer/src/hybrid/cache.rs b/foyer/src/hybrid/cache.rs index 5dbc3e20..8e726832 100644 --- a/foyer/src/hybrid/cache.rs +++ b/foyer/src/hybrid/cache.rs @@ -549,7 +549,8 @@ mod tests { HybridCacheBuilder::new() .with_name("test") .memory(4 * MB) - .storage() + // TODO(MrCroxx): Test with `Engine::Mixed`. + .storage(Engine::Large) .with_device_config( DirectFsDeviceOptionsBuilder::new(dir) .with_capacity(16 * MB) @@ -572,7 +573,8 @@ mod tests { HybridCacheBuilder::new() .with_name("test") .memory(4 * MB) - .storage() + // TODO(MrCroxx): Test with `Engine::Mixed`. + .storage(Engine::Large) .with_device_config( DirectFsDeviceOptionsBuilder::new(dir) .with_capacity(16 * MB) diff --git a/foyer/src/prelude.rs b/foyer/src/prelude.rs index d8af4d38..a9c426fa 100644 --- a/foyer/src/prelude.rs +++ b/foyer/src/prelude.rs @@ -26,9 +26,9 @@ pub use memory::{ pub use storage::{ AdmissionPicker, AdmitAllPicker, Compression, Dev, DevExt, DevOptions, DeviceStats, DirectFileDevice, DirectFileDeviceOptions, DirectFileDeviceOptionsBuilder, DirectFsDevice, DirectFsDeviceOptions, - DirectFsDeviceOptionsBuilder, EvictionPicker, FifoPicker, InvalidRatioPicker, RateLimitPicker, RecoverMode, - ReinsertionPicker, RejectAllPicker, Runtime, RuntimeConfig, Storage, Store, StoreBuilder, TokioRuntimeConfig, - TombstoneLogConfigBuilder, + DirectFsDeviceOptionsBuilder, Engine, EvictionPicker, FifoPicker, InvalidRatioPicker, LargeEngineOptions, + RateLimitPicker, RecoverMode, ReinsertionPicker, RejectAllPicker, Runtime, RuntimeConfig, SmallEngineOptions, + Storage, Store, StoreBuilder, TokioRuntimeConfig, TombstoneLogConfigBuilder, }; pub use crate::hybrid::{