Skip to content

Commit

Permalink
[turbopack] Minimal implementation of local Vcs (#68469)
Browse files Browse the repository at this point in the history
*This is a migrated PR. This was in the turbo repository before the
next.js merge.*
**Migrated From:** vercel/turborepo#8780

# Description

Local Vcs store task-local values inside of the current task's state.
The `SharedReference` holding onto their values is dropped when the task
exits.

The contents of a local Vc must still use a refcounted
`SharedReference`/`triomphe::Arc` because they can be turned into
`ReadRef` objects
(https://turbopack-rust-docs.vercel.sh/rustdoc/turbo_tasks/struct.ReadRef.html),
which require a `'static` lifetime. We can experiment with adding a
lifetime to `ReadRef` in a future iteration, but there's little
advantage to doing this until we have local tasks.

# Limitations

This implements *just enough* to create and read local Vcs (with a
test!). There are many things that are still unimplemented with
`todo!()`. Most notably:

- We can't resolve a local `Vc` to a `ResolvedVc`.
- There's no way to return or pass a local `Vc` as an argument yet.
- For safety, we should only allow construction of local `Vc`s in
functions where we know that the return value is a `ResolvedValue`.

Grand plan:
https://www.notion.so/vercel/Resolved-Vcs-Vc-Lifetimes-Local-Vcs-and-Vc-Refcounts-49d666d3f9594017b5b312b87ddc5bff

# Memory Usage

- This increases the size of `Vc`/`RawVc` from 96 bits to 128 bits. With
clever packing this could (in theory) be solved. However, that it
increase the number of machine words (2x 64bit), so it might not have
any real impact after alignment happens.

- This increase the size of `CurrentTaskState`. I suspect this isn't too
bad as there should be a smallish number of tasks active at any given
time.

**I was not able to measure any change to peak memory/heap usage.**

Built using

```
cargo build --release -p next-build-test
```

Ran heaptrack on a single page (`/sink`) in `shadcn/ui` with:

```
cd ~/ui/apps/www
heaptrack ~/nextpack/target/release/next-build-test run sequential 1 1 '/sink'
```

And analyzed the results with

```
heaptrack --analyze /home/bgw.linux/ui/apps/www/heaptrack.next-build-test.3066837.zst | tail -20
```

### Before

```
total runtime: 130.25s.
calls to allocation functions: 48553541 (372786/s)
temporary memory allocations: 3863919 (29666/s)
peak heap memory consumption: 1.13G
peak RSS (including heaptrack overhead): 2.69G
total memory leaked: 4.96M
suppressed leaks: 7.24K
```

```
total runtime: 135.48s.
calls to allocation functions: 48619554 (358863/s)
temporary memory allocations: 3883888 (28667/s)
peak heap memory consumption: 1.12G
peak RSS (including heaptrack overhead): 2.70G
total memory leaked: 4.71M
suppressed leaks: 7.24K
```

### After

```
total runtime: 157.20s.
calls to allocation functions: 48509638 (308579/s)
temporary memory allocations: 3902883 (24827/s)
peak heap memory consumption: 1.13G
peak RSS (including heaptrack overhead): 2.70G
total memory leaked: 4.86M
suppressed leaks: 7.24K
```

```
total runtime: 130.25s.
calls to allocation functions: 48553541 (372786/s)
temporary memory allocations: 3863919 (29666/s)
peak heap memory consumption: 1.13G
peak RSS (including heaptrack overhead): 2.69G
total memory leaked: 4.96M
suppressed leaks: 7.24K
```

# Testing Instructions

```
cargo nextest r -p turbo-tasks -p turbo-tasks-memory
```
  • Loading branch information
bgw authored Aug 5, 2024
1 parent 8886d2b commit 9511ae2
Show file tree
Hide file tree
Showing 8 changed files with 239 additions and 69 deletions.
10 changes: 10 additions & 0 deletions turbopack/crates/turbo-tasks-macros/src/value_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,16 @@ pub fn value(args: TokenStream, input: TokenStream) -> TokenStream {
let content = self;
turbo_tasks::Vc::cell_private(#cell_access_content)
}

/// Places a value in a task-local cell stored in the current task.
///
/// Task-local cells are stored in a task-local arena, and do not persist outside the
/// lifetime of the current task (including child tasks). Task-local cells can be resolved
/// to be converted into normal cells.
#cell_prefix fn local_cell(self) -> turbo_tasks::Vc<Self> {
let content = self;
turbo_tasks::Vc::local_cell_private(#cell_access_content)
}
};

let into = if let IntoMode::New | IntoMode::Shared = into_mode {
Expand Down
2 changes: 1 addition & 1 deletion turbopack/crates/turbo-tasks-memory/src/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl Display for OutputContent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
OutputContent::Empty => write!(f, "empty"),
OutputContent::Link(raw_vc) => write!(f, "link {}", raw_vc),
OutputContent::Link(raw_vc) => write!(f, "link {:?}", raw_vc),
OutputContent::Error(err) => write!(f, "error {}", err),
OutputContent::Panic(Some(message)) => write!(f, "panic {}", message),
OutputContent::Panic(None) => write!(f, "panic"),
Expand Down
45 changes: 45 additions & 0 deletions turbopack/crates/turbo-tasks-memory/tests/local_cell.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#![feature(arbitrary_self_types)]

use turbo_tasks::Vc;
use turbo_tasks_testing::{register, run, Registration};

static REGISTRATION: Registration = register!();

#[turbo_tasks::value]
struct Wrapper(u32);

#[turbo_tasks::value(transparent)]
struct TransparentWrapper(u32);

#[tokio::test]
async fn store_and_read() {
run(&REGISTRATION, async {
let a: Vc<u32> = Vc::local_cell(42);
assert_eq!(*a.await.unwrap(), 42);

let b = Wrapper(42).local_cell();
assert_eq!((*b.await.unwrap()).0, 42);

let c = TransparentWrapper(42).local_cell();
assert_eq!(*c.await.unwrap(), 42);
})
.await
}

#[tokio::test]
async fn store_and_read_generic() {
run(&REGISTRATION, async {
// `Vc<Vec<Vc<T>>>` is stored as `Vc<Vec<Vc<()>>>` and requires special
// transmute handling
let cells: Vc<Vec<Vc<u32>>> =
Vc::local_cell(vec![Vc::local_cell(1), Vc::local_cell(2), Vc::cell(3)]);

let mut output = Vec::new();
for el in cells.await.unwrap() {
output.push(*el.await.unwrap());
}

assert_eq!(output, vec![1, 2, 3]);
})
.await
}
1 change: 1 addition & 0 deletions turbopack/crates/turbo-tasks/src/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ define_id!(ValueTypeId: u32);
define_id!(TraitTypeId: u32);
define_id!(BackendJobId: u32);
define_id!(ExecutionId: u64, derive(Debug));
define_id!(LocalCellId: u32, derive(Debug));

impl Debug for TaskId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand Down
174 changes: 122 additions & 52 deletions turbopack/crates/turbo-tasks/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use crate::{
},
capture_future::{self, CaptureFuture},
event::{Event, EventListener},
id::{BackendJobId, FunctionId, TraitTypeId},
id_factory::IdFactoryWithReuse,
id::{BackendJobId, ExecutionId, FunctionId, LocalCellId, TraitTypeId},
id_factory::{IdFactory, IdFactoryWithReuse},
magic_any::MagicAny,
raw_vc::{CellId, RawVc},
registry,
Expand Down Expand Up @@ -243,6 +243,7 @@ pub struct TurboTasks<B: Backend + 'static> {
this: Weak<Self>,
backend: B,
task_id_factory: IdFactoryWithReuse<TaskId>,
execution_id_factory: IdFactory<ExecutionId>,
stopped: AtomicBool,
currently_scheduled_tasks: AtomicUsize,
currently_scheduled_foreground_jobs: AtomicUsize,
Expand All @@ -257,7 +258,6 @@ pub struct TurboTasks<B: Backend + 'static> {
program_start: Instant,
}

#[derive(Default)]
struct CurrentTaskState {
/// Affected tasks, that are tracked during task execution. These tasks will
/// be invalidated when the execution finishes or before reading a cell
Expand All @@ -266,6 +266,26 @@ struct CurrentTaskState {

/// True if the current task has state in cells
stateful: bool,

/// A unique identifier created for each unique `CurrentTaskState`. Used to
/// check that [`CurrentTaskState::local_cells`] are valid for the current
/// `RawVc::LocalCell`.
execution_id: ExecutionId,

/// Cells for locally allocated Vcs (`RawVc::LocalCell`). This is freed
/// (along with `CurrentTaskState`) when the task finishes executing.
local_cells: Vec<TypedCellContent>,
}

impl CurrentTaskState {
fn new(execution_id: ExecutionId) -> Self {
Self {
tasks_to_notify: Vec::new(),
stateful: false,
execution_id,
local_cells: Vec::new(),
}
}
}

// TODO implement our own thread pool and make these thread locals instead
Expand Down Expand Up @@ -293,6 +313,7 @@ impl<B: Backend + 'static> TurboTasks<B> {
this: this.clone(),
backend,
task_id_factory,
execution_id_factory: IdFactory::new(),
stopped: AtomicBool::new(false),
currently_scheduled_tasks: AtomicUsize::new(0),
currently_scheduled_background_jobs: AtomicUsize::new(0),
Expand Down Expand Up @@ -488,57 +509,61 @@ impl<B: Backend + 'static> TurboTasks<B> {

let this = self.pin();
let future = async move {
#[allow(clippy::blocks_in_conditions)]
while CURRENT_TASK_STATE
.scope(Default::default(), async {
if this.stopped.load(Ordering::Acquire) {
return false;
}
let mut schedule_again = true;
while schedule_again {
let task_state =
RefCell::new(CurrentTaskState::new(this.execution_id_factory.get()));
schedule_again = CURRENT_TASK_STATE
.scope(task_state, async {
if this.stopped.load(Ordering::Acquire) {
return false;
}

// Setup thread locals
CELL_COUNTERS
.scope(Default::default(), async {
let Some(TaskExecutionSpec { future, span }) =
this.backend.try_start_task_execution(task_id, &*this)
else {
return false;
};

async {
let (result, duration, memory_usage) =
CaptureFuture::new(AssertUnwindSafe(future).catch_unwind())
.await;

let result = result.map_err(|any| match any.downcast::<String>() {
Ok(owned) => Some(Cow::Owned(*owned)),
Err(any) => match any.downcast::<&'static str>() {
Ok(str) => Some(Cow::Borrowed(*str)),
Err(_) => None,
},
});
this.backend.task_execution_result(task_id, result, &*this);
let stateful = this.finish_current_task_state();
let cell_counters =
CELL_COUNTERS.with(|cc| take(&mut *cc.borrow_mut()));
let schedule_again = this.backend.task_execution_completed(
task_id,
duration,
memory_usage,
cell_counters,
stateful,
&*this,
);
// task_execution_completed might need to notify tasks
this.notify_scheduled_tasks();
schedule_again
}
.instrument(span)
// Setup thread locals
CELL_COUNTERS
.scope(Default::default(), async {
let Some(TaskExecutionSpec { future, span }) =
this.backend.try_start_task_execution(task_id, &*this)
else {
return false;
};

async {
let (result, duration, memory_usage) =
CaptureFuture::new(AssertUnwindSafe(future).catch_unwind())
.await;

let result =
result.map_err(|any| match any.downcast::<String>() {
Ok(owned) => Some(Cow::Owned(*owned)),
Err(any) => match any.downcast::<&'static str>() {
Ok(str) => Some(Cow::Borrowed(*str)),
Err(_) => None,
},
});
this.backend.task_execution_result(task_id, result, &*this);
let stateful = this.finish_current_task_state();
let cell_counters =
CELL_COUNTERS.with(|cc| take(&mut *cc.borrow_mut()));
let schedule_again = this.backend.task_execution_completed(
task_id,
duration,
memory_usage,
cell_counters,
stateful,
&*this,
);
// task_execution_completed might need to notify tasks
this.notify_scheduled_tasks();
schedule_again
}
.instrument(span)
.await
})
.await
})
.await
})
.await
{}
})
.await;
}
this.finish_primary_job();
anyhow::Ok(())
};
Expand Down Expand Up @@ -841,6 +866,7 @@ impl<B: Backend + 'static> TurboTasks<B> {
let CurrentTaskState {
tasks_to_notify,
stateful,
..
} = &mut *cell.borrow_mut();
(*stateful, take(tasks_to_notify))
});
Expand Down Expand Up @@ -1692,3 +1718,47 @@ pub fn find_cell_by_type(ty: ValueTypeId) -> CurrentCellRef {
}
})
}

pub(crate) fn create_local_cell(value: TypedCellContent) -> (ExecutionId, LocalCellId) {
CURRENT_TASK_STATE.with(|cell| {
let CurrentTaskState {
execution_id,
local_cells,
..
} = &mut *cell.borrow_mut();

// store in the task-local arena
local_cells.push(value);

// generate a one-indexed id
let raw_local_cell_id = local_cells.len();
let local_cell_id = if cfg!(debug_assertions) {
LocalCellId::from(u32::try_from(raw_local_cell_id).unwrap())
} else {
unsafe { LocalCellId::new_unchecked(raw_local_cell_id as u32) }
};

(*execution_id, local_cell_id)
})
}

/// Panics if the ExecutionId does not match the expected value.
pub(crate) fn read_local_cell(
execution_id: ExecutionId,
local_cell_id: LocalCellId,
) -> TypedCellContent {
CURRENT_TASK_STATE.with(|cell| {
let CurrentTaskState {
execution_id: expected_execution_id,
local_cells,
..
} = &*cell.borrow();
assert_eq!(
execution_id, *expected_execution_id,
"This Vc is local. Local Vcs must only be accessed within their own task. Resolve the \
Vc to convert it into a non-local version."
);
// local cell ids are one-indexed (they use NonZeroU32)
local_cells[(*local_cell_id as usize) - 1].clone()
})
}
Loading

0 comments on commit 9511ae2

Please sign in to comment.