Skip to content

Commit

Permalink
perf: Remove one redundant CopyExec for SMJ (apache#962)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored Sep 27, 2024
1 parent a8156b5 commit a690e9d
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 23 deletions.
50 changes: 27 additions & 23 deletions native/core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -917,15 +917,15 @@ impl PhysicalPlanner {

let fetch = sort.fetch.map(|num| num as usize);

let copy_exec = if can_reuse_input_batch(&child) {
Arc::new(CopyExec::new(child, CopyMode::UnpackOrDeepCopy))
} else {
Arc::new(CopyExec::new(child, CopyMode::UnpackOrClone))
};
// SortExec caches batches so we need to make a copy of incoming batches. Also,
// SortExec fails in some cases if we do not unpack dictionary-encoded arrays, and
// it would be more efficient if we could avoid that.
// https://github.com/apache/datafusion-comet/issues/963
let child = Self::wrap_in_copy_exec(child);

Ok((
scans,
Arc::new(SortExec::new(exprs?, copy_exec).with_fetch(fetch)),
Arc::new(SortExec::new(exprs?, child).with_fetch(fetch)),
))
}
OpStruct::Scan(scan) => {
Expand Down Expand Up @@ -1069,9 +1069,17 @@ impl PhysicalPlanner {
join.join_type,
&join.condition,
)?;

// HashJoinExec may cache the input batch internally. We need
// to copy the input batch to avoid the data corruption from reusing the input
// batch. We also need to unpack dictionary arrays, because the join operators
// do not support them.
let left = Self::wrap_in_copy_exec(join_params.left);
let right = Self::wrap_in_copy_exec(join_params.right);

let hash_join = Arc::new(HashJoinExec::try_new(
join_params.left,
join_params.right,
left,
right,
join_params.join_on,
join_params.join_filter,
&join_params.join_type,
Expand Down Expand Up @@ -1135,6 +1143,7 @@ impl PhysicalPlanner {
}
}

#[allow(clippy::too_many_arguments)]
fn parse_join_parameters(
&self,
inputs: &mut Vec<Arc<GlobalRef>>,
Expand Down Expand Up @@ -1263,21 +1272,6 @@ impl PhysicalPlanner {
None
};

// DataFusion Join operators keep the input batch internally. We need
// to copy the input batch to avoid the data corruption from reusing the input
// batch.
let left = if can_reuse_input_batch(&left) {
Arc::new(CopyExec::new(left, CopyMode::UnpackOrDeepCopy))
} else {
Arc::new(CopyExec::new(left, CopyMode::UnpackOrClone))
};

let right = if can_reuse_input_batch(&right) {
Arc::new(CopyExec::new(right, CopyMode::UnpackOrDeepCopy))
} else {
Arc::new(CopyExec::new(right, CopyMode::UnpackOrClone))
};

Ok((
JoinParameters {
left,
Expand All @@ -1290,6 +1284,16 @@ impl PhysicalPlanner {
))
}

/// Wrap an ExecutionPlan in a CopyExec, which will unpack any dictionary-encoded arrays
/// and make a deep copy of other arrays if the plan re-uses batches.
fn wrap_in_copy_exec(plan: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
if can_reuse_input_batch(&plan) {
Arc::new(CopyExec::new(plan, CopyMode::UnpackOrDeepCopy))
} else {
Arc::new(CopyExec::new(plan, CopyMode::UnpackOrClone))
}
}

/// Create a DataFusion physical aggregate expression from Spark physical aggregate expression
fn create_agg_expr(
&self,
Expand Down
2 changes: 2 additions & 0 deletions native/core/src/execution/operators/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ pub struct CopyExec {

#[derive(Debug, PartialEq, Clone)]
pub enum CopyMode {
/// Perform a deep copy and also unpack dictionaries
UnpackOrDeepCopy,
/// Perform a clone and also unpack dictionaries
UnpackOrClone,
}

Expand Down

0 comments on commit a690e9d

Please sign in to comment.