-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Fix the incorrect null joined rows for SMJ outer join with join filter #10892
Conversation
@@ -860,6 +867,8 @@ impl SMJStream { | |||
self.reservation.shrink(buffered_batch.size_estimation); | |||
} | |||
} else { | |||
// If the head batch is not fully processed, break the loop. | |||
// Streamed batch will be joined with the head batch in the next step. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related to this issue. But as I re-read these code, I add some comments to help us understand it again.
Possibly related to #10881 |
Hmm, I don't see smj is explicitly enabled there? So I guess it is still using hash join? |
@@ -84,7 +84,6 @@ SELECT * FROM t1 LEFT JOIN t2 ON t1.a = t2.a AND t2.b * 50 <= t1.b | |||
Alice 100 Alice 1 | |||
Alice 100 Alice 2 | |||
Alice 50 Alice 1 | |||
Alice 50 NULL NULL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it mean tests were not correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the results were incorrect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, as Alice 50
is joined with Alice 1
, it should not be joined with nulls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, just checked in DuckDB
D CREATE TABLE t1(a text, b int); insert into t1 VALUES ('Alice', 50), ('Alice', 100), ('Bob', 1);
D CREATE TABLE t2(a text, b int);
D insert into t2 VALUES ('Alice', 2), ('Alice', 1);
D SELECT * FROM t1 LEFT JOIN t2 ON t1.a = t2.a AND t2.b * 50 <= t1.b
>
> ;
┌─────────┬───────┬─────────┬───────┐
│ a │ b │ a │ b │
│ varchar │ int32 │ varchar │ int32 │
├─────────┼───────┼─────────┼───────┤
│ Alice │ 50 │ Alice │ 1 │
│ Alice │ 100 │ Alice │ 1 │
│ Alice │ 100 │ Alice │ 2 │
│ Bob │ 1 │ │ │
└─────────┴───────┴─────────┴───────┘
@@ -1445,9 +1504,13 @@ fn get_buffered_columns( | |||
/// `streamed_indices` have the same length as `mask` | |||
/// `matched_indices` array of streaming indices that already has a join filter match | |||
/// `scanning_buffered_offset` current buffered offset across batches | |||
/// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
streamed_columns.extend(buffered_columns); | ||
let columns = streamed_columns; | ||
|
||
Ok(Some(RecordBatch::try_new(schema.clone(), columns)?)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do just
Ok(Some(RecordBatch::try_new(schema.clone(), streamed_columns.extend(buffered_columns))?))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, it is actually
Ok(Some(RecordBatch::try_new(
schema.clone(),
streamed_columns,
)?))
let buffered_indices = UInt64Array::from_iter_values( | ||
buffered_batch.join_filter_failed_idxs.iter().copied(), | ||
); | ||
if let Some(record_batch) = produce_buffered_null_batch( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we may want to have a small helper function to reduce the duplicated call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I already pull common one to produce_buffered_null_batch
. It is duplicated as the producing logic is same.
You mean to inline produce_buffered_null_batch
code in the for
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking to have a small helper function to wrap
if let Some(record_batch) = produce_buffered_null_batch(
&self.schema,
&self.streamed_schema,
&buffered_indices,
buffered_batch,
)? {
self.output_record_batches.push(record_batch);
}
I just realized it is a formatting, the block has 8 lines but in fact its 1 condition and 1 instruction, so lets leave it as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @viirya 💪
Co-authored-by: Oleks V <[email protected]>
Co-authored-by: Oleks V <[email protected]>
Co-authored-by: Oleks V <[email protected]>
Co-authored-by: Oleks V <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm thanks @viirya I think its good to go
…filter (apache#10892) * fix: Fix the incorrect null joined rows for outer join with join filter * Update datafusion/physical-plan/src/joins/sort_merge_join.rs Co-authored-by: Oleks V <[email protected]> * Update datafusion/physical-plan/src/joins/sort_merge_join.rs Co-authored-by: Oleks V <[email protected]> * Update datafusion/physical-plan/src/joins/sort_merge_join.rs Co-authored-by: Oleks V <[email protected]> * Update datafusion/physical-plan/src/joins/sort_merge_join.rs Co-authored-by: Oleks V <[email protected]> * For review --------- Co-authored-by: Oleks V <[email protected]>
Which issue does this PR close?
Closes #10882.
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?