Skip to content

Commit

Permalink
fix style
Browse files Browse the repository at this point in the history
  • Loading branch information
huaxingao committed Jun 27, 2024
1 parent d3b48cb commit 688d150
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 18 deletions.
38 changes: 21 additions & 17 deletions core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
use std::{collections::HashMap, sync::Arc};

use arrow_schema::{DataType, Field, Schema, TimeUnit};
use datafusion::physical_plan::windows::BoundedWindowAggExec;
use datafusion::physical_plan::InputOrderMode;
use datafusion::{
arrow::{compute::SortOptions, datatypes::SchemaRef},
common::DataFusionError,
Expand All @@ -46,20 +48,23 @@ use datafusion::{
},
prelude::SessionContext,
};
use datafusion::physical_plan::InputOrderMode;
use datafusion::physical_plan::windows::BoundedWindowAggExec;
use datafusion_common::{
tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter},
JoinType as DFJoinType, ScalarValue,
};
use datafusion_expr::expr::find_df_window_func;
use datafusion_expr::{ScalarUDF, WindowFrame, WindowFrameBound, WindowFrameUnits};
use datafusion_expr::{
ScalarUDF, WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition,
};
use datafusion_physical_expr::window::WindowExpr;
use datafusion_physical_expr_common::aggregate::create_aggregate_expr;
use itertools::Itertools;
use jni::objects::GlobalRef;
use num::{BigInt, ToPrimitive};

use crate::execution::spark_operator::lower_window_frame_bound::LowerFrameBoundStruct;
use crate::execution::spark_operator::upper_window_frame_bound::UpperFrameBoundStruct;
use crate::execution::spark_operator::WindowFrameType;
use crate::{
errors::ExpressionError,
execution::{
Expand Down Expand Up @@ -100,9 +105,6 @@ use crate::{
spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning},
},
};
use crate::execution::spark_operator::lower_window_frame_bound::LowerFrameBoundStruct;
use crate::execution::spark_operator::upper_window_frame_bound::UpperFrameBoundStruct;
use crate::execution::spark_operator::WindowFrameType;

use super::expressions::{abs::CometAbsFunc, EvalMode};

Expand Down Expand Up @@ -1384,7 +1386,7 @@ impl PhysicalPlanner {
Some(ExprStruct::ScalarFunc(f)) => {
window_func_name = f.func.clone();
window_func_args = f.args.clone();
},
}
other => {
return Err(ExecutionError::GeneralError(format!(
"{other:?} not supported for window function"
Expand All @@ -1396,8 +1398,9 @@ impl PhysicalPlanner {
window_func_name = result.0;
window_func_args = result.1;
} else {
// Handle the case where neither func nor agg_func is set.
return Err(ExecutionError::GeneralError("Both func and agg_func are not set".to_string()));
return Err(ExecutionError::GeneralError(
"Both func and agg_func are not set".to_string(),
));
}

let window_func = match find_df_window_func(&window_func_name) {
Expand Down Expand Up @@ -1484,15 +1487,16 @@ impl PhysicalPlanner {
&input_schema,
false, // TODO: Ignore nulls
)
.map_err(|e| ExecutionError::DataFusionError(e.to_string()))
.map_err(|e| ExecutionError::DataFusionError(e.to_string()))
}

fn process_agg_func(
agg_func: &AggExpr
) -> Result<(String, Vec<Expr>), ExecutionError> {
fn process_agg_func(agg_func: &AggExpr) -> Result<(String, Vec<Expr>), ExecutionError> {

fn optional_expr_to_vec(expr_option: &Option<Expr>) -> Vec<Expr> {
expr_option.as_ref().cloned().map_or_else(Vec::new, |e| vec![e])
expr_option
.as_ref()
.cloned()
.map_or_else(Vec::new, |e| vec![e])
}

fn int_to_stats_type(value: i32) -> Option<StatsType> {
Expand All @@ -1507,13 +1511,13 @@ impl PhysicalPlanner {
Some(AggExprStruct::Count(expr)) => {
let args = &expr.children;
Ok(("count".to_string(), args.to_vec()))
},
}
Some(AggExprStruct::Min(expr)) => {
Ok(("min".to_string(), optional_expr_to_vec(&expr.child)))
},
}
Some(AggExprStruct::Max(expr)) => {
Ok(("max".to_string(), optional_expr_to_vec(&expr.child)))
},
}
other => {
return Err(ExecutionError::GeneralError(format!(
"{other:?} not supported for window function"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateMode, Average, BitAndAgg, BitOrAgg, BitXorAgg, Complete, Corr, Count, CovPopulation, CovSample, Final, First, Last, Max, Min, Partial, StddevPop, StddevSamp, Sum, VariancePop, VarianceSamp}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average, BitAndAgg, BitOrAgg, BitXorAgg, Complete, Corr, Count, CovPopulation, CovSample, Final, First, Last, Max, Min, Partial, StddevPop, StddevSamp, Sum, VariancePop, VarianceSamp}
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, NormalizeNaNAndZero}
import org.apache.spark.sql.catalyst.plans._
Expand Down

0 comments on commit 688d150

Please sign in to comment.