Skip to content

Commit

Permalink
fix: Comet should not fail on negative limit parameter (#288)
Browse files Browse the repository at this point in the history
* fix: Comet should not fail on negative limit parameter

* Move test
  • Loading branch information
viirya authored Apr 19, 2024
1 parent 4da74d8 commit c698d82
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 7 deletions.
12 changes: 8 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ under the License.
-Djdk.reflect.useDirectMethodHandle=false
</extraJavaTestArgs>
<argLine>-ea -Xmx4g -Xss4m ${extraJavaTestArgs}</argLine>
<additional.test.source>spark-3.3-plus</additional.test.source>
<additional.3_3.test.source>spark-3.3-plus</additional.3_3.test.source>
<additional.3_4.test.source>spark-3.4</additional.3_4.test.source>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -496,7 +497,8 @@ under the License.
<spark.version.short>3.2</spark.version.short>
<parquet.version>1.12.0</parquet.version>
<!-- we don't add special test suits for spark-3.2, so a not existed dir is specified-->
<additional.test.source>not-needed-yet</additional.test.source>
<additional.3_3.test.source>not-needed-yet</additional.3_3.test.source>
<additional.3_4.test.source>not-needed-yet</additional.3_4.test.source>
</properties>
</profile>

Expand All @@ -507,7 +509,8 @@ under the License.
<spark.version>3.3.2</spark.version>
<spark.version.short>3.3</spark.version.short>
<parquet.version>1.12.0</parquet.version>
<additional.test.source>spark-3.3-plus</additional.test.source>
<additional.3_3.test.source>spark-3.3-plus</additional.3_3.test.source>
<additional.3_4.test.source>not-needed-yet</additional.3_4.test.source>
</properties>
</profile>

Expand All @@ -517,7 +520,8 @@ under the License.
<scala.version>2.12.17</scala.version>
<spark.version.short>3.4</spark.version.short>
<parquet.version>1.13.1</parquet.version>
<additional.test.source>spark-3.3-plus</additional.test.source>
<additional.3_3.test.source>spark-3.3-plus</additional.3_3.test.source>
<additional.3_4.test.source>spark-3.4</additional.3_4.test.source>
</properties>
</profile>

Expand Down
3 changes: 2 additions & 1 deletion spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ under the License.
</goals>
<configuration>
<sources>
<source>src/test/${additional.test.source}</source>
<source>src/test/${additional.3_3.test.source}</source>
<source>src/test/${additional.3_4.test.source}</source>
</sources>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1837,14 +1837,14 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde {
}

case globalLimitExec: GlobalLimitExec if isCometOperatorEnabled(op.conf, "global_limit") =>
if (childOp.nonEmpty) {
// TODO: We don't support negative limit for now.
if (childOp.nonEmpty && globalLimitExec.limit >= 0) {
val limitBuilder = OperatorOuterClass.Limit.newBuilder()

// Spark 3.2 doesn't support offset for GlobalLimit, but newer Spark versions
// support it. Before we upgrade to Spark 3.3, just set it zero.
// TODO: Spark 3.3 might have negative limit (-1) for Offset usage.
// When we upgrade to Spark 3.3., we need to address it here.
assert(globalLimitExec.limit >= 0, "limit should be greater or equal to zero")
limitBuilder.setLimit(globalLimitExec.limit)
limitBuilder.setOffset(0)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.exec

import org.scalactic.source.Position
import org.scalatest.Tag

import org.apache.spark.sql.CometTestBase

import org.apache.comet.CometConf

/**
* This test suite contains tests for only Spark 3.4.
*/
class CometExec3_4Suite extends CometTestBase {
import testImplicits._

override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
pos: Position): Unit = {
super.test(testName, testTags: _*) {
withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
testFun
}
}
}

// Dataset.offset API is not available before Spark 3.4
test("offset") {
withSQLConf(CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
checkSparkAnswer(testData.offset(90))
checkSparkAnswer(arrayData.toDF().offset(99))
checkSparkAnswer(mapData.toDF().offset(99))
}
}
}

0 comments on commit c698d82

Please sign in to comment.