Skip to content

Commit

Permalink
fix(dynfilter): left table with singleton dist panic bug (#15406)
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Mar 5, 2024
1 parent 8db895f commit 1259903
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 12 deletions.
38 changes: 38 additions & 0 deletions e2e_test/streaming/bug_fixes/issue_15302.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# https://github.com/risingwavelabs/risingwave/issues/15302

statement ok
set RW_IMPLICIT_FLUSH = true;

statement ok
create materialized view test_last_ingestion_time as (
select 'table_a' as source, TO_TIMESTAMP('2024-01-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS') as last_ingestion_time
union all
select 'table_b' as source, TO_TIMESTAMP('2024-01-01 00:00:01', 'YYYY-MM-DD HH24:MI:SS') as last_ingestion_time
);

statement ok
create materialized view test_records as (
select 1 as id, TO_TIMESTAMP('2024-01-01 00:00:00', 'YYYY-MM-DD HH24:MI:SS') as record_timestamp
union all
select 2 as id, TO_TIMESTAMP('2024-01-01 00:00:01', 'YYYY-MM-DD HH24:MI:SS') as record_timestamp
union all
select 3 as id, TO_TIMESTAMP('2024-01-01 00:00:02', 'YYYY-MM-DD HH24:MI:SS') as record_timestamp
);

statement ok
create materialized view test_window as (
with time_window as (
select max(last_ingestion_time) as window_end
from test_last_ingestion_time
)
select
id
from test_records, time_window
where record_timestamp >= window_end
);

query i
select * from test_window;
----
2
3
17 changes: 5 additions & 12 deletions src/stream/src/from_proto/dynamic_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,7 @@ impl ExecutorBuilder for DynamicFilterExecutorBuilder {
let [source_l, source_r]: [_; 2] = params.input.try_into().unwrap();
let key_l = node.get_left_key() as usize;

let vnodes = Arc::new(
params
.vnode_bitmap
.expect("vnodes not set for dynamic filter"),
);
let vnodes = params.vnode_bitmap.map(Arc::new);

let prost_condition = node.get_condition()?;
let comparator = prost_condition.get_function_type()?;
Expand All @@ -64,12 +60,9 @@ impl ExecutorBuilder for DynamicFilterExecutorBuilder {
let cleaned_by_watermark = left_table.get_cleaned_by_watermark();

if cleaned_by_watermark {
let state_table_l = WatermarkCacheStateTable::from_table_catalog(
node.get_left_table()?,
store,
Some(vnodes),
)
.await;
let state_table_l =
WatermarkCacheStateTable::from_table_catalog(node.get_left_table()?, store, vnodes)
.await;

Ok(Box::new(DynamicFilterExecutor::new(
params.actor_context,
Expand All @@ -87,7 +80,7 @@ impl ExecutorBuilder for DynamicFilterExecutorBuilder {
)))
} else {
let state_table_l =
StateTable::from_table_catalog(node.get_left_table()?, store, Some(vnodes)).await;
StateTable::from_table_catalog(node.get_left_table()?, store, vnodes).await;

Ok(Box::new(DynamicFilterExecutor::new(
params.actor_context,
Expand Down

0 comments on commit 1259903

Please sign in to comment.