Skip to content

Commit

Permalink
fixed all single-thread cycles; multi-thread still not working
Browse files Browse the repository at this point in the history
  • Loading branch information
carljm committed Dec 18, 2024
1 parent 00acc56 commit 5202579
Show file tree
Hide file tree
Showing 20 changed files with 317 additions and 196 deletions.
4 changes: 4 additions & 0 deletions src/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ impl<A: Accumulator> Ingredient for IngredientImpl<A> {
panic!("nothing should ever depend on an accumulator directly")
}

fn is_verified_final<'db>(&'db self, _db: &'db dyn Database, _input: Id) -> bool {
false
}

fn cycle_recovery_strategy(&self) -> CycleRecoveryStrategy {
CycleRecoveryStrategy::Panic
}
Expand Down
6 changes: 4 additions & 2 deletions src/active_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,15 @@ impl ActiveQuery {
durability: Durability,
revision: Revision,
accumulated: InputAccumulatedValues,
cycle_heads: &FxHashSet<DatabaseKeyIndex>,
cycle_heads: Option<&FxHashSet<DatabaseKeyIndex>>,
) {
self.input_outputs.insert((EdgeKind::Input, input));
self.durability = self.durability.min(durability);
self.changed_at = self.changed_at.max(revision);
self.accumulated.add_input(accumulated);
self.cycle_heads.extend(cycle_heads);
if let Some(cycle_heads) = cycle_heads {
self.cycle_heads.extend(cycle_heads);
}
}

pub(super) fn add_untracked_read(&mut self, changed_at: Revision) {
Expand Down
5 changes: 5 additions & 0 deletions src/cycle.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/// The maximum number of times we'll fixpoint-iterate before panicking.
///
/// Should only be relevant in case of a badly configured cycle recovery.
pub const MAX_ITERATIONS: u32 = 200;

/// Return value from a cycle recovery function.
#[derive(Debug)]
pub enum CycleRecoveryAction<T> {
Expand Down
5 changes: 5 additions & 0 deletions src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ where
self.maybe_changed_after(db, key, revision)
}

fn is_verified_final<'db>(&'db self, db: &'db dyn Database, input: Id) -> bool {
self.get_memo_from_table_for(db.zalsa(), input)
.is_some_and(|memo| !memo.may_be_provisional())
}

fn cycle_recovery_strategy(&self) -> CycleRecoveryStrategy {
C::CYCLE_STRATEGY
}
Expand Down
51 changes: 19 additions & 32 deletions src/function/execute.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::sync::Arc;

use crate::{zalsa::ZalsaDatabase, Database, DatabaseKeyIndex, Event, EventKind};
use crate::{
cycle::MAX_ITERATIONS, zalsa::ZalsaDatabase, Database, DatabaseKeyIndex, Event, EventKind,
};

use super::{memo::Memo, Configuration, IngredientImpl};

Expand Down Expand Up @@ -103,51 +105,36 @@ where
) {
crate::CycleRecoveryAction::Iterate => {
tracing::debug!("{database_key_index:?}: execute: iterate again");
iteration_count = iteration_count.checked_add(1).expect(
"fixpoint iteration of {database_key_index:#?} should \
converge before u32::MAX iterations",
);
opt_last_provisional = Some(self.insert_memo(
zalsa,
id,
Memo::new(Some(new_value), revision_now, revisions),
));
continue;
}
crate::CycleRecoveryAction::Fallback(fallback_value) => {
tracing::debug!(
"{database_key_index:?}: execute: user cycle_fn says to fall back"
);
new_value = fallback_value;
// We have to insert the fallback value for this query and then iterate
// one more time to fill in correct values for everything else in the
// cycle based on it; then we'll re-insert it as final value.
}
}
}
iteration_count = iteration_count.checked_add(1).expect(
"fixpoint iteration of {database_key_index:#?} should \
iteration_count = iteration_count.checked_add(1).expect(
"fixpoint iteration of {database_key_index:#?} should \
converge before u32::MAX iterations",
);
if iteration_count > 10 {
panic!("too much iteration");
);
if iteration_count > MAX_ITERATIONS {
panic!("{database_key_index:?}: execute: too many cycle iterations");
}
opt_last_provisional = Some(self.insert_memo(
zalsa,
id,
Memo::new(Some(new_value), revision_now, revisions),
));
continue;
}
// This is no longer a provisional result, it's our final result, so remove ourself
// from the cycle heads, and iterate one last time to remove ourself from all other
// results in the cycle as well and turn them into usable cached results.
// TODO Can we avoid doing this? the extra iteration is quite expensive if there is
// a nested cycle. Maybe track the relevant memos and replace them all with the
// cycle head removed? Or just let them keep the cycle head and allow cycle memos
// to be used when we are not actually iterating the cycle for that head?
tracing::debug!(
"{database_key_index:?}: execute: fixpoint iteration has a final value, \
one more iteration to remove cycle heads from memos"
"{database_key_index:?}: execute: fixpoint iteration has a final value"
);
revisions.cycle_heads.remove(&database_key_index);
dbg!(&revisions.cycle_heads);
self.insert_memo(
zalsa,
id,
Memo::new(Some(new_value), revision_now, revisions),
);
continue;
}

tracing::debug!("{database_key_index:?}: execute: result.revisions = {revisions:#?}");
Expand Down
12 changes: 7 additions & 5 deletions src/function/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ where
durability,
changed_at,
InputAccumulatedValues::from_map(&memo.revisions.accumulated),
&memo.revisions.cycle_heads,
memo.cycle_heads(),
);

value
Expand All @@ -54,8 +54,7 @@ where
let memo_guard = self.get_memo_from_table_for(zalsa, id);
if let Some(memo) = &memo_guard {
if memo.value.is_some()
&& !memo.is_provisional()
&& self.shallow_verify_memo(db, zalsa, self.database_key_index(id), memo)
&& self.shallow_verify_memo(db, zalsa, self.database_key_index(id), memo, false)
{
// Unsafety invariant: memo is present in memo_map
unsafe {
Expand Down Expand Up @@ -84,9 +83,10 @@ where
let memo_guard = self.get_memo_from_table_for(zalsa, id);
if let Some(memo) = &memo_guard {
dbg!("found provisional value, shallow verifying it");
dbg!(memo.tracing_debug());
if memo.value.is_some()
&& memo.revisions.cycle_heads.contains(&database_key_index)
&& self.shallow_verify_memo(db, zalsa, database_key_index, memo)
&& self.shallow_verify_memo(db, zalsa, database_key_index, memo, true)
{
dbg!("verified provisional value, returning it");
dbg!(&memo.value);
Expand Down Expand Up @@ -146,6 +146,8 @@ where
}
}

Some(self.execute(db, database_key_index, opt_old_memo))
let memo = self.execute(db, database_key_index, opt_old_memo);

Some(memo)
}
}
49 changes: 39 additions & 10 deletions src/function/maybe_changed_after.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ where
// Check if we have a verified version: this is the hot path.
let memo_guard = self.get_memo_from_table_for(zalsa, id);
if let Some(memo) = &memo_guard {
if !memo.is_provisional()
&& self.shallow_verify_memo(db, zalsa, database_key_index, memo)
{
if self.shallow_verify_memo(db, zalsa, database_key_index, memo, false) {
return VerifyResult::changed_if(memo.revisions.changed_at > revision);
}
drop(memo_guard); // release the arc-swap guard before cold path
Expand Down Expand Up @@ -150,14 +148,25 @@ where
zalsa: &Zalsa,
database_key_index: DatabaseKeyIndex,
memo: &Memo<C::Output<'_>>,
allow_provisional: bool,
) -> bool {
let verified_at = memo.verified_at.load();
let revision_now = zalsa.current_revision();

tracing::debug!(
"{database_key_index:?}: shallow_verify_memo(memo = {memo:#?})",
memo = memo.tracing_debug()
);
if !allow_provisional {
if memo.may_be_provisional() {
tracing::debug!(
"{database_key_index:?}: validate_provisional(memo = {memo:#?})",
memo = memo.tracing_debug()
);
if !self.validate_provisional(db, zalsa, memo) {
return false;
}
}
}
let verified_at = memo.verified_at.load();
let revision_now = zalsa.current_revision();

if verified_at == revision_now {
// Already verified.
Expand All @@ -175,6 +184,26 @@ where
false
}

/// Check if this memo's cycle heads have all been finalized. If so, mark it verified final and
/// return true, if not return false.
fn validate_provisional(
&self,
db: &C::DbView,
zalsa: &Zalsa,
memo: &Memo<C::Output<'_>>,
) -> bool {
for cycle_head in &memo.revisions.cycle_heads {
if !zalsa
.lookup_ingredient(cycle_head.ingredient_index)
.is_verified_final(db.as_dyn_database(), cycle_head.key_index)
{
return false;
}
}
memo.verified_final.store(true);
true
}

/// VerifyResult::Unchanged if the memo's value and `changed_at` time is up to date in the
/// current revision. When this returns Unchanged with no cycle heads, it also updates the
/// memo's `verified_at` field if needed to make future calls cheaper.
Expand All @@ -188,9 +217,6 @@ where
old_memo: &Memo<C::Output<'_>>,
active_query: &ActiveQueryGuard<'_>,
) -> VerifyResult {
if old_memo.is_provisional() {
return VerifyResult::Changed;
}
let zalsa = db.zalsa();
let database_key_index = active_query.database_key_index;

Expand All @@ -199,9 +225,12 @@ where
old_memo = old_memo.tracing_debug()
);

if self.shallow_verify_memo(db, zalsa, database_key_index, old_memo) {
if self.shallow_verify_memo(db, zalsa, database_key_index, old_memo, false) {
return VerifyResult::Unchanged(Default::default());
}
if old_memo.may_be_provisional() {
return VerifyResult::Changed;
}

loop {
let mut cycle_heads = FxHashSet::default();
Expand Down
21 changes: 18 additions & 3 deletions src/function/memo.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use rustc_hash::FxHashSet;
use std::any::Any;
use std::fmt::Debug;
use std::fmt::Formatter;
Expand Down Expand Up @@ -109,6 +110,9 @@ pub(super) struct Memo<V> {
/// as the current revision.
pub(super) verified_at: AtomicCell<Revision>,

/// Is this memo verified to not be a provisional cycle result?
pub(super) verified_final: AtomicCell<bool>,

/// Revision information
pub(super) revisions: QueryRevisions,
}
Expand All @@ -118,13 +122,23 @@ impl<V> Memo<V> {
Memo {
value,
verified_at: AtomicCell::new(revision_now),
verified_final: AtomicCell::new(revisions.cycle_heads.is_empty()),
revisions,
}
}

/// True if this is a provisional cycle-iteration result.
pub(super) fn is_provisional(&self) -> bool {
!self.revisions.cycle_heads.is_empty()
/// True if this is may be a provisional cycle-iteration result.
pub(super) fn may_be_provisional(&self) -> bool {
!self.verified_final.load()
}

/// Cycle heads that should be propagated to dependent queries.
pub(super) fn cycle_heads(&self) -> Option<&FxHashSet<DatabaseKeyIndex>> {
if self.may_be_provisional() {
Some(&self.revisions.cycle_heads)
} else {
None
}
}

/// True if this memo is known not to have changed based on its durability.
Expand Down Expand Up @@ -185,6 +199,7 @@ impl<V> Memo<V> {
},
)
.field("verified_at", &self.memo.verified_at)
.field("verified_final", &self.memo.verified_final)
.field("revisions", &self.memo.revisions)
.finish()
}
Expand Down
1 change: 1 addition & 0 deletions src/function/specify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ where
let memo = Memo {
value: Some(value),
verified_at: AtomicCell::new(revision),
verified_final: AtomicCell::new(true),
revisions,
};

Expand Down
3 changes: 3 additions & 0 deletions src/ingredient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ pub trait Ingredient: Any + std::fmt::Debug + Send + Sync {
revision: Revision,
) -> VerifyResult;

/// Is the value for `input` in this ingredient marked as possibly a provisional cycle value?
fn is_verified_final<'db>(&'db self, db: &'db dyn Database, input: Id) -> bool;

/// What were the inputs (if any) that were used to create the value at `key_index`.
fn origin(&self, db: &dyn Database, key_index: Id) -> Option<QueryOrigin>;

Expand Down
6 changes: 5 additions & 1 deletion src/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl<C: Configuration> IngredientImpl<C> {
stamp.durability,
stamp.changed_at,
InputAccumulatedValues::Empty,
&Default::default(),
None,
);
&value.fields
}
Expand Down Expand Up @@ -222,6 +222,10 @@ impl<C: Configuration> Ingredient for IngredientImpl<C> {
VerifyResult::unchanged()
}

fn is_verified_final<'db>(&'db self, _db: &'db dyn Database, _input: Id) -> bool {
false
}

fn cycle_recovery_strategy(&self) -> CycleRecoveryStrategy {
CycleRecoveryStrategy::Panic
}
Expand Down
4 changes: 4 additions & 0 deletions src/input/input_field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ where
VerifyResult::changed_if(value.stamps[self.field_index].changed_at > revision)
}

fn is_verified_final<'db>(&'db self, _db: &'db dyn Database, _input: Id) -> bool {
false
}

fn origin(&self, _db: &dyn Database, _key_index: Id) -> Option<QueryOrigin> {
None
}
Expand Down
6 changes: 5 additions & 1 deletion src/interned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ where
Durability::MAX,
self.reset_at,
InputAccumulatedValues::Empty,
&Default::default(),
None,
);

// Optimisation to only get read lock on the map if the data has already
Expand Down Expand Up @@ -226,6 +226,10 @@ where
VerifyResult::changed_if(revision < self.reset_at)
}

fn is_verified_final<'db>(&'db self, _db: &'db dyn Database, _input: Id) -> bool {
false
}

fn cycle_recovery_strategy(&self) -> crate::cycle::CycleRecoveryStrategy {
crate::cycle::CycleRecoveryStrategy::Panic
}
Expand Down
3 changes: 3 additions & 0 deletions src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,10 @@ impl Runtime {
let mut dg = self.dependency_graph.lock();
let thread_id = std::thread::current().id();

eprintln!("Runtime::block_on {database_key:?}, I am {thread_id:?}, other id {other_id:?}");

if dg.depends_on(other_id, thread_id) {
eprintln!("thread dependency cycle");
return BlockResult::Cycle;
}

Expand Down
Loading

0 comments on commit 5202579

Please sign in to comment.