Skip to content

Commit

Permalink
chore: Move remaining expressions to spark-expr crate + some minor re…
Browse files Browse the repository at this point in the history
…factoring (apache#1165)

* move CheckOverflow to spark-expr crate

* move NegativeExpr to spark-expr crate

* move UnboundColumn to spark-expr crate

* move ExpandExec from execution::datafusion::operators to execution::operators

* refactoring to remove datafusion subpackage

* update imports in benches

* fix

* fix
  • Loading branch information
andygrove authored Dec 12, 2024
1 parent 49cf0d7 commit 7db9aa6
Show file tree
Hide file tree
Showing 28 changed files with 63 additions and 104 deletions.
1 change: 1 addition & 0 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion native/core/benches/bloom_filter_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use arrow::datatypes::{DataType, Field, Schema};
use arrow_array::builder::Int64Builder;
use arrow_array::{ArrayRef, RecordBatch};
use arrow_schema::SchemaRef;
use comet::execution::datafusion::expressions::bloom_filter_agg::BloomFilterAgg;
use comet::execution::expressions::bloom_filter_agg::BloomFilterAgg;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
Expand Down
2 changes: 1 addition & 1 deletion native/core/benches/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use arrow_array::{builder::StringBuilder, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use comet::execution::datafusion::shuffle_writer::ShuffleWriterExec;
use comet::execution::shuffle::ShuffleWriterExec;
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion::{
physical_plan::{common::collect, memory::MemoryExec, ExecutionPlan},
Expand Down
35 changes: 0 additions & 35 deletions native/core/src/execution/datafusion/expressions/mod.rs

This file was deleted.

18 changes: 0 additions & 18 deletions native/core/src/execution/datafusion/operators/mod.rs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use arrow_schema::Field;
use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility};
use std::{any::Any, sync::Arc};

use crate::execution::datafusion::util::spark_bloom_filter;
use crate::execution::datafusion::util::spark_bloom_filter::SparkBloomFilter;
use crate::execution::util::spark_bloom_filter;
use crate::execution::util::spark_bloom_filter::SparkBloomFilter;
use arrow::array::ArrayRef;
use arrow_array::BinaryArray;
use datafusion::error::Result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::{
execution::datafusion::util::spark_bloom_filter::SparkBloomFilter, parquet::data_type::AsBytes,
};
use crate::{execution::util::spark_bloom_filter::SparkBloomFilter, parquet::data_type::AsBytes};
use arrow::record_batch::RecordBatch;
use arrow_array::cast::as_primitive_array;
use arrow_schema::{DataType, Schema};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
// specific language governing permissions and limitations
// under the License.

//! Native execution through DataFusion
//! Native DataFusion expressions
pub mod expressions;
mod operators;
pub mod planner;
pub mod shuffle_writer;
pub(crate) mod spark_plan;
mod util;
pub mod bloom_filter_agg;
pub mod bloom_filter_might_contain;
pub mod subquery;

pub use datafusion_comet_spark_expr::EvalMode;
6 changes: 3 additions & 3 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ use super::{serde, utils::SparkArrowConvert, CometMemoryPool};
use crate::{
errors::{try_unwrap_or_throw, CometError, CometResult},
execution::{
datafusion::planner::PhysicalPlanner, metrics::utils::update_comet_metric,
serde::to_arrow_datatype, shuffle::row::process_sorted_row_partition, sort::RdxSort,
metrics::utils::update_comet_metric, planner::PhysicalPlanner, serde::to_arrow_datatype,
shuffle::row::process_sorted_row_partition, sort::RdxSort,
},
jvm_bridge::{jni_new_global_ref, JVMClasses},
};
Expand All @@ -59,8 +59,8 @@ use jni::{
};
use tokio::runtime::Runtime;

use crate::execution::datafusion::spark_plan::SparkPlan;
use crate::execution::operators::ScanExec;
use crate::execution::spark_plan::SparkPlan;
use log::info;

/// Comet native execution context. Kept alive across JNI calls.
Expand Down
2 changes: 1 addition & 1 deletion native/core/src/execution/metrics/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::execution::datafusion::spark_plan::SparkPlan;
use crate::execution::spark_plan::SparkPlan;
use crate::jvm_bridge::jni_new_global_ref;
use crate::{
errors::CometError,
Expand Down
5 changes: 4 additions & 1 deletion native/core/src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
// under the License.

//! PoC of vectorization execution through JNI to Rust.
pub mod datafusion;
pub mod expressions;
pub mod jni_api;
mod metrics;
pub mod operators;
pub(crate) mod planner;
pub mod serde;
pub mod shuffle;
pub(crate) mod sort;
pub(crate) mod spark_plan;
pub(crate) mod util;
pub use datafusion_comet_spark_expr::timezone;
pub(crate) mod utils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ use std::{
/// A Comet native operator that expands a single row into multiple rows. This behaves as same as
/// Spark Expand operator.
#[derive(Debug)]
pub struct CometExpandExec {
pub struct ExpandExec {
projections: Vec<Vec<Arc<dyn PhysicalExpr>>>,
child: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
cache: PlanProperties,
}

impl CometExpandExec {
impl ExpandExec {
/// Create a new ExpandExec
pub fn new(
projections: Vec<Vec<Arc<dyn PhysicalExpr>>>,
Expand All @@ -66,7 +66,7 @@ impl CometExpandExec {
}
}

impl DisplayAs for CometExpandExec {
impl DisplayAs for ExpandExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
Expand All @@ -87,7 +87,7 @@ impl DisplayAs for CometExpandExec {
}
}

impl ExecutionPlan for CometExpandExec {
impl ExecutionPlan for ExpandExec {
fn as_any(&self) -> &dyn Any {
self
}
Expand All @@ -104,7 +104,7 @@ impl ExecutionPlan for CometExpandExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
let new_expand = CometExpandExec::new(
let new_expand = ExpandExec::new(
self.projections.clone(),
Arc::clone(&children[0]),
Arc::clone(&self.schema),
Expand Down
2 changes: 2 additions & 0 deletions native/core/src/execution/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub use filter::FilterExec;
pub use scan::*;

mod copy;
mod expand;
pub use expand::ExpandExec;
mod filter;
mod scan;

Expand Down
3 changes: 1 addition & 2 deletions native/core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
use crate::{
errors::CometError,
execution::{
datafusion::planner::TEST_EXEC_CONTEXT_ID, operators::ExecutionError,
utils::SparkArrowConvert,
operators::ExecutionError, planner::TEST_EXEC_CONTEXT_ID, utils::SparkArrowConvert,
},
jvm_bridge::{jni_call, JVMClasses},
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,13 @@ use crate::execution::operators::{CopyMode, FilterExec};
use crate::{
errors::ExpressionError,
execution::{
datafusion::{
expressions::{
bloom_filter_agg::BloomFilterAgg,
bloom_filter_might_contain::BloomFilterMightContain, checkoverflow::CheckOverflow,
negative, subquery::Subquery, unbound::UnboundColumn,
},
operators::expand::CometExpandExec,
shuffle_writer::ShuffleWriterExec,
expressions::{
bloom_filter_agg::BloomFilterAgg, bloom_filter_might_contain::BloomFilterMightContain,
subquery::Subquery,
},
operators::{CopyExec, ExecutionError, ScanExec},
operators::{CopyExec, ExecutionError, ExpandExec, ScanExec},
serde::to_arrow_datatype,
shuffle::ShuffleWriterExec,
},
};
use arrow::compute::CastOptions;
Expand Down Expand Up @@ -68,11 +64,11 @@ use datafusion::{
},
prelude::SessionContext,
};
use datafusion_comet_spark_expr::create_comet_physical_fun;
use datafusion_comet_spark_expr::{create_comet_physical_fun, create_negate_expr};
use datafusion_functions_nested::concat::ArrayAppend;
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};

use crate::execution::datafusion::spark_plan::SparkPlan;
use crate::execution::spark_plan::SparkPlan;
use datafusion_comet_proto::{
spark_expression::{
self, agg_expr::ExprStruct as AggExprStruct, expr::ExprStruct, literal::Value, AggExpr,
Expand All @@ -86,11 +82,11 @@ use datafusion_comet_proto::{
spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning},
};
use datafusion_comet_spark_expr::{
ArrayInsert, Avg, AvgDecimal, BitwiseNotExpr, Cast, Contains, Correlation, Covariance,
CreateNamedStruct, DateTruncExpr, EndsWith, GetArrayStructFields, GetStructField, HourExpr,
IfExpr, Like, ListExtract, MinuteExpr, NormalizeNaNAndZero, RLike, SecondExpr,
ArrayInsert, Avg, AvgDecimal, BitwiseNotExpr, Cast, CheckOverflow, Contains, Correlation,
Covariance, CreateNamedStruct, DateTruncExpr, EndsWith, GetArrayStructFields, GetStructField,
HourExpr, IfExpr, Like, ListExtract, MinuteExpr, NormalizeNaNAndZero, RLike, SecondExpr,
SparkCastOptions, StartsWith, Stddev, StringSpaceExpr, SubstringExpr, SumDecimal,
TimestampTruncExpr, ToJson, Variance,
TimestampTruncExpr, ToJson, UnboundColumn, Variance,
};
use datafusion_common::scalar::ScalarStructBuilder;
use datafusion_common::{
Expand Down Expand Up @@ -611,7 +607,7 @@ impl PhysicalPlanner {
ExprStruct::UnaryMinus(expr) => {
let child: Arc<dyn PhysicalExpr> =
self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
let result = negative::create_negate_expr(child, expr.fail_on_error);
let result = create_negate_expr(child, expr.fail_on_error);
result.map_err(|e| ExecutionError::GeneralError(e.to_string()))
}
ExprStruct::NormalizeNanAndZero(expr) => {
Expand Down Expand Up @@ -1118,7 +1114,7 @@ impl PhysicalPlanner {
} else {
Arc::clone(&child.native_plan)
};
let expand = Arc::new(CometExpandExec::new(projections, input, schema));
let expand = Arc::new(ExpandExec::new(projections, input, schema));
Ok((
scans,
Arc::new(SparkPlan::new(spark_plan.plan_id, expand, vec![child])),
Expand Down Expand Up @@ -2270,7 +2266,7 @@ mod tests {
use datafusion::{physical_plan::common::collect, prelude::SessionContext};
use tokio::sync::mpsc;

use crate::execution::{datafusion::planner::PhysicalPlanner, operators::InputBatch};
use crate::execution::{operators::InputBatch, planner::PhysicalPlanner};

use crate::execution::operators::ExecutionError;
use datafusion_comet_proto::{
Expand Down
2 changes: 2 additions & 0 deletions native/core/src/execution/shuffle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@
mod list;
mod map;
pub mod row;
mod shuffle_writer;
pub use shuffle_writer::ShuffleWriterExec;
2 changes: 1 addition & 1 deletion native/core/src/execution/shuffle/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
use crate::{
errors::CometError,
execution::{
datafusion::shuffle_writer::{write_ipc_compressed, Checksum},
shuffle::{
list::{append_list_element, SparkUnsafeArray},
map::{append_map_elements, get_map_key_value_dt, SparkUnsafeMap},
shuffle_writer::{write_ipc_compressed, Checksum},
},
utils::bytes_to_i128,
},
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use crate::execution::datafusion::util::spark_bit_array;
use crate::execution::datafusion::util::spark_bit_array::SparkBitArray;
use crate::execution::util::spark_bit_array;
use crate::execution::util::spark_bit_array::SparkBitArray;
use arrow_array::{ArrowNativeTypeOp, BooleanArray, Int64Array};
use arrow_buffer::ToByteSlice;
use datafusion_comet_spark_expr::spark_hash::spark_compatible_murmur3_hash;
Expand Down
1 change: 1 addition & 0 deletions native/spark-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ edition = { workspace = true }
[dependencies]
arrow = { workspace = true }
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-data = { workspace = true }
arrow-schema = { workspace = true }
chrono = { workspace = true }
Expand Down
File renamed without changes.
12 changes: 12 additions & 0 deletions native/spark-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ mod bitwise_not;
pub use bitwise_not::{bitwise_not, BitwiseNotExpr};
mod avg_decimal;
pub use avg_decimal::AvgDecimal;
mod checkoverflow;
pub use checkoverflow::CheckOverflow;
mod correlation;
pub use correlation::Correlation;
mod covariance;
Expand All @@ -45,10 +47,14 @@ pub use stddev::Stddev;
mod structs;
mod sum_decimal;
pub use sum_decimal::SumDecimal;
mod negative;
pub use negative::{create_negate_expr, NegativeExpr};
mod normalize_nan;
mod temporal;
pub mod timezone;
mod to_json;
mod unbound;
pub use unbound::UnboundColumn;
pub mod utils;
pub use normalize_nan::NormalizeNaNAndZero;

Expand Down Expand Up @@ -83,3 +89,9 @@ pub enum EvalMode {
/// failing the entire query.
Try,
}

pub(crate) fn arithmetic_overflow_error(from_type: &str) -> SparkError {
SparkError::ArithmeticOverflow {
from_type: from_type.to_string(),
}
}
Loading

0 comments on commit 7db9aa6

Please sign in to comment.