Skip to content

Commit

Permalink
refactor(stream): use columnar eval for temporal join non-lookup conds (
Browse files Browse the repository at this point in the history
#15228)

Signed-off-by: TennyZhuang <[email protected]>
  • Loading branch information
TennyZhuang authored Feb 27, 2024
1 parent c06bec5 commit 7fe410c
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 52 deletions.
51 changes: 51 additions & 0 deletions e2e_test/streaming/temporal_join/temporal_join_non_loopup_cond.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;

statement ok
create table version(id2 int, a2 int, b2 int, primary key (id2));

statement ok
create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 and a1 > a2;

statement ok
insert into stream values(1, 11, 111);

statement ok
insert into version values(1, 12, 111);

statement ok
insert into stream values(1, 13, 111);

statement ok
delete from version;

query IIII rowsort
select * from v;
----
1 11 NULL NULL
1 13 1 12

statement ok
insert into version values(2, 22, 222);

statement ok
insert into stream values(2, 23, 222);

query IIII rowsort
select * from v;
----
1 11 NULL NULL
1 13 1 12
2 23 2 22

statement ok
drop materialized view v;

statement ok
drop table stream;

statement ok
drop table version;
3 changes: 0 additions & 3 deletions e2e_test/streaming/temporal_join/temporal_join_watermark.slt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,3 @@ drop table stream cascade;

statement ok
drop table version cascade;



171 changes: 122 additions & 49 deletions src/stream/src/executor/temporal_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ use futures::{pin_mut, StreamExt, TryStreamExt};
use futures_async_stream::{for_await, try_stream};
use local_stats_alloc::{SharedStatsAlloc, StatsAlloc};
use lru::DefaultHasher;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
use risingwave_common::array::{ArrayImpl, Op, StreamChunk};
use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::estimate_size::{EstimateSize, KvSize};
use risingwave_common::hash::{HashKey, NullBitmap};
use risingwave_common::row::{OwnedRow, Row, RowExt};
use risingwave_common::types::DataType;
use risingwave_common::types::{DataType, DatumRef};
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_expr::expr::NonStrictExpression;
use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch};
Expand All @@ -51,6 +53,7 @@ use crate::task::AtomicU64Ref;

pub struct TemporalJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitive> {
ctx: ActorContextRef,
#[allow(dead_code)]
info: ExecutorInfo,
left: Executor,
right: Executor,
Expand Down Expand Up @@ -368,10 +371,12 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> TemporalJoinExecutor

#[try_stream(ok = Message, error = StreamExecutorError)]
async fn into_stream(mut self) {
let (left_map, right_map) = JoinStreamChunkBuilder::get_i2o_mapping(
let right_size = self.right.schema().len();

let (left_map, _right_map) = JoinStreamChunkBuilder::get_i2o_mapping(
&self.output_indices,
self.left.schema().len(),
self.right.schema().len(),
right_size,
);

let left_to_output: HashMap<usize, usize> = HashMap::from_iter(left_map.iter().cloned());
Expand All @@ -382,6 +387,14 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> TemporalJoinExecutor

let mut prev_epoch = None;

let full_schema: Vec<_> = self
.left
.schema()
.data_types()
.into_iter()
.chain(self.right.schema().data_types().into_iter())
.collect();

let table_id_str = self.right_table.source.table_id().to_string();
let actor_id_str = self.ctx.id.to_string();
let fragment_id_str = self.ctx.fragment_id.to_string();
Expand All @@ -399,57 +412,117 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> TemporalJoinExecutor
yield Message::Watermark(watermark.with_idx(output_watermark_col_idx));
}
InternalMessage::Chunk(chunk) => {
let mut builder = JoinStreamChunkBuilder::new(
self.chunk_size,
self.info.schema.data_types(),
left_map.clone(),
right_map.clone(),
);
let epoch = prev_epoch.expect("Chunk data should come after some barrier.");
let keys = K::build(&self.left_join_keys, chunk.data_chunk())?;
let to_fetch_keys = chunk
.visibility()
.iter()
.zip_eq_debug(keys.iter())
.filter_map(|(vis, key)| if vis { Some(key) } else { None });
self.right_table
.fetch_or_promote_keys(to_fetch_keys, epoch)
.await?;
for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) {
let Some((op, left_row)) = r else {
continue;
};
if key.null_bitmap().is_subset(&null_matched)
&& let join_entry = self.right_table.force_peek(&key)
&& !join_entry.is_empty()
{
for right_row in join_entry.cached.values() {
// check join condition
let ok = if let Some(ref mut cond) = self.condition {
let concat_row = left_row.chain(&right_row).into_owned_row();
cond.eval_row_infallible(&concat_row)
.await
.map(|s| *s.as_bool())
.unwrap_or(false)
} else {
true
// Joined result without evaluating non-lookup conditions.
let st1 = {
#[try_stream]
async {
#[allow(unreachable_code)]
#[allow(clippy::diverging_sub_expression)]
if false {
return unreachable!("type hints only") as StreamExecutorResult<_>;
}
let mut builder =
StreamChunkBuilder::new(self.chunk_size, full_schema.clone());
// The bitmap is aligned with `builder`. The bit is set if the record is matched.
// TODO: Consider adding the bitmap to `builder`.
let mut row_matched_bitmap_builder =
BitmapBuilder::with_capacity(self.chunk_size);
let epoch =
prev_epoch.expect("Chunk data should come after some barrier.");
let keys = K::build(&self.left_join_keys, chunk.data_chunk())?;
let to_fetch_keys = chunk
.visibility()
.iter()
.zip_eq_debug(keys.iter())
.filter_map(|(vis, key)| if vis { Some(key) } else { None });
self.right_table
.fetch_or_promote_keys(to_fetch_keys, epoch)
.await?;
for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) {
let Some((op, left_row)) = r else {
continue;
};

if ok {
if let Some(chunk) = builder.append_row(op, left_row, right_row)
{
yield Message::Chunk(chunk);
if key.null_bitmap().is_subset(&null_matched)
&& let join_entry = self.right_table.force_peek(&key)
&& !join_entry.is_empty()
{
for right_row in join_entry.cached.values() {
row_matched_bitmap_builder.append(true);
if let Some(chunk) =
builder.append_row(op, left_row.chain(right_row))
{
let row_matched =
std::mem::take(&mut row_matched_bitmap_builder)
.finish();
yield (chunk, row_matched);
}
}
} else if T == JoinType::LeftOuter {
row_matched_bitmap_builder.append(false);
if let Some(chunk) = builder.append_row(
op,
left_row.chain(risingwave_common::row::repeat_n(
DatumRef::None,
right_size,
)),
) {
let row_matched =
std::mem::take(&mut row_matched_bitmap_builder)
.finish();
yield (chunk, row_matched);
}
}
}
} else if T == JoinType::LeftOuter {
if let Some(chunk) = builder.append_row_update(op, left_row) {
yield Message::Chunk(chunk);
if let Some(chunk) = builder.take() {
let row_matched =
std::mem::take(&mut row_matched_bitmap_builder).finish();
yield (chunk, row_matched);
}
}
}
if let Some(chunk) = builder.take() {
yield Message::Chunk(chunk);
};

#[for_await]
for item in st1 {
let (chunk, row_matched) = item?;
// check non-lookup join conditions
if !row_matched.is_empty()
&& let Some(ref cond) = self.condition
{
// All chunks are newly created in the previous phase, so no holes should exist.
assert!(chunk.visibility().all());
// For non matched row, we shouldn't evaluate on it.
// So we treat `row_matched` as visibility here.
let chunk = chunk.clone_with_vis(row_matched.clone());
let (data_chunk, ops) = chunk.into_parts();
let filter = cond.eval_infallible(&data_chunk).await;
let ArrayImpl::Bool(bool_array) = &*filter else {
panic!("unmatched type: filter expr returns a non-null array");
};
let new_vis = bool_array.to_bitmap() | (!row_matched);
let (columns, _) = data_chunk.into_parts();
// apply output indices.
let output_columns = self
.output_indices
.iter()
.cloned()
.map(|idx| columns[idx].clone())
.collect();
let new_chunk =
StreamChunk::with_visibility(ops, output_columns, new_vis);
yield Message::Chunk(new_chunk);
} else {
let (data_chunk, ops) = chunk.into_parts();
let (columns, vis) = data_chunk.into_parts();
// apply output indices.
let output_columns = self
.output_indices
.iter()
.cloned()
.map(|idx| columns[idx].clone())
.collect();
let new_chunk = StreamChunk::with_visibility(ops, output_columns, vis);
yield Message::Chunk(new_chunk);
};
}
}
InternalMessage::Barrier(updates, barrier) => {
Expand Down

0 comments on commit 7fe410c

Please sign in to comment.