-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(stream): use columnar eval for temporal join non-lookup conds #15228
Changes from all commits
533ba7d
35029ca
330c389
6dd22b3
2472ba3
d67fb3d
49eb77c
f04cb29
9e9e18f
429fc2f
cb950f6
0680e1b
1d76c52
a9859c2
ffc9d5c
d6aba5b
7a099bb
2f84503
ef97d09
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
statement ok | ||
SET RW_IMPLICIT_FLUSH TO true; | ||
|
||
statement ok | ||
create table stream(id1 int, a1 int, b1 int) APPEND ONLY; | ||
|
||
statement ok | ||
create table version(id2 int, a2 int, b2 int, primary key (id2)); | ||
|
||
statement ok | ||
create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 and a1 > a2; | ||
|
||
statement ok | ||
insert into stream values(1, 11, 111); | ||
|
||
statement ok | ||
insert into version values(1, 12, 111); | ||
|
||
statement ok | ||
insert into stream values(1, 13, 111); | ||
|
||
statement ok | ||
delete from version; | ||
|
||
query IIII rowsort | ||
select * from v; | ||
---- | ||
1 11 NULL NULL | ||
1 13 1 12 | ||
|
||
statement ok | ||
insert into version values(2, 22, 222); | ||
|
||
statement ok | ||
insert into stream values(2, 23, 222); | ||
|
||
query IIII rowsort | ||
select * from v; | ||
---- | ||
1 11 NULL NULL | ||
1 13 1 12 | ||
2 23 2 22 | ||
|
||
statement ok | ||
drop materialized view v; | ||
|
||
statement ok | ||
drop table stream; | ||
|
||
statement ok | ||
drop table version; |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -68,6 +68,3 @@ drop table stream cascade; | |
|
||
statement ok | ||
drop table version cascade; | ||
|
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,11 +24,13 @@ use futures::{pin_mut, StreamExt, TryStreamExt}; | |
use futures_async_stream::{for_await, try_stream}; | ||
use local_stats_alloc::{SharedStatsAlloc, StatsAlloc}; | ||
use lru::DefaultHasher; | ||
use risingwave_common::array::{Op, StreamChunk}; | ||
use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder; | ||
use risingwave_common::array::{ArrayImpl, Op, StreamChunk}; | ||
use risingwave_common::buffer::BitmapBuilder; | ||
use risingwave_common::estimate_size::{EstimateSize, KvSize}; | ||
use risingwave_common::hash::{HashKey, NullBitmap}; | ||
use risingwave_common::row::{OwnedRow, Row, RowExt}; | ||
use risingwave_common::types::DataType; | ||
use risingwave_common::types::{DataType, DatumRef}; | ||
use risingwave_common::util::iter_util::ZipEqDebug; | ||
use risingwave_expr::expr::NonStrictExpression; | ||
use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch}; | ||
|
@@ -51,6 +53,7 @@ use crate::task::AtomicU64Ref; | |
|
||
pub struct TemporalJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitive> { | ||
ctx: ActorContextRef, | ||
#[allow(dead_code)] | ||
info: ExecutorInfo, | ||
left: Executor, | ||
right: Executor, | ||
|
@@ -368,10 +371,12 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> TemporalJoinExecutor | |
|
||
#[try_stream(ok = Message, error = StreamExecutorError)] | ||
async fn into_stream(mut self) { | ||
let (left_map, right_map) = JoinStreamChunkBuilder::get_i2o_mapping( | ||
let right_size = self.right.schema().len(); | ||
|
||
let (left_map, _right_map) = JoinStreamChunkBuilder::get_i2o_mapping( | ||
&self.output_indices, | ||
self.left.schema().len(), | ||
self.right.schema().len(), | ||
right_size, | ||
); | ||
|
||
let left_to_output: HashMap<usize, usize> = HashMap::from_iter(left_map.iter().cloned()); | ||
|
@@ -382,6 +387,14 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> TemporalJoinExecutor | |
|
||
let mut prev_epoch = None; | ||
|
||
let full_schema: Vec<_> = self | ||
.left | ||
.schema() | ||
.data_types() | ||
.into_iter() | ||
.chain(self.right.schema().data_types().into_iter()) | ||
.collect(); | ||
|
||
let table_id_str = self.right_table.source.table_id().to_string(); | ||
let actor_id_str = self.ctx.id.to_string(); | ||
let fragment_id_str = self.ctx.fragment_id.to_string(); | ||
|
@@ -399,57 +412,117 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> TemporalJoinExecutor | |
yield Message::Watermark(watermark.with_idx(output_watermark_col_idx)); | ||
} | ||
InternalMessage::Chunk(chunk) => { | ||
let mut builder = JoinStreamChunkBuilder::new( | ||
self.chunk_size, | ||
self.info.schema.data_types(), | ||
left_map.clone(), | ||
right_map.clone(), | ||
); | ||
let epoch = prev_epoch.expect("Chunk data should come after some barrier."); | ||
let keys = K::build(&self.left_join_keys, chunk.data_chunk())?; | ||
let to_fetch_keys = chunk | ||
.visibility() | ||
.iter() | ||
.zip_eq_debug(keys.iter()) | ||
.filter_map(|(vis, key)| if vis { Some(key) } else { None }); | ||
self.right_table | ||
.fetch_or_promote_keys(to_fetch_keys, epoch) | ||
.await?; | ||
for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) { | ||
let Some((op, left_row)) = r else { | ||
continue; | ||
}; | ||
if key.null_bitmap().is_subset(&null_matched) | ||
&& let join_entry = self.right_table.force_peek(&key) | ||
&& !join_entry.is_empty() | ||
{ | ||
for right_row in join_entry.cached.values() { | ||
// check join condition | ||
let ok = if let Some(ref mut cond) = self.condition { | ||
let concat_row = left_row.chain(&right_row).into_owned_row(); | ||
cond.eval_row_infallible(&concat_row) | ||
.await | ||
.map(|s| *s.as_bool()) | ||
.unwrap_or(false) | ||
} else { | ||
true | ||
// Joined result without evaluating non-lookup conditions. | ||
let st1 = { | ||
#[try_stream] | ||
async { | ||
#[allow(unreachable_code)] | ||
#[allow(clippy::diverging_sub_expression)] | ||
if false { | ||
return unreachable!("type hints only") as StreamExecutorResult<_>; | ||
} | ||
let mut builder = | ||
StreamChunkBuilder::new(self.chunk_size, full_schema.clone()); | ||
// The bitmap is aligned with `builder`. The bit is set if the record is matched. | ||
// TODO: Consider adding the bitmap to `builder`. | ||
let mut row_matched_bitmap_builder = | ||
BitmapBuilder::with_capacity(self.chunk_size); | ||
let epoch = | ||
prev_epoch.expect("Chunk data should come after some barrier."); | ||
let keys = K::build(&self.left_join_keys, chunk.data_chunk())?; | ||
let to_fetch_keys = chunk | ||
.visibility() | ||
.iter() | ||
.zip_eq_debug(keys.iter()) | ||
.filter_map(|(vis, key)| if vis { Some(key) } else { None }); | ||
self.right_table | ||
.fetch_or_promote_keys(to_fetch_keys, epoch) | ||
.await?; | ||
for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) { | ||
let Some((op, left_row)) = r else { | ||
continue; | ||
}; | ||
|
||
if ok { | ||
if let Some(chunk) = builder.append_row(op, left_row, right_row) | ||
{ | ||
yield Message::Chunk(chunk); | ||
if key.null_bitmap().is_subset(&null_matched) | ||
&& let join_entry = self.right_table.force_peek(&key) | ||
&& !join_entry.is_empty() | ||
{ | ||
for right_row in join_entry.cached.values() { | ||
row_matched_bitmap_builder.append(true); | ||
if let Some(chunk) = | ||
builder.append_row(op, left_row.chain(right_row)) | ||
{ | ||
let row_matched = | ||
std::mem::take(&mut row_matched_bitmap_builder) | ||
.finish(); | ||
yield (chunk, row_matched); | ||
} | ||
} | ||
} else if T == JoinType::LeftOuter { | ||
row_matched_bitmap_builder.append(false); | ||
if let Some(chunk) = builder.append_row( | ||
op, | ||
left_row.chain(risingwave_common::row::repeat_n( | ||
DatumRef::None, | ||
right_size, | ||
)), | ||
) { | ||
let row_matched = | ||
std::mem::take(&mut row_matched_bitmap_builder) | ||
.finish(); | ||
yield (chunk, row_matched); | ||
} | ||
} | ||
} | ||
} else if T == JoinType::LeftOuter { | ||
if let Some(chunk) = builder.append_row_update(op, left_row) { | ||
yield Message::Chunk(chunk); | ||
if let Some(chunk) = builder.take() { | ||
let row_matched = | ||
std::mem::take(&mut row_matched_bitmap_builder).finish(); | ||
yield (chunk, row_matched); | ||
} | ||
} | ||
} | ||
if let Some(chunk) = builder.take() { | ||
yield Message::Chunk(chunk); | ||
}; | ||
|
||
#[for_await] | ||
for item in st1 { | ||
let (chunk, row_matched) = item?; | ||
// check non-lookup join conditions | ||
if !row_matched.is_empty() | ||
&& let Some(ref cond) = self.condition | ||
{ | ||
// All chunks are newly created in the previous phase, so no holes should exist. | ||
assert!(chunk.visibility().all()); | ||
// For non matched row, we shouldn't evaluate on it. | ||
// So we treat `row_matched` as visibility here. | ||
let chunk = chunk.clone_with_vis(row_matched.clone()); | ||
let (data_chunk, ops) = chunk.into_parts(); | ||
let filter = cond.eval_infallible(&data_chunk).await; | ||
let ArrayImpl::Bool(bool_array) = &*filter else { | ||
panic!("unmatched type: filter expr returns a non-null array"); | ||
}; | ||
let new_vis = bool_array.to_bitmap() | (!row_matched); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why non-matched rows are visible in the result? 🤔 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. LEFT OUTER JOIN There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Non-match rows will only be appended to the chunk when it's a LEFT OUTER join, so they should be visible here. |
||
let (columns, _) = data_chunk.into_parts(); | ||
// apply output indices. | ||
let output_columns = self | ||
.output_indices | ||
.iter() | ||
.cloned() | ||
.map(|idx| columns[idx].clone()) | ||
.collect(); | ||
let new_chunk = | ||
StreamChunk::with_visibility(ops, output_columns, new_vis); | ||
yield Message::Chunk(new_chunk); | ||
} else { | ||
let (data_chunk, ops) = chunk.into_parts(); | ||
let (columns, vis) = data_chunk.into_parts(); | ||
// apply output indices. | ||
let output_columns = self | ||
.output_indices | ||
.iter() | ||
.cloned() | ||
.map(|idx| columns[idx].clone()) | ||
.collect(); | ||
let new_chunk = StreamChunk::with_visibility(ops, output_columns, vis); | ||
yield Message::Chunk(new_chunk); | ||
}; | ||
} | ||
} | ||
InternalMessage::Barrier(updates, barrier) => { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to extract a method for this async block? The hack here looks ugly🤣
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer to keep the hack code in a small scope and use it only needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is very acceptable. I like the simple and stupid way