diff --git a/e2e_test/streaming/temporal_join/temporal_join_non_loopup_cond.slt b/e2e_test/streaming/temporal_join/temporal_join_non_loopup_cond.slt new file mode 100644 index 0000000000000..cf206bbea0a22 --- /dev/null +++ b/e2e_test/streaming/temporal_join/temporal_join_non_loopup_cond.slt @@ -0,0 +1,51 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + +statement ok +create table version(id2 int, a2 int, b2 int, primary key (id2)); + +statement ok +create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 and a1 > a2; + +statement ok +insert into stream values(1, 11, 111); + +statement ok +insert into version values(1, 12, 111); + +statement ok +insert into stream values(1, 13, 111); + +statement ok +delete from version; + +query IIII rowsort +select * from v; +---- +1 11 NULL NULL +1 13 1 12 + +statement ok +insert into version values(2, 22, 222); + +statement ok +insert into stream values(2, 23, 222); + +query IIII rowsort +select * from v; +---- +1 11 NULL NULL +1 13 1 12 +2 23 2 22 + +statement ok +drop materialized view v; + +statement ok +drop table stream; + +statement ok +drop table version; diff --git a/e2e_test/streaming/temporal_join/temporal_join_watermark.slt b/e2e_test/streaming/temporal_join/temporal_join_watermark.slt index 9479ca6727299..c6c64e5774692 100644 --- a/e2e_test/streaming/temporal_join/temporal_join_watermark.slt +++ b/e2e_test/streaming/temporal_join/temporal_join_watermark.slt @@ -68,6 +68,3 @@ drop table stream cascade; statement ok drop table version cascade; - - - diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index f6c614f41aef4..42a8e7d81a04f 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -24,11 +24,13 @@ use futures::{pin_mut, StreamExt, TryStreamExt}; use futures_async_stream::{for_await, try_stream}; use local_stats_alloc::{SharedStatsAlloc, StatsAlloc}; use lru::DefaultHasher; -use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder; +use risingwave_common::array::{ArrayImpl, Op, StreamChunk}; +use risingwave_common::buffer::BitmapBuilder; use risingwave_common::estimate_size::{EstimateSize, KvSize}; use risingwave_common::hash::{HashKey, NullBitmap}; use risingwave_common::row::{OwnedRow, Row, RowExt}; -use risingwave_common::types::DataType; +use risingwave_common::types::{DataType, DatumRef}; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_expr::expr::NonStrictExpression; use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch}; @@ -51,6 +53,7 @@ use crate::task::AtomicU64Ref; pub struct TemporalJoinExecutor { ctx: ActorContextRef, + #[allow(dead_code)] info: ExecutorInfo, left: Executor, right: Executor, @@ -368,10 +371,12 @@ impl TemporalJoinExecutor #[try_stream(ok = Message, error = StreamExecutorError)] async fn into_stream(mut self) { - let (left_map, right_map) = JoinStreamChunkBuilder::get_i2o_mapping( + let right_size = self.right.schema().len(); + + let (left_map, _right_map) = JoinStreamChunkBuilder::get_i2o_mapping( &self.output_indices, self.left.schema().len(), - self.right.schema().len(), + right_size, ); let left_to_output: HashMap = HashMap::from_iter(left_map.iter().cloned()); @@ -382,6 +387,14 @@ impl TemporalJoinExecutor let mut prev_epoch = None; + let full_schema: Vec<_> = self + .left + .schema() + .data_types() + .into_iter() + .chain(self.right.schema().data_types().into_iter()) + .collect(); + let table_id_str = self.right_table.source.table_id().to_string(); let actor_id_str = self.ctx.id.to_string(); let fragment_id_str = self.ctx.fragment_id.to_string(); @@ -399,57 +412,117 @@ impl TemporalJoinExecutor yield Message::Watermark(watermark.with_idx(output_watermark_col_idx)); } InternalMessage::Chunk(chunk) => { - let mut builder = JoinStreamChunkBuilder::new( - self.chunk_size, - self.info.schema.data_types(), - left_map.clone(), - right_map.clone(), - ); - let epoch = prev_epoch.expect("Chunk data should come after some barrier."); - let keys = K::build(&self.left_join_keys, chunk.data_chunk())?; - let to_fetch_keys = chunk - .visibility() - .iter() - .zip_eq_debug(keys.iter()) - .filter_map(|(vis, key)| if vis { Some(key) } else { None }); - self.right_table - .fetch_or_promote_keys(to_fetch_keys, epoch) - .await?; - for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) { - let Some((op, left_row)) = r else { - continue; - }; - if key.null_bitmap().is_subset(&null_matched) - && let join_entry = self.right_table.force_peek(&key) - && !join_entry.is_empty() - { - for right_row in join_entry.cached.values() { - // check join condition - let ok = if let Some(ref mut cond) = self.condition { - let concat_row = left_row.chain(&right_row).into_owned_row(); - cond.eval_row_infallible(&concat_row) - .await - .map(|s| *s.as_bool()) - .unwrap_or(false) - } else { - true + // Joined result without evaluating non-lookup conditions. + let st1 = { + #[try_stream] + async { + #[allow(unreachable_code)] + #[allow(clippy::diverging_sub_expression)] + if false { + return unreachable!("type hints only") as StreamExecutorResult<_>; + } + let mut builder = + StreamChunkBuilder::new(self.chunk_size, full_schema.clone()); + // The bitmap is aligned with `builder`. The bit is set if the record is matched. + // TODO: Consider adding the bitmap to `builder`. + let mut row_matched_bitmap_builder = + BitmapBuilder::with_capacity(self.chunk_size); + let epoch = + prev_epoch.expect("Chunk data should come after some barrier."); + let keys = K::build(&self.left_join_keys, chunk.data_chunk())?; + let to_fetch_keys = chunk + .visibility() + .iter() + .zip_eq_debug(keys.iter()) + .filter_map(|(vis, key)| if vis { Some(key) } else { None }); + self.right_table + .fetch_or_promote_keys(to_fetch_keys, epoch) + .await?; + for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) { + let Some((op, left_row)) = r else { + continue; }; - - if ok { - if let Some(chunk) = builder.append_row(op, left_row, right_row) - { - yield Message::Chunk(chunk); + if key.null_bitmap().is_subset(&null_matched) + && let join_entry = self.right_table.force_peek(&key) + && !join_entry.is_empty() + { + for right_row in join_entry.cached.values() { + row_matched_bitmap_builder.append(true); + if let Some(chunk) = + builder.append_row(op, left_row.chain(right_row)) + { + let row_matched = + std::mem::take(&mut row_matched_bitmap_builder) + .finish(); + yield (chunk, row_matched); + } + } + } else if T == JoinType::LeftOuter { + row_matched_bitmap_builder.append(false); + if let Some(chunk) = builder.append_row( + op, + left_row.chain(risingwave_common::row::repeat_n( + DatumRef::None, + right_size, + )), + ) { + let row_matched = + std::mem::take(&mut row_matched_bitmap_builder) + .finish(); + yield (chunk, row_matched); } } } - } else if T == JoinType::LeftOuter { - if let Some(chunk) = builder.append_row_update(op, left_row) { - yield Message::Chunk(chunk); + if let Some(chunk) = builder.take() { + let row_matched = + std::mem::take(&mut row_matched_bitmap_builder).finish(); + yield (chunk, row_matched); } } - } - if let Some(chunk) = builder.take() { - yield Message::Chunk(chunk); + }; + + #[for_await] + for item in st1 { + let (chunk, row_matched) = item?; + // check non-lookup join conditions + if !row_matched.is_empty() + && let Some(ref cond) = self.condition + { + // All chunks are newly created in the previous phase, so no holes should exist. + assert!(chunk.visibility().all()); + // For non matched row, we shouldn't evaluate on it. + // So we treat `row_matched` as visibility here. + let chunk = chunk.clone_with_vis(row_matched.clone()); + let (data_chunk, ops) = chunk.into_parts(); + let filter = cond.eval_infallible(&data_chunk).await; + let ArrayImpl::Bool(bool_array) = &*filter else { + panic!("unmatched type: filter expr returns a non-null array"); + }; + let new_vis = bool_array.to_bitmap() | (!row_matched); + let (columns, _) = data_chunk.into_parts(); + // apply output indices. + let output_columns = self + .output_indices + .iter() + .cloned() + .map(|idx| columns[idx].clone()) + .collect(); + let new_chunk = + StreamChunk::with_visibility(ops, output_columns, new_vis); + yield Message::Chunk(new_chunk); + } else { + let (data_chunk, ops) = chunk.into_parts(); + let (columns, vis) = data_chunk.into_parts(); + // apply output indices. + let output_columns = self + .output_indices + .iter() + .cloned() + .map(|idx| columns[idx].clone()) + .collect(); + let new_chunk = StreamChunk::with_visibility(ops, output_columns, vis); + yield Message::Chunk(new_chunk); + }; } } InternalMessage::Barrier(updates, barrier) => {