diff --git a/.gitignore b/.gitignore index 8bdcd51d3..14e28d3f4 100644 --- a/.gitignore +++ b/.gitignore @@ -8,7 +8,7 @@ derby.log metastore_db/ spark-warehouse/ dependency-reduced-pom.xml -native/core/src/execution/generated +native/proto/src/generated prebuild .flattened-pom.xml rat.txt diff --git a/native/Cargo.lock b/native/Cargo.lock index 649e137f0..c3aae93af 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -864,6 +864,7 @@ dependencies = [ "crc32fast", "criterion", "datafusion", + "datafusion-comet-proto", "datafusion-comet-spark-expr", "datafusion-common", "datafusion-expr", @@ -886,7 +887,6 @@ dependencies = [ "paste", "pprof", "prost 0.12.6", - "prost-build", "rand", "regex", "serde", @@ -900,6 +900,14 @@ dependencies = [ "zstd", ] +[[package]] +name = "datafusion-comet-proto" +version = "0.1.0" +dependencies = [ + "prost 0.12.6", + "prost-build", +] + [[package]] name = "datafusion-comet-spark-expr" version = "0.1.0" diff --git a/native/Cargo.toml b/native/Cargo.toml index 4f306452a..c52d906bc 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -16,7 +16,7 @@ # under the License. [workspace] -members = ["core", "spark-expr"] +members = ["core", "spark-expr", "proto"] resolver = "2" [workspace.package] @@ -47,6 +47,7 @@ datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", r datafusion-physical-expr-common = { git = "https://github.com/apache/datafusion.git", rev = "40.0.0", default-features = false } datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "40.0.0", default-features = false } datafusion-comet-spark-expr = { path = "spark-expr", version = "0.1.0" } +datafusion-comet-proto = { path = "proto", version = "0.1.0" } chrono = { version = "0.4", default-features = false, features = ["clock"] } chrono-tz = { version = "0.8" } num = "0.4" diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 90ead502f..158c26319 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -76,9 +76,7 @@ regex = { workspace = true } crc32fast = "1.3.2" simd-adler32 = "0.3.7" datafusion-comet-spark-expr = { workspace = true } - -[build-dependencies] -prost-build = "0.9.0" +datafusion-comet-proto = { workspace = true } [dev-dependencies] pprof = { version = "0.13.0", features = ["flamegraph"] } diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 7e6383059..a6fefba66 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -63,9 +63,6 @@ use itertools::Itertools; use jni::objects::GlobalRef; use num::{BigInt, ToPrimitive}; -use crate::execution::spark_operator::lower_window_frame_bound::LowerFrameBoundStruct; -use crate::execution::spark_operator::upper_window_frame_bound::UpperFrameBoundStruct; -use crate::execution::spark_operator::WindowFrameType; use crate::{ errors::ExpressionError, execution::{ @@ -94,17 +91,22 @@ use crate::{ }, operators::{CopyExec, ExecutionError, ScanExec}, serde::to_arrow_datatype, - spark_expression, - spark_expression::{ - agg_expr::ExprStruct as AggExprStruct, expr::ExprStruct, literal::Value, AggExpr, Expr, - ScalarFunc, - }, - spark_operator::{operator::OpStruct, BuildSide, JoinType, Operator}, - spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning}, }, }; use super::expressions::{create_named_struct::CreateNamedStruct, EvalMode}; +use datafusion_comet_proto::{ + spark_expression::{ + self, agg_expr::ExprStruct as AggExprStruct, expr::ExprStruct, literal::Value, AggExpr, + Expr, ScalarFunc, + }, + spark_operator::{ + self, lower_window_frame_bound::LowerFrameBoundStruct, operator::OpStruct, + upper_window_frame_bound::UpperFrameBoundStruct, BuildSide, JoinType, Operator, + WindowFrameType, + }, + spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning}, +}; use datafusion_comet_spark_expr::{ Abs, Cast, DateTruncExec, HourExec, IfExpr, MinuteExec, SecondExec, TimestampTruncExec, }; @@ -1452,7 +1454,7 @@ impl PhysicalPlanner { /// Create a DataFusion windows physical expression from Spark physical expression fn create_window_expr<'a>( &'a self, - spark_expr: &'a crate::execution::spark_operator::WindowExpr, + spark_expr: &'a spark_operator::WindowExpr, input_schema: SchemaRef, partition_by: &[Arc], sort_exprs: &[PhysicalSortExpr], @@ -1833,17 +1835,16 @@ mod tests { use datafusion::{physical_plan::common::collect, prelude::SessionContext}; use tokio::sync::mpsc; - use crate::execution::{ - datafusion::planner::PhysicalPlanner, - operators::InputBatch, + use crate::execution::{datafusion::planner::PhysicalPlanner, operators::InputBatch}; + + use crate::execution::operators::ExecutionError; + use datafusion_comet_proto::{ + spark_expression::expr::ExprStruct::*, spark_expression::{self, literal}, spark_operator, + spark_operator::{operator::OpStruct, Operator}, }; - use crate::execution::operators::ExecutionError; - use spark_expression::expr::ExprStruct::*; - use spark_operator::{operator::OpStruct, Operator}; - #[test] fn test_unpack_dictionary_primitive() { let op_scan = Operator { diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index bc194238b..d326b4f39 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -49,10 +49,10 @@ use crate::{ execution::{ datafusion::planner::PhysicalPlanner, metrics::utils::update_comet_metric, serde::to_arrow_datatype, shuffle::row::process_sorted_row_partition, sort::RdxSort, - spark_operator::Operator, }, jvm_bridge::{jni_new_global_ref, JVMClasses}, }; +use datafusion_comet_proto::spark_operator::Operator; use futures::stream::StreamExt; use jni::{ objects::GlobalRef, diff --git a/native/core/src/execution/mod.rs b/native/core/src/execution/mod.rs index cdd429231..f17935702 100644 --- a/native/core/src/execution/mod.rs +++ b/native/core/src/execution/mod.rs @@ -32,24 +32,6 @@ pub(crate) mod utils; mod memory_pool; pub use memory_pool::*; -// Include generated modules from .proto files. -#[allow(missing_docs)] -pub mod spark_expression { - include!(concat!("generated", "/spark.spark_expression.rs")); -} - -// Include generated modules from .proto files. -#[allow(missing_docs)] -pub mod spark_partitioning { - include!(concat!("generated", "/spark.spark_partitioning.rs")); -} - -// Include generated modules from .proto files. -#[allow(missing_docs)] -pub mod spark_operator { - include!(concat!("generated", "/spark.spark_operator.rs")); -} - #[cfg(test)] mod tests { #[test] diff --git a/native/core/src/execution/serde.rs b/native/core/src/execution/serde.rs index b88e3d651..659ce41f0 100644 --- a/native/core/src/execution/serde.rs +++ b/native/core/src/execution/serde.rs @@ -17,19 +17,20 @@ //! Ser/De for expression/operators. -use super::{ - operators::ExecutionError, spark_expression, spark_expression::DataType, spark_operator, -}; -use crate::{ - errors::ExpressionError, - execution::spark_expression::data_type::{ +use super::operators::ExecutionError; +use crate::errors::ExpressionError; +use arrow::datatypes::{DataType as ArrowDataType, TimeUnit}; +use arrow_schema::{Field, Fields}; +use datafusion_comet_proto::{ + spark_expression, + spark_expression::data_type::{ data_type_info::DatatypeStruct, DataTypeId, DataTypeId::{Bool, Bytes, Decimal, Double, Float, Int16, Int32, Int64, Int8, String}, }, + spark_expression::DataType, + spark_operator, }; -use arrow::datatypes::{DataType as ArrowDataType, TimeUnit}; -use arrow_schema::{Field, Fields}; use prost::Message; use std::{io::Cursor, sync::Arc}; diff --git a/native/proto/Cargo.toml b/native/proto/Cargo.toml new file mode 100644 index 000000000..29aba6396 --- /dev/null +++ b/native/proto/Cargo.toml @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "datafusion-comet-proto" +version = "0.1.0" +homepage = "https://datafusion.apache.org/comet" +repository = "https://github.com/apache/datafusion-comet" +authors = ["Apache DataFusion "] +description = "Apache DataFusion Comet: High performance accelerator for Apache Spark" +readme = "README.md" +license = "Apache-2.0" +edition = "2021" + +[dependencies] +prost = "0.12.1" + +[build-dependencies] +prost-build = "0.9.0" + +[features] +default = [] + +[lib] +name = "datafusion_comet_proto" +path = "src/lib.rs" + + + + diff --git a/native/proto/README.md b/native/proto/README.md new file mode 100644 index 000000000..34bd79c64 --- /dev/null +++ b/native/proto/README.md @@ -0,0 +1,23 @@ + + +# Apache DataFusion Comet: Intermediate Representation of Query Plan + +This crate contains the protocol buffer definitions of Spark physical query plans +and is intended to be used as part of the Apache DataFusion Comet project. \ No newline at end of file diff --git a/native/core/build.rs b/native/proto/build.rs similarity index 79% rename from native/core/build.rs rename to native/proto/build.rs index 14ff0c110..e707f0c3b 100644 --- a/native/core/build.rs +++ b/native/proto/build.rs @@ -20,20 +20,20 @@ use std::{fs, io::Result, path::Path}; fn main() -> Result<()> { - println!("cargo:rerun-if-changed=src/execution/proto/"); + println!("cargo:rerun-if-changed=src/proto/"); - let out_dir = "src/execution/generated"; + let out_dir = "src/generated"; if !Path::new(out_dir).is_dir() { fs::create_dir(out_dir)?; } prost_build::Config::new().out_dir(out_dir).compile_protos( &[ - "src/execution/proto/expr.proto", - "src/execution/proto/partitioning.proto", - "src/execution/proto/operator.proto", + "src/proto/expr.proto", + "src/proto/partitioning.proto", + "src/proto/operator.proto", ], - &["src/execution/proto"], + &["src/proto"], )?; Ok(()) } diff --git a/native/proto/src/lib.rs b/native/proto/src/lib.rs new file mode 100644 index 000000000..eaf1253cc --- /dev/null +++ b/native/proto/src/lib.rs @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Include generated modules from .proto files. +#[allow(missing_docs)] +pub mod spark_expression { + include!(concat!("generated", "/spark.spark_expression.rs")); +} + +// Include generated modules from .proto files. +#[allow(missing_docs)] +pub mod spark_partitioning { + include!(concat!("generated", "/spark.spark_partitioning.rs")); +} + +// Include generated modules from .proto files. +#[allow(missing_docs)] +pub mod spark_operator { + include!(concat!("generated", "/spark.spark_operator.rs")); +} diff --git a/native/core/src/execution/proto/expr.proto b/native/proto/src/proto/expr.proto similarity index 100% rename from native/core/src/execution/proto/expr.proto rename to native/proto/src/proto/expr.proto diff --git a/native/core/src/execution/proto/operator.proto b/native/proto/src/proto/operator.proto similarity index 100% rename from native/core/src/execution/proto/operator.proto rename to native/proto/src/proto/operator.proto diff --git a/native/core/src/execution/proto/partitioning.proto b/native/proto/src/proto/partitioning.proto similarity index 100% rename from native/core/src/execution/proto/partitioning.proto rename to native/proto/src/proto/partitioning.proto diff --git a/spark/pom.xml b/spark/pom.xml index 49672e0bb..70ea32187 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -152,7 +152,7 @@ under the License. com.google.protobuf:protoc:${protobuf.version} - ../native/core/src/execution/proto + ../native/proto/src/proto