From 6b9e606c73f84d6fc27cbddd746bbc2bb16a0105 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 12 Dec 2024 07:48:24 -0700 Subject: [PATCH 1/8] move CheckOverflow to spark-expr crate --- native/core/src/execution/datafusion/expressions/mod.rs | 2 -- native/core/src/execution/datafusion/planner.rs | 4 ++-- .../expressions => spark-expr/src}/checkoverflow.rs | 0 native/spark-expr/src/lib.rs | 2 ++ 4 files changed, 4 insertions(+), 4 deletions(-) rename native/{core/src/execution/datafusion/expressions => spark-expr/src}/checkoverflow.rs (100%) diff --git a/native/core/src/execution/datafusion/expressions/mod.rs b/native/core/src/execution/datafusion/expressions/mod.rs index 5f9f322b2..c88753022 100644 --- a/native/core/src/execution/datafusion/expressions/mod.rs +++ b/native/core/src/execution/datafusion/expressions/mod.rs @@ -17,8 +17,6 @@ //! Native DataFusion expressions -pub mod checkoverflow; - use crate::errors::CometError; pub mod bloom_filter_agg; pub mod bloom_filter_might_contain; diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 0e64ed6af..266786f72 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -25,7 +25,7 @@ use crate::{ datafusion::{ expressions::{ bloom_filter_agg::BloomFilterAgg, - bloom_filter_might_contain::BloomFilterMightContain, checkoverflow::CheckOverflow, + bloom_filter_might_contain::BloomFilterMightContain, negative, subquery::Subquery, unbound::UnboundColumn, }, operators::expand::CometExpandExec, @@ -90,7 +90,7 @@ use datafusion_comet_spark_expr::{ CreateNamedStruct, DateTruncExpr, EndsWith, GetArrayStructFields, GetStructField, HourExpr, IfExpr, Like, ListExtract, MinuteExpr, NormalizeNaNAndZero, RLike, SecondExpr, SparkCastOptions, StartsWith, Stddev, StringSpaceExpr, SubstringExpr, SumDecimal, - TimestampTruncExpr, ToJson, Variance, + TimestampTruncExpr, ToJson, Variance,CheckOverflow }; use datafusion_common::scalar::ScalarStructBuilder; use datafusion_common::{ diff --git a/native/core/src/execution/datafusion/expressions/checkoverflow.rs b/native/spark-expr/src/checkoverflow.rs similarity index 100% rename from native/core/src/execution/datafusion/expressions/checkoverflow.rs rename to native/spark-expr/src/checkoverflow.rs diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs index 5dff6e0b8..f94b55173 100644 --- a/native/spark-expr/src/lib.rs +++ b/native/spark-expr/src/lib.rs @@ -29,6 +29,8 @@ mod bitwise_not; pub use bitwise_not::{bitwise_not, BitwiseNotExpr}; mod avg_decimal; pub use avg_decimal::AvgDecimal; +mod checkoverflow; +pub use checkoverflow::CheckOverflow; mod correlation; pub use correlation::Correlation; mod covariance; From 58dad48255c6330cc467044edbce9b60e603a31e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 12 Dec 2024 07:52:15 -0700 Subject: [PATCH 2/8] move NegativeExpr to spark-expr crate --- native/Cargo.lock | 1 + .../src/execution/datafusion/expressions/mod.rs | 8 -------- native/core/src/execution/datafusion/planner.rs | 16 ++++++++-------- native/spark-expr/Cargo.toml | 1 + native/spark-expr/src/lib.rs | 8 ++++++++ .../expressions => spark-expr/src}/negative.rs | 7 +++---- 6 files changed, 21 insertions(+), 20 deletions(-) rename native/{core/src/execution/datafusion/expressions => spark-expr/src}/negative.rs (98%) diff --git a/native/Cargo.lock b/native/Cargo.lock index 9a8eab83e..7966bb80b 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -931,6 +931,7 @@ version = "0.5.0" dependencies = [ "arrow", "arrow-array", + "arrow-buffer", "arrow-data", "arrow-schema", "chrono", diff --git a/native/core/src/execution/datafusion/expressions/mod.rs b/native/core/src/execution/datafusion/expressions/mod.rs index c88753022..d4839dc95 100644 --- a/native/core/src/execution/datafusion/expressions/mod.rs +++ b/native/core/src/execution/datafusion/expressions/mod.rs @@ -17,17 +17,9 @@ //! Native DataFusion expressions -use crate::errors::CometError; pub mod bloom_filter_agg; pub mod bloom_filter_might_contain; -pub mod negative; pub mod subquery; pub mod unbound; pub use datafusion_comet_spark_expr::{EvalMode, SparkError}; - -fn arithmetic_overflow_error(from_type: &str) -> CometError { - CometError::Spark(SparkError::ArithmeticOverflow { - from_type: from_type.to_string(), - }) -} diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 266786f72..6449959de 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -25,8 +25,8 @@ use crate::{ datafusion::{ expressions::{ bloom_filter_agg::BloomFilterAgg, - bloom_filter_might_contain::BloomFilterMightContain, - negative, subquery::Subquery, unbound::UnboundColumn, + bloom_filter_might_contain::BloomFilterMightContain, subquery::Subquery, + unbound::UnboundColumn, }, operators::expand::CometExpandExec, shuffle_writer::ShuffleWriterExec, @@ -68,7 +68,7 @@ use datafusion::{ }, prelude::SessionContext, }; -use datafusion_comet_spark_expr::create_comet_physical_fun; +use datafusion_comet_spark_expr::{create_comet_physical_fun, create_negate_expr}; use datafusion_functions_nested::concat::ArrayAppend; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; @@ -86,11 +86,11 @@ use datafusion_comet_proto::{ spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning}, }; use datafusion_comet_spark_expr::{ - ArrayInsert, Avg, AvgDecimal, BitwiseNotExpr, Cast, Contains, Correlation, Covariance, - CreateNamedStruct, DateTruncExpr, EndsWith, GetArrayStructFields, GetStructField, HourExpr, - IfExpr, Like, ListExtract, MinuteExpr, NormalizeNaNAndZero, RLike, SecondExpr, + ArrayInsert, Avg, AvgDecimal, BitwiseNotExpr, Cast, CheckOverflow, Contains, Correlation, + Covariance, CreateNamedStruct, DateTruncExpr, EndsWith, GetArrayStructFields, GetStructField, + HourExpr, IfExpr, Like, ListExtract, MinuteExpr, NormalizeNaNAndZero, RLike, SecondExpr, SparkCastOptions, StartsWith, Stddev, StringSpaceExpr, SubstringExpr, SumDecimal, - TimestampTruncExpr, ToJson, Variance,CheckOverflow + TimestampTruncExpr, ToJson, Variance, }; use datafusion_common::scalar::ScalarStructBuilder; use datafusion_common::{ @@ -611,7 +611,7 @@ impl PhysicalPlanner { ExprStruct::UnaryMinus(expr) => { let child: Arc = self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; - let result = negative::create_negate_expr(child, expr.fail_on_error); + let result = create_negate_expr(child, expr.fail_on_error); result.map_err(|e| ExecutionError::GeneralError(e.to_string())) } ExprStruct::NormalizeNanAndZero(expr) => { diff --git a/native/spark-expr/Cargo.toml b/native/spark-expr/Cargo.toml index 65517431d..d0bc2fd9d 100644 --- a/native/spark-expr/Cargo.toml +++ b/native/spark-expr/Cargo.toml @@ -29,6 +29,7 @@ edition = { workspace = true } [dependencies] arrow = { workspace = true } arrow-array = { workspace = true } +arrow-buffer = { workspace = true } arrow-data = { workspace = true } arrow-schema = { workspace = true } chrono = { workspace = true } diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs index f94b55173..1a742ec86 100644 --- a/native/spark-expr/src/lib.rs +++ b/native/spark-expr/src/lib.rs @@ -47,6 +47,8 @@ pub use stddev::Stddev; mod structs; mod sum_decimal; pub use sum_decimal::SumDecimal; +mod negative; +pub use negative::{create_negate_expr, NegativeExpr}; mod normalize_nan; mod temporal; pub mod timezone; @@ -85,3 +87,9 @@ pub enum EvalMode { /// failing the entire query. Try, } + +pub(crate) fn arithmetic_overflow_error(from_type: &str) -> SparkError { + SparkError::ArithmeticOverflow { + from_type: from_type.to_string(), + } +} diff --git a/native/core/src/execution/datafusion/expressions/negative.rs b/native/spark-expr/src/negative.rs similarity index 98% rename from native/core/src/execution/datafusion/expressions/negative.rs rename to native/spark-expr/src/negative.rs index 8dfe71742..3d9063e78 100644 --- a/native/core/src/execution/datafusion/expressions/negative.rs +++ b/native/spark-expr/src/negative.rs @@ -16,7 +16,7 @@ // under the License. use super::arithmetic_overflow_error; -use crate::errors::CometError; +use crate::SparkError; use arrow::{compute::kernels::numeric::neg_wrapping, datatypes::IntervalDayTimeType}; use arrow_array::RecordBatch; use arrow_buffer::IntervalDayTime; @@ -26,8 +26,7 @@ use datafusion::{ logical_expr::{interval_arithmetic::Interval, ColumnarValue}, physical_expr::PhysicalExpr, }; -use datafusion_comet_spark_expr::SparkError; -use datafusion_common::{Result, ScalarValue}; +use datafusion_common::{DataFusionError, Result, ScalarValue}; use datafusion_expr::sort_properties::ExprProperties; use std::{ any::Any, @@ -38,7 +37,7 @@ use std::{ pub fn create_negate_expr( expr: Arc, fail_on_error: bool, -) -> Result, CometError> { +) -> Result, DataFusionError> { Ok(Arc::new(NegativeExpr::new(expr, fail_on_error))) } From d4c2ca5d34277ed31d81ebc576da8f5d0160c03f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 12 Dec 2024 07:53:42 -0700 Subject: [PATCH 3/8] move UnboundColumn to spark-expr crate --- native/core/src/execution/datafusion/expressions/mod.rs | 1 - native/core/src/execution/datafusion/planner.rs | 3 +-- native/spark-expr/src/lib.rs | 2 ++ .../datafusion/expressions => spark-expr/src}/unbound.rs | 0 4 files changed, 3 insertions(+), 3 deletions(-) rename native/{core/src/execution/datafusion/expressions => spark-expr/src}/unbound.rs (100%) diff --git a/native/core/src/execution/datafusion/expressions/mod.rs b/native/core/src/execution/datafusion/expressions/mod.rs index d4839dc95..da533d8f5 100644 --- a/native/core/src/execution/datafusion/expressions/mod.rs +++ b/native/core/src/execution/datafusion/expressions/mod.rs @@ -20,6 +20,5 @@ pub mod bloom_filter_agg; pub mod bloom_filter_might_contain; pub mod subquery; -pub mod unbound; pub use datafusion_comet_spark_expr::{EvalMode, SparkError}; diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 6449959de..6bffda046 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -26,7 +26,6 @@ use crate::{ expressions::{ bloom_filter_agg::BloomFilterAgg, bloom_filter_might_contain::BloomFilterMightContain, subquery::Subquery, - unbound::UnboundColumn, }, operators::expand::CometExpandExec, shuffle_writer::ShuffleWriterExec, @@ -90,7 +89,7 @@ use datafusion_comet_spark_expr::{ Covariance, CreateNamedStruct, DateTruncExpr, EndsWith, GetArrayStructFields, GetStructField, HourExpr, IfExpr, Like, ListExtract, MinuteExpr, NormalizeNaNAndZero, RLike, SecondExpr, SparkCastOptions, StartsWith, Stddev, StringSpaceExpr, SubstringExpr, SumDecimal, - TimestampTruncExpr, ToJson, Variance, + TimestampTruncExpr, ToJson, UnboundColumn, Variance, }; use datafusion_common::scalar::ScalarStructBuilder; use datafusion_common::{ diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs index 1a742ec86..8a5748058 100644 --- a/native/spark-expr/src/lib.rs +++ b/native/spark-expr/src/lib.rs @@ -53,6 +53,8 @@ mod normalize_nan; mod temporal; pub mod timezone; mod to_json; +mod unbound; +pub use unbound::UnboundColumn; pub mod utils; pub use normalize_nan::NormalizeNaNAndZero; diff --git a/native/core/src/execution/datafusion/expressions/unbound.rs b/native/spark-expr/src/unbound.rs similarity index 100% rename from native/core/src/execution/datafusion/expressions/unbound.rs rename to native/spark-expr/src/unbound.rs From bcd108f9776b4ad731948c5d14b2267d8a7318af Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 12 Dec 2024 08:04:35 -0700 Subject: [PATCH 4/8] move ExpandExec from execution::datafusion::operators to execution::operators --- native/core/src/execution/datafusion/mod.rs | 1 - .../src/execution/datafusion/operators/mod.rs | 18 ------------------ .../core/src/execution/datafusion/planner.rs | 5 ++--- .../{datafusion => }/operators/expand.rs | 10 +++++----- native/core/src/execution/operators/mod.rs | 2 ++ 5 files changed, 9 insertions(+), 27 deletions(-) delete mode 100644 native/core/src/execution/datafusion/operators/mod.rs rename native/core/src/execution/{datafusion => }/operators/expand.rs (97%) diff --git a/native/core/src/execution/datafusion/mod.rs b/native/core/src/execution/datafusion/mod.rs index ca41fa0aa..dede2f723 100644 --- a/native/core/src/execution/datafusion/mod.rs +++ b/native/core/src/execution/datafusion/mod.rs @@ -18,7 +18,6 @@ //! Native execution through DataFusion pub mod expressions; -mod operators; pub mod planner; pub mod shuffle_writer; pub(crate) mod spark_plan; diff --git a/native/core/src/execution/datafusion/operators/mod.rs b/native/core/src/execution/datafusion/operators/mod.rs deleted file mode 100644 index 3d28a266a..000000000 --- a/native/core/src/execution/datafusion/operators/mod.rs +++ /dev/null @@ -1,18 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -pub mod expand; diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 6bffda046..221ae622b 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -27,10 +27,9 @@ use crate::{ bloom_filter_agg::BloomFilterAgg, bloom_filter_might_contain::BloomFilterMightContain, subquery::Subquery, }, - operators::expand::CometExpandExec, shuffle_writer::ShuffleWriterExec, }, - operators::{CopyExec, ExecutionError, ScanExec}, + operators::{CopyExec, ExecutionError, ExpandExec, ScanExec}, serde::to_arrow_datatype, }, }; @@ -1117,7 +1116,7 @@ impl PhysicalPlanner { } else { Arc::clone(&child.native_plan) }; - let expand = Arc::new(CometExpandExec::new(projections, input, schema)); + let expand = Arc::new(ExpandExec::new(projections, input, schema)); Ok(( scans, Arc::new(SparkPlan::new(spark_plan.plan_id, expand, vec![child])), diff --git a/native/core/src/execution/datafusion/operators/expand.rs b/native/core/src/execution/operators/expand.rs similarity index 97% rename from native/core/src/execution/datafusion/operators/expand.rs rename to native/core/src/execution/operators/expand.rs index a3dd06507..fb43a6e49 100644 --- a/native/core/src/execution/datafusion/operators/expand.rs +++ b/native/core/src/execution/operators/expand.rs @@ -37,14 +37,14 @@ use std::{ /// A Comet native operator that expands a single row into multiple rows. This behaves as same as /// Spark Expand operator. #[derive(Debug)] -pub struct CometExpandExec { +pub struct ExpandExec { projections: Vec>>, child: Arc, schema: SchemaRef, cache: PlanProperties, } -impl CometExpandExec { +impl ExpandExec { /// Create a new ExpandExec pub fn new( projections: Vec>>, @@ -66,7 +66,7 @@ impl CometExpandExec { } } -impl DisplayAs for CometExpandExec { +impl DisplayAs for ExpandExec { fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { @@ -87,7 +87,7 @@ impl DisplayAs for CometExpandExec { } } -impl ExecutionPlan for CometExpandExec { +impl ExecutionPlan for ExpandExec { fn as_any(&self) -> &dyn Any { self } @@ -104,7 +104,7 @@ impl ExecutionPlan for CometExpandExec { self: Arc, children: Vec>, ) -> datafusion_common::Result> { - let new_expand = CometExpandExec::new( + let new_expand = ExpandExec::new( self.projections.clone(), Arc::clone(&children[0]), Arc::clone(&self.schema), diff --git a/native/core/src/execution/operators/mod.rs b/native/core/src/execution/operators/mod.rs index bdc233e94..4e15e4341 100644 --- a/native/core/src/execution/operators/mod.rs +++ b/native/core/src/execution/operators/mod.rs @@ -27,6 +27,8 @@ pub use filter::FilterExec; pub use scan::*; mod copy; +mod expand; +pub use expand::ExpandExec; mod filter; mod scan; From fe430442a39e01eac6292eeeb69ee8c655f6dad1 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 12 Dec 2024 08:09:21 -0700 Subject: [PATCH 5/8] refactoring to remove datafusion subpackage --- native/core/src/execution/datafusion/mod.rs | 24 ------------------- .../expressions/bloom_filter_agg.rs | 4 ++-- .../expressions/bloom_filter_might_contain.rs | 4 +--- .../{datafusion => }/expressions/mod.rs | 2 +- .../{datafusion => }/expressions/subquery.rs | 0 native/core/src/execution/jni_api.rs | 6 ++--- native/core/src/execution/metrics/utils.rs | 2 +- native/core/src/execution/mod.rs | 5 +++- native/core/src/execution/operators/scan.rs | 3 +-- .../src/execution/{datafusion => }/planner.rs | 14 +++++------ native/core/src/execution/shuffle/mod.rs | 2 ++ native/core/src/execution/shuffle/row.rs | 2 +- .../{datafusion => shuffle}/shuffle_writer.rs | 0 .../execution/{datafusion => }/spark_plan.rs | 0 .../execution/{datafusion => }/util/mod.rs | 0 .../{datafusion => }/util/spark_bit_array.rs | 0 .../util/spark_bloom_filter.rs | 4 ++-- 17 files changed, 24 insertions(+), 48 deletions(-) delete mode 100644 native/core/src/execution/datafusion/mod.rs rename native/core/src/execution/{datafusion => }/expressions/bloom_filter_agg.rs (97%) rename native/core/src/execution/{datafusion => }/expressions/bloom_filter_might_contain.rs (97%) rename native/core/src/execution/{datafusion => }/expressions/mod.rs (93%) rename native/core/src/execution/{datafusion => }/expressions/subquery.rs (100%) rename native/core/src/execution/{datafusion => }/planner.rs (99%) rename native/core/src/execution/{datafusion => shuffle}/shuffle_writer.rs (100%) rename native/core/src/execution/{datafusion => }/spark_plan.rs (100%) rename native/core/src/execution/{datafusion => }/util/mod.rs (100%) rename native/core/src/execution/{datafusion => }/util/spark_bit_array.rs (100%) rename native/core/src/execution/{datafusion => }/util/spark_bloom_filter.rs (98%) diff --git a/native/core/src/execution/datafusion/mod.rs b/native/core/src/execution/datafusion/mod.rs deleted file mode 100644 index dede2f723..000000000 --- a/native/core/src/execution/datafusion/mod.rs +++ /dev/null @@ -1,24 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Native execution through DataFusion - -pub mod expressions; -pub mod planner; -pub mod shuffle_writer; -pub(crate) mod spark_plan; -mod util; diff --git a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs b/native/core/src/execution/expressions/bloom_filter_agg.rs similarity index 97% rename from native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs rename to native/core/src/execution/expressions/bloom_filter_agg.rs index 1300e08c2..ea8bb3647 100644 --- a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs +++ b/native/core/src/execution/expressions/bloom_filter_agg.rs @@ -19,8 +19,8 @@ use arrow_schema::Field; use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility}; use std::{any::Any, sync::Arc}; -use crate::execution::datafusion::util::spark_bloom_filter; -use crate::execution::datafusion::util::spark_bloom_filter::SparkBloomFilter; +use crate::execution::util::spark_bloom_filter; +use crate::execution::util::spark_bloom_filter::SparkBloomFilter; use arrow::array::ArrayRef; use arrow_array::BinaryArray; use datafusion::error::Result; diff --git a/native/core/src/execution/datafusion/expressions/bloom_filter_might_contain.rs b/native/core/src/execution/expressions/bloom_filter_might_contain.rs similarity index 97% rename from native/core/src/execution/datafusion/expressions/bloom_filter_might_contain.rs rename to native/core/src/execution/expressions/bloom_filter_might_contain.rs index de922d831..af6a5a47a 100644 --- a/native/core/src/execution/datafusion/expressions/bloom_filter_might_contain.rs +++ b/native/core/src/execution/expressions/bloom_filter_might_contain.rs @@ -15,9 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::{ - execution::datafusion::util::spark_bloom_filter::SparkBloomFilter, parquet::data_type::AsBytes, -}; +use crate::{execution::util::spark_bloom_filter::SparkBloomFilter, parquet::data_type::AsBytes}; use arrow::record_batch::RecordBatch; use arrow_array::cast::as_primitive_array; use arrow_schema::{DataType, Schema}; diff --git a/native/core/src/execution/datafusion/expressions/mod.rs b/native/core/src/execution/expressions/mod.rs similarity index 93% rename from native/core/src/execution/datafusion/expressions/mod.rs rename to native/core/src/execution/expressions/mod.rs index da533d8f5..e2f811fa2 100644 --- a/native/core/src/execution/datafusion/expressions/mod.rs +++ b/native/core/src/execution/expressions/mod.rs @@ -21,4 +21,4 @@ pub mod bloom_filter_agg; pub mod bloom_filter_might_contain; pub mod subquery; -pub use datafusion_comet_spark_expr::{EvalMode, SparkError}; +pub use datafusion_comet_spark_expr::EvalMode; diff --git a/native/core/src/execution/datafusion/expressions/subquery.rs b/native/core/src/execution/expressions/subquery.rs similarity index 100% rename from native/core/src/execution/datafusion/expressions/subquery.rs rename to native/core/src/execution/expressions/subquery.rs diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 5103f5ce4..491b389c9 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -45,8 +45,8 @@ use super::{serde, utils::SparkArrowConvert, CometMemoryPool}; use crate::{ errors::{try_unwrap_or_throw, CometError, CometResult}, execution::{ - datafusion::planner::PhysicalPlanner, metrics::utils::update_comet_metric, - serde::to_arrow_datatype, shuffle::row::process_sorted_row_partition, sort::RdxSort, + metrics::utils::update_comet_metric, planner::PhysicalPlanner, serde::to_arrow_datatype, + shuffle::row::process_sorted_row_partition, sort::RdxSort, }, jvm_bridge::{jni_new_global_ref, JVMClasses}, }; @@ -59,8 +59,8 @@ use jni::{ }; use tokio::runtime::Runtime; -use crate::execution::datafusion::spark_plan::SparkPlan; use crate::execution::operators::ScanExec; +use crate::execution::spark_plan::SparkPlan; use log::info; /// Comet native execution context. Kept alive across JNI calls. diff --git a/native/core/src/execution/metrics/utils.rs b/native/core/src/execution/metrics/utils.rs index 4bb1c4474..0eb4b631d 100644 --- a/native/core/src/execution/metrics/utils.rs +++ b/native/core/src/execution/metrics/utils.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::execution::datafusion::spark_plan::SparkPlan; +use crate::execution::spark_plan::SparkPlan; use crate::jvm_bridge::jni_new_global_ref; use crate::{ errors::CometError, diff --git a/native/core/src/execution/mod.rs b/native/core/src/execution/mod.rs index 3dba747f2..df1877323 100644 --- a/native/core/src/execution/mod.rs +++ b/native/core/src/execution/mod.rs @@ -16,13 +16,16 @@ // under the License. //! PoC of vectorization execution through JNI to Rust. -pub mod datafusion; +pub(crate) mod expressions; pub mod jni_api; mod metrics; pub mod operators; +pub(crate) mod planner; pub mod serde; pub mod shuffle; pub(crate) mod sort; +pub(crate) mod spark_plan; +pub(crate) mod util; pub use datafusion_comet_spark_expr::timezone; pub(crate) mod utils; diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index 0d35859df..a297f87c1 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -18,8 +18,7 @@ use crate::{ errors::CometError, execution::{ - datafusion::planner::TEST_EXEC_CONTEXT_ID, operators::ExecutionError, - utils::SparkArrowConvert, + operators::ExecutionError, planner::TEST_EXEC_CONTEXT_ID, utils::SparkArrowConvert, }, jvm_bridge::{jni_call, JVMClasses}, }; diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/planner.rs similarity index 99% rename from native/core/src/execution/datafusion/planner.rs rename to native/core/src/execution/planner.rs index 221ae622b..3ac830c04 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/planner.rs @@ -22,15 +22,13 @@ use crate::execution::operators::{CopyMode, FilterExec}; use crate::{ errors::ExpressionError, execution::{ - datafusion::{ - expressions::{ - bloom_filter_agg::BloomFilterAgg, - bloom_filter_might_contain::BloomFilterMightContain, subquery::Subquery, - }, - shuffle_writer::ShuffleWriterExec, + expressions::{ + bloom_filter_agg::BloomFilterAgg, bloom_filter_might_contain::BloomFilterMightContain, + subquery::Subquery, }, operators::{CopyExec, ExecutionError, ExpandExec, ScanExec}, serde::to_arrow_datatype, + shuffle::ShuffleWriterExec, }, }; use arrow::compute::CastOptions; @@ -70,7 +68,7 @@ use datafusion_comet_spark_expr::{create_comet_physical_fun, create_negate_expr} use datafusion_functions_nested::concat::ArrayAppend; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; -use crate::execution::datafusion::spark_plan::SparkPlan; +use crate::execution::spark_plan::SparkPlan; use datafusion_comet_proto::{ spark_expression::{ self, agg_expr::ExprStruct as AggExprStruct, expr::ExprStruct, literal::Value, AggExpr, @@ -2268,7 +2266,7 @@ mod tests { use datafusion::{physical_plan::common::collect, prelude::SessionContext}; use tokio::sync::mpsc; - use crate::execution::{datafusion::planner::PhysicalPlanner, operators::InputBatch}; + use crate::execution::{operators::InputBatch, planner::PhysicalPlanner}; use crate::execution::operators::ExecutionError; use datafusion_comet_proto::{ diff --git a/native/core/src/execution/shuffle/mod.rs b/native/core/src/execution/shuffle/mod.rs index b052df29b..8721ead74 100644 --- a/native/core/src/execution/shuffle/mod.rs +++ b/native/core/src/execution/shuffle/mod.rs @@ -18,3 +18,5 @@ mod list; mod map; pub mod row; +mod shuffle_writer; +pub use shuffle_writer::ShuffleWriterExec; diff --git a/native/core/src/execution/shuffle/row.rs b/native/core/src/execution/shuffle/row.rs index 17b180e9d..ce752e68a 100644 --- a/native/core/src/execution/shuffle/row.rs +++ b/native/core/src/execution/shuffle/row.rs @@ -20,10 +20,10 @@ use crate::{ errors::CometError, execution::{ - datafusion::shuffle_writer::{write_ipc_compressed, Checksum}, shuffle::{ list::{append_list_element, SparkUnsafeArray}, map::{append_map_elements, get_map_key_value_dt, SparkUnsafeMap}, + shuffle_writer::{write_ipc_compressed, Checksum}, }, utils::bytes_to_i128, }, diff --git a/native/core/src/execution/datafusion/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs similarity index 100% rename from native/core/src/execution/datafusion/shuffle_writer.rs rename to native/core/src/execution/shuffle/shuffle_writer.rs diff --git a/native/core/src/execution/datafusion/spark_plan.rs b/native/core/src/execution/spark_plan.rs similarity index 100% rename from native/core/src/execution/datafusion/spark_plan.rs rename to native/core/src/execution/spark_plan.rs diff --git a/native/core/src/execution/datafusion/util/mod.rs b/native/core/src/execution/util/mod.rs similarity index 100% rename from native/core/src/execution/datafusion/util/mod.rs rename to native/core/src/execution/util/mod.rs diff --git a/native/core/src/execution/datafusion/util/spark_bit_array.rs b/native/core/src/execution/util/spark_bit_array.rs similarity index 100% rename from native/core/src/execution/datafusion/util/spark_bit_array.rs rename to native/core/src/execution/util/spark_bit_array.rs diff --git a/native/core/src/execution/datafusion/util/spark_bloom_filter.rs b/native/core/src/execution/util/spark_bloom_filter.rs similarity index 98% rename from native/core/src/execution/datafusion/util/spark_bloom_filter.rs rename to native/core/src/execution/util/spark_bloom_filter.rs index 35fa23b46..2c3af1691 100644 --- a/native/core/src/execution/datafusion/util/spark_bloom_filter.rs +++ b/native/core/src/execution/util/spark_bloom_filter.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -use crate::execution::datafusion::util::spark_bit_array; -use crate::execution::datafusion::util::spark_bit_array::SparkBitArray; +use crate::execution::util::spark_bit_array; +use crate::execution::util::spark_bit_array::SparkBitArray; use arrow_array::{ArrowNativeTypeOp, BooleanArray, Int64Array}; use arrow_buffer::ToByteSlice; use datafusion_comet_spark_expr::spark_hash::spark_compatible_murmur3_hash; From 1630b5829c598a4abf95f70da18080baff29ae9a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 12 Dec 2024 08:23:44 -0700 Subject: [PATCH 6/8] update imports in benches --- native/core/benches/bloom_filter_agg.rs | 2 +- native/core/benches/shuffle_writer.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/native/core/benches/bloom_filter_agg.rs b/native/core/benches/bloom_filter_agg.rs index 25d27d174..b83ff3fad 100644 --- a/native/core/benches/bloom_filter_agg.rs +++ b/native/core/benches/bloom_filter_agg.rs @@ -19,7 +19,7 @@ use arrow::datatypes::{DataType, Field, Schema}; use arrow_array::builder::Int64Builder; use arrow_array::{ArrayRef, RecordBatch}; use arrow_schema::SchemaRef; -use comet::execution::datafusion::expressions::bloom_filter_agg::BloomFilterAgg; +use comet::execution::expressions::bloom_filter_agg::BloomFilterAgg; use criterion::{black_box, criterion_group, criterion_main, Criterion}; use datafusion::physical_expr::PhysicalExpr; use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; diff --git a/native/core/benches/shuffle_writer.rs b/native/core/benches/shuffle_writer.rs index 6f2871861..64a750b81 100644 --- a/native/core/benches/shuffle_writer.rs +++ b/native/core/benches/shuffle_writer.rs @@ -17,7 +17,7 @@ use arrow_array::{builder::StringBuilder, RecordBatch}; use arrow_schema::{DataType, Field, Schema}; -use comet::execution::datafusion::shuffle_writer::ShuffleWriterExec; +use comet::execution::shuffle_writer::ShuffleWriterExec; use criterion::{criterion_group, criterion_main, Criterion}; use datafusion::{ physical_plan::{common::collect, memory::MemoryExec, ExecutionPlan}, From adbcf3c7c342139fda3549aa123ca3790ed924d5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 12 Dec 2024 09:22:36 -0700 Subject: [PATCH 7/8] fix --- native/core/src/execution/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native/core/src/execution/mod.rs b/native/core/src/execution/mod.rs index df1877323..a74ec3017 100644 --- a/native/core/src/execution/mod.rs +++ b/native/core/src/execution/mod.rs @@ -16,7 +16,7 @@ // under the License. //! PoC of vectorization execution through JNI to Rust. -pub(crate) mod expressions; +pub mod expressions; pub mod jni_api; mod metrics; pub mod operators; From 01c85d2ffa8bbcfb94d9692de6146667159506ea Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 12 Dec 2024 12:47:27 -0700 Subject: [PATCH 8/8] fix --- native/core/benches/shuffle_writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native/core/benches/shuffle_writer.rs b/native/core/benches/shuffle_writer.rs index 64a750b81..272887238 100644 --- a/native/core/benches/shuffle_writer.rs +++ b/native/core/benches/shuffle_writer.rs @@ -17,7 +17,7 @@ use arrow_array::{builder::StringBuilder, RecordBatch}; use arrow_schema::{DataType, Field, Schema}; -use comet::execution::shuffle_writer::ShuffleWriterExec; +use comet::execution::shuffle::ShuffleWriterExec; use criterion::{criterion_group, criterion_main, Criterion}; use datafusion::{ physical_plan::{common::collect, memory::MemoryExec, ExecutionPlan},