From 11046975c5f083d1e64cc344c07e36a34d476d90 Mon Sep 17 00:00:00 2001 From: huaxingao Date: Thu, 25 Jul 2024 17:25:22 -0700 Subject: [PATCH 01/10] fix: window function range offset should be long instead of int --- native/proto/src/proto/operator.proto | 4 +-- .../apache/comet/serde/QueryPlanSerde.scala | 27 +++++++++++++++---- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 6a29e6330..533d504c4 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -161,11 +161,11 @@ message UpperWindowFrameBound { } message Preceding { - int32 offset = 1; + int64 offset = 1; } message Following { - int32 offset = 1; + int64 offset = 1; } message UnboundedPreceding {} diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 0ee05d82c..8f9a5c546 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -255,15 +255,17 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim (None, exprToProto(windowExpr.windowFunction, output)) } + if (aggExpr.isEmpty && builtinFunc.isEmpty) { + return None + } + val f = windowExpr.windowSpec.frameSpecification val (frameType, lowerBound, upperBound) = f match { case SpecifiedWindowFrame(frameType, lBound, uBound) => val frameProto = frameType match { case RowFrame => OperatorOuterClass.WindowFrameType.Rows - case RangeFrame => - withInfo(windowExpr, "Range frame is not supported") - return None + case RangeFrame => OperatorOuterClass.WindowFrameType.Range } val lBoundProto = lBound match { @@ -278,12 +280,19 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim .setCurrentRow(OperatorOuterClass.CurrentRow.newBuilder().build()) .build() case e => + val offset = e.eval() match { + case i: Integer => i.toLong + case l: Long => l + case _ => + throw new IllegalArgumentException( + "Unsupported data type for window function row/range offset") + } OperatorOuterClass.LowerWindowFrameBound .newBuilder() .setPreceding( OperatorOuterClass.Preceding .newBuilder() - .setOffset(e.eval().asInstanceOf[Int]) + .setOffset(offset) .build()) .build() } @@ -300,12 +309,20 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim .setCurrentRow(OperatorOuterClass.CurrentRow.newBuilder().build()) .build() case e => + val offset = e.eval() match { + case i: Integer => i.toLong + case l: Long => l + case _ => + throw new IllegalArgumentException( + "Unsupported data type for window function row/range offset") + } + OperatorOuterClass.UpperWindowFrameBound .newBuilder() .setFollowing( OperatorOuterClass.Following .newBuilder() - .setOffset(e.eval().asInstanceOf[Int]) + .setOffset(offset) .build()) .build() } From 423bcc39b651eb97ec28e2808a95720dd5921630 Mon Sep 17 00:00:00 2001 From: huaxingao Date: Fri, 26 Jul 2024 18:10:40 -0700 Subject: [PATCH 02/10] fix error --- native/core/src/execution/datafusion/planner.rs | 2 +- spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index d7c8d7459..4a9a10fb5 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -1696,7 +1696,7 @@ impl PhysicalPlanner { WindowFrameBound::Preceding(ScalarValue::UInt64(None)) } LowerFrameBoundStruct::Preceding(offset) => { - let offset_value = offset.offset.unsigned_abs() as u64; + let offset_value = offset.offset.unsigned_abs(); WindowFrameBound::Preceding(ScalarValue::UInt64(Some(offset_value))) } LowerFrameBoundStruct::CurrentRow(_) => WindowFrameBound::CurrentRow, diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 7e6334e0e..e02c55df5 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -44,7 +44,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastNestedLoopJoinExec, Cartes import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.expressions.Window -import org.apache.spark.sql.functions.{col, count, date_add, expr, lead, sum} +import org.apache.spark.sql.functions.{col, date_add, expr, lead, sum} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE import org.apache.spark.unsafe.types.UTF8String From 63822c1df0a168a52a213845c6977b2dbd6f47fe Mon Sep 17 00:00:00 2001 From: huaxingao Date: Sat, 27 Jul 2024 08:52:00 -0700 Subject: [PATCH 03/10] fall back to Spark if range offset is not int or long --- .../scala/org/apache/comet/serde/QueryPlanSerde.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 8f9a5c546..749bc9130 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -283,9 +283,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim val offset = e.eval() match { case i: Integer => i.toLong case l: Long => l - case _ => - throw new IllegalArgumentException( - "Unsupported data type for window function row/range offset") + case _ => return None } OperatorOuterClass.LowerWindowFrameBound .newBuilder() @@ -312,9 +310,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim val offset = e.eval() match { case i: Integer => i.toLong case l: Long => l - case _ => - throw new IllegalArgumentException( - "Unsupported data type for window function row/range offset") + case _ => return None } OperatorOuterClass.UpperWindowFrameBound From a1e829c73126a004f21061b6f486096adda11258 Mon Sep 17 00:00:00 2001 From: huaxingao Date: Mon, 29 Jul 2024 11:03:33 -0700 Subject: [PATCH 04/10] uncomment tests --- .../test/scala/org/apache/comet/exec/CometExecSuite.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index e02c55df5..ab306aeeb 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -1769,10 +1769,9 @@ class CometExecSuite extends CometTestBase { aggregateFunctions.foreach { function => val queries = Seq( s"SELECT $function OVER() FROM t1", - // TODO: Range frame is not supported yet. - // s"SELECT $function OVER(order by _2) FROM t1", - // s"SELECT $function OVER(order by _2 desc) FROM t1", - // s"SELECT $function OVER(partition by _2 order by _2) FROM t1", + s"SELECT $function OVER(order by _2) FROM t1", + s"SELECT $function OVER(order by _2 desc) FROM t1", + s"SELECT $function OVER(partition by _2 order by _2) FROM t1", s"SELECT $function OVER(rows between 1 preceding and 1 following) FROM t1", s"SELECT $function OVER(order by _2 rows between 1 preceding and current row) FROM t1", s"SELECT $function OVER(order by _2 rows between current row and 1 following) FROM t1") From e164d11ef24b5886ff696cb9cd990cb6471c5bef Mon Sep 17 00:00:00 2001 From: huaxingao Date: Mon, 12 Aug 2024 14:36:49 -0700 Subject: [PATCH 05/10] rebase --- spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index ab306aeeb..6c3d68a82 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -44,7 +44,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastNestedLoopJoinExec, Cartes import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.expressions.Window -import org.apache.spark.sql.functions.{col, date_add, expr, lead, sum} +import org.apache.spark.sql.functions.{col, count, date_add, expr, lead, sum} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE import org.apache.spark.unsafe.types.UTF8String From 8cc5772224c30de43543a4388b527adbbb9184dd Mon Sep 17 00:00:00 2001 From: huaxingao Date: Mon, 19 Aug 2024 14:22:03 -0700 Subject: [PATCH 06/10] fix offset datatype --- .../core/src/execution/datafusion/planner.rs | 32 +++++++++++++++---- .../apache/comet/exec/CometExecSuite.scala | 12 +++---- 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 4a9a10fb5..485df8f8e 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -1693,15 +1693,24 @@ impl PhysicalPlanner { { Some(l) => match l { LowerFrameBoundStruct::UnboundedPreceding(_) => { - WindowFrameBound::Preceding(ScalarValue::UInt64(None)) + match units { + WindowFrameUnits::Rows => WindowFrameBound::Preceding(ScalarValue::UInt64(None)), + WindowFrameUnits::Range | WindowFrameUnits::Groups => WindowFrameBound::Preceding(ScalarValue::Int64(None)), + } } LowerFrameBoundStruct::Preceding(offset) => { - let offset_value = offset.offset.unsigned_abs(); - WindowFrameBound::Preceding(ScalarValue::UInt64(Some(offset_value))) + let offset_value = offset.offset.abs() as i64; + match units { + WindowFrameUnits::Rows => WindowFrameBound::Preceding(ScalarValue::UInt64(Some(offset_value as u64))), + WindowFrameUnits::Range | WindowFrameUnits::Groups => WindowFrameBound::Preceding(ScalarValue::Int64(Some(offset_value))), + } } LowerFrameBoundStruct::CurrentRow(_) => WindowFrameBound::CurrentRow, }, - None => WindowFrameBound::Preceding(ScalarValue::UInt64(None)), + None => match units { + WindowFrameUnits::Rows => WindowFrameBound::Preceding(ScalarValue::UInt64(None)), + WindowFrameUnits::Range | WindowFrameUnits::Groups => WindowFrameBound::Preceding(ScalarValue::Int64(None)), + }, }; let upper_bound: WindowFrameBound = match spark_window_frame @@ -1711,14 +1720,23 @@ impl PhysicalPlanner { { Some(u) => match u { UpperFrameBoundStruct::UnboundedFollowing(_) => { - WindowFrameBound::Following(ScalarValue::UInt64(None)) + match units { + WindowFrameUnits::Rows => WindowFrameBound::Following(ScalarValue::UInt64(None)), + WindowFrameUnits::Range | WindowFrameUnits::Groups => WindowFrameBound::Following(ScalarValue::Int64(None)), + } } UpperFrameBoundStruct::Following(offset) => { - WindowFrameBound::Following(ScalarValue::UInt64(Some(offset.offset as u64))) + match units { + WindowFrameUnits::Rows => WindowFrameBound::Following(ScalarValue::UInt64(Some(offset.offset as u64))), + WindowFrameUnits::Range | WindowFrameUnits::Groups => WindowFrameBound::Following(ScalarValue::Int64(Some(offset.offset as i64))), + } } UpperFrameBoundStruct::CurrentRow(_) => WindowFrameBound::CurrentRow, }, - None => WindowFrameBound::Following(ScalarValue::UInt64(None)), + None => match units { + WindowFrameUnits::Rows => WindowFrameBound::Following(ScalarValue::UInt64(None)), + WindowFrameUnits::Range | WindowFrameUnits::Groups => WindowFrameBound::Following(ScalarValue::Int64(None)), + }, }; let window_frame = WindowFrame::new_bounds(units, lower_bound, upper_bound); diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 6c3d68a82..5a7c88b5a 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -193,23 +193,21 @@ class CometExecSuite extends CometTestBase { } } - test("Window range frame should fall back to Spark") { + test("Window range frame with long boundary should not fail") { val df = Seq((1L, "1"), (1L, "1"), (2147483650L, "1"), (3L, "2"), (2L, "1"), (2147483650L, "2")) .toDF("key", "value") - checkAnswer( + checkSparkAnswer( df.select( $"key", count("key").over( - Window.partitionBy($"value").orderBy($"key").rangeBetween(0, 2147483648L))), - Seq(Row(1, 3), Row(1, 3), Row(2, 2), Row(3, 2), Row(2147483650L, 1), Row(2147483650L, 1))) - checkAnswer( + Window.partitionBy($"value").orderBy($"key").rangeBetween(0, 2147483648L)))) + checkSparkAnswer( df.select( $"key", count("key").over( - Window.partitionBy($"value").orderBy($"key").rangeBetween(-2147483649L, 0))), - Seq(Row(1, 2), Row(1, 2), Row(2, 3), Row(2147483650L, 2), Row(2147483650L, 4), Row(3, 1))) + Window.partitionBy($"value").orderBy($"key").rangeBetween(-2147483649L, 0)))) } test("Unsupported window expression should fall back to Spark") { From 0d84fd20fc9194d67c109b303e1b5d53ada7900c Mon Sep 17 00:00:00 2001 From: huaxingao Date: Wed, 21 Aug 2024 17:38:16 -0700 Subject: [PATCH 07/10] fix data type --- .../core/src/execution/datafusion/planner.rs | 54 ++++++++++++------- .../apache/comet/serde/QueryPlanSerde.scala | 42 +++++++++++++++ .../apache/comet/exec/CometExecSuite.scala | 20 ++++++- 3 files changed, 94 insertions(+), 22 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 485df8f8e..89719859f 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -1692,24 +1692,32 @@ impl PhysicalPlanner { .and_then(|inner| inner.lower_frame_bound_struct.as_ref()) { Some(l) => match l { - LowerFrameBoundStruct::UnboundedPreceding(_) => { - match units { - WindowFrameUnits::Rows => WindowFrameBound::Preceding(ScalarValue::UInt64(None)), - WindowFrameUnits::Range | WindowFrameUnits::Groups => WindowFrameBound::Preceding(ScalarValue::Int64(None)), + LowerFrameBoundStruct::UnboundedPreceding(_) => match units { + WindowFrameUnits::Rows => { + WindowFrameBound::Preceding(ScalarValue::UInt64(None)) } - } + WindowFrameUnits::Range | WindowFrameUnits::Groups => { + WindowFrameBound::Preceding(ScalarValue::Int64(None)) + } + }, LowerFrameBoundStruct::Preceding(offset) => { - let offset_value = offset.offset.abs() as i64; + let offset_value = offset.offset.abs(); match units { - WindowFrameUnits::Rows => WindowFrameBound::Preceding(ScalarValue::UInt64(Some(offset_value as u64))), - WindowFrameUnits::Range | WindowFrameUnits::Groups => WindowFrameBound::Preceding(ScalarValue::Int64(Some(offset_value))), + WindowFrameUnits::Rows => WindowFrameBound::Preceding(ScalarValue::UInt64( + Some(offset_value as u64), + )), + WindowFrameUnits::Range | WindowFrameUnits::Groups => { + WindowFrameBound::Preceding(ScalarValue::Int64(Some(offset_value))) + } } } LowerFrameBoundStruct::CurrentRow(_) => WindowFrameBound::CurrentRow, }, None => match units { WindowFrameUnits::Rows => WindowFrameBound::Preceding(ScalarValue::UInt64(None)), - WindowFrameUnits::Range | WindowFrameUnits::Groups => WindowFrameBound::Preceding(ScalarValue::Int64(None)), + WindowFrameUnits::Range | WindowFrameUnits::Groups => { + WindowFrameBound::Preceding(ScalarValue::Int64(None)) + } }, }; @@ -1719,23 +1727,29 @@ impl PhysicalPlanner { .and_then(|inner| inner.upper_frame_bound_struct.as_ref()) { Some(u) => match u { - UpperFrameBoundStruct::UnboundedFollowing(_) => { - match units { - WindowFrameUnits::Rows => WindowFrameBound::Following(ScalarValue::UInt64(None)), - WindowFrameUnits::Range | WindowFrameUnits::Groups => WindowFrameBound::Following(ScalarValue::Int64(None)), + UpperFrameBoundStruct::UnboundedFollowing(_) => match units { + WindowFrameUnits::Rows => { + WindowFrameBound::Following(ScalarValue::UInt64(None)) } - } - UpperFrameBoundStruct::Following(offset) => { - match units { - WindowFrameUnits::Rows => WindowFrameBound::Following(ScalarValue::UInt64(Some(offset.offset as u64))), - WindowFrameUnits::Range | WindowFrameUnits::Groups => WindowFrameBound::Following(ScalarValue::Int64(Some(offset.offset as i64))), + WindowFrameUnits::Range | WindowFrameUnits::Groups => { + WindowFrameBound::Following(ScalarValue::Int64(None)) } - } + }, + UpperFrameBoundStruct::Following(offset) => match units { + WindowFrameUnits::Rows => { + WindowFrameBound::Following(ScalarValue::UInt64(Some(offset.offset as u64))) + } + WindowFrameUnits::Range | WindowFrameUnits::Groups => { + WindowFrameBound::Following(ScalarValue::Int64(Some(offset.offset))) + } + }, UpperFrameBoundStruct::CurrentRow(_) => WindowFrameBound::CurrentRow, }, None => match units { WindowFrameUnits::Rows => WindowFrameBound::Following(ScalarValue::UInt64(None)), - WindowFrameUnits::Range | WindowFrameUnits::Groups => WindowFrameBound::Following(ScalarValue::Int64(None)), + WindowFrameUnits::Range | WindowFrameUnits::Groups => { + WindowFrameBound::Following(ScalarValue::Int64(None)) + } }, }; diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 749bc9130..460c80924 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2787,6 +2787,11 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim return None } + if (partitionSpec.nonEmpty && orderSpec.nonEmpty && + !validatePartitionAndSortSpecsForWindowFunc(partitionSpec, orderSpec, op)) { + return None + } + val windowExprProto = winExprs.map(windowExprToProto(_, output, op.conf)) val partitionExprs = partitionSpec.map(exprToProto(_, child.output)) @@ -3290,4 +3295,41 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim true } } + + private def validatePartitionAndSortSpecsForWindowFunc( + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], + op: SparkPlan): Boolean = { + if (partitionSpec.length != orderSpec.length) { + withInfo(op, "Partitioning and sorting specifications do not match") + return false + } else { + val partitionColumnNames = partitionSpec.collect { case a: AttributeReference => + a.name + } + + if (partitionColumnNames.length != partitionSpec.length) { + withInfo(op, "Unsupported partitioning specification") + return false + } + + val orderColumnNames = orderSpec.collect { case s: SortOrder => + s.child match { + case a: AttributeReference => a.name + } + } + + if (orderColumnNames.length != orderSpec.length) { + withInfo(op, "Unsupported SortOrder") + return false + } + + if (partitionColumnNames.toSet != orderColumnNames.toSet) { + withInfo(op, "Partitioning and sorting specifications do not match") + return false + } + + true + } + } } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 5a7c88b5a..e1dde458c 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -149,6 +149,22 @@ class CometExecSuite extends CometTestBase { } } + test( + "fall back to Spark when the partition spec and order spec are not the same for window function") { + withTempView("test") { + sql(""" + |CREATE OR REPLACE TEMPORARY VIEW test_agg AS SELECT * FROM VALUES + | (1, true), (1, false), + |(2, true), (3, false), (4, true) AS test(k, v) + |""".stripMargin) + + val df = sql(""" + SELECT k, v, every(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg + |""".stripMargin) + checkSparkAnswer(df) + } + } + test("Native window operator should be CometUnaryExec") { withTempView("testData") { sql(""" @@ -164,11 +180,11 @@ class CometExecSuite extends CometTestBase { |(3, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), null) |AS testData(val, val_long, val_double, val_date, val_timestamp, cate) |""".stripMargin) - val df = sql(""" + val df1 = sql(""" |SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val ROWS CURRENT ROW) |FROM testData ORDER BY cate, val |""".stripMargin) - checkSparkAnswer(df) + checkSparkAnswer(df1) } } From 2de759364655b263c176dffc4966a50c95a87440 Mon Sep 17 00:00:00 2001 From: huaxingao Date: Mon, 23 Sep 2024 10:06:59 -0700 Subject: [PATCH 08/10] address comments --- .../apache/comet/serde/QueryPlanSerde.scala | 38 ++++++++----------- 1 file changed, 15 insertions(+), 23 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 460c80924..4744c2a1f 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -3303,33 +3303,25 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim if (partitionSpec.length != orderSpec.length) { withInfo(op, "Partitioning and sorting specifications do not match") return false - } else { - val partitionColumnNames = partitionSpec.collect { case a: AttributeReference => - a.name - } - - if (partitionColumnNames.length != partitionSpec.length) { - withInfo(op, "Unsupported partitioning specification") - return false - } - - val orderColumnNames = orderSpec.collect { case s: SortOrder => - s.child match { - case a: AttributeReference => a.name - } - } + } - if (orderColumnNames.length != orderSpec.length) { - withInfo(op, "Unsupported SortOrder") - return false - } + val partitionColumnNames = partitionSpec.collect { case a: AttributeReference => + a.name + } - if (partitionColumnNames.toSet != orderColumnNames.toSet) { - withInfo(op, "Partitioning and sorting specifications do not match") - return false + val orderColumnNames = orderSpec.collect { case s: SortOrder => + s.child match { + case a: AttributeReference => a.name } + } - true + if (partitionColumnNames.zip(orderColumnNames).exists { case (partCol, orderCol) => + partCol != orderCol + }) { + withInfo(op, "Partitioning and sorting specifications must be the same.") + return false } + + true } } From 78f18f67b492707d267a955afb7bf3ee44e25092 Mon Sep 17 00:00:00 2001 From: huaxingao Date: Mon, 23 Sep 2024 11:51:41 -0700 Subject: [PATCH 09/10] throw Err for WindowFrameUnits::Groups --- .../core/src/execution/datafusion/planner.rs | 44 ++++++++++++++----- 1 file changed, 34 insertions(+), 10 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 89719859f..64adf2258 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -1696,9 +1696,14 @@ impl PhysicalPlanner { WindowFrameUnits::Rows => { WindowFrameBound::Preceding(ScalarValue::UInt64(None)) } - WindowFrameUnits::Range | WindowFrameUnits::Groups => { + WindowFrameUnits::Range => { WindowFrameBound::Preceding(ScalarValue::Int64(None)) } + WindowFrameUnits::Groups => { + return Err(ExecutionError::GeneralError(format!( + "WindowFrameUnits::Groups is not supported." + ))) + } }, LowerFrameBoundStruct::Preceding(offset) => { let offset_value = offset.offset.abs(); @@ -1706,8 +1711,13 @@ impl PhysicalPlanner { WindowFrameUnits::Rows => WindowFrameBound::Preceding(ScalarValue::UInt64( Some(offset_value as u64), )), - WindowFrameUnits::Range | WindowFrameUnits::Groups => { + WindowFrameUnits::Range => { WindowFrameBound::Preceding(ScalarValue::Int64(Some(offset_value))) + }, + WindowFrameUnits::Groups => { + return Err(ExecutionError::GeneralError(format!( + "WindowFrameUnits::Groups is not supported." + ))) } } } @@ -1715,9 +1725,11 @@ impl PhysicalPlanner { }, None => match units { WindowFrameUnits::Rows => WindowFrameBound::Preceding(ScalarValue::UInt64(None)), - WindowFrameUnits::Range | WindowFrameUnits::Groups => { - WindowFrameBound::Preceding(ScalarValue::Int64(None)) - } + WindowFrameUnits::Range => WindowFrameBound::Preceding(ScalarValue::Int64(None)), + WindowFrameUnits::Groups => + return Err(ExecutionError::GeneralError(format!( + "WindowFrameUnits::Groups is not supported." + ))) }, }; @@ -1731,25 +1743,37 @@ impl PhysicalPlanner { WindowFrameUnits::Rows => { WindowFrameBound::Following(ScalarValue::UInt64(None)) } - WindowFrameUnits::Range | WindowFrameUnits::Groups => { + WindowFrameUnits::Range => { WindowFrameBound::Following(ScalarValue::Int64(None)) } + WindowFrameUnits::Groups => { + return Err(ExecutionError::GeneralError(format!( + "WindowFrameUnits::Groups is not supported." + ))) + } }, UpperFrameBoundStruct::Following(offset) => match units { WindowFrameUnits::Rows => { WindowFrameBound::Following(ScalarValue::UInt64(Some(offset.offset as u64))) } - WindowFrameUnits::Range | WindowFrameUnits::Groups => { + WindowFrameUnits::Range => { WindowFrameBound::Following(ScalarValue::Int64(Some(offset.offset))) } + WindowFrameUnits::Groups => { + return Err(ExecutionError::GeneralError(format!( + "WindowFrameUnits::Groups is not supported." + ))) + } }, UpperFrameBoundStruct::CurrentRow(_) => WindowFrameBound::CurrentRow, }, None => match units { WindowFrameUnits::Rows => WindowFrameBound::Following(ScalarValue::UInt64(None)), - WindowFrameUnits::Range | WindowFrameUnits::Groups => { - WindowFrameBound::Following(ScalarValue::Int64(None)) - } + WindowFrameUnits::Range => WindowFrameBound::Following(ScalarValue::Int64(None)), + WindowFrameUnits::Groups => + return Err(ExecutionError::GeneralError(format!( + "WindowFrameUnits::Groups is not supported." + ))) }, }; From 92b5c588f0804a4a910d34cedce3d0a29167931a Mon Sep 17 00:00:00 2001 From: huaxingao Date: Mon, 23 Sep 2024 12:47:45 -0700 Subject: [PATCH 10/10] formatting --- .../core/src/execution/datafusion/planner.rs | 44 ++++++++++--------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 64adf2258..663db0d1b 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -1700,9 +1700,9 @@ impl PhysicalPlanner { WindowFrameBound::Preceding(ScalarValue::Int64(None)) } WindowFrameUnits::Groups => { - return Err(ExecutionError::GeneralError(format!( - "WindowFrameUnits::Groups is not supported." - ))) + return Err(ExecutionError::GeneralError( + "WindowFrameUnits::Groups is not supported.".to_string(), + )); } }, LowerFrameBoundStruct::Preceding(offset) => { @@ -1713,11 +1713,11 @@ impl PhysicalPlanner { )), WindowFrameUnits::Range => { WindowFrameBound::Preceding(ScalarValue::Int64(Some(offset_value))) - }, + } WindowFrameUnits::Groups => { - return Err(ExecutionError::GeneralError(format!( - "WindowFrameUnits::Groups is not supported." - ))) + return Err(ExecutionError::GeneralError( + "WindowFrameUnits::Groups is not supported.".to_string(), + )); } } } @@ -1726,10 +1726,11 @@ impl PhysicalPlanner { None => match units { WindowFrameUnits::Rows => WindowFrameBound::Preceding(ScalarValue::UInt64(None)), WindowFrameUnits::Range => WindowFrameBound::Preceding(ScalarValue::Int64(None)), - WindowFrameUnits::Groups => - return Err(ExecutionError::GeneralError(format!( - "WindowFrameUnits::Groups is not supported." - ))) + WindowFrameUnits::Groups => { + return Err(ExecutionError::GeneralError( + "WindowFrameUnits::Groups is not supported.".to_string(), + )); + } }, }; @@ -1747,9 +1748,9 @@ impl PhysicalPlanner { WindowFrameBound::Following(ScalarValue::Int64(None)) } WindowFrameUnits::Groups => { - return Err(ExecutionError::GeneralError(format!( - "WindowFrameUnits::Groups is not supported." - ))) + return Err(ExecutionError::GeneralError( + "WindowFrameUnits::Groups is not supported.".to_string(), + )); } }, UpperFrameBoundStruct::Following(offset) => match units { @@ -1760,9 +1761,9 @@ impl PhysicalPlanner { WindowFrameBound::Following(ScalarValue::Int64(Some(offset.offset))) } WindowFrameUnits::Groups => { - return Err(ExecutionError::GeneralError(format!( - "WindowFrameUnits::Groups is not supported." - ))) + return Err(ExecutionError::GeneralError( + "WindowFrameUnits::Groups is not supported.".to_string(), + )); } }, UpperFrameBoundStruct::CurrentRow(_) => WindowFrameBound::CurrentRow, @@ -1770,10 +1771,11 @@ impl PhysicalPlanner { None => match units { WindowFrameUnits::Rows => WindowFrameBound::Following(ScalarValue::UInt64(None)), WindowFrameUnits::Range => WindowFrameBound::Following(ScalarValue::Int64(None)), - WindowFrameUnits::Groups => - return Err(ExecutionError::GeneralError(format!( - "WindowFrameUnits::Groups is not supported." - ))) + WindowFrameUnits::Groups => { + return Err(ExecutionError::GeneralError( + "WindowFrameUnits::Groups is not supported.".to_string(), + )); + } }, };