From 6b7684bcdf6aeb85f301f4502d416196b6d4936f Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 18 Apr 2024 14:36:44 -0700 Subject: [PATCH 1/2] fix: Comet should not fail on negative limit parameter --- .../scala/org/apache/comet/serde/QueryPlanSerde.scala | 4 ++-- .../scala/org/apache/comet/exec/CometExecSuite.scala | 9 +++++++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index a0c17fc9d..555ab4084 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1837,14 +1837,14 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { } case globalLimitExec: GlobalLimitExec if isCometOperatorEnabled(op.conf, "global_limit") => - if (childOp.nonEmpty) { + // TODO: We don't support negative limit for now. + if (childOp.nonEmpty && globalLimitExec.limit >= 0) { val limitBuilder = OperatorOuterClass.Limit.newBuilder() // Spark 3.2 doesn't support offset for GlobalLimit, but newer Spark versions // support it. Before we upgrade to Spark 3.3, just set it zero. // TODO: Spark 3.3 might have negative limit (-1) for Offset usage. // When we upgrade to Spark 3.3., we need to address it here. - assert(globalLimitExec.limit >= 0, "limit should be greater or equal to zero") limitBuilder.setLimit(globalLimitExec.limit) limitBuilder.setOffset(0) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index a8b05cc98..170ee5e4b 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -60,6 +60,15 @@ class CometExecSuite extends CometTestBase { } } + test("offset") { + assume(isSpark34Plus, "Dataset.offset is not available before Spark 3.4") + withSQLConf(CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + checkSparkAnswer(testData.offset(90)) + checkSparkAnswer(arrayData.toDF().offset(99)) + checkSparkAnswer(mapData.toDF().offset(99)) + } + } + test("try_sum should return null if overflow happens before merging") { assume(isSpark33Plus, "try_sum is available in Spark 3.3+") val longDf = Seq(Long.MaxValue, Long.MaxValue, 2).toDF("v") From 153dcc1d6ab50d1d7b5f7309e4f035c2e97631ba Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 18 Apr 2024 18:45:38 -0700 Subject: [PATCH 2/2] Move test --- pom.xml | 12 +++-- spark/pom.xml | 3 +- .../apache/comet/exec/CometExecSuite.scala | 9 ---- .../apache/comet/exec/CometExec3_4Suite.scala | 52 +++++++++++++++++++ 4 files changed, 62 insertions(+), 14 deletions(-) create mode 100644 spark/src/test/spark-3.4/org/apache/comet/exec/CometExec3_4Suite.scala diff --git a/pom.xml b/pom.xml index c7e417dd1..f5f9bf26f 100644 --- a/pom.xml +++ b/pom.xml @@ -86,7 +86,8 @@ under the License. -Djdk.reflect.useDirectMethodHandle=false -ea -Xmx4g -Xss4m ${extraJavaTestArgs} - spark-3.3-plus + spark-3.3-plus + spark-3.4 @@ -496,7 +497,8 @@ under the License. 3.2 1.12.0 - not-needed-yet + not-needed-yet + not-needed-yet @@ -507,7 +509,8 @@ under the License. 3.3.2 3.3 1.12.0 - spark-3.3-plus + spark-3.3-plus + not-needed-yet @@ -517,7 +520,8 @@ under the License. 2.12.17 3.4 1.13.1 - spark-3.3-plus + spark-3.3-plus + spark-3.4 diff --git a/spark/pom.xml b/spark/pom.xml index 31d80bbe6..e0be7c656 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -245,7 +245,8 @@ under the License. - src/test/${additional.test.source} + src/test/${additional.3_3.test.source} + src/test/${additional.3_4.test.source} diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 170ee5e4b..a8b05cc98 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -60,15 +60,6 @@ class CometExecSuite extends CometTestBase { } } - test("offset") { - assume(isSpark34Plus, "Dataset.offset is not available before Spark 3.4") - withSQLConf(CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { - checkSparkAnswer(testData.offset(90)) - checkSparkAnswer(arrayData.toDF().offset(99)) - checkSparkAnswer(mapData.toDF().offset(99)) - } - } - test("try_sum should return null if overflow happens before merging") { assume(isSpark33Plus, "try_sum is available in Spark 3.3+") val longDf = Seq(Long.MaxValue, Long.MaxValue, 2).toDF("v") diff --git a/spark/src/test/spark-3.4/org/apache/comet/exec/CometExec3_4Suite.scala b/spark/src/test/spark-3.4/org/apache/comet/exec/CometExec3_4Suite.scala new file mode 100644 index 000000000..7644d54be --- /dev/null +++ b/spark/src/test/spark-3.4/org/apache/comet/exec/CometExec3_4Suite.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.exec + +import org.scalactic.source.Position +import org.scalatest.Tag + +import org.apache.spark.sql.CometTestBase + +import org.apache.comet.CometConf + +/** + * This test suite contains tests for only Spark 3.4. + */ +class CometExec3_4Suite extends CometTestBase { + import testImplicits._ + + override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit + pos: Position): Unit = { + super.test(testName, testTags: _*) { + withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + testFun + } + } + } + + // Dataset.offset API is not available before Spark 3.4 + test("offset") { + withSQLConf(CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + checkSparkAnswer(testData.offset(90)) + checkSparkAnswer(arrayData.toDF().offset(99)) + checkSparkAnswer(mapData.toDF().offset(99)) + } + } +}