diff --git a/native/Cargo.lock b/native/Cargo.lock index 9a8eab83e..7966bb80b 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -931,6 +931,7 @@ version = "0.5.0" dependencies = [ "arrow", "arrow-array", + "arrow-buffer", "arrow-data", "arrow-schema", "chrono", diff --git a/native/core/benches/bloom_filter_agg.rs b/native/core/benches/bloom_filter_agg.rs index 25d27d174..b83ff3fad 100644 --- a/native/core/benches/bloom_filter_agg.rs +++ b/native/core/benches/bloom_filter_agg.rs @@ -19,7 +19,7 @@ use arrow::datatypes::{DataType, Field, Schema}; use arrow_array::builder::Int64Builder; use arrow_array::{ArrayRef, RecordBatch}; use arrow_schema::SchemaRef; -use comet::execution::datafusion::expressions::bloom_filter_agg::BloomFilterAgg; +use comet::execution::expressions::bloom_filter_agg::BloomFilterAgg; use criterion::{black_box, criterion_group, criterion_main, Criterion}; use datafusion::physical_expr::PhysicalExpr; use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; diff --git a/native/core/benches/shuffle_writer.rs b/native/core/benches/shuffle_writer.rs index 6f2871861..272887238 100644 --- a/native/core/benches/shuffle_writer.rs +++ b/native/core/benches/shuffle_writer.rs @@ -17,7 +17,7 @@ use arrow_array::{builder::StringBuilder, RecordBatch}; use arrow_schema::{DataType, Field, Schema}; -use comet::execution::datafusion::shuffle_writer::ShuffleWriterExec; +use comet::execution::shuffle::ShuffleWriterExec; use criterion::{criterion_group, criterion_main, Criterion}; use datafusion::{ physical_plan::{common::collect, memory::MemoryExec, ExecutionPlan}, diff --git a/native/core/src/execution/datafusion/expressions/mod.rs b/native/core/src/execution/datafusion/expressions/mod.rs deleted file mode 100644 index 5f9f322b2..000000000 --- a/native/core/src/execution/datafusion/expressions/mod.rs +++ /dev/null @@ -1,35 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Native DataFusion expressions - -pub mod checkoverflow; - -use crate::errors::CometError; -pub mod bloom_filter_agg; -pub mod bloom_filter_might_contain; -pub mod negative; -pub mod subquery; -pub mod unbound; - -pub use datafusion_comet_spark_expr::{EvalMode, SparkError}; - -fn arithmetic_overflow_error(from_type: &str) -> CometError { - CometError::Spark(SparkError::ArithmeticOverflow { - from_type: from_type.to_string(), - }) -} diff --git a/native/core/src/execution/datafusion/operators/mod.rs b/native/core/src/execution/datafusion/operators/mod.rs deleted file mode 100644 index 3d28a266a..000000000 --- a/native/core/src/execution/datafusion/operators/mod.rs +++ /dev/null @@ -1,18 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -pub mod expand; diff --git a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs b/native/core/src/execution/expressions/bloom_filter_agg.rs similarity index 97% rename from native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs rename to native/core/src/execution/expressions/bloom_filter_agg.rs index 1300e08c2..ea8bb3647 100644 --- a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs +++ b/native/core/src/execution/expressions/bloom_filter_agg.rs @@ -19,8 +19,8 @@ use arrow_schema::Field; use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility}; use std::{any::Any, sync::Arc}; -use crate::execution::datafusion::util::spark_bloom_filter; -use crate::execution::datafusion::util::spark_bloom_filter::SparkBloomFilter; +use crate::execution::util::spark_bloom_filter; +use crate::execution::util::spark_bloom_filter::SparkBloomFilter; use arrow::array::ArrayRef; use arrow_array::BinaryArray; use datafusion::error::Result; diff --git a/native/core/src/execution/datafusion/expressions/bloom_filter_might_contain.rs b/native/core/src/execution/expressions/bloom_filter_might_contain.rs similarity index 97% rename from native/core/src/execution/datafusion/expressions/bloom_filter_might_contain.rs rename to native/core/src/execution/expressions/bloom_filter_might_contain.rs index de922d831..af6a5a47a 100644 --- a/native/core/src/execution/datafusion/expressions/bloom_filter_might_contain.rs +++ b/native/core/src/execution/expressions/bloom_filter_might_contain.rs @@ -15,9 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::{ - execution::datafusion::util::spark_bloom_filter::SparkBloomFilter, parquet::data_type::AsBytes, -}; +use crate::{execution::util::spark_bloom_filter::SparkBloomFilter, parquet::data_type::AsBytes}; use arrow::record_batch::RecordBatch; use arrow_array::cast::as_primitive_array; use arrow_schema::{DataType, Schema}; diff --git a/native/core/src/execution/datafusion/mod.rs b/native/core/src/execution/expressions/mod.rs similarity index 83% rename from native/core/src/execution/datafusion/mod.rs rename to native/core/src/execution/expressions/mod.rs index ca41fa0aa..e2f811fa2 100644 --- a/native/core/src/execution/datafusion/mod.rs +++ b/native/core/src/execution/expressions/mod.rs @@ -15,11 +15,10 @@ // specific language governing permissions and limitations // under the License. -//! Native execution through DataFusion +//! Native DataFusion expressions -pub mod expressions; -mod operators; -pub mod planner; -pub mod shuffle_writer; -pub(crate) mod spark_plan; -mod util; +pub mod bloom_filter_agg; +pub mod bloom_filter_might_contain; +pub mod subquery; + +pub use datafusion_comet_spark_expr::EvalMode; diff --git a/native/core/src/execution/datafusion/expressions/subquery.rs b/native/core/src/execution/expressions/subquery.rs similarity index 100% rename from native/core/src/execution/datafusion/expressions/subquery.rs rename to native/core/src/execution/expressions/subquery.rs diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 5103f5ce4..491b389c9 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -45,8 +45,8 @@ use super::{serde, utils::SparkArrowConvert, CometMemoryPool}; use crate::{ errors::{try_unwrap_or_throw, CometError, CometResult}, execution::{ - datafusion::planner::PhysicalPlanner, metrics::utils::update_comet_metric, - serde::to_arrow_datatype, shuffle::row::process_sorted_row_partition, sort::RdxSort, + metrics::utils::update_comet_metric, planner::PhysicalPlanner, serde::to_arrow_datatype, + shuffle::row::process_sorted_row_partition, sort::RdxSort, }, jvm_bridge::{jni_new_global_ref, JVMClasses}, }; @@ -59,8 +59,8 @@ use jni::{ }; use tokio::runtime::Runtime; -use crate::execution::datafusion::spark_plan::SparkPlan; use crate::execution::operators::ScanExec; +use crate::execution::spark_plan::SparkPlan; use log::info; /// Comet native execution context. Kept alive across JNI calls. diff --git a/native/core/src/execution/metrics/utils.rs b/native/core/src/execution/metrics/utils.rs index 4bb1c4474..0eb4b631d 100644 --- a/native/core/src/execution/metrics/utils.rs +++ b/native/core/src/execution/metrics/utils.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::execution::datafusion::spark_plan::SparkPlan; +use crate::execution::spark_plan::SparkPlan; use crate::jvm_bridge::jni_new_global_ref; use crate::{ errors::CometError, diff --git a/native/core/src/execution/mod.rs b/native/core/src/execution/mod.rs index 3dba747f2..a74ec3017 100644 --- a/native/core/src/execution/mod.rs +++ b/native/core/src/execution/mod.rs @@ -16,13 +16,16 @@ // under the License. //! PoC of vectorization execution through JNI to Rust. -pub mod datafusion; +pub mod expressions; pub mod jni_api; mod metrics; pub mod operators; +pub(crate) mod planner; pub mod serde; pub mod shuffle; pub(crate) mod sort; +pub(crate) mod spark_plan; +pub(crate) mod util; pub use datafusion_comet_spark_expr::timezone; pub(crate) mod utils; diff --git a/native/core/src/execution/datafusion/operators/expand.rs b/native/core/src/execution/operators/expand.rs similarity index 97% rename from native/core/src/execution/datafusion/operators/expand.rs rename to native/core/src/execution/operators/expand.rs index a3dd06507..fb43a6e49 100644 --- a/native/core/src/execution/datafusion/operators/expand.rs +++ b/native/core/src/execution/operators/expand.rs @@ -37,14 +37,14 @@ use std::{ /// A Comet native operator that expands a single row into multiple rows. This behaves as same as /// Spark Expand operator. #[derive(Debug)] -pub struct CometExpandExec { +pub struct ExpandExec { projections: Vec>>, child: Arc, schema: SchemaRef, cache: PlanProperties, } -impl CometExpandExec { +impl ExpandExec { /// Create a new ExpandExec pub fn new( projections: Vec>>, @@ -66,7 +66,7 @@ impl CometExpandExec { } } -impl DisplayAs for CometExpandExec { +impl DisplayAs for ExpandExec { fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { @@ -87,7 +87,7 @@ impl DisplayAs for CometExpandExec { } } -impl ExecutionPlan for CometExpandExec { +impl ExecutionPlan for ExpandExec { fn as_any(&self) -> &dyn Any { self } @@ -104,7 +104,7 @@ impl ExecutionPlan for CometExpandExec { self: Arc, children: Vec>, ) -> datafusion_common::Result> { - let new_expand = CometExpandExec::new( + let new_expand = ExpandExec::new( self.projections.clone(), Arc::clone(&children[0]), Arc::clone(&self.schema), diff --git a/native/core/src/execution/operators/mod.rs b/native/core/src/execution/operators/mod.rs index bdc233e94..4e15e4341 100644 --- a/native/core/src/execution/operators/mod.rs +++ b/native/core/src/execution/operators/mod.rs @@ -27,6 +27,8 @@ pub use filter::FilterExec; pub use scan::*; mod copy; +mod expand; +pub use expand::ExpandExec; mod filter; mod scan; diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index 0d35859df..a297f87c1 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -18,8 +18,7 @@ use crate::{ errors::CometError, execution::{ - datafusion::planner::TEST_EXEC_CONTEXT_ID, operators::ExecutionError, - utils::SparkArrowConvert, + operators::ExecutionError, planner::TEST_EXEC_CONTEXT_ID, utils::SparkArrowConvert, }, jvm_bridge::{jni_call, JVMClasses}, }; diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/planner.rs similarity index 98% rename from native/core/src/execution/datafusion/planner.rs rename to native/core/src/execution/planner.rs index 0e64ed6af..3ac830c04 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/planner.rs @@ -22,17 +22,13 @@ use crate::execution::operators::{CopyMode, FilterExec}; use crate::{ errors::ExpressionError, execution::{ - datafusion::{ - expressions::{ - bloom_filter_agg::BloomFilterAgg, - bloom_filter_might_contain::BloomFilterMightContain, checkoverflow::CheckOverflow, - negative, subquery::Subquery, unbound::UnboundColumn, - }, - operators::expand::CometExpandExec, - shuffle_writer::ShuffleWriterExec, + expressions::{ + bloom_filter_agg::BloomFilterAgg, bloom_filter_might_contain::BloomFilterMightContain, + subquery::Subquery, }, - operators::{CopyExec, ExecutionError, ScanExec}, + operators::{CopyExec, ExecutionError, ExpandExec, ScanExec}, serde::to_arrow_datatype, + shuffle::ShuffleWriterExec, }, }; use arrow::compute::CastOptions; @@ -68,11 +64,11 @@ use datafusion::{ }, prelude::SessionContext, }; -use datafusion_comet_spark_expr::create_comet_physical_fun; +use datafusion_comet_spark_expr::{create_comet_physical_fun, create_negate_expr}; use datafusion_functions_nested::concat::ArrayAppend; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; -use crate::execution::datafusion::spark_plan::SparkPlan; +use crate::execution::spark_plan::SparkPlan; use datafusion_comet_proto::{ spark_expression::{ self, agg_expr::ExprStruct as AggExprStruct, expr::ExprStruct, literal::Value, AggExpr, @@ -86,11 +82,11 @@ use datafusion_comet_proto::{ spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning}, }; use datafusion_comet_spark_expr::{ - ArrayInsert, Avg, AvgDecimal, BitwiseNotExpr, Cast, Contains, Correlation, Covariance, - CreateNamedStruct, DateTruncExpr, EndsWith, GetArrayStructFields, GetStructField, HourExpr, - IfExpr, Like, ListExtract, MinuteExpr, NormalizeNaNAndZero, RLike, SecondExpr, + ArrayInsert, Avg, AvgDecimal, BitwiseNotExpr, Cast, CheckOverflow, Contains, Correlation, + Covariance, CreateNamedStruct, DateTruncExpr, EndsWith, GetArrayStructFields, GetStructField, + HourExpr, IfExpr, Like, ListExtract, MinuteExpr, NormalizeNaNAndZero, RLike, SecondExpr, SparkCastOptions, StartsWith, Stddev, StringSpaceExpr, SubstringExpr, SumDecimal, - TimestampTruncExpr, ToJson, Variance, + TimestampTruncExpr, ToJson, UnboundColumn, Variance, }; use datafusion_common::scalar::ScalarStructBuilder; use datafusion_common::{ @@ -611,7 +607,7 @@ impl PhysicalPlanner { ExprStruct::UnaryMinus(expr) => { let child: Arc = self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; - let result = negative::create_negate_expr(child, expr.fail_on_error); + let result = create_negate_expr(child, expr.fail_on_error); result.map_err(|e| ExecutionError::GeneralError(e.to_string())) } ExprStruct::NormalizeNanAndZero(expr) => { @@ -1118,7 +1114,7 @@ impl PhysicalPlanner { } else { Arc::clone(&child.native_plan) }; - let expand = Arc::new(CometExpandExec::new(projections, input, schema)); + let expand = Arc::new(ExpandExec::new(projections, input, schema)); Ok(( scans, Arc::new(SparkPlan::new(spark_plan.plan_id, expand, vec![child])), @@ -2270,7 +2266,7 @@ mod tests { use datafusion::{physical_plan::common::collect, prelude::SessionContext}; use tokio::sync::mpsc; - use crate::execution::{datafusion::planner::PhysicalPlanner, operators::InputBatch}; + use crate::execution::{operators::InputBatch, planner::PhysicalPlanner}; use crate::execution::operators::ExecutionError; use datafusion_comet_proto::{ diff --git a/native/core/src/execution/shuffle/mod.rs b/native/core/src/execution/shuffle/mod.rs index b052df29b..8721ead74 100644 --- a/native/core/src/execution/shuffle/mod.rs +++ b/native/core/src/execution/shuffle/mod.rs @@ -18,3 +18,5 @@ mod list; mod map; pub mod row; +mod shuffle_writer; +pub use shuffle_writer::ShuffleWriterExec; diff --git a/native/core/src/execution/shuffle/row.rs b/native/core/src/execution/shuffle/row.rs index 17b180e9d..ce752e68a 100644 --- a/native/core/src/execution/shuffle/row.rs +++ b/native/core/src/execution/shuffle/row.rs @@ -20,10 +20,10 @@ use crate::{ errors::CometError, execution::{ - datafusion::shuffle_writer::{write_ipc_compressed, Checksum}, shuffle::{ list::{append_list_element, SparkUnsafeArray}, map::{append_map_elements, get_map_key_value_dt, SparkUnsafeMap}, + shuffle_writer::{write_ipc_compressed, Checksum}, }, utils::bytes_to_i128, }, diff --git a/native/core/src/execution/datafusion/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs similarity index 100% rename from native/core/src/execution/datafusion/shuffle_writer.rs rename to native/core/src/execution/shuffle/shuffle_writer.rs diff --git a/native/core/src/execution/datafusion/spark_plan.rs b/native/core/src/execution/spark_plan.rs similarity index 100% rename from native/core/src/execution/datafusion/spark_plan.rs rename to native/core/src/execution/spark_plan.rs diff --git a/native/core/src/execution/datafusion/util/mod.rs b/native/core/src/execution/util/mod.rs similarity index 100% rename from native/core/src/execution/datafusion/util/mod.rs rename to native/core/src/execution/util/mod.rs diff --git a/native/core/src/execution/datafusion/util/spark_bit_array.rs b/native/core/src/execution/util/spark_bit_array.rs similarity index 100% rename from native/core/src/execution/datafusion/util/spark_bit_array.rs rename to native/core/src/execution/util/spark_bit_array.rs diff --git a/native/core/src/execution/datafusion/util/spark_bloom_filter.rs b/native/core/src/execution/util/spark_bloom_filter.rs similarity index 98% rename from native/core/src/execution/datafusion/util/spark_bloom_filter.rs rename to native/core/src/execution/util/spark_bloom_filter.rs index 35fa23b46..2c3af1691 100644 --- a/native/core/src/execution/datafusion/util/spark_bloom_filter.rs +++ b/native/core/src/execution/util/spark_bloom_filter.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -use crate::execution::datafusion::util::spark_bit_array; -use crate::execution::datafusion::util::spark_bit_array::SparkBitArray; +use crate::execution::util::spark_bit_array; +use crate::execution::util::spark_bit_array::SparkBitArray; use arrow_array::{ArrowNativeTypeOp, BooleanArray, Int64Array}; use arrow_buffer::ToByteSlice; use datafusion_comet_spark_expr::spark_hash::spark_compatible_murmur3_hash; diff --git a/native/spark-expr/Cargo.toml b/native/spark-expr/Cargo.toml index 65517431d..d0bc2fd9d 100644 --- a/native/spark-expr/Cargo.toml +++ b/native/spark-expr/Cargo.toml @@ -29,6 +29,7 @@ edition = { workspace = true } [dependencies] arrow = { workspace = true } arrow-array = { workspace = true } +arrow-buffer = { workspace = true } arrow-data = { workspace = true } arrow-schema = { workspace = true } chrono = { workspace = true } diff --git a/native/core/src/execution/datafusion/expressions/checkoverflow.rs b/native/spark-expr/src/checkoverflow.rs similarity index 100% rename from native/core/src/execution/datafusion/expressions/checkoverflow.rs rename to native/spark-expr/src/checkoverflow.rs diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs index 5dff6e0b8..8a5748058 100644 --- a/native/spark-expr/src/lib.rs +++ b/native/spark-expr/src/lib.rs @@ -29,6 +29,8 @@ mod bitwise_not; pub use bitwise_not::{bitwise_not, BitwiseNotExpr}; mod avg_decimal; pub use avg_decimal::AvgDecimal; +mod checkoverflow; +pub use checkoverflow::CheckOverflow; mod correlation; pub use correlation::Correlation; mod covariance; @@ -45,10 +47,14 @@ pub use stddev::Stddev; mod structs; mod sum_decimal; pub use sum_decimal::SumDecimal; +mod negative; +pub use negative::{create_negate_expr, NegativeExpr}; mod normalize_nan; mod temporal; pub mod timezone; mod to_json; +mod unbound; +pub use unbound::UnboundColumn; pub mod utils; pub use normalize_nan::NormalizeNaNAndZero; @@ -83,3 +89,9 @@ pub enum EvalMode { /// failing the entire query. Try, } + +pub(crate) fn arithmetic_overflow_error(from_type: &str) -> SparkError { + SparkError::ArithmeticOverflow { + from_type: from_type.to_string(), + } +} diff --git a/native/core/src/execution/datafusion/expressions/negative.rs b/native/spark-expr/src/negative.rs similarity index 98% rename from native/core/src/execution/datafusion/expressions/negative.rs rename to native/spark-expr/src/negative.rs index 8dfe71742..3d9063e78 100644 --- a/native/core/src/execution/datafusion/expressions/negative.rs +++ b/native/spark-expr/src/negative.rs @@ -16,7 +16,7 @@ // under the License. use super::arithmetic_overflow_error; -use crate::errors::CometError; +use crate::SparkError; use arrow::{compute::kernels::numeric::neg_wrapping, datatypes::IntervalDayTimeType}; use arrow_array::RecordBatch; use arrow_buffer::IntervalDayTime; @@ -26,8 +26,7 @@ use datafusion::{ logical_expr::{interval_arithmetic::Interval, ColumnarValue}, physical_expr::PhysicalExpr, }; -use datafusion_comet_spark_expr::SparkError; -use datafusion_common::{Result, ScalarValue}; +use datafusion_common::{DataFusionError, Result, ScalarValue}; use datafusion_expr::sort_properties::ExprProperties; use std::{ any::Any, @@ -38,7 +37,7 @@ use std::{ pub fn create_negate_expr( expr: Arc, fail_on_error: bool, -) -> Result, CometError> { +) -> Result, DataFusionError> { Ok(Arc::new(NegativeExpr::new(expr, fail_on_error))) } diff --git a/native/core/src/execution/datafusion/expressions/unbound.rs b/native/spark-expr/src/unbound.rs similarity index 100% rename from native/core/src/execution/datafusion/expressions/unbound.rs rename to native/spark-expr/src/unbound.rs