Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Jan 30, 2024
1 parent 7462f4b commit 5227091
Showing 1 changed file with 56 additions and 4 deletions.
60 changes: 56 additions & 4 deletions datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1104,6 +1104,28 @@ impl SMJStream {
.map(|f| new_null_array(f.data_type(), buffered_indices.len()))
.collect::<Vec<_>>();

// Construct batch with only filter columns
let mut filter_columns = vec![];

if let Some(f) = &self.filter {
let left_columns = f
.column_indices()
.iter()
.filter(|col_index| (*col_index).side == JoinSide::Left)
.map(|i| streamed_columns[i.index].clone())
.collect::<Vec<_>>();

let right_columns = f
.column_indices()
.iter()
.filter(|col_index| (*col_index).side == JoinSide::Right)
.map(|i| buffered_columns[i.index].clone())
.collect::<Vec<_>>();

filter_columns.extend(left_columns);
filter_columns.extend(right_columns);
}

streamed_columns.extend(buffered_columns);
let columns = streamed_columns;

Expand All @@ -1113,10 +1135,14 @@ impl SMJStream {
let output_batch = if let Some(f) = &self.filter {
println!("f: {:?}", f);

// Construct batch with only filter columns
let filter_batch =
RecordBatch::try_new(Arc::new(f.schema().clone()), filter_columns)?;

let filter_result = f
.expression()
.evaluate(&output_batch)?
.into_array(output_batch.num_rows())?;
.evaluate(&filter_batch)?
.into_array(filter_batch.num_rows())?;
let mask = datafusion_common::cast::as_boolean_array(&filter_result)?;
println!("mask: {:?}", mask);

Expand Down Expand Up @@ -1168,6 +1194,28 @@ impl SMJStream {
.collect::<Vec<_>>()
};

// Construct batch with only filter columns
let mut filter_columns = vec![];

if let Some(f) = &self.filter {
let left_columns = f
.column_indices()
.iter()
.filter(|col_index| (*col_index).side == JoinSide::Left)
.map(|i| streamed_columns[i.index].clone())
.collect::<Vec<_>>();

let right_columns = f
.column_indices()
.iter()
.filter(|col_index| (*col_index).side == JoinSide::Right)
.map(|i| buffered_columns[i.index].clone())
.collect::<Vec<_>>();

filter_columns.extend(left_columns);
filter_columns.extend(right_columns);
}

let columns = if matches!(self.join_type, JoinType::Right) {
buffered_columns.extend(streamed_columns);
buffered_columns
Expand All @@ -1182,10 +1230,14 @@ impl SMJStream {
let output_batch = if let Some(f) = &self.filter {
println!("f: {:?}", f);

// Construct batch with only filter columns
let filter_batch =
RecordBatch::try_new(Arc::new(f.schema().clone()), filter_columns)?;

let filter_result = f
.expression()
.evaluate(&output_batch)?
.into_array(output_batch.num_rows())?;
.evaluate(&filter_batch)?
.into_array(filter_batch.num_rows())?;
let mask = datafusion_common::cast::as_boolean_array(&filter_result)?;
println!("mask: {:?}", mask);

Expand Down

0 comments on commit 5227091

Please sign in to comment.