Skip to content

Commit

Permalink
Restore
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Jan 3, 2024
1 parent a4ab9c2 commit aeb3f42
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 9 deletions.
14 changes: 13 additions & 1 deletion datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,7 @@ fn add_hash_on_top(
mut input: DistributionContext,
hash_exprs: Vec<Arc<dyn PhysicalExpr>>,
n_target: usize,
repartition_beneficial_stats: bool,
) -> Result<DistributionContext> {
let partition_count = input.plan.output_partitioning().partition_count();
// Early return if hash repartition is unnecessary
Expand All @@ -950,6 +951,12 @@ fn add_hash_on_top(
// requirements.
// - Usage of order preserving variants is not desirable (per the flag
// `config.optimizer.prefer_existing_sort`).
if repartition_beneficial_stats {
// Since hashing benefits from partitioning, add a round-robin repartition
// before it:
input = add_roundrobin_on_top(input, n_target)?;
}

let partitioning = Partitioning::Hash(hash_exprs, n_target);
let repartition = RepartitionExec::try_new(input.plan.clone(), partitioning)?
.with_preserve_order();
Expand Down Expand Up @@ -1231,7 +1238,12 @@ fn ensure_distribution(
child = add_spm_on_top(child);
}
Distribution::HashPartitioned(exprs) => {
child = add_hash_on_top(child, exprs.to_vec(), target_partitions)?;
child = add_hash_on_top(
child,
exprs.to_vec(),
target_partitions,
repartition_beneficial_stats,
)?;
}
Distribution::UnspecifiedDistribution => {}
};
Expand Down
13 changes: 5 additions & 8 deletions datafusion/sqllogictest/test_files/repartition_scan.slt
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,10 @@ query I
select * from csv_table;
----
5
1
2
3
4
1

## Expect to see the scan read the file as "4" groups with even sizes (offsets)
query TT
Expand Down Expand Up @@ -195,8 +195,8 @@ select * from "json_table";
5
1
2
3
4
3

## Expect to see the scan read the file as "4" groups with even sizes (offsets)
query TT
Expand Down Expand Up @@ -245,20 +245,17 @@ DROP TABLE arrow_table;

## Use pre-existing files we don't have a way to create avro files yet

statement ok
statement error DataFusion error: This feature is not implemented: cannot read avro schema without the 'avro' feature enabled
CREATE EXTERNAL TABLE avro_table
STORED AS AVRO
WITH HEADER ROW
LOCATION '../../testing/data/avro/simple_enum.avro'


# It would be great to see the file read as "4" groups with even sizes (offsets) eventually
query TT
query error DataFusion error: Error during planning: table 'datafusion\.public\.avro_table' not found
EXPLAIN SELECT * FROM avro_table
----
logical_plan TableScan: avro_table projection=[f1, f2, f3]
physical_plan AvroExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/avro/simple_enum.avro]]}, projection=[f1, f2, f3]

# Cleanup
statement ok
statement error DataFusion error: Execution error: Table 'avro_table' doesn't exist\.
DROP TABLE avro_table;

0 comments on commit aeb3f42

Please sign in to comment.