Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(stream): use columnar eval for temporal join non-lookup conds #15228

Merged
merged 19 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions e2e_test/streaming/temporal_join/temporal_join_non_loopup_cond.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table stream(id1 int, a1 int, b1 int) APPEND ONLY;

statement ok
create table version(id2 int, a2 int, b2 int, primary key (id2));

statement ok
create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 and a1 > a2;

statement ok
insert into stream values(1, 11, 111);

statement ok
insert into version values(1, 12, 111);

statement ok
insert into stream values(1, 13, 111);

statement ok
delete from version;

query IIII rowsort
select * from v;
----
1 11 NULL NULL
1 13 1 12

statement ok
insert into version values(2, 22, 222);

statement ok
insert into stream values(2, 23, 222);

query IIII rowsort
select * from v;
----
1 11 NULL NULL
1 13 1 12
2 23 2 22

statement ok
drop materialized view v;

statement ok
drop table stream;

statement ok
drop table version;
3 changes: 0 additions & 3 deletions e2e_test/streaming/temporal_join/temporal_join_watermark.slt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,3 @@ drop table stream cascade;

statement ok
drop table version cascade;



170 changes: 121 additions & 49 deletions src/stream/src/executor/temporal_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ use futures::{pin_mut, StreamExt, TryStreamExt};
use futures_async_stream::{for_await, try_stream};
use local_stats_alloc::{SharedStatsAlloc, StatsAlloc};
use lru::DefaultHasher;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
use risingwave_common::array::{ArrayImpl, Op, StreamChunk};
use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::catalog::Schema;
use risingwave_common::estimate_size::{EstimateSize, KvSize};
use risingwave_common::hash::{HashKey, NullBitmap};
use risingwave_common::row::{OwnedRow, Row, RowExt};
use risingwave_common::types::DataType;
use risingwave_common::types::{DataType, DatumRef};
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_expr::expr::NonStrictExpression;
use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch};
Expand Down Expand Up @@ -369,10 +371,12 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> TemporalJoinExecutor

#[try_stream(ok = Message, error = StreamExecutorError)]
async fn into_stream(mut self) {
let (left_map, right_map) = JoinStreamChunkBuilder::get_i2o_mapping(
let right_size = self.right.schema().len();

let (left_map, _right_map) = JoinStreamChunkBuilder::get_i2o_mapping(
&self.output_indices,
self.left.schema().len(),
self.right.schema().len(),
right_size,
);

let left_to_output: HashMap<usize, usize> = HashMap::from_iter(left_map.iter().cloned());
Expand All @@ -383,6 +387,14 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> TemporalJoinExecutor

let mut prev_epoch = None;

let full_schema: Vec<_> = self
.left
.schema()
.data_types()
.into_iter()
.chain(self.right.schema().data_types().into_iter())
.collect();

let table_id_str = self.right_table.source.table_id().to_string();
let actor_id_str = self.ctx.id.to_string();
let fragment_id_str = self.ctx.fragment_id.to_string();
Expand All @@ -400,57 +412,117 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> TemporalJoinExecutor
yield Message::Watermark(watermark.with_idx(output_watermark_col_idx));
}
InternalMessage::Chunk(chunk) => {
let mut builder = JoinStreamChunkBuilder::new(
self.chunk_size,
self.info.schema.data_types(),
left_map.clone(),
right_map.clone(),
);
let epoch = prev_epoch.expect("Chunk data should come after some barrier.");
let keys = K::build(&self.left_join_keys, chunk.data_chunk())?;
let to_fetch_keys = chunk
.visibility()
.iter()
.zip_eq_debug(keys.iter())
.filter_map(|(vis, key)| if vis { Some(key) } else { None });
self.right_table
.fetch_or_promote_keys(to_fetch_keys, epoch)
.await?;
for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) {
let Some((op, left_row)) = r else {
continue;
};
if key.null_bitmap().is_subset(&null_matched)
&& let join_entry = self.right_table.force_peek(&key)
&& !join_entry.is_empty()
{
for right_row in join_entry.cached.values() {
// check join condition
let ok = if let Some(ref mut cond) = self.condition {
let concat_row = left_row.chain(&right_row).into_owned_row();
cond.eval_row_infallible(&concat_row)
.await
.map(|s| *s.as_bool())
.unwrap_or(false)
} else {
true
// Joined result without evaluating non-lookup conditions.
let st1 = {
#[try_stream]
async {
#[allow(unreachable_code)]
#[allow(clippy::diverging_sub_expression)]
if false {
return unreachable!("type hints only") as StreamExecutorResult<_>;
}
Comment on lines +417 to +423
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to extract a method for this async block? The hack here looks ugly🤣

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer to keep the hack code in a small scope and use it only needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is very acceptable. I like the simple and stupid way

let mut builder =
StreamChunkBuilder::new(self.chunk_size, full_schema.clone());
// The bitmap is aligned with `builder`. The bit is set if the record is matched.
// TODO: Consider adding the bitmap to `JoinStreamChunkBuilder`.
let mut row_matched_bitmap_builder =
BitmapBuilder::with_capacity(self.chunk_size);
let epoch =
prev_epoch.expect("Chunk data should come after some barrier.");
let keys = K::build(&self.left_join_keys, chunk.data_chunk())?;
let to_fetch_keys = chunk
.visibility()
.iter()
.zip_eq_debug(keys.iter())
.filter_map(|(vis, key)| if vis { Some(key) } else { None });
self.right_table
.fetch_or_promote_keys(to_fetch_keys, epoch)
.await?;
for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) {
let Some((op, left_row)) = r else {
continue;
};

if ok {
if let Some(chunk) = builder.append_row(op, left_row, right_row)
{
yield Message::Chunk(chunk);
if key.null_bitmap().is_subset(&null_matched)
&& let join_entry = self.right_table.force_peek(&key)
&& !join_entry.is_empty()
{
for right_row in join_entry.cached.values() {
row_matched_bitmap_builder.append(true);
if let Some(chunk) =
builder.append_row(op, left_row.chain(right_row))
{
let row_matched =
std::mem::take(&mut row_matched_bitmap_builder)
.finish();
yield (chunk, row_matched);
}
}
} else if T == JoinType::LeftOuter {
row_matched_bitmap_builder.append(false);
if let Some(chunk) = builder.append_row(
op,
left_row.chain(risingwave_common::row::repeat_n(
DatumRef::None,
right_size,
)),
) {
let row_matched =
std::mem::take(&mut row_matched_bitmap_builder)
.finish();
yield (chunk, row_matched);
}
}
}
} else if T == JoinType::LeftOuter {
if let Some(chunk) = builder.append_row_update(op, left_row) {
yield Message::Chunk(chunk);
if let Some(chunk) = builder.take() {
let row_matched =
std::mem::take(&mut row_matched_bitmap_builder).finish();
yield (chunk, row_matched);
}
}
}
if let Some(chunk) = builder.take() {
yield Message::Chunk(chunk);
};

#[for_await]
for item in st1 {
let (chunk, row_matched) = item?;
// check non-lookup join conditions
if !row_matched.is_empty()
&& let Some(ref cond) = self.condition
{
// All chunks are newly created in the previous phase, so no holes should exist.
assert!(chunk.visibility().all());
// For non matched row, we shouldn't evaluate on it.
// So we treat `row_matched` as visibility here.
let chunk = chunk.clone_with_vis(row_matched.clone());
let (data_chunk, ops) = chunk.into_parts();
let filter = cond.eval_infallible(&data_chunk).await;
let ArrayImpl::Bool(bool_array) = &*filter else {
panic!("unmatched type: filter expr returns a non-null array");
};
let new_vis = bool_array.to_bitmap() | (!row_matched);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why non-matched rows are visible in the result? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LEFT OUTER JOIN

Copy link
Contributor Author

@TennyZhuang TennyZhuang Feb 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-match rows will only be appended to the chunk when it's a LEFT OUTER join, so they should be visible here.

let (columns, _) = data_chunk.into_parts();
// apply output indices.
let output_columns = self
.output_indices
.iter()
.cloned()
.map(|idx| columns[idx].clone())
.collect();
let new_chunk =
StreamChunk::with_visibility(ops, output_columns, new_vis);
yield Message::Chunk(new_chunk);
} else {
let (data_chunk, ops) = chunk.into_parts();
let (columns, vis) = data_chunk.into_parts();
// apply output indices.
let output_columns = self
.output_indices
.iter()
.cloned()
.map(|idx| columns[idx].clone())
.collect();
let new_chunk = StreamChunk::with_visibility(ops, output_columns, vis);
yield Message::Chunk(new_chunk);
};
}
}
InternalMessage::Barrier(updates, barrier) => {
Expand Down
Loading