diff --git a/.cargo/config.toml b/.cargo/config.toml index fbe29b3e6b6af..c65cd3ac1a4fa 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -31,13 +31,7 @@ rustflags = [ # Flags for all targets. [target.'cfg(all())'] -rustflags = [ - "--cfg", - "tokio_unstable", - # Uncomment the following two lines to enable `TaskLocalAlloc`. - # "--cfg", - # "enable_task_local_alloc", -] +rustflags = ["--cfg", "tokio_unstable"] [unstable] lints = true diff --git a/Cargo.lock b/Cargo.lock index c8fa0f3011732..1ea68102c5076 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4604,19 +4604,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "generator" -version = "0.7.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cc16584ff22b460a382b7feec54b23d2908d858152e5739a120b949293bd74e" -dependencies = [ - "cc", - "libc", - "log", - "rustversion", - "windows 0.48.0", -] - [[package]] name = "generic-array" version = "0.14.7" @@ -5855,22 +5842,6 @@ dependencies = [ "value-bag", ] -[[package]] -name = "loom" -version = "0.5.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff50ecb28bb86013e935fb6683ab1f6d3a20016f123c76fd4c27470076ac30f5" -dependencies = [ - "cfg-if", - "generator", - "pin-utils", - "scoped-tls", - "serde", - "serde_json", - "tracing", - "tracing-subscriber", -] - [[package]] name = "lru" version = "0.7.6" @@ -8835,7 +8806,6 @@ dependencies = [ "rw_futures_util", "scopeguard", "serde_json", - "task_stats_alloc", "tempfile", "thiserror", "thiserror-ext", @@ -8903,7 +8873,6 @@ dependencies = [ "risingwave_frontend", "risingwave_meta_node", "risingwave_rt", - "task_stats_alloc", "tikv-jemallocator", "workspace-config", "workspace-hack", @@ -8934,7 +8903,6 @@ dependencies = [ "shell-words", "strum 0.25.0", "strum_macros 0.26.1", - "task_stats_alloc", "tempfile", "thiserror-ext", "tikv-jemallocator", @@ -10355,7 +10323,6 @@ dependencies = [ "static_assertions", "strum 0.26.1", "strum_macros 0.26.1", - "task_stats_alloc", "thiserror", "thiserror-ext", "tokio-metrics", @@ -10856,12 +10823,6 @@ dependencies = [ "hashbrown 0.13.2", ] -[[package]] -name = "scoped-tls" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" - [[package]] name = "scopeguard" version = "1.2.0" @@ -12255,14 +12216,6 @@ version = "0.12.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14c39fd04924ca3a864207c66fc2cd7d22d7c016007f9ce846cbb9326331930a" -[[package]] -name = "task_stats_alloc" -version = "0.1.11" -dependencies = [ - "loom", - "madsim-tokio", -] - [[package]] name = "tempfile" version = "3.10.0" diff --git a/src/batch/Cargo.toml b/src/batch/Cargo.toml index f87db26ba84be..8c9baa83e1203 100644 --- a/src/batch/Cargo.toml +++ b/src/batch/Cargo.toml @@ -62,9 +62,6 @@ tokio-util = { workspace = true } tonic = { workspace = true } tracing = "0.1" -[target.'cfg(enable_task_local_alloc)'.dependencies] -task_stats_alloc = { path = "../utils/task_stats_alloc" } - [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../workspace-hack" } diff --git a/src/batch/src/task/task_execution.rs b/src/batch/src/task/task_execution.rs index f33911a5e4ca3..7b5e00df18c03 100644 --- a/src/batch/src/task/task_execution.rs +++ b/src/batch/src/task/task_execution.rs @@ -13,12 +13,8 @@ // limitations under the License. use std::fmt::{Debug, Formatter}; -#[cfg(enable_task_local_alloc)] -use std::future::Future; use std::panic::AssertUnwindSafe; use std::sync::Arc; -#[cfg(enable_task_local_alloc)] -use std::time::Duration; use anyhow::Context; use futures::StreamExt; @@ -51,52 +47,6 @@ use crate::task::BatchTaskContext; // Now we will only at most have 2 status for each status channel. Running -> Failed or Finished. pub const TASK_STATUS_BUFFER_SIZE: usize = 2; -/// A special version for batch allocation stat, passed in another task `context` C to report task -/// mem usage 0 bytes at the end. -#[cfg(enable_task_local_alloc)] -pub async fn allocation_stat_for_batch( - future: Fut, - interval: Duration, - mut report: F, - context: C, -) -> T -where - Fut: Future, - F: FnMut(usize), - C: BatchTaskContext, -{ - use task_stats_alloc::{TaskLocalBytesAllocated, BYTES_ALLOCATED}; - - BYTES_ALLOCATED - .scope(TaskLocalBytesAllocated::new(), async move { - // The guard has the same lifetime as the counter so that the counter will keep positive - // in the whole scope. When the scope exits, the guard is released, so the counter can - // reach zero eventually and then `drop` itself. - let _guard = Box::new(114514); - let monitor = async move { - let mut interval = tokio::time::interval(interval); - loop { - interval.tick().await; - BYTES_ALLOCATED.with(|bytes| report(bytes.val())); - } - }; - let output = tokio::select! { - biased; - _ = monitor => unreachable!(), - output = future => { - // NOTE: Report bytes allocated when the actor ends. We simply report 0 here, - // assuming that all memory allocated by this batch task will be freed at some - // time. Maybe we should introduce a better monitoring strategy for batch memory - // usage. - BYTES_ALLOCATED.with(|_| context.store_mem_usage(0)); - output - }, - }; - output - }) - .await -} - /// Send batch task status (local/distributed) to frontend. /// /// @@ -538,27 +488,7 @@ impl BatchTaskExecution { } }; - #[cfg(enable_task_local_alloc)] - { - // For every fired Batch Task, we will wrap it with allocation stats to report memory - // estimation per task to `BatchManager`. - let ctx1 = self.context.clone(); - let ctx2 = self.context.clone(); - let alloc_stat_wrap_fut = allocation_stat_for_batch( - fut, - Duration::from_millis(1000), - move |bytes| { - ctx1.store_mem_usage(bytes); - }, - ctx2, - ); - self.runtime.spawn(alloc_stat_wrap_fut); - } - - #[cfg(not(enable_task_local_alloc))] - { - self.runtime.spawn(fut); - } + self.runtime.spawn(fut); Ok(()) } diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 10a007932dcf9..d73f0bef0206b 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -46,9 +46,6 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ workspace-config = { path = "../utils/workspace-config" } workspace-hack = { path = "../workspace-hack" } -[target.'cfg(enable_task_local_alloc)'.dependencies] -task_stats_alloc = { path = "../utils/task_stats_alloc" } - [lib] test = false diff --git a/src/cmd/src/lib.rs b/src/cmd/src/lib.rs index a2f3457bb1266..59ffda9e76557 100644 --- a/src/cmd/src/lib.rs +++ b/src/cmd/src/lib.rs @@ -23,10 +23,6 @@ use risingwave_rt::{init_risingwave_logger, main_okk, LoggerSettings}; #[macro_export] macro_rules! main { ($component:ident) => { - #[cfg(enable_task_local_alloc)] - risingwave_common::enable_task_local_jemalloc!(); - - #[cfg(not(enable_task_local_alloc))] risingwave_common::enable_jemalloc!(); #[cfg_attr(coverage, coverage(off))] diff --git a/src/cmd_all/Cargo.toml b/src/cmd_all/Cargo.toml index 813928ce931ea..cc9befc068d0c 100644 --- a/src/cmd_all/Cargo.toml +++ b/src/cmd_all/Cargo.toml @@ -67,9 +67,6 @@ vergen = { version = "8", default-features = false, features = [ "gitcl", ] } -[target.'cfg(enable_task_local_alloc)'.dependencies] -task_stats_alloc = { path = "../utils/task_stats_alloc" } - [[bin]] name = "risingwave" path = "src/bin/risingwave.rs" diff --git a/src/cmd_all/src/bin/risingwave.rs b/src/cmd_all/src/bin/risingwave.rs index 4fc326f1089a3..1bbfe2265c316 100644 --- a/src/cmd_all/src/bin/risingwave.rs +++ b/src/cmd_all/src/bin/risingwave.rs @@ -31,10 +31,6 @@ use strum::IntoEnumIterator; use strum_macros::{Display, EnumIter, EnumString, IntoStaticStr}; use tracing::Level; -#[cfg(enable_task_local_alloc)] -risingwave_common::enable_task_local_jemalloc!(); - -#[cfg(not(enable_task_local_alloc))] risingwave_common::enable_jemalloc!(); const BINARY_NAME: &str = "risingwave"; diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index 5348dedfcde61..10caf3bbf96c1 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -81,9 +81,6 @@ tokio-stream = "0.1" tonic = { workspace = true } tracing = "0.1" -[target.'cfg(enable_task_local_alloc)'.dependencies] -task_stats_alloc = { path = "../utils/task_stats_alloc" } - [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../workspace-hack" } diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 7d6d584c736e2..d452decdce507 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -651,30 +651,7 @@ impl LocalBarrierWorker { let with_config = crate::CONFIG.scope(self.actor_manager.env.config().clone(), instrumented); - // TODO: are the following lines still useful? - #[cfg(enable_task_local_alloc)] - { - let metrics = streaming_metrics.clone(); - let actor_id_str = actor_id.to_string(); - let fragment_id_str = actor_context.fragment_id.to_string(); - let allocation_stated = task_stats_alloc::allocation_stat( - with_config, - Duration::from_millis(1000), - move |bytes| { - metrics - .actor_memory_usage - .with_label_values(&[&actor_id_str, &fragment_id_str]) - .set(bytes as i64); - - actor_context.store_mem_usage(bytes); - }, - ); - self.actor_manager.runtime.spawn(allocation_stated) - } - #[cfg(not(enable_task_local_alloc))] - { - self.actor_manager.runtime.spawn(with_config) - } + self.actor_manager.runtime.spawn(with_config) }; self.actor_manager_state.handles.insert(actor_id, handle); diff --git a/src/utils/task_stats_alloc/Cargo.toml b/src/utils/task_stats_alloc/Cargo.toml deleted file mode 100644 index 20d5ceed86729..0000000000000 --- a/src/utils/task_stats_alloc/Cargo.toml +++ /dev/null @@ -1,32 +0,0 @@ -[package] -name = "task_stats_alloc" -version = "0.1.11" -edition = "2021" -description = "Allocator with statistics" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[package.metadata.cargo-machete] -ignored = ["workspace-hack"] - -[package.metadata.cargo-udeps.ignore] -normal = ["workspace-hack"] - -[dependencies] -tokio = { version = "0.2", package = "madsim-tokio", features = [ - "fs", - "rt", - "rt-multi-thread", - "sync", - "macros", - "time", - "signal", -] } - -[dev-dependencies] - - -[target.'cfg(loom)'.dependencies] -loom = {version = "0.5", features = ["futures", "checkpoint"]} - -[lints] -workspace = true diff --git a/src/utils/task_stats_alloc/src/lib.rs b/src/utils/task_stats_alloc/src/lib.rs deleted file mode 100644 index a1f2c43a9e575..0000000000000 --- a/src/utils/task_stats_alloc/src/lib.rs +++ /dev/null @@ -1,217 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#![cfg(enable_task_local_alloc)] -#![feature(allocator_api)] -#![feature(lint_reasons)] - -use std::alloc::{GlobalAlloc, Layout, System}; -use std::future::Future; -use std::ptr::NonNull; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::time::Duration; - -use tokio::task_local; - -/// If you change the code in this struct, pls re-run the `tests/loom.rs` test locally. -#[repr(transparent)] -#[derive(Clone, Copy, Debug)] -pub struct TaskLocalBytesAllocated(Option>); - -impl Default for TaskLocalBytesAllocated { - fn default() -> Self { - Self(Some( - NonNull::new(Box::into_raw(Box::new_in(0.into(), System))).unwrap(), - )) - } -} - -// Need this otherwise the NonNull is not Send and can not be used in future. -unsafe impl Send for TaskLocalBytesAllocated {} - -impl TaskLocalBytesAllocated { - pub fn new() -> Self { - Self::default() - } - - /// Create an invalid counter. - pub const fn invalid() -> Self { - Self(None) - } - - /// Adds to the current counter. - #[inline(always)] - pub fn add(&self, val: usize) { - if let Some(bytes) = self.0 { - let bytes_ref = unsafe { bytes.as_ref() }; - bytes_ref.fetch_add(val, Ordering::Relaxed); - } - } - - /// Adds to the current counter without validity check. - /// - /// # Safety - /// The caller must ensure that `self` is valid. - #[inline(always)] - unsafe fn add_unchecked(&self, val: usize) { - let bytes = self.0.unwrap_unchecked(); - let bytes_ref = unsafe { bytes.as_ref() }; - bytes_ref.fetch_add(val, Ordering::Relaxed); - } - - /// Subtracts from the counter value, and `drop` the counter while the count reaches zero. - #[inline(always)] - pub fn sub(&self, val: usize) -> bool { - if let Some(bytes) = self.0 { - // Use `Relaxed` order as we don't need to sync read/write with other memory addresses. - // Accesses to the counter itself are serialized by atomic operations. - let bytes_ref = unsafe { bytes.as_ref() }; - let old_bytes = bytes_ref.fetch_sub(val, Ordering::Relaxed); - // If the counter reaches zero, delete the counter. Note that we've ensured there's no - // zero deltas in `wrap_layout`, so there'll be no more uses of the counter. - if old_bytes == val { - // No fence here, this is different from ref counter impl in https://www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html#boost_atomic.usage_examples.example_reference_counters. - // As here, T is the exactly Counter and they have same memory address, so there - // should not happen out-of-order commit. - unsafe { Box::from_raw_in(bytes.as_ptr(), System) }; - return true; - } - } - false - } - - #[inline(always)] - pub fn val(&self) -> usize { - let bytes_ref = self.0.as_ref().expect("bytes is invalid"); - let bytes_ref = unsafe { bytes_ref.as_ref() }; - bytes_ref.load(Ordering::Relaxed) - } -} - -task_local! { - pub static BYTES_ALLOCATED: TaskLocalBytesAllocated; -} - -pub async fn allocation_stat(future: Fut, interval: Duration, mut report: F) -> T -where - Fut: Future, - F: FnMut(usize), -{ - BYTES_ALLOCATED - .scope(TaskLocalBytesAllocated::new(), async move { - // The guard has the same lifetime as the counter so that the counter will keep positive - // in the whole scope. When the scope exits, the guard is released, so the counter can - // reach zero eventually and then `drop` itself. - let _guard = Box::new(114514); - let monitor = async move { - let mut interval = tokio::time::interval(interval); - loop { - interval.tick().await; - BYTES_ALLOCATED.with(|bytes| report(bytes.val())); - } - }; - let output = tokio::select! { - biased; - _ = monitor => unreachable!(), - output = future => output, - }; - output - }) - .await -} - -#[inline(always)] -fn wrap_layout(layout: Layout) -> (Layout, usize) { - debug_assert_ne!(layout.size(), 0, "the size of layout must be non-zero"); - - let (wrapped_layout, offset) = Layout::new::() - .extend(layout) - .expect("wrapping layout overflow"); - let wrapped_layout = wrapped_layout.pad_to_align(); - - (wrapped_layout, offset) -} - -pub struct TaskLocalAlloc(pub A); - -unsafe impl GlobalAlloc for TaskLocalAlloc -where - A: GlobalAlloc, -{ - unsafe fn alloc(&self, layout: Layout) -> *mut u8 { - let (wrapped_layout, offset) = wrap_layout(layout); - - BYTES_ALLOCATED - .try_with(|&bytes| { - bytes.add_unchecked(layout.size()); - let ptr = self.0.alloc(wrapped_layout); - *ptr.cast() = bytes; - ptr.wrapping_add(offset) - }) - .unwrap_or_else(|_| { - let ptr = self.0.alloc(wrapped_layout); - *ptr.cast() = TaskLocalBytesAllocated::invalid(); - ptr.wrapping_add(offset) - }) - } - - unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) { - let (wrapped_layout, offset) = wrap_layout(layout); - let ptr = ptr.wrapping_sub(offset); - - let bytes: TaskLocalBytesAllocated = *ptr.cast(); - bytes.sub(layout.size()); - - self.0.dealloc(ptr, wrapped_layout); - } - - unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 { - let (wrapped_layout, offset) = wrap_layout(layout); - - BYTES_ALLOCATED - .try_with(|&bytes| { - bytes.add_unchecked(layout.size()); - let ptr = self.0.alloc_zeroed(wrapped_layout); - *ptr.cast() = bytes; - ptr.wrapping_add(offset) - }) - .unwrap_or_else(|_| { - let ptr = self.0.alloc_zeroed(wrapped_layout); - *ptr.cast() = TaskLocalBytesAllocated::invalid(); - ptr.wrapping_add(offset) - }) - } - - unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 { - let (wrapped_layout, offset) = wrap_layout(layout); - // SAFETY: the caller must ensure that the `new_size` does not overflow. - let (new_wrapped_layout, new_offset) = - wrap_layout(Layout::from_size_align_unchecked(new_size, layout.align())); - let new_wrapped_size = new_wrapped_layout.size(); - - let ptr = ptr.wrapping_sub(offset); - - let bytes: TaskLocalBytesAllocated = *ptr.cast(); - bytes.sub(layout.size()); - bytes.add(new_size); - - let ptr = self.0.realloc(ptr, wrapped_layout, new_wrapped_size); - if ptr.is_null() { - ptr - } else { - *ptr.cast() = bytes; - ptr.wrapping_add(new_offset) - } - } -} diff --git a/src/utils/task_stats_alloc/tests/integration.rs b/src/utils/task_stats_alloc/tests/integration.rs deleted file mode 100644 index e756ab6de8260..0000000000000 --- a/src/utils/task_stats_alloc/tests/integration.rs +++ /dev/null @@ -1,65 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#![cfg(enable_task_local_alloc)] - -use std::alloc::System; -use std::hint::black_box; -use std::time::Duration; - -use task_stats_alloc::*; - -#[global_allocator] -static GLOBAL: TaskLocalAlloc = TaskLocalAlloc(System); - -#[tokio::test] -async fn test_basic() { - let mut bytes = None; - let mut placeholder = Box::new(10u64); - - { - let bytes = &mut bytes; - let placeholder = &mut placeholder; - allocation_stat( - async move { - *bytes = Some(BYTES_ALLOCATED.get()); - let bytes = bytes.unwrap(); - let base1 = bytes.val(); - { - let _a1 = black_box(Box::new(114514_u64)); - - let base2 = bytes.val(); - { - let _a2 = black_box(Vec::::with_capacity(1024)); - } - assert_eq!(bytes.val(), base2); - - let _a3 = black_box(Box::new(1145166666666666666666666666666666u128)); - let _v = Vec::::with_capacity(128); - } - assert_eq!(bytes.val(), base1); - // Leak the box out of the task. - *placeholder = Box::new(187u64); - }, - Duration::from_secs(1), - |_| {}, - ) - .await; - } - // There should only one u64 held by `placeholder` not dropped in the task local allocator. - assert_eq!(bytes.unwrap().val(), std::mem::size_of::()); - // The placeholder was consumed by `black_box`. - black_box(placeholder); - // The counter should be dropped, but there is no way to test it. -} diff --git a/src/utils/task_stats_alloc/tests/loom.rs b/src/utils/task_stats_alloc/tests/loom.rs deleted file mode 100644 index f0e1433ce9612..0000000000000 --- a/src/utils/task_stats_alloc/tests/loom.rs +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#![cfg(all(loom, enable_task_local_alloc))] - -/// Note this test is not running in CI, due to the re-compile time cost. Add it when it is -/// necessary. Run `RUSTFLAGS="--cfg loom" cargo test --test loom` to test. -use loom::sync::Arc; -use loom::thread; -use task_stats_alloc::TaskLocalBytesAllocated; - -#[test] -fn test_to_avoid_double_drop() { - loom::model(|| { - let bytes_num = 3; - let num = Arc::new(TaskLocalBytesAllocated::new()); - - let threads: Vec<_> = (0..bytes_num) - .map(|_| { - let num = num.clone(); - thread::spawn(move || { - num.add(1); - num.sub(1) - }) - }) - .collect(); - - // How many times the bytes have been dropped. - let mut drop_num = 0; - for t in threads { - if t.join().unwrap() { - drop_num += 1; - } - } - - // Ensure the counter is dropped. - assert_eq!(drop_num, 1); - }); -}