diff --git a/src/stream/src/from_proto/dynamic_filter.rs b/src/stream/src/from_proto/dynamic_filter.rs index c09fd767ad9ef..be744354c3d14 100644 --- a/src/stream/src/from_proto/dynamic_filter.rs +++ b/src/stream/src/from_proto/dynamic_filter.rs @@ -36,11 +36,7 @@ impl ExecutorBuilder for DynamicFilterExecutorBuilder { let [source_l, source_r]: [_; 2] = params.input.try_into().unwrap(); let key_l = node.get_left_key() as usize; - let vnodes = Arc::new( - params - .vnode_bitmap - .expect("vnodes not set for dynamic filter"), - ); + let vnodes = params.vnode_bitmap.map(Arc::new); let prost_condition = node.get_condition()?; let comparator = prost_condition.get_function_type()?; @@ -63,12 +59,9 @@ impl ExecutorBuilder for DynamicFilterExecutorBuilder { let cleaned_by_watermark = left_table.get_cleaned_by_watermark(); let exec = if cleaned_by_watermark { - let state_table_l = WatermarkCacheStateTable::from_table_catalog( - node.get_left_table()?, - store, - Some(vnodes), - ) - .await; + let state_table_l = + WatermarkCacheStateTable::from_table_catalog(node.get_left_table()?, store, vnodes) + .await; DynamicFilterExecutor::new( params.actor_context, @@ -87,7 +80,7 @@ impl ExecutorBuilder for DynamicFilterExecutorBuilder { .boxed() } else { let state_table_l = - StateTable::from_table_catalog(node.get_left_table()?, store, Some(vnodes)).await; + StateTable::from_table_catalog(node.get_left_table()?, store, vnodes).await; DynamicFilterExecutor::new( params.actor_context,