Skip to content

Commit

Permalink
For review
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Feb 5, 2024
1 parent be35c90 commit c7c25ce
Showing 1 changed file with 4 additions and 1 deletion.
5 changes: 4 additions & 1 deletion datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1163,6 +1163,7 @@ impl SMJStream {
get_filter_column(&self.filter, &streamed_columns, &buffered_columns)
}
} else {
// This chunk is for null joined rows (outer join), we don't need to apply join filter.
vec![]
};

Expand Down Expand Up @@ -1204,7 +1205,8 @@ impl SMJStream {
self.join_type,
JoinType::Left | JoinType::Right | JoinType::Full
) {
// The reverse of the selection mask, which is for null joined rows
// The reverse of the selection mask. For the rows not pass join filter above,
// we need to join them (left or right) with null rows for outer joins.
let not_mask = compute::not(mask)?;
let null_joined_batch =
compute::filter_record_batch(&output_batch, &not_mask)?;
Expand Down Expand Up @@ -1232,6 +1234,7 @@ impl SMJStream {
buffered_columns.extend(streamed_columns);
buffered_columns
} else {
// Left join or full outer join
let mut streamed_columns = null_joined_batch
.columns()
.iter()
Expand Down

0 comments on commit c7c25ce

Please sign in to comment.