Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Aug 20, 2024
1 parent 0d33b02 commit 178e4c7
Showing 1 changed file with 21 additions and 12 deletions.
33 changes: 21 additions & 12 deletions datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
use std::any::Any;
use std::cmp::Ordering;
use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};
use std::fmt::Formatter;
use std::mem;
use std::ops::Range;
Expand Down Expand Up @@ -1379,23 +1379,32 @@ impl SMJStream {
// If it is joined with streamed side, but doesn't match the join filter,
// we need to output it with nulls as streamed side.
if matches!(self.join_type, JoinType::Full) {
let mut buffered_indices_map: HashMap<u64, bool> =
HashMap::new();

for i in 0..pre_mask.len() {
let buffered_batch = &mut self.buffered_data.batches
[chunk.buffered_batch_idx.unwrap()];
let buffered_index = buffered_indices.value(i);

if !pre_mask.value(i) {
// For a buffered row that is joined with streamed side but doesn't satisfy the join filter,
buffered_indices_map.insert(
buffered_index,
*buffered_indices_map
.get(&buffered_index)
.unwrap_or(&true)
&& !pre_mask.value(i),
);
}

let buffered_batch = &mut self.buffered_data.batches
[chunk.buffered_batch_idx.unwrap()];
for (buffered_index, failed_join_filter) in
buffered_indices_map
{
if failed_join_filter {
// For a buffered row that is joined with streamed side rows but all joined rows don't
// satisfy the join filter,
buffered_batch
.join_filter_failed_idxs
.insert(buffered_index);
} else if buffered_batch
.join_filter_failed_idxs
.contains(&buffered_index)
{
buffered_batch
.join_filter_failed_idxs
.remove(&buffered_index);
}
}
}
Expand Down

0 comments on commit 178e4c7

Please sign in to comment.