Skip to content

Commit

Permalink
For review
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Jun 13, 2024
1 parent 29b1193 commit 764fbc4
Showing 1 changed file with 9 additions and 4 deletions.
13 changes: 9 additions & 4 deletions datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1116,10 +1116,13 @@ impl SMJStream {
// NULLs on streamed side.
//
// Applicable only in case of Full join.
//
// If `output_not_matched_filter` is true, this will also produce record batches
// for buffered rows which are joined with streamed side but don't match join filter.
fn freeze_buffered(
&mut self,
batch_count: usize,
output_join_filter_fail_batch: bool,
output_not_matched_filter: bool,
) -> Result<()> {
if !matches!(self.join_type, JoinType::Full) {
return Ok(());
Expand All @@ -1139,7 +1142,7 @@ impl SMJStream {
buffered_batch.null_joined.clear();

// For buffered rows which are joined with streamed side but doesn't satisfy the join filter
if output_join_filter_fail_batch {
if output_not_matched_filter {
let buffered_indices = UInt64Array::from_iter_values(
buffered_batch.join_filter_failed_idxs.iter().copied(),
);
Expand Down Expand Up @@ -1476,9 +1479,11 @@ fn produce_buffered_null_batch(
.collect::<Vec<_>>();

streamed_columns.extend(buffered_columns);
let columns = streamed_columns;

Ok(Some(RecordBatch::try_new(schema.clone(), columns)?))
Ok(Some(RecordBatch::try_new(
schema.clone(),
streamed_columns,
)?))
}

/// Get `buffered_indices` rows for `buffered_data[buffered_batch_idx]`
Expand Down

0 comments on commit 764fbc4

Please sign in to comment.