From bf03db60fcb4fb513107ccca4e90baaf2643c254 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Mon, 4 Mar 2024 15:45:52 +0800 Subject: [PATCH] fix left singleton table bug Signed-off-by: Richard Chien --- src/stream/src/from_proto/dynamic_filter.rs | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/src/stream/src/from_proto/dynamic_filter.rs b/src/stream/src/from_proto/dynamic_filter.rs index c09fd767ad9ef..be744354c3d14 100644 --- a/src/stream/src/from_proto/dynamic_filter.rs +++ b/src/stream/src/from_proto/dynamic_filter.rs @@ -36,11 +36,7 @@ impl ExecutorBuilder for DynamicFilterExecutorBuilder { let [source_l, source_r]: [_; 2] = params.input.try_into().unwrap(); let key_l = node.get_left_key() as usize; - let vnodes = Arc::new( - params - .vnode_bitmap - .expect("vnodes not set for dynamic filter"), - ); + let vnodes = params.vnode_bitmap.map(Arc::new); let prost_condition = node.get_condition()?; let comparator = prost_condition.get_function_type()?; @@ -63,12 +59,9 @@ impl ExecutorBuilder for DynamicFilterExecutorBuilder { let cleaned_by_watermark = left_table.get_cleaned_by_watermark(); let exec = if cleaned_by_watermark { - let state_table_l = WatermarkCacheStateTable::from_table_catalog( - node.get_left_table()?, - store, - Some(vnodes), - ) - .await; + let state_table_l = + WatermarkCacheStateTable::from_table_catalog(node.get_left_table()?, store, vnodes) + .await; DynamicFilterExecutor::new( params.actor_context, @@ -87,7 +80,7 @@ impl ExecutorBuilder for DynamicFilterExecutorBuilder { .boxed() } else { let state_table_l = - StateTable::from_table_catalog(node.get_left_table()?, store, Some(vnodes)).await; + StateTable::from_table_catalog(node.get_left_table()?, store, vnodes).await; DynamicFilterExecutor::new( params.actor_context,