Skip to content

Commit

Permalink
chore(allocator): remove things related to task_local_alloc (#15976)
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Mar 28, 2024
1 parent 6635712 commit af11697
Show file tree
Hide file tree
Showing 14 changed files with 3 additions and 533 deletions.
8 changes: 1 addition & 7 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,7 @@ rustflags = [

# Flags for all targets.
[target.'cfg(all())']
rustflags = [
"--cfg",
"tokio_unstable",
# Uncomment the following two lines to enable `TaskLocalAlloc`.
# "--cfg",
# "enable_task_local_alloc",
]
rustflags = ["--cfg", "tokio_unstable"]

[unstable]
lints = true
47 changes: 0 additions & 47 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ tokio-util = { workspace = true }
tonic = { workspace = true }
tracing = "0.1"

[target.'cfg(enable_task_local_alloc)'.dependencies]
task_stats_alloc = { path = "../utils/task_stats_alloc" }

[target.'cfg(not(madsim))'.dependencies]
workspace-hack = { path = "../workspace-hack" }

Expand Down
72 changes: 1 addition & 71 deletions src/batch/src/task/task_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,8 @@
// limitations under the License.

use std::fmt::{Debug, Formatter};
#[cfg(enable_task_local_alloc)]
use std::future::Future;
use std::panic::AssertUnwindSafe;
use std::sync::Arc;
#[cfg(enable_task_local_alloc)]
use std::time::Duration;

use anyhow::Context;
use futures::StreamExt;
Expand Down Expand Up @@ -51,52 +47,6 @@ use crate::task::BatchTaskContext;
// Now we will only at most have 2 status for each status channel. Running -> Failed or Finished.
pub const TASK_STATUS_BUFFER_SIZE: usize = 2;

/// A special version for batch allocation stat, passed in another task `context` C to report task
/// mem usage 0 bytes at the end.
#[cfg(enable_task_local_alloc)]
pub async fn allocation_stat_for_batch<Fut, T, F, C>(
future: Fut,
interval: Duration,
mut report: F,
context: C,
) -> T
where
Fut: Future<Output = T>,
F: FnMut(usize),
C: BatchTaskContext,
{
use task_stats_alloc::{TaskLocalBytesAllocated, BYTES_ALLOCATED};

BYTES_ALLOCATED
.scope(TaskLocalBytesAllocated::new(), async move {
// The guard has the same lifetime as the counter so that the counter will keep positive
// in the whole scope. When the scope exits, the guard is released, so the counter can
// reach zero eventually and then `drop` itself.
let _guard = Box::new(114514);
let monitor = async move {
let mut interval = tokio::time::interval(interval);
loop {
interval.tick().await;
BYTES_ALLOCATED.with(|bytes| report(bytes.val()));
}
};
let output = tokio::select! {
biased;
_ = monitor => unreachable!(),
output = future => {
// NOTE: Report bytes allocated when the actor ends. We simply report 0 here,
// assuming that all memory allocated by this batch task will be freed at some
// time. Maybe we should introduce a better monitoring strategy for batch memory
// usage.
BYTES_ALLOCATED.with(|_| context.store_mem_usage(0));
output
},
};
output
})
.await
}

/// Send batch task status (local/distributed) to frontend.
///
///
Expand Down Expand Up @@ -538,27 +488,7 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
}
};

#[cfg(enable_task_local_alloc)]
{
// For every fired Batch Task, we will wrap it with allocation stats to report memory
// estimation per task to `BatchManager`.
let ctx1 = self.context.clone();
let ctx2 = self.context.clone();
let alloc_stat_wrap_fut = allocation_stat_for_batch(
fut,
Duration::from_millis(1000),
move |bytes| {
ctx1.store_mem_usage(bytes);
},
ctx2,
);
self.runtime.spawn(alloc_stat_wrap_fut);
}

#[cfg(not(enable_task_local_alloc))]
{
self.runtime.spawn(fut);
}
self.runtime.spawn(fut);

Ok(())
}
Expand Down
3 changes: 0 additions & 3 deletions src/cmd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
workspace-config = { path = "../utils/workspace-config" }
workspace-hack = { path = "../workspace-hack" }

[target.'cfg(enable_task_local_alloc)'.dependencies]
task_stats_alloc = { path = "../utils/task_stats_alloc" }

[lib]
test = false

Expand Down
4 changes: 0 additions & 4 deletions src/cmd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ use risingwave_rt::{init_risingwave_logger, main_okk, LoggerSettings};
#[macro_export]
macro_rules! main {
($component:ident) => {
#[cfg(enable_task_local_alloc)]
risingwave_common::enable_task_local_jemalloc!();

#[cfg(not(enable_task_local_alloc))]
risingwave_common::enable_jemalloc!();

#[cfg_attr(coverage, coverage(off))]
Expand Down
3 changes: 0 additions & 3 deletions src/cmd_all/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ vergen = { version = "8", default-features = false, features = [
"gitcl",
] }

[target.'cfg(enable_task_local_alloc)'.dependencies]
task_stats_alloc = { path = "../utils/task_stats_alloc" }

[[bin]]
name = "risingwave"
path = "src/bin/risingwave.rs"
Expand Down
4 changes: 0 additions & 4 deletions src/cmd_all/src/bin/risingwave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ use strum::IntoEnumIterator;
use strum_macros::{Display, EnumIter, EnumString, IntoStaticStr};
use tracing::Level;

#[cfg(enable_task_local_alloc)]
risingwave_common::enable_task_local_jemalloc!();

#[cfg(not(enable_task_local_alloc))]
risingwave_common::enable_jemalloc!();

const BINARY_NAME: &str = "risingwave";
Expand Down
3 changes: 0 additions & 3 deletions src/stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,6 @@ tokio-stream = "0.1"
tonic = { workspace = true }
tracing = "0.1"

[target.'cfg(enable_task_local_alloc)'.dependencies]
task_stats_alloc = { path = "../utils/task_stats_alloc" }

[target.'cfg(not(madsim))'.dependencies]
workspace-hack = { path = "../workspace-hack" }

Expand Down
25 changes: 1 addition & 24 deletions src/stream/src/task/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,30 +651,7 @@ impl LocalBarrierWorker {
let with_config =
crate::CONFIG.scope(self.actor_manager.env.config().clone(), instrumented);

// TODO: are the following lines still useful?
#[cfg(enable_task_local_alloc)]
{
let metrics = streaming_metrics.clone();
let actor_id_str = actor_id.to_string();
let fragment_id_str = actor_context.fragment_id.to_string();
let allocation_stated = task_stats_alloc::allocation_stat(
with_config,
Duration::from_millis(1000),
move |bytes| {
metrics
.actor_memory_usage
.with_label_values(&[&actor_id_str, &fragment_id_str])
.set(bytes as i64);

actor_context.store_mem_usage(bytes);
},
);
self.actor_manager.runtime.spawn(allocation_stated)
}
#[cfg(not(enable_task_local_alloc))]
{
self.actor_manager.runtime.spawn(with_config)
}
self.actor_manager.runtime.spawn(with_config)
};
self.actor_manager_state.handles.insert(actor_id, handle);

Expand Down
32 changes: 0 additions & 32 deletions src/utils/task_stats_alloc/Cargo.toml

This file was deleted.

Loading

0 comments on commit af11697

Please sign in to comment.