From a06acaab70bfdaf27b144b146e57e3984e43c6c1 Mon Sep 17 00:00:00 2001 From: comphead Date: Mon, 20 May 2024 14:21:57 -0700 Subject: [PATCH] fmt --- .../src/joins/sort_merge_join.rs | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 66946f664a78..1cc7bf4700d1 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -996,21 +996,23 @@ impl SMJStream { } } Ordering::Equal => { - if matches!(self.join_type, JoinType::LeftSemi) && self.filter.is_some() { - join_streamed = !self - .streamed_batch - .join_filter_matched_idxs - .contains(&(self.streamed_batch.idx as u64)) - && !self.streamed_joined; - // if the join filter specified there can be references to buffered columns - // so buffered columns are needed to access them - join_buffered = join_streamed; - } - if matches!(self.join_type, JoinType::LeftSemi) && self.filter.is_none() { - join_streamed = !self.streamed_joined; - // if the join filter specified there can be references to buffered columns - // so buffered columns are needed to access them - join_buffered = self.filter.is_some(); + if matches!(self.join_type, JoinType::LeftSemi) { + // if the join filter is specified then its needed to output the streamed index + // only if it has not been emitted before + // the `join_filter_matched_idxs` keeps track on if streamed index has a successful + // filter match and prevents the same index to go into output more than once + if self.filter.is_some() { + join_streamed = !self + .streamed_batch + .join_filter_matched_idxs + .contains(&(self.streamed_batch.idx as u64)) + && !self.streamed_joined; + // if the join filter specified there can be references to buffered columns + // so buffered columns are needed to access them + join_buffered = join_streamed; + } else { + join_streamed = !self.streamed_joined; + } } if matches!( self.join_type,