Skip to content

Commit

Permalink
feat: upgrade foyer to 0.11.3 (#18313)
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx authored Sep 20, 2024
1 parent 4bc3b28 commit 82ac534
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 184 deletions.
209 changes: 90 additions & 119 deletions Cargo.lock

Large diffs are not rendered by default.

31 changes: 22 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ license = "Apache-2.0"
repository = "https://github.com/risingwavelabs/risingwave"

[workspace.dependencies]
foyer = { version = "0.10.4", features = ["nightly", "mtrace"] }
foyer = { version = "0.11.3", features = ["mtrace", "nightly"] }
apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "25113ba88234a9ae23296e981d8302c290fdaa4b", features = [
"snappy",
"zstandard",
Expand Down Expand Up @@ -183,11 +183,10 @@ tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git"
"profiling",
"stats",
], rev = "64a2d9" }
# TODO(http-bump): bump to use tonic 0.12 once minitrace-opentelemetry is updated
opentelemetry = "0.23"
opentelemetry-otlp = "0.16"
opentelemetry_sdk = { version = "0.23", default-features = false }
opentelemetry-semantic-conventions = "0.15"
opentelemetry = "0.24"
opentelemetry-otlp = "0.17"
opentelemetry_sdk = { version = "0.24", default-features = false }
opentelemetry-semantic-conventions = "0.16"
parking_lot = { version = "0.12", features = [
"arc_lock",
"deadlock_detection",
Expand All @@ -198,10 +197,24 @@ sea-orm = { version = "0.12.14", features = [
"sqlx-sqlite",
"runtime-tokio-native-tls",
] }
sqlx = { version = "0.7.3", default-features = false, features = ["bigdecimal", "chrono", "json", "mysql", "postgres", "runtime-tokio-native-tls", "rust_decimal", "sqlite", "time", "uuid"] }
tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0dd1055", features = ["net", "fs"] }
sqlx = { version = "0.7.3", default-features = false, features = [
"bigdecimal",
"chrono",
"json",
"mysql",
"postgres",
"runtime-tokio-native-tls",
"rust_decimal",
"sqlite",
"time",
"uuid",
] }
tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0dd1055", features = [
"net",
"fs",
] }
tokio-util = "0.7"
tracing-opentelemetry = "0.24"
tracing-opentelemetry = "0.25"
rand = { version = "0.8", features = ["small_rng"] }
risingwave_backup = { path = "./src/storage/backup" }
risingwave_batch = { path = "./src/batch" }
Expand Down
17 changes: 12 additions & 5 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::num::NonZeroUsize;
use anyhow::Context;
use clap::ValueEnum;
use educe::Educe;
use foyer::{LfuConfig, LruConfig, RecoverMode, S3FifoConfig};
use foyer::{Compression, LfuConfig, LruConfig, RecoverMode, RuntimeConfig, S3FifoConfig};
use risingwave_common_proc_macro::ConfigDoc;
pub use risingwave_common_proc_macro::OverrideConfig;
use risingwave_pb::meta::SystemParams;
Expand Down Expand Up @@ -871,7 +871,7 @@ pub struct FileCacheConfig {
pub indexer_shards: usize,

#[serde(default = "default::file_cache::compression")]
pub compression: String,
pub compression: Compression,

#[serde(default = "default::file_cache::flush_buffer_threshold_mb")]
pub flush_buffer_threshold_mb: Option<usize>,
Expand All @@ -888,6 +888,9 @@ pub struct FileCacheConfig {
#[serde(default = "default::file_cache::recover_mode")]
pub recover_mode: RecoverMode,

#[serde(default = "default::file_cache::runtime_config")]
pub runtime_config: RuntimeConfig,

#[serde(default, flatten)]
#[config_doc(omitted)]
pub unrecognized: Unrecognized<Self>,
Expand Down Expand Up @@ -1702,7 +1705,7 @@ pub mod default {
}

pub mod file_cache {
use foyer::RecoverMode;
use foyer::{Compression, RecoverMode, RuntimeConfig, TokioRuntimeConfig};

pub fn dir() -> String {
"".to_string()
Expand Down Expand Up @@ -1736,8 +1739,8 @@ pub mod default {
64
}

pub fn compression() -> String {
"none".to_string()
pub fn compression() -> Compression {
Compression::None
}

pub fn flush_buffer_threshold_mb() -> Option<usize> {
Expand All @@ -1747,6 +1750,10 @@ pub mod default {
pub fn recover_mode() -> RecoverMode {
RecoverMode::None
}

pub fn runtime_config() -> RuntimeConfig {
RuntimeConfig::Unified(TokioRuntimeConfig::default())
}
}

pub mod cache_refill {
Expand Down
12 changes: 10 additions & 2 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,13 @@ reclaimers = 4
recover_concurrency = 8
insert_rate_limit_mb = 0
indexer_shards = 64
compression = "none"
compression = "None"
recover_mode = "None"

[storage.data_file_cache.runtime_config.Unified]
worker_threads = 0
max_blocking_threads = 0

[storage.meta_file_cache]
dir = ""
capacity_mb = 1024
Expand All @@ -185,9 +189,13 @@ reclaimers = 4
recover_concurrency = 8
insert_rate_limit_mb = 0
indexer_shards = 64
compression = "none"
compression = "None"
recover_mode = "None"

[storage.meta_file_cache.runtime_config.Unified]
worker_threads = 0
max_blocking_threads = 0

[storage.cache_refill]
data_refill_levels = []
timeout_ms = 6000
Expand Down
12 changes: 8 additions & 4 deletions src/storage/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,9 @@ pub struct StorageOpts {
pub data_file_cache_recover_concurrency: usize,
pub data_file_cache_insert_rate_limit_mb: usize,
pub data_file_cache_indexer_shards: usize,
pub data_file_cache_compression: String,
pub data_file_cache_compression: foyer::Compression,
pub data_file_cache_flush_buffer_threshold_mb: usize,
pub data_file_cache_runtime_config: foyer::RuntimeConfig,

pub cache_refill_data_refill_levels: Vec<u32>,
pub cache_refill_timeout_ms: u64,
Expand All @@ -112,8 +113,9 @@ pub struct StorageOpts {
pub meta_file_cache_recover_concurrency: usize,
pub meta_file_cache_insert_rate_limit_mb: usize,
pub meta_file_cache_indexer_shards: usize,
pub meta_file_cache_compression: String,
pub meta_file_cache_compression: foyer::Compression,
pub meta_file_cache_flush_buffer_threshold_mb: usize,
pub meta_file_cache_runtime_config: foyer::RuntimeConfig,

/// The storage url for storing backups.
pub backup_storage_url: String,
Expand Down Expand Up @@ -193,8 +195,9 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt
data_file_cache_recover_concurrency: c.storage.data_file_cache.recover_concurrency,
data_file_cache_insert_rate_limit_mb: c.storage.data_file_cache.insert_rate_limit_mb,
data_file_cache_indexer_shards: c.storage.data_file_cache.indexer_shards,
data_file_cache_compression: c.storage.data_file_cache.compression.clone(),
data_file_cache_compression: c.storage.data_file_cache.compression,
data_file_cache_flush_buffer_threshold_mb: s.block_file_cache_flush_buffer_threshold_mb,
data_file_cache_runtime_config: c.storage.data_file_cache.runtime_config.clone(),
meta_file_cache_dir: c.storage.meta_file_cache.dir.clone(),
meta_file_cache_capacity_mb: c.storage.meta_file_cache.capacity_mb,
meta_file_cache_file_capacity_mb: c.storage.meta_file_cache.file_capacity_mb,
Expand All @@ -204,8 +207,9 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt
meta_file_cache_recover_concurrency: c.storage.meta_file_cache.recover_concurrency,
meta_file_cache_insert_rate_limit_mb: c.storage.meta_file_cache.insert_rate_limit_mb,
meta_file_cache_indexer_shards: c.storage.meta_file_cache.indexer_shards,
meta_file_cache_compression: c.storage.meta_file_cache.compression.clone(),
meta_file_cache_compression: c.storage.meta_file_cache.compression,
meta_file_cache_flush_buffer_threshold_mb: s.meta_file_cache_flush_buffer_threshold_mb,
meta_file_cache_runtime_config: c.storage.meta_file_cache.runtime_config.clone(),
cache_refill_data_refill_levels: c.storage.cache_refill.data_refill_levels.clone(),
cache_refill_timeout_ms: c.storage.cache_refill.timeout_ms,
cache_refill_concurrency: c.storage.cache_refill.concurrency,
Expand Down
30 changes: 5 additions & 25 deletions src/storage/src/store_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ use std::sync::Arc;
use std::time::Duration;

use enum_as_inner::EnumAsInner;
use foyer::{
DirectFsDeviceOptionsBuilder, HybridCacheBuilder, RateLimitPicker, RuntimeConfigBuilder,
};
use foyer::{DirectFsDeviceOptionsBuilder, HybridCacheBuilder, RateLimitPicker};
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
use risingwave_common_service::RpcNotificationClient;
use risingwave_hummock_sdk::HummockSstableObjectId;
Expand Down Expand Up @@ -663,17 +661,8 @@ impl StateStoreImpl {
)
.with_recover_mode(opts.meta_file_cache_recover_mode)
.with_recover_concurrency(opts.meta_file_cache_recover_concurrency)
.with_compression(
opts.meta_file_cache_compression
.as_str()
.try_into()
.map_err(HummockError::foyer_error)?,
)
.with_runtime_config(
RuntimeConfigBuilder::new()
.with_thread_name("foyer.meta.runtime")
.build(),
);
.with_compression(opts.meta_file_cache_compression)
.with_runtime_config(opts.meta_file_cache_runtime_config.clone());
if opts.meta_file_cache_insert_rate_limit_mb > 0 {
builder = builder.with_admission_picker(Arc::new(RateLimitPicker::new(
opts.meta_file_cache_insert_rate_limit_mb * MB,
Expand Down Expand Up @@ -717,17 +706,8 @@ impl StateStoreImpl {
)
.with_recover_mode(opts.data_file_cache_recover_mode)
.with_recover_concurrency(opts.data_file_cache_recover_concurrency)
.with_compression(
opts.data_file_cache_compression
.as_str()
.try_into()
.map_err(HummockError::foyer_error)?,
)
.with_runtime_config(
RuntimeConfigBuilder::new()
.with_thread_name("foyer.data.runtime")
.build(),
);
.with_compression(opts.data_file_cache_compression)
.with_runtime_config(opts.data_file_cache_runtime_config.clone());
if opts.data_file_cache_insert_rate_limit_mb > 0 {
builder = builder.with_admission_picker(Arc::new(RateLimitPicker::new(
opts.data_file_cache_insert_rate_limit_mb * MB,
Expand Down
4 changes: 2 additions & 2 deletions src/utils/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ opentelemetry_sdk = { workspace = true, features = [
"rt-tokio",
] } # only enable `rt-tokio` feature under non-madsim target
workspace-hack = { path = "../../workspace-hack" }
minitrace = "0.6.7"
minitrace-opentelemetry = "0.6.7"
fastrace = "0.7"
fastrace-opentelemetry = "0.7"

[lints]
workspace = true
37 changes: 19 additions & 18 deletions src/utils/runtime/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use std::path::PathBuf;
use std::time::Duration;

use either::Either;
use minitrace_opentelemetry::OpenTelemetryReporter;
use opentelemetry::trace::SpanKind;
use fastrace_opentelemetry::OpenTelemetryReporter;
use opentelemetry::trace::{SpanKind, TracerProvider};
use opentelemetry::InstrumentationLibrary;
use opentelemetry_sdk::Resource;
use risingwave_common::metrics::MetricsLayer;
Expand Down Expand Up @@ -444,26 +444,27 @@ pub fn init_risingwave_logger(settings: LoggerSettings) {
// Installing the exporter requires a tokio runtime.
let _entered = runtime.enter();

// TODO(bugen): better service name
// https://github.com/jaegertracing/jaeger-ui/issues/336
let service_name = format!("{}-{}", settings.name, id);
let otel_tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(&endpoint),
)
.with_trace_config(sdk::trace::config().with_resource(sdk::Resource::new([
KeyValue::new(
resource::SERVICE_NAME,
// TODO(bugen): better service name
// https://github.com/jaegertracing/jaeger-ui/issues/336
format!("{}-{}", settings.name, id),
),
KeyValue::new(resource::SERVICE_INSTANCE_ID, id.clone()),
KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
])))
.with_trace_config(
sdk::trace::Config::default().with_resource(sdk::Resource::new([
KeyValue::new(resource::SERVICE_NAME, service_name.clone()),
KeyValue::new(resource::SERVICE_INSTANCE_ID, id.clone()),
KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
])),
)
.install_batch(sdk::runtime::Tokio)
.unwrap();
.unwrap()
.tracer(service_name);

let exporter = opentelemetry_otlp::new_exporter()
.tonic()
Expand Down Expand Up @@ -512,7 +513,7 @@ pub fn init_risingwave_logger(settings: LoggerSettings) {

layers.push(layer.boxed());

// The reporter is used by minitrace in foyer for dynamically tail-based tracing.
// The reporter is used by fastrace in foyer for dynamically tail-based tracing.
//
// Code here only setup the OpenTelemetry reporter. To enable/disable the function, please use risectl.
//
Expand All @@ -526,12 +527,12 @@ pub fn init_risingwave_logger(settings: LoggerSettings) {
SpanKind::Server,
Cow::Owned(Resource::new([KeyValue::new(
resource::SERVICE_NAME,
format!("minitrace-{id}"),
format!("fastrace-{id}"),
)])),
InstrumentationLibrary::builder("opentelemetry-instrumentation-foyer").build(),
);
minitrace::set_reporter(reporter, minitrace::collector::Config::default());
tracing::info!("opentelemetry exporter for minitrace is set at {endpoint}");
fastrace::set_reporter(reporter, fastrace::collector::Config::default());
tracing::info!("opentelemetry exporter for fastrace is set at {endpoint}");
}

// Metrics layer
Expand Down

0 comments on commit 82ac534

Please sign in to comment.