Skip to content

Commit

Permalink
disable conflict check for stand alone table
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Jan 18, 2024
1 parent c902aad commit bcb2f87
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 4 deletions.
5 changes: 5 additions & 0 deletions src/stream/src/executor/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub struct ActorContext {

pub streaming_metrics: Arc<StreamingMetrics>,
pub error_suppressor: Arc<Mutex<ErrorSuppressor>>,

pub dispatch_num: usize,
}

pub type ActorContextRef = Arc<ActorContext>;
Expand All @@ -63,6 +65,7 @@ impl ActorContext {
total_mem_val: Arc::new(TrAdder::new()),
streaming_metrics: Arc::new(StreamingMetrics::unused()),
error_suppressor: Arc::new(Mutex::new(ErrorSuppressor::new(10))),
dispatch_num: 0,
})
}

Expand All @@ -72,6 +75,7 @@ impl ActorContext {
total_mem_val: Arc<TrAdder<i64>>,
streaming_metrics: Arc<StreamingMetrics>,
unique_user_errors: usize,
dispatch_num: usize,
) -> ActorContextRef {
Arc::new(Self {
id,
Expand All @@ -81,6 +85,7 @@ impl ActorContext {
total_mem_val,
streaming_metrics,
error_suppressor: Arc::new(Mutex::new(ErrorSuppressor::new(unique_user_errors))),
dispatch_num,
})
}

Expand Down
89 changes: 85 additions & 4 deletions src/stream/src/executor/mview/materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ use crate::common::table::state_table::StateTableInner;
use crate::executor::error::StreamExecutorError;
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{
expect_first_barrier, ActorContext, ActorContextRef, BoxedExecutor, BoxedMessageStream,
Executor, ExecutorInfo, Message, PkIndicesRef, StreamExecutorResult,
expect_first_barrier, ActorContext, ActorContextRef, AddMutation, BoxedExecutor,
BoxedMessageStream, Executor, ExecutorInfo, Message, Mutation, PkIndicesRef,
StreamExecutorResult, UpdateMutation,
};
use crate::task::AtomicU64Ref;
use crate::task::{ActorId, AtomicU64Ref};

/// `MaterializeExecutor` materializes changes in stream into a materialized view on storage.
pub struct MaterializeExecutor<S: StateStore, SD: ValueRowSerde> {
Expand Down Expand Up @@ -83,7 +84,18 @@ impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
) -> Self {
let arrange_key_indices: Vec<usize> = arrange_key.iter().map(|k| k.column_index).collect();

let state_table = StateTableInner::from_table_catalog(table_catalog, store, vnodes).await;
let can_disable_conflict_check = actor_context.dispatch_num == 0;

let state_table = if matches!(
conflict_behavior,
ConflictBehavior::Overwrite | ConflictBehavior::IgnoreConflict
) && can_disable_conflict_check
{
// Table could disable conflict check if no downstream mv depends on it, so we use a inconsistent_op to skip sanity check as well.
StateTableInner::from_table_catalog_inconsistent_op(table_catalog, store, vnodes).await
} else {
StateTableInner::from_table_catalog(table_catalog, store, vnodes).await
};

let metrics_info =
MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");
Expand Down Expand Up @@ -115,6 +127,8 @@ impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
// The first barrier message should be propagated.
yield Message::Barrier(barrier);

let mut can_disable_conflict_check = self.actor_context.dispatch_num == 0;

#[for_await]
for msg in input {
let msg = msg?;
Expand All @@ -130,6 +144,13 @@ impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
.inc_by(chunk.cardinality() as u64);

match self.conflict_behavior {
ConflictBehavior::Overwrite | ConflictBehavior::IgnoreConflict
if can_disable_conflict_check =>
{
self.state_table.write_chunk(chunk.clone());
self.state_table.try_flush().await?;
continue;
}
ConflictBehavior::Overwrite | ConflictBehavior::IgnoreConflict => {
if chunk.cardinality() == 0 {
// empty chunk
Expand Down Expand Up @@ -191,6 +212,11 @@ impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
}
Message::Barrier(b) => {
self.state_table.commit(b.epoch).await?;
let mutation = b.mutation.clone();
// If a downstream mv depends on the current table, we need to do conflict check again.
if Self::mutation_might_affect_dispatcher(mutation, self.actor_context.id) {
can_disable_conflict_check = false;
}

// Update the vnode bitmap for the state table if asked.
if let Some(vnode_bitmap) = b.as_update_vnode_bitmap(self.actor_context.id) {
Expand All @@ -207,6 +233,61 @@ impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
}
}
}

fn mutation_might_affect_dispatcher(
mutation: Option<Arc<Mutation>>,
actor_id: ActorId,
) -> bool {
let Some(mutation) = mutation.as_deref() else {
return false;
};
match mutation {
Mutation::Add(AddMutation { adds, .. }) => {
if adds.get(&actor_id).is_some() {
return true;
}
}
Mutation::Update(UpdateMutation {
dispatchers,
actor_new_dispatchers: actor_dispatchers,
..
}) => {
if actor_dispatchers.get(&actor_id).is_some() {
return true;
}

if dispatchers.get(&actor_id).is_some() {
return true;
}
}
Mutation::AddAndUpdate(
AddMutation { adds, .. },
UpdateMutation {
dispatchers,
actor_new_dispatchers: actor_dispatchers,
..
},
) => {
if adds.get(&actor_id).is_some() {
return true;
}
if actor_dispatchers.get(&actor_id).is_some() {
return true;
}

if dispatchers.get(&actor_id).is_some() {
return true;
}
}
Mutation::Stop(stops) => {
if stops.contains(&actor_id) {
return true;
}
}
_ => {}
};
false
}
}

impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/task/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,7 @@ impl LocalStreamManagerCore {
self.total_mem_val.clone(),
self.streaming_metrics.clone(),
self.config.unique_user_stream_errors,
actor.dispatcher.len(),
);
let vnode_bitmap = actor.vnode_bitmap.as_ref().map(|b| b.into());
let expr_context = actor.expr_context.clone().unwrap();
Expand Down

0 comments on commit bcb2f87

Please sign in to comment.