Skip to content

Commit

Permalink
Update parse_protobuf_file_scan_config to remove any partition column…
Browse files Browse the repository at this point in the history
…s from the file_schema in FileScanConfig (apache#9126)

* Update parse_protobuf_file_scan_config to remove any partition columns from the file_schema.

* Add in explainations for the fix and a roundtrip test.

* Clean up unused imports left over after debugging.
  • Loading branch information
bcmcmill authored Feb 6, 2024
1 parent 9669520 commit 2b6ca7b
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 5 deletions.
2 changes: 1 addition & 1 deletion datafusion/proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,4 @@ serde_json = { workspace = true, optional = true }
[dev-dependencies]
doc-comment = { workspace = true }
strum = { version = "0.26.1", features = ["derive"] }
tokio = "1.18"
tokio = { version = "1.18", features = ["rt-multi-thread"] }
16 changes: 14 additions & 2 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,13 +530,25 @@ pub fn parse_protobuf_file_scan_config(
true => ObjectStoreUrl::local_filesystem(),
};

// extract types of partition columns
// Reacquire the partition column types from the schema before removing them below.
let table_partition_cols = proto
.table_partition_cols
.iter()
.map(|col| Ok(schema.field_with_name(col)?.clone()))
.collect::<Result<Vec<_>>>()?;

// Remove partition columns from the schema after recreating table_partition_cols
// because the partition columns are not in the file. They are present to allow the
// the partition column types to be reconstructed after serde.
let file_schema = Arc::new(Schema::new(
schema
.fields()
.iter()
.filter(|field| !table_partition_cols.contains(field))
.cloned()
.collect::<Vec<_>>(),
));

let mut output_ordering = vec![];
for node_collection in &proto.output_ordering {
let sort_expr = node_collection
Expand All @@ -562,7 +574,7 @@ pub fn parse_protobuf_file_scan_config(

Ok(FileScanConfig {
object_store_url,
file_schema: schema,
file_schema,
file_groups,
statistics,
projection,
Expand Down
13 changes: 12 additions & 1 deletion datafusion/proto/src/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,17 @@ impl TryFrom<&FileScanConfig> for protobuf::FileScanExecConf {
output_orderings.push(expr_node_vec)
}

// Fields must be added to the schema so that they can persist in the protobuf
// and then they are to be removed from the schema in `parse_protobuf_file_scan_config`
let mut fields = conf
.file_schema
.fields()
.iter()
.cloned()
.collect::<Vec<_>>();
fields.extend(conf.table_partition_cols.iter().cloned().map(Arc::new));
let schema = Arc::new(datafusion::arrow::datatypes::Schema::new(fields.clone()));

Ok(protobuf::FileScanExecConf {
file_groups,
statistics: Some((&conf.statistics).into()),
Expand All @@ -749,7 +760,7 @@ impl TryFrom<&FileScanConfig> for protobuf::FileScanExecConf {
.iter()
.map(|n| *n as u32)
.collect(),
schema: Some(conf.file_schema.as_ref().try_into()?),
schema: Some(schema.as_ref().try_into()?),
table_partition_cols: conf
.table_partition_cols
.iter()
Expand Down
30 changes: 29 additions & 1 deletion datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use arrow::csv::WriterBuilder;
use std::ops::Deref;
use std::sync::Arc;
use std::vec;

use datafusion::arrow::array::ArrayRef;
use datafusion::arrow::compute::kernels::sort::SortOptions;
Expand All @@ -28,7 +29,8 @@ use datafusion::datasource::file_format::parquet::ParquetSink;
use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile};
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{
FileScanConfig, FileSinkConfig, ParquetExec,
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig,
FileSinkConfig, ParquetExec,
};
use datafusion::execution::context::ExecutionProps;
use datafusion::logical_expr::{
Expand Down Expand Up @@ -561,6 +563,32 @@ fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> {
)))
}

#[tokio::test]
async fn roundtrip_parquet_exec_with_table_partition_cols() -> Result<()> {
let mut file_group =
PartitionedFile::new("/path/to/part=0/file.parquet".to_string(), 1024);
file_group.partition_values =
vec![wrap_partition_value_in_dict(ScalarValue::Int64(Some(0)))];
let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));

let scan_config = FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: vec![vec![file_group]],
statistics: Statistics::new_unknown(&schema),
file_schema: schema,
projection: Some(vec![0, 1]),
limit: None,
table_partition_cols: vec![Field::new(
"part".to_string(),
wrap_partition_type_in_dict(DataType::Int16),
false,
)],
output_ordering: vec![],
};

roundtrip_test(Arc::new(ParquetExec::new(scan_config, None, None)))
}

#[test]
fn roundtrip_builtin_scalar_function() -> Result<()> {
let field_a = Field::new("a", DataType::Int64, false);
Expand Down

0 comments on commit 2b6ca7b

Please sign in to comment.