From 210e8df9205dd542a346e89f38b517acf92d3e9f Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 3 Jan 2024 13:12:47 -0800 Subject: [PATCH] More --- .../enforce_distribution.rs | 183 ++++++++++++------ datafusion/core/tests/sql/explain_analyze.rs | 12 +- 2 files changed, 129 insertions(+), 66 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index e156e4a6923a..f97a8cd2c60f 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -2089,12 +2089,15 @@ pub(crate) mod tests { JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => vec![ top_join_plan.as_str(), join_plan.as_str(), - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], // Should include 4 RepartitionExecs @@ -2102,12 +2105,15 @@ pub(crate) mod tests { top_join_plan.as_str(), "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", join_plan.as_str(), - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], }; @@ -2146,12 +2152,15 @@ pub(crate) mod tests { vec![ top_join_plan.as_str(), join_plan.as_str(), - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], // Should include 4 RepartitionExecs @@ -2160,12 +2169,15 @@ pub(crate) mod tests { top_join_plan.as_str(), "RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10", join_plan.as_str(), - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], }; @@ -2216,11 +2228,14 @@ pub(crate) mod tests { "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a1@0, c@2)]", "ProjectionExec: expr=[a@0 as a1, a@0 as a2]", "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ]; assert_optimized!(expected, top_join.clone(), true); @@ -2239,11 +2254,14 @@ pub(crate) mod tests { "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a2@1, c@2)]", "ProjectionExec: expr=[a@0 as a1, a@0 as a2]", "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ]; @@ -2291,11 +2309,14 @@ pub(crate) mod tests { "ProjectionExec: expr=[c1@0 as a]", "ProjectionExec: expr=[c@2 as c1]", "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ]; @@ -2490,15 +2511,19 @@ pub(crate) mod tests { "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(B@2, b1@6), (C@3, c@2), (AA@1, a1@5)]", "ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]", "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@1, b1@1), (c@2, c1@2), (a@0, a1@0)]", - "RepartitionExec: partitioning=Hash([b@1, c@2, a@0], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b@1, c@2, a@0], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([b1@1, c1@2, a1@0], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b1@1, c1@2, a1@0], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@1, b1@1), (c@2, c1@2), (a@0, a1@0)]", - "RepartitionExec: partitioning=Hash([b@1, c@2, a@0], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b@1, c@2, a@0], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([b1@1, c1@2, a1@0], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b1@1, c1@2, a1@0], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ]; @@ -2614,15 +2639,19 @@ pub(crate) mod tests { top_join_plan.as_str(), "ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]", "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a1@0), (b@1, b1@1), (c@2, c1@2)]", - "RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([a1@0, b1@1, c1@2], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a1@0, b1@1, c1@2], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@2, c1@2), (b@1, b1@1), (a@0, a1@0)]", - "RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([c1@2, b1@1, a1@0], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c1@2, b1@1, a1@0], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ]; @@ -2735,15 +2764,19 @@ pub(crate) mod tests { top_join_plan.as_str(), "ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]", "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a1@0), (b@1, b1@1)]", - "RepartitionExec: partitioning=Hash([a@0, b@1], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0, b@1], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([a1@0, b1@1], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a1@0, b1@1], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@2, c1@2), (b@1, b1@1), (a@0, a1@0)]", - "RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([c1@2, b1@1, a1@0], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c1@2, b1@1, a1@0], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ]; @@ -2809,14 +2842,17 @@ pub(crate) mod tests { top_join_plan.as_str(), join_plan.as_str(), "SortExec: expr=[a@0 ASC]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "SortExec: expr=[b1@1 ASC]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "SortExec: expr=[c@2 ASC]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], // Should include 7 RepartitionExecs (4 hash, 3 round-robin), 4 SortExecs @@ -2835,14 +2871,17 @@ pub(crate) mod tests { "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", join_plan.as_str(), "SortExec: expr=[a@0 ASC]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "SortExec: expr=[b1@1 ASC]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "SortExec: expr=[c@2 ASC]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], }; @@ -2854,14 +2893,17 @@ pub(crate) mod tests { vec![ top_join_plan.as_str(), join_plan.as_str(), - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "SortExec: expr=[a@0 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "SortExec: expr=[b1@1 ASC]", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "SortExec: expr=[c@2 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], @@ -2877,18 +2919,22 @@ pub(crate) mod tests { _ => vec![ top_join_plan.as_str(), // Below 4 operators are differences introduced, when join mode is changed - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "SortExec: expr=[a@0 ASC]", "CoalescePartitionsExec", join_plan.as_str(), - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "SortExec: expr=[a@0 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "SortExec: expr=[b1@1 ASC]", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "SortExec: expr=[c@2 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], @@ -2918,14 +2964,17 @@ pub(crate) mod tests { top_join_plan.as_str(), join_plan.as_str(), "SortExec: expr=[a@0 ASC]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "SortExec: expr=[b1@1 ASC]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "SortExec: expr=[c@2 ASC]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], // Should include 7 RepartitionExecs (4 hash, 3 round-robin) and 4 SortExecs @@ -2935,14 +2984,17 @@ pub(crate) mod tests { "RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10", join_plan.as_str(), "SortExec: expr=[a@0 ASC]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "SortExec: expr=[b1@1 ASC]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "SortExec: expr=[c@2 ASC]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], // this match arm cannot be reached @@ -2955,32 +3007,39 @@ pub(crate) mod tests { JoinType::Inner | JoinType::Right => vec![ top_join_plan.as_str(), join_plan.as_str(), - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "SortExec: expr=[a@0 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "SortExec: expr=[b1@1 ASC]", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "SortExec: expr=[c@2 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], // Should include 8 RepartitionExecs (4 of them preserves order) and 4 SortExecs JoinType::Left | JoinType::Full => vec![ top_join_plan.as_str(), - "RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@6 ASC", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "SortExec: expr=[b1@6 ASC]", "CoalescePartitionsExec", join_plan.as_str(), - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "SortExec: expr=[a@0 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "SortExec: expr=[b1@1 ASC]", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "SortExec: expr=[c@2 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], @@ -3070,7 +3129,8 @@ pub(crate) mod tests { let expected_first_sort_enforcement = &[ "SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)]", - "RepartitionExec: partitioning=Hash([b3@1, a3@0], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b3@1, a3@0], 10), input_partitions=10, preserve_order=true, sort_exprs=b3@1 ASC,a3@0 ASC", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "SortExec: expr=[b3@1 ASC,a3@0 ASC]", "CoalescePartitionsExec", "ProjectionExec: expr=[a1@0 as a3, b1@1 as b3]", @@ -3080,7 +3140,8 @@ pub(crate) mod tests { "AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([b2@1, a2@0], 10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b2@1, a2@0], 10), input_partitions=10, preserve_order=true, sort_exprs=b2@1 ASC,a2@0 ASC", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "SortExec: expr=[b2@1 ASC,a2@0 ASC]", "CoalescePartitionsExec", "ProjectionExec: expr=[a@1 as a2, b@0 as b2]", diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 3ae6ab33ba95..37f8cefc9080 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -660,12 +660,14 @@ async fn test_physical_plan_display_indent_multi_children() { " CoalesceBatchesExec: target_batch_size=4096", " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c2@0)]", " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([c1@0], 9000), input_partitions=1", - " CsvExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true", - " CoalesceBatchesExec: target_batch_size=4096", - " RepartitionExec: partitioning=Hash([c2@0], 9000), input_partitions=1", - " ProjectionExec: expr=[c1@0 as c2]", + " RepartitionExec: partitioning=Hash([c1@0], 9000), input_partitions=9000", + " RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1", " CsvExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true", + " CoalesceBatchesExec: target_batch_size=4096", + " RepartitionExec: partitioning=Hash([c2@0], 9000), input_partitions=9000", + " RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1", + " ProjectionExec: expr=[c1@0 as c2]", + " CsvExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true", ]; let normalizer = ExplainNormalizer::new();