Skip to content

Commit

Permalink
refactor: refine runtime and tracing options (#756)
Browse files Browse the repository at this point in the history
* refactor: refine runtime and tracing options

Signed-off-by: MrCroxx <[email protected]>

* refactor: rename API

Signed-off-by: MrCroxx <[email protected]>

* refactor: tiny refines

Signed-off-by: MrCroxx <[email protected]>

* chore: pass ffmt check

Signed-off-by: MrCroxx <[email protected]>

---------

Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx authored Oct 8, 2024
1 parent d326b1d commit 726a5f5
Show file tree
Hide file tree
Showing 10 changed files with 209 additions and 159 deletions.
8 changes: 4 additions & 4 deletions examples/hybrid_full.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use anyhow::Result;
use chrono::Datelike;
use foyer::{
DirectFsDeviceOptions, Engine, FifoPicker, HybridCache, HybridCacheBuilder, LargeEngineOptions, LruConfig,
RateLimitPicker, RecoverMode, RuntimeConfig, SmallEngineOptions, TokioRuntimeConfig, TombstoneLogConfigBuilder,
RateLimitPicker, RecoverMode, RuntimeOptions, SmallEngineOptions, TokioRuntimeOptions, TombstoneLogConfigBuilder,
};
use tempfile::tempdir;

Expand All @@ -45,12 +45,12 @@ async fn main() -> Result<()> {
.with_recover_mode(RecoverMode::Quiet)
.with_admission_picker(Arc::new(RateLimitPicker::new(100 * 1024 * 1024)))
.with_compression(foyer::Compression::Lz4)
.with_runtime_config(RuntimeConfig::Separated {
read_runtime_config: TokioRuntimeConfig {
.with_runtime_options(RuntimeOptions::Separated {
read_runtime_options: TokioRuntimeOptions {
worker_threads: 4,
max_blocking_threads: 8,
},
write_runtime_config: TokioRuntimeConfig {
write_runtime_options: TokioRuntimeOptions {
worker_threads: 4,
max_blocking_threads: 8,
},
Expand Down
6 changes: 2 additions & 4 deletions examples/tail_based_tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::time::Duration;

use foyer::{DirectFsDeviceOptions, Engine, HybridCache, HybridCacheBuilder};
use foyer::{DirectFsDeviceOptions, Engine, HybridCache, HybridCacheBuilder, TracingOptions};

#[cfg(feature = "jaeger")]
fn init_jaeger_exporter() {
Expand Down Expand Up @@ -76,9 +76,7 @@ async fn main() -> anyhow::Result<()> {
.await?;

hybrid.enable_tracing();
hybrid
.tracing_config()
.set_record_hybrid_get_threshold(Duration::from_millis(10));
hybrid.update_tracing_options(TracingOptions::new().with_record_hybrid_get_threshold(Duration::from_millis(10)));

hybrid.insert(42, "The answer to life, the universe, and everything.".to_string());
assert_eq!(
Expand Down
1 change: 1 addition & 0 deletions foyer-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ fastrace-jaeger = { workspace = true, optional = true }
foyer = { workspace = true }
futures = "0.3"
hdrhistogram = "7"
humantime = "2"
itertools = { workspace = true }
metrics = { workspace = true }
metrics-exporter-prometheus = "0.15"
Expand Down
48 changes: 24 additions & 24 deletions foyer-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use clap::{builder::PossibleValuesParser, ArgGroup, Parser};
use foyer::{
Compression, DirectFileDeviceOptions, DirectFsDeviceOptions, Engine, FifoConfig, FifoPicker, HybridCache,
HybridCacheBuilder, InvalidRatioPicker, LargeEngineOptions, LfuConfig, LruConfig, RateLimitPicker, RecoverMode,
RuntimeConfig, S3FifoConfig, SmallEngineOptions, TokioRuntimeConfig, TracingConfig,
RuntimeOptions, S3FifoConfig, SmallEngineOptions, TokioRuntimeOptions, TracingOptions,
};
use futures::future::join_all;
use itertools::Itertools;
Expand Down Expand Up @@ -245,20 +245,20 @@ pub struct Args {
set_cache_capacity: usize,

/// Record insert trace threshold. Only effective with "tracing" feature.
#[arg(long, default_value_t = 1000 * 1000)]
trace_insert_us: usize,
#[arg(long, default_value = "1s")]
trace_insert: humantime::Duration,
/// Record get trace threshold. Only effective with "tracing" feature.
#[arg(long, default_value_t = 1000 * 1000)]
trace_get_us: usize,
#[arg(long, default_value = "1s")]
trace_get: humantime::Duration,
/// Record obtain trace threshold. Only effective with "tracing" feature.
#[arg(long, default_value_t = 1000 * 1000)]
trace_obtain_us: usize,
#[arg(long, default_value = "1s")]
trace_obtain: humantime::Duration,
/// Record remove trace threshold. Only effective with "tracing" feature.
#[arg(long, default_value_t = 1000 * 1000)]
trace_remove_us: usize,
#[arg(long, default_value = "1s")]
trace_remove: humantime::Duration,
/// Record fetch trace threshold. Only effective with "tracing" feature.
#[arg(long, default_value_t = 1000 * 1000)]
trace_fetch_us: usize,
#[arg(long, default_value = "1s")]
trace_fetch: humantime::Duration,
}

#[derive(Debug)]
Expand Down Expand Up @@ -433,15 +433,15 @@ async fn benchmark(args: Args) {
.unwrap();
}

let tracing_config = TracingConfig::default();
tracing_config.set_record_hybrid_insert_threshold(Duration::from_micros(args.trace_insert_us as _));
tracing_config.set_record_hybrid_get_threshold(Duration::from_micros(args.trace_get_us as _));
tracing_config.set_record_hybrid_obtain_threshold(Duration::from_micros(args.trace_obtain_us as _));
tracing_config.set_record_hybrid_remove_threshold(Duration::from_micros(args.trace_remove_us as _));
tracing_config.set_record_hybrid_fetch_threshold(Duration::from_micros(args.trace_fetch_us as _));
let tracing_options = TracingOptions::new()
.with_record_hybrid_insert_threshold(args.trace_insert.into())
.with_record_hybrid_get_threshold(args.trace_get.into())
.with_record_hybrid_obtain_threshold(args.trace_obtain.into())
.with_record_hybrid_remove_threshold(args.trace_remove.into())
.with_record_hybrid_fetch_threshold(args.trace_fetch.into());

let builder = HybridCacheBuilder::new()
.with_tracing_config(tracing_config)
.with_tracing_options(tracing_options)
.memory(args.mem.as_u64() as _)
.with_shards(args.shards);

Expand Down Expand Up @@ -479,18 +479,18 @@ async fn benchmark(args: Args) {
.with_flush(args.flush)
.with_recover_mode(args.recover_mode)
.with_compression(args.compression)
.with_runtime_config(match args.runtime.as_str() {
"disabled" => RuntimeConfig::Disabled,
"unified" => RuntimeConfig::Unified(TokioRuntimeConfig {
.with_runtime_options(match args.runtime.as_str() {
"disabled" => RuntimeOptions::Disabled,
"unified" => RuntimeOptions::Unified(TokioRuntimeOptions {
worker_threads: args.runtime_worker_threads,
max_blocking_threads: args.runtime_max_blocking_threads,
}),
"separated" => RuntimeConfig::Separated {
read_runtime_config: TokioRuntimeConfig {
"separated" => RuntimeOptions::Separated {
read_runtime_options: TokioRuntimeOptions {
worker_threads: args.read_runtime_worker_threads,
max_blocking_threads: args.read_runtime_max_blocking_threads,
},
write_runtime_config: TokioRuntimeConfig {
write_runtime_options: TokioRuntimeOptions {
worker_threads: args.write_runtime_worker_threads,
max_blocking_threads: args.write_runtime_max_blocking_threads,
},
Expand Down
151 changes: 100 additions & 51 deletions foyer-common/src/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::{
ops::Deref,
pin::Pin,
sync::atomic::{AtomicUsize, Ordering},
sync::atomic::{AtomicU64, Ordering},
task::{Context, Poll},
time::Duration,
};
Expand All @@ -26,87 +26,136 @@ use pin_project::pin_project;
use serde::{Deserialize, Serialize};

/// Configurations for tracing.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Default)]
pub struct TracingConfig {
/// Threshold for recording the hybrid cache `insert` and `insert_with_context` operation in us.
record_hybrid_insert_threshold_us: AtomicUsize,
record_hybrid_insert_threshold_us: AtomicU64,
/// Threshold for recording the hybrid cache `get` operation in us.
record_hybrid_get_threshold_us: AtomicUsize,
record_hybrid_get_threshold_us: AtomicU64,
/// Threshold for recording the hybrid cache `obtain` operation in us.
record_hybrid_obtain_threshold_us: AtomicUsize,
record_hybrid_obtain_threshold_us: AtomicU64,
/// Threshold for recording the hybrid cache `remove` operation in us.
record_hybrid_remove_threshold_us: AtomicUsize,
record_hybrid_remove_threshold_us: AtomicU64,
/// Threshold for recording the hybrid cache `fetch` operation in us.
record_hybrid_fetch_threshold_us: AtomicUsize,
record_hybrid_fetch_threshold_us: AtomicU64,
}

impl Default for TracingConfig {
/// All thresholds are set to `1s`.
fn default() -> Self {
Self {
record_hybrid_insert_threshold_us: AtomicUsize::from(1000 * 1000),
record_hybrid_get_threshold_us: AtomicUsize::from(1000 * 1000),
record_hybrid_obtain_threshold_us: AtomicUsize::from(1000 * 1000),
record_hybrid_remove_threshold_us: AtomicUsize::from(1000 * 1000),
record_hybrid_fetch_threshold_us: AtomicUsize::from(1000 * 1000),
impl TracingConfig {
/// Update tracing config with options.
pub fn update(&self, options: TracingOptions) {
if let Some(threshold) = options.record_hybrid_insert_threshold {
self.record_hybrid_insert_threshold_us
.store(threshold.as_micros() as _, Ordering::Relaxed);
}
}
}

impl TracingConfig {
/// Set the threshold for recording the hybrid cache `insert` and `insert_with_context` operation.
pub fn set_record_hybrid_insert_threshold(&self, threshold: Duration) {
self.record_hybrid_insert_threshold_us
.store(threshold.as_micros() as _, Ordering::Relaxed);
if let Some(threshold) = options.record_hybrid_get_threshold {
self.record_hybrid_get_threshold_us
.store(threshold.as_micros() as _, Ordering::Relaxed);
}

if let Some(threshold) = options.record_hybrid_obtain_threshold {
self.record_hybrid_obtain_threshold_us
.store(threshold.as_micros() as _, Ordering::Relaxed);
}

if let Some(threshold) = options.record_hybrid_remove_threshold {
self.record_hybrid_remove_threshold_us
.store(threshold.as_micros() as _, Ordering::Relaxed);
}

if let Some(threshold) = options.record_hybrid_fetch_threshold {
self.record_hybrid_fetch_threshold_us
.store(threshold.as_micros() as _, Ordering::Relaxed);
}
}

/// Threshold for recording the hybrid cache `insert` and `insert_with_context` operation.
pub fn record_hybrid_insert_threshold(&self) -> Duration {
Duration::from_micros(self.record_hybrid_insert_threshold_us.load(Ordering::Relaxed) as _)
}

/// Set the threshold for recording the hybrid cache `get` operation.
pub fn set_record_hybrid_get_threshold(&self, threshold: Duration) {
self.record_hybrid_get_threshold_us
.store(threshold.as_micros() as _, Ordering::Relaxed);
Duration::from_micros(self.record_hybrid_insert_threshold_us.load(Ordering::Relaxed))
}

/// Threshold for recording the hybrid cache `get` operation.
pub fn record_hybrid_get_threshold(&self) -> Duration {
Duration::from_micros(self.record_hybrid_get_threshold_us.load(Ordering::Relaxed) as _)
}

/// Set the threshold for recording the hybrid cache `obtain` operation.
pub fn set_record_hybrid_obtain_threshold(&self, threshold: Duration) {
self.record_hybrid_obtain_threshold_us
.store(threshold.as_micros() as _, Ordering::Relaxed);
Duration::from_micros(self.record_hybrid_get_threshold_us.load(Ordering::Relaxed))
}

/// Threshold for recording the hybrid cache `obtain` operation.
pub fn record_hybrid_obtain_threshold(&self) -> Duration {
Duration::from_micros(self.record_hybrid_obtain_threshold_us.load(Ordering::Relaxed) as _)
}

/// Set the threshold for recording the hybrid cache `remove` operation.
pub fn set_record_hybrid_remove_threshold(&self, threshold: Duration) {
self.record_hybrid_remove_threshold_us
.store(threshold.as_micros() as _, Ordering::Relaxed);
Duration::from_micros(self.record_hybrid_obtain_threshold_us.load(Ordering::Relaxed))
}

/// Threshold for recording the hybrid cache `remove` operation.
pub fn record_hybrid_remove_threshold(&self) -> Duration {
Duration::from_micros(self.record_hybrid_remove_threshold_us.load(Ordering::Relaxed) as _)
Duration::from_micros(self.record_hybrid_remove_threshold_us.load(Ordering::Relaxed))
}

/// Set the threshold for recording the hybrid cache `fetch` operation.
pub fn set_record_hybrid_fetch_threshold(&self, threshold: Duration) {
self.record_hybrid_fetch_threshold_us
.store(threshold.as_micros() as _, Ordering::Relaxed);
/// Threshold for recording the hybrid cache `fetch` operation.
pub fn record_hybrid_fetch_threshold(&self) -> Duration {
Duration::from_micros(self.record_hybrid_fetch_threshold_us.load(Ordering::Relaxed))
}
}

/// Options for tracing.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TracingOptions {
/// Threshold for recording the hybrid cache `insert` and `insert_with_context` operation.
record_hybrid_insert_threshold: Option<Duration>,
/// Threshold for recording the hybrid cache `get` operation.
record_hybrid_get_threshold: Option<Duration>,
/// Threshold for recording the hybrid cache `obtain` operation.
record_hybrid_obtain_threshold: Option<Duration>,
/// Threshold for recording the hybrid cache `remove` operation.
record_hybrid_remove_threshold: Option<Duration>,
/// Threshold for recording the hybrid cache `fetch` operation.
pub fn record_hybrid_fetch_threshold(&self) -> Duration {
Duration::from_micros(self.record_hybrid_fetch_threshold_us.load(Ordering::Relaxed) as _)
record_hybrid_fetch_threshold: Option<Duration>,
}

impl Default for TracingOptions {
fn default() -> Self {
Self::new()
}
}

impl TracingOptions {
/// Create an empty tracing options.
pub fn new() -> Self {
Self {
record_hybrid_insert_threshold: None,
record_hybrid_get_threshold: None,
record_hybrid_obtain_threshold: None,
record_hybrid_remove_threshold: None,
record_hybrid_fetch_threshold: None,
}
}

/// Set the threshold for recording the hybrid cache `insert` and `insert_with_context` operation.
pub fn with_record_hybrid_insert_threshold(mut self, threshold: Duration) -> Self {
self.record_hybrid_insert_threshold = Some(threshold);
self
}

/// Set the threshold for recording the hybrid cache `get` operation.
pub fn with_record_hybrid_get_threshold(mut self, threshold: Duration) -> Self {
self.record_hybrid_get_threshold = Some(threshold);
self
}

/// Set the threshold for recording the hybrid cache `obtain` operation.
pub fn with_record_hybrid_obtain_threshold(mut self, threshold: Duration) -> Self {
self.record_hybrid_obtain_threshold = Some(threshold);
self
}

/// Set the threshold for recording the hybrid cache `remove` operation.
pub fn with_record_hybrid_remove_threshold(mut self, threshold: Duration) -> Self {
self.record_hybrid_remove_threshold = Some(threshold);
self
}

/// Set the threshold for recording the hybrid cache `fetch` operation.
pub fn with_record_hybrid_fetch_threshold(mut self, threshold: Duration) -> Self {
self.record_hybrid_fetch_threshold = Some(threshold);
self
}
}

Expand Down
4 changes: 2 additions & 2 deletions foyer-storage/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub use crate::{
statistics::Statistics,
storage::{either::Order, Storage},
store::{
DeviceOptions, Engine, LargeEngineOptions, RuntimeConfig, SmallEngineOptions, Store, StoreBuilder,
TokioRuntimeConfig,
DeviceOptions, Engine, LargeEngineOptions, RuntimeOptions, SmallEngineOptions, Store, StoreBuilder,
TokioRuntimeOptions,
},
};
Loading

0 comments on commit 726a5f5

Please sign in to comment.