From b7395a34d7f73668211719d467a18fbcefacd1f6 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 1 Jan 2024 16:30:49 -0800 Subject: [PATCH] Cleanup --- .../enforce_distribution.rs | 33 ++++--------------- 1 file changed, 7 insertions(+), 26 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index bf5aa7d02272..817a445950a0 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -925,7 +925,6 @@ fn add_hash_on_top( mut input: DistributionContext, hash_exprs: Vec>, n_target: usize, - repartition_beneficial_stats: bool, ) -> Result { let partition_count = input.plan.output_partitioning().partition_count(); // Early return if hash repartition is unnecessary @@ -951,12 +950,6 @@ fn add_hash_on_top( // requirements. // - Usage of order preserving variants is not desirable (per the flag // `config.optimizer.prefer_existing_sort`). - if repartition_beneficial_stats { - // Since hashing benefits from partitioning, add a round-robin repartition - // before it: - input = add_roundrobin_on_top(input, n_target)?; - } - let partitioning = Partitioning::Hash(hash_exprs, n_target); let repartition = RepartitionExec::try_new(input.plan.clone(), partitioning)? .with_preserve_order(); @@ -1198,13 +1191,13 @@ fn ensure_distribution( ) .map( |(mut child, requirement, required_input_ordering, would_benefit, maintains)| { - // Don't need to apply when the returned row count is not greater than 1: + // Don't need to apply when the returned row count is not greater than batch size let num_rows = child.plan.statistics()?.num_rows; let repartition_beneficial_stats = if num_rows.is_exact().unwrap_or(false) { num_rows .get_value() .map(|value| value > &batch_size) - .unwrap_or(true) + .unwrap() // safe to unwrap since is_exact() is true } else { true }; @@ -1237,12 +1230,7 @@ fn ensure_distribution( child = add_spm_on_top(child); } Distribution::HashPartitioned(exprs) => { - child = add_hash_on_top( - child, - exprs.to_vec(), - target_partitions, - repartition_beneficial_stats, - )?; + child = add_hash_on_top(child, exprs.to_vec(), target_partitions)?; } Distribution::UnspecifiedDistribution => {} }; @@ -1362,17 +1350,10 @@ impl DistributionContext { fn update_children(mut self) -> Result { for child_context in self.children_nodes.iter_mut() { - child_context.distribution_connection = match child_context.plan.as_any() { - plan_any if plan_any.is::() => matches!( - plan_any - .downcast_ref::() - .unwrap() - .partitioning(), - Partitioning::RoundRobinBatch(_) | Partitioning::Hash(_, _) - ), - plan_any - if plan_any.is::() - || plan_any.is::() => + child_context.distribution_connection = match &child_context.plan { + plan if is_repartition(plan) + || is_coalesce_partitions(plan) + || is_sort_preserving_merge(plan) => { true }