diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index c2468b9be..a9c900b4f 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -1566,6 +1566,16 @@ impl PhysicalPlanner { )); } + // Cast sort expressions to Int64 to match DataFusion's window frame (offsets are Int64) + let cast_sort_exprs: Vec = sort_exprs.iter().map(|sort_expr| { + let expr = sort_expr.expr.clone(); + let cast_expr = Arc::new(CastExpr::new(expr, DataType::Int64, None)); + PhysicalSortExpr { + expr: cast_expr, + options: sort_expr.options.clone(), + } + }).collect(); + let window_func = match self.find_df_window_function(&window_func_name) { Some(f) => f, _ => { @@ -1668,7 +1678,7 @@ impl PhysicalPlanner { &window_args, &[], partition_by, - sort_exprs, + &cast_sort_exprs, window_frame.into(), &input_schema, false, // TODO: Ignore nulls diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 1258a9d16..3839e8a3c 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -113,11 +113,17 @@ class CometExecSuite extends CometTestBase { |(3, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), null) |AS testData(val, val_long, val_double, val_date, val_timestamp, cate) |""".stripMargin) - val df = sql(""" + val df1 = sql(""" |SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val ROWS CURRENT ROW) |FROM testData ORDER BY cate, val |""".stripMargin) - checkSparkAnswer(df) + checkSparkAnswer(df1) + + // ORDER BY with RANGE frame + val df2 = sql(""" + |SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val RANGE 1 PRECEDING) FROM testData + |""".stripMargin) + checkSparkAnswer(df2) } }