Skip to content

Commit

Permalink
feat: impl the new builder, update bench and examples
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx committed Sep 23, 2024
1 parent e2c62b4 commit 56e4073
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 64 deletions.
4 changes: 2 additions & 2 deletions examples/hybrid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use foyer::{DirectFsDeviceOptionsBuilder, HybridCache, HybridCacheBuilder};
use foyer::{DirectFsDeviceOptionsBuilder, Engine, HybridCache, HybridCacheBuilder};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let dir = tempfile::tempdir()?;

let hybrid: HybridCache<u64, String> = HybridCacheBuilder::new()
.memory(64 * 1024 * 1024)
.storage()
.storage(Engine::Large) // use large object disk cache engine only
.with_device_config(
DirectFsDeviceOptionsBuilder::new(dir.path())
.with_capacity(256 * 1024 * 1024)
Expand Down
41 changes: 25 additions & 16 deletions examples/hybrid_full.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use std::sync::Arc;
use anyhow::Result;
use chrono::Datelike;
use foyer::{
DirectFsDeviceOptionsBuilder, FifoPicker, HybridCache, HybridCacheBuilder, LruConfig, RateLimitPicker, RecoverMode,
RuntimeConfig, TokioRuntimeConfig, TombstoneLogConfigBuilder,
DirectFsDeviceOptionsBuilder, Engine, FifoPicker, HybridCache, HybridCacheBuilder, LargeEngineOptions, LruConfig,
RateLimitPicker, RecoverMode, RuntimeConfig, SmallEngineOptions, TokioRuntimeConfig, TombstoneLogConfigBuilder,
};
use tempfile::tempdir;

Expand All @@ -35,30 +35,17 @@ async fn main() -> Result<()> {
.with_object_pool_capacity(1024)
.with_hash_builder(ahash::RandomState::default())
.with_weighter(|_key, value: &String| value.len())
.storage()
.storage(Engine::Mixed(0.1))
.with_device_config(
DirectFsDeviceOptionsBuilder::new(dir.path())
.with_capacity(64 * 1024 * 1024)
.with_file_size(4 * 1024 * 1024)
.build(),
)
.with_flush(true)
.with_indexer_shards(64)
.with_recover_mode(RecoverMode::Quiet)
.with_recover_concurrency(8)
.with_flushers(2)
.with_reclaimers(2)
.with_buffer_pool_size(256 * 1024 * 1024)
.with_clean_region_threshold(4)
.with_eviction_pickers(vec![Box::<FifoPicker>::default()])
.with_admission_picker(Arc::new(RateLimitPicker::new(100 * 1024 * 1024)))
.with_reinsertion_picker(Arc::new(RateLimitPicker::new(10 * 1024 * 1024)))
.with_compression(foyer::Compression::Lz4)
.with_tombstone_log_config(
TombstoneLogConfigBuilder::new(dir.path().join("tombstone-log-file"))
.with_flush(true)
.build(),
)
.with_runtime_config(RuntimeConfig::Separated {
read_runtime_config: TokioRuntimeConfig {
worker_threads: 4,
Expand All @@ -69,6 +56,28 @@ async fn main() -> Result<()> {
max_blocking_threads: 8,
},
})
.with_large_object_disk_cache_options(
LargeEngineOptions::new()
.with_indexer_shards(64)
.with_recover_concurrency(8)
.with_flushers(2)
.with_reclaimers(2)
.with_buffer_pool_size(256 * 1024 * 1024)
.with_clean_region_threshold(4)
.with_eviction_pickers(vec![Box::<FifoPicker>::default()])
.with_reinsertion_picker(Arc::new(RateLimitPicker::new(10 * 1024 * 1024)))
.with_tombstone_log_config(
TombstoneLogConfigBuilder::new(dir.path().join("tombstone-log-file"))
.with_flush(true)
.build(),
),
)
.with_small_object_disk_cache_options(
SmallEngineOptions::new()
.with_set_size(16 * 1024)
.with_set_cache_capacity(64)
.with_flushers(2),
)
.build()
.await?;

Expand Down
4 changes: 2 additions & 2 deletions examples/tail_based_tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::time::Duration;

use foyer::{DirectFsDeviceOptionsBuilder, HybridCache, HybridCacheBuilder};
use foyer::{DirectFsDeviceOptionsBuilder, Engine, HybridCache, HybridCacheBuilder};

#[cfg(feature = "jaeger")]
fn init_jaeger_exporter() {
Expand Down Expand Up @@ -70,7 +70,7 @@ async fn main() -> anyhow::Result<()> {

let hybrid: HybridCache<u64, String> = HybridCacheBuilder::new()
.memory(64 * 1024 * 1024)
.storage()
.storage(Engine::Large)
.with_device_config(
DirectFsDeviceOptionsBuilder::new(dir.path())
.with_capacity(256 * 1024 * 1024)
Expand Down
42 changes: 26 additions & 16 deletions foyer-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ use analyze::{analyze, monitor, Metrics};
use bytesize::ByteSize;
use clap::{builder::PossibleValuesParser, ArgGroup, Parser};
use foyer::{
Compression, DirectFileDeviceOptionsBuilder, DirectFsDeviceOptionsBuilder, FifoConfig, FifoPicker, HybridCache,
HybridCacheBuilder, InvalidRatioPicker, LfuConfig, LruConfig, RateLimitPicker, RecoverMode, RuntimeConfig,
S3FifoConfig, TokioRuntimeConfig, TracingConfig,
Compression, DirectFileDeviceOptionsBuilder, DirectFsDeviceOptionsBuilder, Engine, FifoConfig, FifoPicker,
HybridCache, HybridCacheBuilder, InvalidRatioPicker, LargeEngineOptions, LfuConfig, LruConfig, RateLimitPicker,
RecoverMode, RuntimeConfig, S3FifoConfig, TokioRuntimeConfig, TracingConfig,
};
use futures::future::join_all;
use itertools::Itertools;
Expand Down Expand Up @@ -204,6 +204,11 @@ pub struct Args {
#[arg(long, value_enum, default_value_t = Compression::None)]
compression: Compression,

// TODO(MrCroxx): use mixed engine by default.
/// Disk cache engine.
#[arg(long, default_value_t = Engine::Large)]
engine: Engine,

/// Time-series operation distribution.
///
/// Available values: "none", "uniform", "zipf".
Expand Down Expand Up @@ -448,7 +453,7 @@ async fn benchmark(args: Args) {

let mut builder = builder
.with_weighter(|_: &u64, value: &Value| u64::BITS as usize / 8 + value.len())
.storage();
.storage(args.engine);

builder = match (args.file.as_ref(), args.dir.as_ref()) {
(Some(file), None) => builder.with_device_config(
Expand All @@ -468,15 +473,7 @@ async fn benchmark(args: Args) {

builder = builder
.with_flush(args.flush)
.with_indexer_shards(args.shards)
.with_recover_mode(args.recover_mode)
.with_recover_concurrency(args.recover_concurrency)
.with_flushers(args.flushers)
.with_reclaimers(args.reclaimers)
.with_eviction_pickers(vec![
Box::new(InvalidRatioPicker::new(args.invalid_ratio)),
Box::<FifoPicker>::default(),
])
.with_compression(args.compression)
.with_runtime_config(match args.runtime.as_str() {
"disabled" => RuntimeConfig::Disabled,
Expand All @@ -497,20 +494,33 @@ async fn benchmark(args: Args) {
_ => unreachable!(),
});

let mut large = LargeEngineOptions::new()
.with_indexer_shards(args.shards)
.with_recover_concurrency(args.recover_concurrency)
.with_flushers(args.flushers)
.with_reclaimers(args.reclaimers)
.with_eviction_pickers(vec![
Box::new(InvalidRatioPicker::new(args.invalid_ratio)),
Box::<FifoPicker>::default(),
]);

if args.admission_rate_limit.as_u64() > 0 {
builder =
builder.with_admission_picker(Arc::new(RateLimitPicker::new(args.admission_rate_limit.as_u64() as _)));
}
if args.reinsertion_rate_limit.as_u64() > 0 {
builder =
builder.with_reinsertion_picker(Arc::new(RateLimitPicker::new(args.admission_rate_limit.as_u64() as _)));
large = large.with_reinsertion_picker(Arc::new(RateLimitPicker::new(args.admission_rate_limit.as_u64() as _)));
}

if args.clean_region_threshold > 0 {
builder = builder.with_clean_region_threshold(args.clean_region_threshold);
large = large.with_clean_region_threshold(args.clean_region_threshold);
}

let hybrid = builder.build().await.unwrap();
let hybrid = builder
.with_large_object_disk_cache_options(large)
.build()
.await
.unwrap();

#[cfg(feature = "mtrace")]
hybrid.enable_tracing();
Expand Down
93 changes: 66 additions & 27 deletions foyer-storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{borrow::Borrow, fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc, time::Instant};
use std::{
borrow::Borrow,
fmt::{Debug, Display},
hash::Hash,
marker::PhantomData,
str::FromStr,
sync::Arc,
time::Instant,
};

use ahash::RandomState;
use foyer_common::{
Expand Down Expand Up @@ -251,21 +259,18 @@ impl From<DirectFsDeviceOptions> for DeviceConfig {
/// If [`Engine::Mixed`] is used, it will use the `Either` engine
/// with the small object disk cache as the left engine,
/// and the large object disk cache as the right engine.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum Engine {
/// All space are used as the large object disk cache.
Large,
/// All space are used as the small object disk cache.
Small,
/// Mixed the large object disk cache and the small object disk cache.
Mixed {
/// The ratio of the large object disk cache.
large_object_cache_ratio: f64,
/// The serialized entry size threshold to use the large object disk cache.
large_object_threshold: usize,
/// Load order.
load_order: Order,
},
///
/// The argument controls the ratio of the small object disk cache.
///
/// Range: [0 ~ 1]
Mixed(f64),
}

impl Default for Engine {
Expand All @@ -276,6 +281,11 @@ impl Default for Engine {
}

impl Engine {
/// Threshold for distinguishing small and large objects.
pub const LARGE_OBJECT_SIZE_THRESHOLD: usize = 2048;
/// Check the large object disk cache first, for checking it does NOT involve disk ops.
pub const MIXED_LOAD_ORDER: Order = Order::RightFirst;

/// Default large object disk cache only config.
pub fn large() -> Self {
Self::Large
Expand All @@ -286,16 +296,45 @@ impl Engine {
Self::Small
}

/// Default mixed large object disk cache and small object disk cache only config.
/// Default mixed large object disk cache and small object disk cache config.
pub fn mixed() -> Self {
Self::Mixed {
large_object_cache_ratio: 0.5,
large_object_threshold: 4096,
load_order: Order::RightFirst,
Self::Mixed(0.1)
}
}

impl Display for Engine {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Engine::Large => write!(f, "large"),
Engine::Small => write!(f, "small"),
Engine::Mixed(ratio) => write!(f, "mixed({ratio})"),
}
}
}

impl FromStr for Engine {
type Err = String;

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
const MIXED_PREFIX: &str = "mixed(";
const MIXED_SUFFIX: &str = ")";

match s {
"large" => return Ok(Engine::Large),
"small" => return Ok(Engine::Small),
_ => {}
}

if s.starts_with(MIXED_PREFIX) && s.ends_with(MIXED_SUFFIX) {
if let Ok(ratio) = (&s[MIXED_PREFIX.len()..s.len() - MIXED_SUFFIX.len()]).parse::<f64>() {
return Ok(Engine::Mixed(ratio));
}
}

Err(format!("invalid input: {s}"))
}
}

/// Tokio runtime configuration.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct TokioRuntimeConfig {
Expand Down Expand Up @@ -372,6 +411,10 @@ where
{
/// Setup disk cache store for the given in-memory cache.
pub fn new(memory: Cache<K, V, S>, engine: Engine) -> Self {
if matches!(engine, Engine::Mixed(ratio) if ratio <0.0 && ratio > 1.0) {
panic!("mixed engine small object disk cache ratio must be a f64 in range [0.0, 1.0]");
}

Self {
name: "foyer".to_string(),
memory,
Expand Down Expand Up @@ -596,17 +639,12 @@ where
}))
.await
}
Engine::Mixed {
large_object_cache_ratio,
large_object_threshold,
load_order,
} => {
let large_region_count = (device.regions() as f64 * large_object_cache_ratio) as usize;
let large_regions =
(device.regions() - large_region_count) as RegionId..device.regions() as RegionId;
let small_regions = 0..((device.regions() - large_region_count) as RegionId);
Engine::Mixed(ratio) => {
let small_region_count = std::cmp::max((device.regions() as f64 * ratio) as usize,1);
let small_regions = 0..small_region_count as RegionId;
let large_regions = small_region_count as RegionId..device.regions() as RegionId;
EngineEnum::open(EngineConfig::Mixed(EitherConfig {
selector: SizeSelector::new(large_object_threshold),
selector: SizeSelector::new(Engine::LARGE_OBJECT_SIZE_THRESHOLD),
left: GenericSmallStorageConfig {
set_size: self.small.set_size,
set_cache_capacity: self.small.set_cache_capacity,
Expand Down Expand Up @@ -641,7 +679,7 @@ where
user_runtime_handle,
marker: PhantomData,
},
load_order,
load_order: Engine::MIXED_LOAD_ORDER,
}))
.await
}
Expand Down Expand Up @@ -848,7 +886,8 @@ where
V: StorageValue,
S: HashBuilder + Debug,
{
fn new() -> Self {
/// Create small object disk cache engine default options.
pub fn new() -> Self {
Self {
set_size: 16 * 1024, // 16 KiB
set_cache_capacity: 64, // 64 sets
Expand Down
Loading

0 comments on commit 56e4073

Please sign in to comment.