Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Move remaining expressions to spark-expr crate + some minor refactoring #1165

Merged
merged 8 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion native/core/benches/bloom_filter_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use arrow::datatypes::{DataType, Field, Schema};
use arrow_array::builder::Int64Builder;
use arrow_array::{ArrayRef, RecordBatch};
use arrow_schema::SchemaRef;
use comet::execution::datafusion::expressions::bloom_filter_agg::BloomFilterAgg;
use comet::execution::expressions::bloom_filter_agg::BloomFilterAgg;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
Expand Down
2 changes: 1 addition & 1 deletion native/core/benches/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use arrow_array::{builder::StringBuilder, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use comet::execution::datafusion::shuffle_writer::ShuffleWriterExec;
use comet::execution::shuffle::ShuffleWriterExec;
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion::{
physical_plan::{common::collect, memory::MemoryExec, ExecutionPlan},
Expand Down
35 changes: 0 additions & 35 deletions native/core/src/execution/datafusion/expressions/mod.rs

This file was deleted.

18 changes: 0 additions & 18 deletions native/core/src/execution/datafusion/operators/mod.rs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use arrow_schema::Field;
use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility};
use std::{any::Any, sync::Arc};

use crate::execution::datafusion::util::spark_bloom_filter;
use crate::execution::datafusion::util::spark_bloom_filter::SparkBloomFilter;
use crate::execution::util::spark_bloom_filter;
use crate::execution::util::spark_bloom_filter::SparkBloomFilter;
use arrow::array::ArrayRef;
use arrow_array::BinaryArray;
use datafusion::error::Result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::{
execution::datafusion::util::spark_bloom_filter::SparkBloomFilter, parquet::data_type::AsBytes,
};
use crate::{execution::util::spark_bloom_filter::SparkBloomFilter, parquet::data_type::AsBytes};
use arrow::record_batch::RecordBatch;
use arrow_array::cast::as_primitive_array;
use arrow_schema::{DataType, Schema};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
// specific language governing permissions and limitations
// under the License.

//! Native execution through DataFusion
//! Native DataFusion expressions

pub mod expressions;
mod operators;
pub mod planner;
pub mod shuffle_writer;
pub(crate) mod spark_plan;
mod util;
pub mod bloom_filter_agg;
pub mod bloom_filter_might_contain;
pub mod subquery;

pub use datafusion_comet_spark_expr::EvalMode;
6 changes: 3 additions & 3 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ use super::{serde, utils::SparkArrowConvert, CometMemoryPool};
use crate::{
errors::{try_unwrap_or_throw, CometError, CometResult},
execution::{
datafusion::planner::PhysicalPlanner, metrics::utils::update_comet_metric,
serde::to_arrow_datatype, shuffle::row::process_sorted_row_partition, sort::RdxSort,
metrics::utils::update_comet_metric, planner::PhysicalPlanner, serde::to_arrow_datatype,
shuffle::row::process_sorted_row_partition, sort::RdxSort,
},
jvm_bridge::{jni_new_global_ref, JVMClasses},
};
Expand All @@ -59,8 +59,8 @@ use jni::{
};
use tokio::runtime::Runtime;

use crate::execution::datafusion::spark_plan::SparkPlan;
use crate::execution::operators::ScanExec;
use crate::execution::spark_plan::SparkPlan;
use log::info;

/// Comet native execution context. Kept alive across JNI calls.
Expand Down
2 changes: 1 addition & 1 deletion native/core/src/execution/metrics/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::execution::datafusion::spark_plan::SparkPlan;
use crate::execution::spark_plan::SparkPlan;
use crate::jvm_bridge::jni_new_global_ref;
use crate::{
errors::CometError,
Expand Down
5 changes: 4 additions & 1 deletion native/core/src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
// under the License.

//! PoC of vectorization execution through JNI to Rust.
pub mod datafusion;
pub mod expressions;
pub mod jni_api;
mod metrics;
pub mod operators;
pub(crate) mod planner;
pub mod serde;
pub mod shuffle;
pub(crate) mod sort;
pub(crate) mod spark_plan;
pub(crate) mod util;
pub use datafusion_comet_spark_expr::timezone;
pub(crate) mod utils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ use std::{
/// A Comet native operator that expands a single row into multiple rows. This behaves as same as
/// Spark Expand operator.
#[derive(Debug)]
pub struct CometExpandExec {
pub struct ExpandExec {
projections: Vec<Vec<Arc<dyn PhysicalExpr>>>,
child: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
cache: PlanProperties,
}

impl CometExpandExec {
impl ExpandExec {
/// Create a new ExpandExec
pub fn new(
projections: Vec<Vec<Arc<dyn PhysicalExpr>>>,
Expand All @@ -66,7 +66,7 @@ impl CometExpandExec {
}
}

impl DisplayAs for CometExpandExec {
impl DisplayAs for ExpandExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
Expand All @@ -87,7 +87,7 @@ impl DisplayAs for CometExpandExec {
}
}

impl ExecutionPlan for CometExpandExec {
impl ExecutionPlan for ExpandExec {
fn as_any(&self) -> &dyn Any {
self
}
Expand All @@ -104,7 +104,7 @@ impl ExecutionPlan for CometExpandExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
let new_expand = CometExpandExec::new(
let new_expand = ExpandExec::new(
self.projections.clone(),
Arc::clone(&children[0]),
Arc::clone(&self.schema),
Expand Down
2 changes: 2 additions & 0 deletions native/core/src/execution/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub use filter::FilterExec;
pub use scan::*;

mod copy;
mod expand;
pub use expand::ExpandExec;
mod filter;
mod scan;

Expand Down
3 changes: 1 addition & 2 deletions native/core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
use crate::{
errors::CometError,
execution::{
datafusion::planner::TEST_EXEC_CONTEXT_ID, operators::ExecutionError,
utils::SparkArrowConvert,
operators::ExecutionError, planner::TEST_EXEC_CONTEXT_ID, utils::SparkArrowConvert,
},
jvm_bridge::{jni_call, JVMClasses},
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,13 @@ use crate::execution::operators::{CopyMode, FilterExec};
use crate::{
errors::ExpressionError,
execution::{
datafusion::{
expressions::{
bloom_filter_agg::BloomFilterAgg,
bloom_filter_might_contain::BloomFilterMightContain, checkoverflow::CheckOverflow,
negative, subquery::Subquery, unbound::UnboundColumn,
},
operators::expand::CometExpandExec,
shuffle_writer::ShuffleWriterExec,
expressions::{
bloom_filter_agg::BloomFilterAgg, bloom_filter_might_contain::BloomFilterMightContain,
subquery::Subquery,
},
operators::{CopyExec, ExecutionError, ScanExec},
operators::{CopyExec, ExecutionError, ExpandExec, ScanExec},
serde::to_arrow_datatype,
shuffle::ShuffleWriterExec,
},
};
use arrow::compute::CastOptions;
Expand Down Expand Up @@ -68,11 +64,11 @@ use datafusion::{
},
prelude::SessionContext,
};
use datafusion_comet_spark_expr::create_comet_physical_fun;
use datafusion_comet_spark_expr::{create_comet_physical_fun, create_negate_expr};
use datafusion_functions_nested::concat::ArrayAppend;
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};

use crate::execution::datafusion::spark_plan::SparkPlan;
use crate::execution::spark_plan::SparkPlan;
use datafusion_comet_proto::{
spark_expression::{
self, agg_expr::ExprStruct as AggExprStruct, expr::ExprStruct, literal::Value, AggExpr,
Expand All @@ -86,11 +82,11 @@ use datafusion_comet_proto::{
spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning},
};
use datafusion_comet_spark_expr::{
ArrayInsert, Avg, AvgDecimal, BitwiseNotExpr, Cast, Contains, Correlation, Covariance,
CreateNamedStruct, DateTruncExpr, EndsWith, GetArrayStructFields, GetStructField, HourExpr,
IfExpr, Like, ListExtract, MinuteExpr, NormalizeNaNAndZero, RLike, SecondExpr,
ArrayInsert, Avg, AvgDecimal, BitwiseNotExpr, Cast, CheckOverflow, Contains, Correlation,
Covariance, CreateNamedStruct, DateTruncExpr, EndsWith, GetArrayStructFields, GetStructField,
HourExpr, IfExpr, Like, ListExtract, MinuteExpr, NormalizeNaNAndZero, RLike, SecondExpr,
SparkCastOptions, StartsWith, Stddev, StringSpaceExpr, SubstringExpr, SumDecimal,
TimestampTruncExpr, ToJson, Variance,
TimestampTruncExpr, ToJson, UnboundColumn, Variance,
};
use datafusion_common::scalar::ScalarStructBuilder;
use datafusion_common::{
Expand Down Expand Up @@ -611,7 +607,7 @@ impl PhysicalPlanner {
ExprStruct::UnaryMinus(expr) => {
let child: Arc<dyn PhysicalExpr> =
self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
let result = negative::create_negate_expr(child, expr.fail_on_error);
let result = create_negate_expr(child, expr.fail_on_error);
result.map_err(|e| ExecutionError::GeneralError(e.to_string()))
}
ExprStruct::NormalizeNanAndZero(expr) => {
Expand Down Expand Up @@ -1118,7 +1114,7 @@ impl PhysicalPlanner {
} else {
Arc::clone(&child.native_plan)
};
let expand = Arc::new(CometExpandExec::new(projections, input, schema));
let expand = Arc::new(ExpandExec::new(projections, input, schema));
Ok((
scans,
Arc::new(SparkPlan::new(spark_plan.plan_id, expand, vec![child])),
Expand Down Expand Up @@ -2270,7 +2266,7 @@ mod tests {
use datafusion::{physical_plan::common::collect, prelude::SessionContext};
use tokio::sync::mpsc;

use crate::execution::{datafusion::planner::PhysicalPlanner, operators::InputBatch};
use crate::execution::{operators::InputBatch, planner::PhysicalPlanner};

use crate::execution::operators::ExecutionError;
use datafusion_comet_proto::{
Expand Down
2 changes: 2 additions & 0 deletions native/core/src/execution/shuffle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@
mod list;
mod map;
pub mod row;
mod shuffle_writer;
pub use shuffle_writer::ShuffleWriterExec;
2 changes: 1 addition & 1 deletion native/core/src/execution/shuffle/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
use crate::{
errors::CometError,
execution::{
datafusion::shuffle_writer::{write_ipc_compressed, Checksum},
shuffle::{
list::{append_list_element, SparkUnsafeArray},
map::{append_map_elements, get_map_key_value_dt, SparkUnsafeMap},
shuffle_writer::{write_ipc_compressed, Checksum},
},
utils::bytes_to_i128,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use crate::execution::datafusion::util::spark_bit_array;
use crate::execution::datafusion::util::spark_bit_array::SparkBitArray;
use crate::execution::util::spark_bit_array;
use crate::execution::util::spark_bit_array::SparkBitArray;
use arrow_array::{ArrowNativeTypeOp, BooleanArray, Int64Array};
use arrow_buffer::ToByteSlice;
use datafusion_comet_spark_expr::spark_hash::spark_compatible_murmur3_hash;
Expand Down
1 change: 1 addition & 0 deletions native/spark-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ edition = { workspace = true }
[dependencies]
arrow = { workspace = true }
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-data = { workspace = true }
arrow-schema = { workspace = true }
chrono = { workspace = true }
Expand Down
12 changes: 12 additions & 0 deletions native/spark-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ mod bitwise_not;
pub use bitwise_not::{bitwise_not, BitwiseNotExpr};
mod avg_decimal;
pub use avg_decimal::AvgDecimal;
mod checkoverflow;
pub use checkoverflow::CheckOverflow;
mod correlation;
pub use correlation::Correlation;
mod covariance;
Expand All @@ -45,10 +47,14 @@ pub use stddev::Stddev;
mod structs;
mod sum_decimal;
pub use sum_decimal::SumDecimal;
mod negative;
pub use negative::{create_negate_expr, NegativeExpr};
mod normalize_nan;
mod temporal;
pub mod timezone;
mod to_json;
mod unbound;
pub use unbound::UnboundColumn;
pub mod utils;
pub use normalize_nan::NormalizeNaNAndZero;

Expand Down Expand Up @@ -83,3 +89,9 @@ pub enum EvalMode {
/// failing the entire query.
Try,
}

pub(crate) fn arithmetic_overflow_error(from_type: &str) -> SparkError {
SparkError::ArithmeticOverflow {
from_type: from_type.to_string(),
}
}
Loading
Loading