Skip to content

Commit

Permalink
fix left singleton table bug
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Mar 4, 2024
1 parent 786b10b commit bf03db6
Showing 1 changed file with 5 additions and 12 deletions.
17 changes: 5 additions & 12 deletions src/stream/src/from_proto/dynamic_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,7 @@ impl ExecutorBuilder for DynamicFilterExecutorBuilder {
let [source_l, source_r]: [_; 2] = params.input.try_into().unwrap();
let key_l = node.get_left_key() as usize;

let vnodes = Arc::new(
params
.vnode_bitmap
.expect("vnodes not set for dynamic filter"),
);
let vnodes = params.vnode_bitmap.map(Arc::new);

let prost_condition = node.get_condition()?;
let comparator = prost_condition.get_function_type()?;
Expand All @@ -63,12 +59,9 @@ impl ExecutorBuilder for DynamicFilterExecutorBuilder {
let cleaned_by_watermark = left_table.get_cleaned_by_watermark();

let exec = if cleaned_by_watermark {
let state_table_l = WatermarkCacheStateTable::from_table_catalog(
node.get_left_table()?,
store,
Some(vnodes),
)
.await;
let state_table_l =
WatermarkCacheStateTable::from_table_catalog(node.get_left_table()?, store, vnodes)
.await;

DynamicFilterExecutor::new(
params.actor_context,
Expand All @@ -87,7 +80,7 @@ impl ExecutorBuilder for DynamicFilterExecutorBuilder {
.boxed()
} else {
let state_table_l =
StateTable::from_table_catalog(node.get_left_table()?, store, Some(vnodes)).await;
StateTable::from_table_catalog(node.get_left_table()?, store, vnodes).await;

DynamicFilterExecutor::new(
params.actor_context,
Expand Down

0 comments on commit bf03db6

Please sign in to comment.