From 56e4073e3fbf10d36e157f67aeb3b15b97439897 Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Mon, 23 Sep 2024 07:58:49 +0000 Subject: [PATCH] feat: impl the new builder, update bench and examples Signed-off-by: MrCroxx --- examples/hybrid.rs | 4 +- examples/hybrid_full.rs | 41 +++++++++------ examples/tail_based_tracing.rs | 4 +- foyer-bench/src/main.rs | 42 +++++++++------ foyer-storage/src/store.rs | 93 ++++++++++++++++++++++++---------- foyer/src/hybrid/builder.rs | 31 +++++++++++- 6 files changed, 151 insertions(+), 64 deletions(-) diff --git a/examples/hybrid.rs b/examples/hybrid.rs index e8411295..2bf7a450 100644 --- a/examples/hybrid.rs +++ b/examples/hybrid.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use foyer::{DirectFsDeviceOptionsBuilder, HybridCache, HybridCacheBuilder}; +use foyer::{DirectFsDeviceOptionsBuilder, Engine, HybridCache, HybridCacheBuilder}; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -20,7 +20,7 @@ async fn main() -> anyhow::Result<()> { let hybrid: HybridCache = HybridCacheBuilder::new() .memory(64 * 1024 * 1024) - .storage() + .storage(Engine::Large) // use large object disk cache engine only .with_device_config( DirectFsDeviceOptionsBuilder::new(dir.path()) .with_capacity(256 * 1024 * 1024) diff --git a/examples/hybrid_full.rs b/examples/hybrid_full.rs index 97f98500..60cafb8e 100644 --- a/examples/hybrid_full.rs +++ b/examples/hybrid_full.rs @@ -17,8 +17,8 @@ use std::sync::Arc; use anyhow::Result; use chrono::Datelike; use foyer::{ - DirectFsDeviceOptionsBuilder, FifoPicker, HybridCache, HybridCacheBuilder, LruConfig, RateLimitPicker, RecoverMode, - RuntimeConfig, TokioRuntimeConfig, TombstoneLogConfigBuilder, + DirectFsDeviceOptionsBuilder, Engine, FifoPicker, HybridCache, HybridCacheBuilder, LargeEngineOptions, LruConfig, + RateLimitPicker, RecoverMode, RuntimeConfig, SmallEngineOptions, TokioRuntimeConfig, TombstoneLogConfigBuilder, }; use tempfile::tempdir; @@ -35,7 +35,7 @@ async fn main() -> Result<()> { .with_object_pool_capacity(1024) .with_hash_builder(ahash::RandomState::default()) .with_weighter(|_key, value: &String| value.len()) - .storage() + .storage(Engine::Mixed(0.1)) .with_device_config( DirectFsDeviceOptionsBuilder::new(dir.path()) .with_capacity(64 * 1024 * 1024) @@ -43,22 +43,9 @@ async fn main() -> Result<()> { .build(), ) .with_flush(true) - .with_indexer_shards(64) .with_recover_mode(RecoverMode::Quiet) - .with_recover_concurrency(8) - .with_flushers(2) - .with_reclaimers(2) - .with_buffer_pool_size(256 * 1024 * 1024) - .with_clean_region_threshold(4) - .with_eviction_pickers(vec![Box::::default()]) .with_admission_picker(Arc::new(RateLimitPicker::new(100 * 1024 * 1024))) - .with_reinsertion_picker(Arc::new(RateLimitPicker::new(10 * 1024 * 1024))) .with_compression(foyer::Compression::Lz4) - .with_tombstone_log_config( - TombstoneLogConfigBuilder::new(dir.path().join("tombstone-log-file")) - .with_flush(true) - .build(), - ) .with_runtime_config(RuntimeConfig::Separated { read_runtime_config: TokioRuntimeConfig { worker_threads: 4, @@ -69,6 +56,28 @@ async fn main() -> Result<()> { max_blocking_threads: 8, }, }) + .with_large_object_disk_cache_options( + LargeEngineOptions::new() + .with_indexer_shards(64) + .with_recover_concurrency(8) + .with_flushers(2) + .with_reclaimers(2) + .with_buffer_pool_size(256 * 1024 * 1024) + .with_clean_region_threshold(4) + .with_eviction_pickers(vec![Box::::default()]) + .with_reinsertion_picker(Arc::new(RateLimitPicker::new(10 * 1024 * 1024))) + .with_tombstone_log_config( + TombstoneLogConfigBuilder::new(dir.path().join("tombstone-log-file")) + .with_flush(true) + .build(), + ), + ) + .with_small_object_disk_cache_options( + SmallEngineOptions::new() + .with_set_size(16 * 1024) + .with_set_cache_capacity(64) + .with_flushers(2), + ) .build() .await?; diff --git a/examples/tail_based_tracing.rs b/examples/tail_based_tracing.rs index e74a0cef..171aebea 100644 --- a/examples/tail_based_tracing.rs +++ b/examples/tail_based_tracing.rs @@ -14,7 +14,7 @@ use std::time::Duration; -use foyer::{DirectFsDeviceOptionsBuilder, HybridCache, HybridCacheBuilder}; +use foyer::{DirectFsDeviceOptionsBuilder, Engine, HybridCache, HybridCacheBuilder}; #[cfg(feature = "jaeger")] fn init_jaeger_exporter() { @@ -70,7 +70,7 @@ async fn main() -> anyhow::Result<()> { let hybrid: HybridCache = HybridCacheBuilder::new() .memory(64 * 1024 * 1024) - .storage() + .storage(Engine::Large) .with_device_config( DirectFsDeviceOptionsBuilder::new(dir.path()) .with_capacity(256 * 1024 * 1024) diff --git a/foyer-bench/src/main.rs b/foyer-bench/src/main.rs index f72c30ea..95380c6f 100644 --- a/foyer-bench/src/main.rs +++ b/foyer-bench/src/main.rs @@ -34,9 +34,9 @@ use analyze::{analyze, monitor, Metrics}; use bytesize::ByteSize; use clap::{builder::PossibleValuesParser, ArgGroup, Parser}; use foyer::{ - Compression, DirectFileDeviceOptionsBuilder, DirectFsDeviceOptionsBuilder, FifoConfig, FifoPicker, HybridCache, - HybridCacheBuilder, InvalidRatioPicker, LfuConfig, LruConfig, RateLimitPicker, RecoverMode, RuntimeConfig, - S3FifoConfig, TokioRuntimeConfig, TracingConfig, + Compression, DirectFileDeviceOptionsBuilder, DirectFsDeviceOptionsBuilder, Engine, FifoConfig, FifoPicker, + HybridCache, HybridCacheBuilder, InvalidRatioPicker, LargeEngineOptions, LfuConfig, LruConfig, RateLimitPicker, + RecoverMode, RuntimeConfig, S3FifoConfig, TokioRuntimeConfig, TracingConfig, }; use futures::future::join_all; use itertools::Itertools; @@ -204,6 +204,11 @@ pub struct Args { #[arg(long, value_enum, default_value_t = Compression::None)] compression: Compression, + // TODO(MrCroxx): use mixed engine by default. + /// Disk cache engine. + #[arg(long, default_value_t = Engine::Large)] + engine: Engine, + /// Time-series operation distribution. /// /// Available values: "none", "uniform", "zipf". @@ -448,7 +453,7 @@ async fn benchmark(args: Args) { let mut builder = builder .with_weighter(|_: &u64, value: &Value| u64::BITS as usize / 8 + value.len()) - .storage(); + .storage(args.engine); builder = match (args.file.as_ref(), args.dir.as_ref()) { (Some(file), None) => builder.with_device_config( @@ -468,15 +473,7 @@ async fn benchmark(args: Args) { builder = builder .with_flush(args.flush) - .with_indexer_shards(args.shards) .with_recover_mode(args.recover_mode) - .with_recover_concurrency(args.recover_concurrency) - .with_flushers(args.flushers) - .with_reclaimers(args.reclaimers) - .with_eviction_pickers(vec![ - Box::new(InvalidRatioPicker::new(args.invalid_ratio)), - Box::::default(), - ]) .with_compression(args.compression) .with_runtime_config(match args.runtime.as_str() { "disabled" => RuntimeConfig::Disabled, @@ -497,20 +494,33 @@ async fn benchmark(args: Args) { _ => unreachable!(), }); + let mut large = LargeEngineOptions::new() + .with_indexer_shards(args.shards) + .with_recover_concurrency(args.recover_concurrency) + .with_flushers(args.flushers) + .with_reclaimers(args.reclaimers) + .with_eviction_pickers(vec![ + Box::new(InvalidRatioPicker::new(args.invalid_ratio)), + Box::::default(), + ]); + if args.admission_rate_limit.as_u64() > 0 { builder = builder.with_admission_picker(Arc::new(RateLimitPicker::new(args.admission_rate_limit.as_u64() as _))); } if args.reinsertion_rate_limit.as_u64() > 0 { - builder = - builder.with_reinsertion_picker(Arc::new(RateLimitPicker::new(args.admission_rate_limit.as_u64() as _))); + large = large.with_reinsertion_picker(Arc::new(RateLimitPicker::new(args.admission_rate_limit.as_u64() as _))); } if args.clean_region_threshold > 0 { - builder = builder.with_clean_region_threshold(args.clean_region_threshold); + large = large.with_clean_region_threshold(args.clean_region_threshold); } - let hybrid = builder.build().await.unwrap(); + let hybrid = builder + .with_large_object_disk_cache_options(large) + .build() + .await + .unwrap(); #[cfg(feature = "mtrace")] hybrid.enable_tracing(); diff --git a/foyer-storage/src/store.rs b/foyer-storage/src/store.rs index fc8e8d9f..9f073bcb 100644 --- a/foyer-storage/src/store.rs +++ b/foyer-storage/src/store.rs @@ -12,7 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{borrow::Borrow, fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc, time::Instant}; +use std::{ + borrow::Borrow, + fmt::{Debug, Display}, + hash::Hash, + marker::PhantomData, + str::FromStr, + sync::Arc, + time::Instant, +}; use ahash::RandomState; use foyer_common::{ @@ -251,21 +259,18 @@ impl From for DeviceConfig { /// If [`Engine::Mixed`] is used, it will use the `Either` engine /// with the small object disk cache as the left engine, /// and the large object disk cache as the right engine. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub enum Engine { /// All space are used as the large object disk cache. Large, /// All space are used as the small object disk cache. Small, /// Mixed the large object disk cache and the small object disk cache. - Mixed { - /// The ratio of the large object disk cache. - large_object_cache_ratio: f64, - /// The serialized entry size threshold to use the large object disk cache. - large_object_threshold: usize, - /// Load order. - load_order: Order, - }, + /// + /// The argument controls the ratio of the small object disk cache. + /// + /// Range: [0 ~ 1] + Mixed(f64), } impl Default for Engine { @@ -276,6 +281,11 @@ impl Default for Engine { } impl Engine { + /// Threshold for distinguishing small and large objects. + pub const LARGE_OBJECT_SIZE_THRESHOLD: usize = 2048; + /// Check the large object disk cache first, for checking it does NOT involve disk ops. + pub const MIXED_LOAD_ORDER: Order = Order::RightFirst; + /// Default large object disk cache only config. pub fn large() -> Self { Self::Large @@ -286,16 +296,45 @@ impl Engine { Self::Small } - /// Default mixed large object disk cache and small object disk cache only config. + /// Default mixed large object disk cache and small object disk cache config. pub fn mixed() -> Self { - Self::Mixed { - large_object_cache_ratio: 0.5, - large_object_threshold: 4096, - load_order: Order::RightFirst, + Self::Mixed(0.1) + } +} + +impl Display for Engine { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Engine::Large => write!(f, "large"), + Engine::Small => write!(f, "small"), + Engine::Mixed(ratio) => write!(f, "mixed({ratio})"), } } } +impl FromStr for Engine { + type Err = String; + + fn from_str(s: &str) -> std::result::Result { + const MIXED_PREFIX: &str = "mixed("; + const MIXED_SUFFIX: &str = ")"; + + match s { + "large" => return Ok(Engine::Large), + "small" => return Ok(Engine::Small), + _ => {} + } + + if s.starts_with(MIXED_PREFIX) && s.ends_with(MIXED_SUFFIX) { + if let Ok(ratio) = (&s[MIXED_PREFIX.len()..s.len() - MIXED_SUFFIX.len()]).parse::() { + return Ok(Engine::Mixed(ratio)); + } + } + + Err(format!("invalid input: {s}")) + } +} + /// Tokio runtime configuration. #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct TokioRuntimeConfig { @@ -372,6 +411,10 @@ where { /// Setup disk cache store for the given in-memory cache. pub fn new(memory: Cache, engine: Engine) -> Self { + if matches!(engine, Engine::Mixed(ratio) if ratio <0.0 && ratio > 1.0) { + panic!("mixed engine small object disk cache ratio must be a f64 in range [0.0, 1.0]"); + } + Self { name: "foyer".to_string(), memory, @@ -596,17 +639,12 @@ where })) .await } - Engine::Mixed { - large_object_cache_ratio, - large_object_threshold, - load_order, - } => { - let large_region_count = (device.regions() as f64 * large_object_cache_ratio) as usize; - let large_regions = - (device.regions() - large_region_count) as RegionId..device.regions() as RegionId; - let small_regions = 0..((device.regions() - large_region_count) as RegionId); + Engine::Mixed(ratio) => { + let small_region_count = std::cmp::max((device.regions() as f64 * ratio) as usize,1); + let small_regions = 0..small_region_count as RegionId; + let large_regions = small_region_count as RegionId..device.regions() as RegionId; EngineEnum::open(EngineConfig::Mixed(EitherConfig { - selector: SizeSelector::new(large_object_threshold), + selector: SizeSelector::new(Engine::LARGE_OBJECT_SIZE_THRESHOLD), left: GenericSmallStorageConfig { set_size: self.small.set_size, set_cache_capacity: self.small.set_cache_capacity, @@ -641,7 +679,7 @@ where user_runtime_handle, marker: PhantomData, }, - load_order, + load_order: Engine::MIXED_LOAD_ORDER, })) .await } @@ -848,7 +886,8 @@ where V: StorageValue, S: HashBuilder + Debug, { - fn new() -> Self { + /// Create small object disk cache engine default options. + pub fn new() -> Self { Self { set_size: 16 * 1024, // 16 KiB set_cache_capacity: 64, // 64 sets diff --git a/foyer/src/hybrid/builder.rs b/foyer/src/hybrid/builder.rs index 4415dc4d..ab01c3c1 100644 --- a/foyer/src/hybrid/builder.rs +++ b/foyer/src/hybrid/builder.rs @@ -21,7 +21,10 @@ use foyer_common::{ tracing::TracingConfig, }; use foyer_memory::{Cache, CacheBuilder, EvictionConfig, Weighter}; -use foyer_storage::{AdmissionPicker, Compression, DeviceConfig, Engine, RecoverMode, RuntimeConfig, StoreBuilder}; +use foyer_storage::{ + AdmissionPicker, Compression, DeviceConfig, Engine, LargeEngineOptions, RecoverMode, RuntimeConfig, + SmallEngineOptions, StoreBuilder, +}; use crate::HybridCache; @@ -279,6 +282,32 @@ where } } + /// Setup the large object disk cache engine with the given options. + /// + /// Otherwise, the default options will be used. See [`LargeEngineOptions`]. + pub fn with_large_object_disk_cache_options(self, options: LargeEngineOptions) -> Self { + let builder = self.builder.with_large_object_disk_cache_options(options); + Self { + name: self.name, + tracing_config: self.tracing_config, + memory: self.memory, + builder, + } + } + + /// Setup the small object disk cache engine with the given options. + /// + /// Otherwise, the default options will be used. See [`SmallEngineOptions`]. + pub fn with_small_object_disk_cache_options(self, options: SmallEngineOptions) -> Self { + let builder = self.builder.with_small_object_disk_cache_options(options); + Self { + name: self.name, + tracing_config: self.tracing_config, + memory: self.memory, + builder, + } + } + /// Build and open the hybrid cache with the given configurations. pub async fn build(self) -> anyhow::Result> { let storage = self.builder.build().await?;