diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index a8c143f768a86..33a0c6c1498f3 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -96,7 +96,7 @@ steps: config: ci/docker-compose.yml mount-buildkite-agent: true - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 18 + timeout_in_minutes: 19 retry: *auto-retry - label: "end-to-end test (parallel)" diff --git a/e2e_test/streaming/bug_fixes/issue_15302.slt b/e2e_test/streaming/bug_fixes/issue_15302.slt new file mode 100644 index 0000000000000..6d0bd01716cc6 --- /dev/null +++ b/e2e_test/streaming/bug_fixes/issue_15302.slt @@ -0,0 +1,38 @@ +# https://github.com/risingwavelabs/risingwave/issues/15302 + +statement ok +set RW_IMPLICIT_FLUSH = true; + +statement ok +create materialized view test_last_ingestion_time as ( +select 'table_a' as source, TO_TIMESTAMP('2024-01-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS') as last_ingestion_time +union all +select 'table_b' as source, TO_TIMESTAMP('2024-01-01 00:00:01', 'YYYY-MM-DD HH24:MI:SS') as last_ingestion_time +); + +statement ok +create materialized view test_records as ( +select 1 as id, TO_TIMESTAMP('2024-01-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS') as record_timestamp +union all +select 2 as id, TO_TIMESTAMP('2024-01-01 00:00:01', 'YYYY-MM-DD HH24:MI:SS') as record_timestamp +union all +select 3 as id, TO_TIMESTAMP('2024-01-01 00:00:02', 'YYYY-MM-DD HH24:MI:SS') as record_timestamp +); + +statement ok +create materialized view test_window as ( +with time_window as ( + select max(last_ingestion_time) as window_end + from test_last_ingestion_time +) +select + id + from test_records, time_window + where record_timestamp >= window_end +); + +query i +select * from test_window; +---- +2 +3 diff --git a/src/stream/src/from_proto/dynamic_filter.rs b/src/stream/src/from_proto/dynamic_filter.rs index c09fd767ad9ef..be744354c3d14 100644 --- a/src/stream/src/from_proto/dynamic_filter.rs +++ b/src/stream/src/from_proto/dynamic_filter.rs @@ -36,11 +36,7 @@ impl ExecutorBuilder for DynamicFilterExecutorBuilder { let [source_l, source_r]: [_; 2] = params.input.try_into().unwrap(); let key_l = node.get_left_key() as usize; - let vnodes = Arc::new( - params - .vnode_bitmap - .expect("vnodes not set for dynamic filter"), - ); + let vnodes = params.vnode_bitmap.map(Arc::new); let prost_condition = node.get_condition()?; let comparator = prost_condition.get_function_type()?; @@ -63,12 +59,9 @@ impl ExecutorBuilder for DynamicFilterExecutorBuilder { let cleaned_by_watermark = left_table.get_cleaned_by_watermark(); let exec = if cleaned_by_watermark { - let state_table_l = WatermarkCacheStateTable::from_table_catalog( - node.get_left_table()?, - store, - Some(vnodes), - ) - .await; + let state_table_l = + WatermarkCacheStateTable::from_table_catalog(node.get_left_table()?, store, vnodes) + .await; DynamicFilterExecutor::new( params.actor_context, @@ -87,7 +80,7 @@ impl ExecutorBuilder for DynamicFilterExecutorBuilder { .boxed() } else { let state_table_l = - StateTable::from_table_catalog(node.get_left_table()?, store, Some(vnodes)).await; + StateTable::from_table_catalog(node.get_left_table()?, store, vnodes).await; DynamicFilterExecutor::new( params.actor_context,