Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(stream): use columnar eval for temporal join non-lookup conds #15228

Merged
merged 19 commits into from
Feb 27, 2024
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 86 additions & 46 deletions src/stream/src/executor/temporal_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use futures::{pin_mut, StreamExt, TryStreamExt};
use futures_async_stream::{for_await, try_stream};
use local_stats_alloc::{SharedStatsAlloc, StatsAlloc};
use lru::DefaultHasher;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::array::{ArrayImpl, Op, StreamChunk};
use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::catalog::Schema;
use risingwave_common::estimate_size::{EstimateSize, KvSize};
use risingwave_common::hash::{HashKey, NullBitmap};
Expand Down Expand Up @@ -400,57 +401,96 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> TemporalJoinExecutor
yield Message::Watermark(watermark.with_idx(output_watermark_col_idx));
}
InternalMessage::Chunk(chunk) => {
let mut builder = JoinStreamChunkBuilder::new(
self.chunk_size,
self.info.schema.data_types(),
left_map.clone(),
right_map.clone(),
);
let epoch = prev_epoch.expect("Chunk data should come after some barrier.");
let keys = K::build(&self.left_join_keys, chunk.data_chunk())?;
let to_fetch_keys = chunk
.visibility()
.iter()
.zip_eq_debug(keys.iter())
.filter_map(|(vis, key)| if vis { Some(key) } else { None });
self.right_table
.fetch_or_promote_keys(to_fetch_keys, epoch)
.await?;
for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) {
let Some((op, left_row)) = r else {
continue;
};
if key.null_bitmap().is_subset(&null_matched)
&& let join_entry = self.right_table.force_peek(&key)
&& !join_entry.is_empty()
{
for right_row in join_entry.cached.values() {
// check join condition
let ok = if let Some(ref mut cond) = self.condition {
let concat_row = left_row.chain(&right_row).into_owned_row();
cond.eval_row_infallible(&concat_row)
.await
.map(|s| *s.as_bool())
.unwrap_or(false)
} else {
true
// Joined result without evaluating non-lookup conditions.
let st1 = {
#[try_stream]
async {
#[allow(unreachable_code)]
if false {
return Err(unreachable!("type hints only") as StreamExecutorError);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks really weird. 😇
Is it possible to use #[try_stream(error = StreamExecutorError)]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They don't support that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's non-trivial to declare a variable as impl Trait type. TAIT must be enabled for that, and there are many corner cases. So I guess the macro author don't want to do that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll create a helper macro to generate the code.

let mut builder = JoinStreamChunkBuilder::new(
self.chunk_size,
self.info.schema.data_types(),
left_map.clone(),
right_map.clone(),
);
// The bitmap is aligned with `builder`. The bit is set if the record is matched.
// TODO: Consider adding the bitmap to `JoinStreamChunkBuilder`.
let mut row_matched_bitmap_builder =
BitmapBuilder::with_capacity(self.chunk_size);
let epoch =
prev_epoch.expect("Chunk data should come after some barrier.");
let keys = K::build(&self.left_join_keys, chunk.data_chunk())?;
let to_fetch_keys = chunk
.visibility()
.iter()
.zip_eq_debug(keys.iter())
.filter_map(|(vis, key)| if vis { Some(key) } else { None });
self.right_table
.fetch_or_promote_keys(to_fetch_keys, epoch)
.await?;
for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) {
let Some((op, left_row)) = r else {
continue;
};

if ok {
if let Some(chunk) = builder.append_row(op, left_row, right_row)
{
yield Message::Chunk(chunk);
if key.null_bitmap().is_subset(&null_matched)
&& let join_entry = self.right_table.force_peek(&key)
&& !join_entry.is_empty()
{
for right_row in join_entry.cached.values() {
row_matched_bitmap_builder.append(true);
if let Some(chunk) =
builder.append_row(op, left_row, right_row)
{
let row_matched =
std::mem::take(&mut row_matched_bitmap_builder)
.finish();
yield (chunk, row_matched);
}
}
} else if T == JoinType::LeftOuter {
row_matched_bitmap_builder.append(false);
if let Some(chunk) = builder.append_row_update(op, left_row) {
let row_matched =
std::mem::take(&mut row_matched_bitmap_builder)
.finish();
yield (chunk, row_matched);
}
}
}
} else if T == JoinType::LeftOuter {
if let Some(chunk) = builder.append_row_update(op, left_row) {
yield Message::Chunk(chunk);
if let Some(chunk) = builder.take() {
let row_matched =
std::mem::take(&mut row_matched_bitmap_builder).finish();
yield (chunk, row_matched);
}
}
}
if let Some(chunk) = builder.take() {
yield Message::Chunk(chunk);
};

#[for_await]
for item in st1 {
let (chunk, row_matched) = item?;
// check non-lookup join conditions
if !row_matched.is_empty()
&& let Some(ref cond) = self.condition
{
// All chunks are newly created in the previous phase, so no holes should exist.
assert!(chunk.visibility().all());
// For non matched row, we shouldn't evaluate on it.
// So we treat `row_matched` as visibility here.
let chunk = chunk.clone_with_vis(row_matched.clone());
let (data_chunk, ops) = chunk.into_parts();
let filter = cond.eval_infallible(&data_chunk).await;
let ArrayImpl::Bool(bool_array) = &*filter else {
panic!("unmatched type: filter expr returns a non-null array");
};
let new_vis = bool_array.to_bitmap() | (!row_matched);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why non-matched rows are visible in the result? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LEFT OUTER JOIN

Copy link
Contributor Author

@TennyZhuang TennyZhuang Feb 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-match rows will only be appended to the chunk when it's a LEFT OUTER join, so they should be visible here.

let (columns, _) = data_chunk.into_parts();
let new_chunk = StreamChunk::with_visibility(ops, columns, new_vis);
yield Message::Chunk(new_chunk);
} else {
yield Message::Chunk(chunk);
};
}
}
InternalMessage::Barrier(updates, barrier) => {
Expand Down
Loading