Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
comphead committed May 20, 2024
1 parent f0e60da commit a06acaa
Showing 1 changed file with 17 additions and 15 deletions.
32 changes: 17 additions & 15 deletions datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -996,21 +996,23 @@ impl SMJStream {
}
}
Ordering::Equal => {
if matches!(self.join_type, JoinType::LeftSemi) && self.filter.is_some() {
join_streamed = !self
.streamed_batch
.join_filter_matched_idxs
.contains(&(self.streamed_batch.idx as u64))
&& !self.streamed_joined;
// if the join filter specified there can be references to buffered columns
// so buffered columns are needed to access them
join_buffered = join_streamed;
}
if matches!(self.join_type, JoinType::LeftSemi) && self.filter.is_none() {
join_streamed = !self.streamed_joined;
// if the join filter specified there can be references to buffered columns
// so buffered columns are needed to access them
join_buffered = self.filter.is_some();
if matches!(self.join_type, JoinType::LeftSemi) {
// if the join filter is specified then its needed to output the streamed index
// only if it has not been emitted before
// the `join_filter_matched_idxs` keeps track on if streamed index has a successful
// filter match and prevents the same index to go into output more than once
if self.filter.is_some() {
join_streamed = !self
.streamed_batch
.join_filter_matched_idxs
.contains(&(self.streamed_batch.idx as u64))
&& !self.streamed_joined;
// if the join filter specified there can be references to buffered columns
// so buffered columns are needed to access them
join_buffered = join_streamed;
} else {
join_streamed = !self.streamed_joined;
}
}
if matches!(
self.join_type,
Expand Down

0 comments on commit a06acaa

Please sign in to comment.