From bf03db60fcb4fb513107ccca4e90baaf2643c254 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Mon, 4 Mar 2024 15:45:52 +0800 Subject: [PATCH 1/3] fix left singleton table bug Signed-off-by: Richard Chien --- src/stream/src/from_proto/dynamic_filter.rs | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/src/stream/src/from_proto/dynamic_filter.rs b/src/stream/src/from_proto/dynamic_filter.rs index c09fd767ad9e..be744354c3d1 100644 --- a/src/stream/src/from_proto/dynamic_filter.rs +++ b/src/stream/src/from_proto/dynamic_filter.rs @@ -36,11 +36,7 @@ impl ExecutorBuilder for DynamicFilterExecutorBuilder { let [source_l, source_r]: [_; 2] = params.input.try_into().unwrap(); let key_l = node.get_left_key() as usize; - let vnodes = Arc::new( - params - .vnode_bitmap - .expect("vnodes not set for dynamic filter"), - ); + let vnodes = params.vnode_bitmap.map(Arc::new); let prost_condition = node.get_condition()?; let comparator = prost_condition.get_function_type()?; @@ -63,12 +59,9 @@ impl ExecutorBuilder for DynamicFilterExecutorBuilder { let cleaned_by_watermark = left_table.get_cleaned_by_watermark(); let exec = if cleaned_by_watermark { - let state_table_l = WatermarkCacheStateTable::from_table_catalog( - node.get_left_table()?, - store, - Some(vnodes), - ) - .await; + let state_table_l = + WatermarkCacheStateTable::from_table_catalog(node.get_left_table()?, store, vnodes) + .await; DynamicFilterExecutor::new( params.actor_context, @@ -87,7 +80,7 @@ impl ExecutorBuilder for DynamicFilterExecutorBuilder { .boxed() } else { let state_table_l = - StateTable::from_table_catalog(node.get_left_table()?, store, Some(vnodes)).await; + StateTable::from_table_catalog(node.get_left_table()?, store, vnodes).await; DynamicFilterExecutor::new( params.actor_context, From 3cc388daa29fc98d4e3992309b7195650096fc22 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Mon, 4 Mar 2024 16:00:19 +0800 Subject: [PATCH 2/3] add e2e Signed-off-by: Richard Chien --- e2e_test/streaming/bug_fixes/issue_15302.slt | 38 ++++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 e2e_test/streaming/bug_fixes/issue_15302.slt diff --git a/e2e_test/streaming/bug_fixes/issue_15302.slt b/e2e_test/streaming/bug_fixes/issue_15302.slt new file mode 100644 index 000000000000..6d0bd01716cc --- /dev/null +++ b/e2e_test/streaming/bug_fixes/issue_15302.slt @@ -0,0 +1,38 @@ +# https://github.com/risingwavelabs/risingwave/issues/15302 + +statement ok +set RW_IMPLICIT_FLUSH = true; + +statement ok +create materialized view test_last_ingestion_time as ( +select 'table_a' as source, TO_TIMESTAMP('2024-01-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS') as last_ingestion_time +union all +select 'table_b' as source, TO_TIMESTAMP('2024-01-01 00:00:01', 'YYYY-MM-DD HH24:MI:SS') as last_ingestion_time +); + +statement ok +create materialized view test_records as ( +select 1 as id, TO_TIMESTAMP('2024-01-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS') as record_timestamp +union all +select 2 as id, TO_TIMESTAMP('2024-01-01 00:00:01', 'YYYY-MM-DD HH24:MI:SS') as record_timestamp +union all +select 3 as id, TO_TIMESTAMP('2024-01-01 00:00:02', 'YYYY-MM-DD HH24:MI:SS') as record_timestamp +); + +statement ok +create materialized view test_window as ( +with time_window as ( + select max(last_ingestion_time) as window_end + from test_last_ingestion_time +) +select + id + from test_records, time_window + where record_timestamp >= window_end +); + +query i +select * from test_window; +---- +2 +3 From 52bca99882a8b00eaba2c72bc609d5507313cdd8 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Tue, 5 Mar 2024 15:13:43 +0800 Subject: [PATCH 3/3] inc e2e timeout Signed-off-by: Richard Chien --- ci/workflows/pull-request.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index ab96ae00af39..f40c578e24b8 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -96,7 +96,7 @@ steps: config: ci/docker-compose.yml mount-buildkite-agent: true - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 18 + timeout_in_minutes: 19 retry: *auto-retry - label: "end-to-end test (parallel)"