Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Jan 2, 2024
1 parent e82707e commit b7395a3
Showing 1 changed file with 7 additions and 26 deletions.
33 changes: 7 additions & 26 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,6 @@ fn add_hash_on_top(
mut input: DistributionContext,
hash_exprs: Vec<Arc<dyn PhysicalExpr>>,
n_target: usize,
repartition_beneficial_stats: bool,
) -> Result<DistributionContext> {
let partition_count = input.plan.output_partitioning().partition_count();
// Early return if hash repartition is unnecessary
Expand All @@ -951,12 +950,6 @@ fn add_hash_on_top(
// requirements.
// - Usage of order preserving variants is not desirable (per the flag
// `config.optimizer.prefer_existing_sort`).
if repartition_beneficial_stats {
// Since hashing benefits from partitioning, add a round-robin repartition
// before it:
input = add_roundrobin_on_top(input, n_target)?;
}

let partitioning = Partitioning::Hash(hash_exprs, n_target);
let repartition = RepartitionExec::try_new(input.plan.clone(), partitioning)?
.with_preserve_order();
Expand Down Expand Up @@ -1198,13 +1191,13 @@ fn ensure_distribution(
)
.map(
|(mut child, requirement, required_input_ordering, would_benefit, maintains)| {
// Don't need to apply when the returned row count is not greater than 1:
// Don't need to apply when the returned row count is not greater than batch size
let num_rows = child.plan.statistics()?.num_rows;
let repartition_beneficial_stats = if num_rows.is_exact().unwrap_or(false) {
num_rows
.get_value()
.map(|value| value > &batch_size)
.unwrap_or(true)
.unwrap() // safe to unwrap since is_exact() is true
} else {
true
};
Expand Down Expand Up @@ -1237,12 +1230,7 @@ fn ensure_distribution(
child = add_spm_on_top(child);
}
Distribution::HashPartitioned(exprs) => {
child = add_hash_on_top(
child,
exprs.to_vec(),
target_partitions,
repartition_beneficial_stats,
)?;
child = add_hash_on_top(child, exprs.to_vec(), target_partitions)?;
}
Distribution::UnspecifiedDistribution => {}
};
Expand Down Expand Up @@ -1362,17 +1350,10 @@ impl DistributionContext {

fn update_children(mut self) -> Result<Self> {
for child_context in self.children_nodes.iter_mut() {
child_context.distribution_connection = match child_context.plan.as_any() {
plan_any if plan_any.is::<RepartitionExec>() => matches!(
plan_any
.downcast_ref::<RepartitionExec>()
.unwrap()
.partitioning(),
Partitioning::RoundRobinBatch(_) | Partitioning::Hash(_, _)
),
plan_any
if plan_any.is::<SortPreservingMergeExec>()
|| plan_any.is::<CoalescePartitionsExec>() =>
child_context.distribution_connection = match &child_context.plan {
plan if is_repartition(plan)
|| is_coalesce_partitions(plan)
|| is_sort_preserving_merge(plan) =>
{
true
}
Expand Down

0 comments on commit b7395a3

Please sign in to comment.