From 7757e1cee351aae8e8e0cfebc3261e62ab24856c Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 18 Apr 2024 14:36:44 -0700 Subject: [PATCH] fix: Comet should not fail on negative limit parameter --- .../scala/org/apache/comet/serde/QueryPlanSerde.scala | 4 ++-- .../test/scala/org/apache/comet/exec/CometExecSuite.scala | 8 ++++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index a0c17fc9de..555ab4084c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1837,14 +1837,14 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { } case globalLimitExec: GlobalLimitExec if isCometOperatorEnabled(op.conf, "global_limit") => - if (childOp.nonEmpty) { + // TODO: We don't support negative limit for now. + if (childOp.nonEmpty && globalLimitExec.limit >= 0) { val limitBuilder = OperatorOuterClass.Limit.newBuilder() // Spark 3.2 doesn't support offset for GlobalLimit, but newer Spark versions // support it. Before we upgrade to Spark 3.3, just set it zero. // TODO: Spark 3.3 might have negative limit (-1) for Offset usage. // When we upgrade to Spark 3.3., we need to address it here. - assert(globalLimitExec.limit >= 0, "limit should be greater or equal to zero") limitBuilder.setLimit(globalLimitExec.limit) limitBuilder.setOffset(0) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index a8b05cc984..eefd99840d 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -60,6 +60,14 @@ class CometExecSuite extends CometTestBase { } } + test("offset") { + withSQLConf(CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + checkSparkAnswer(testData.offset(90)) + checkSparkAnswer(arrayData.toDF().offset(99)) + checkSparkAnswer(mapData.toDF().offset(99)) + } + } + test("try_sum should return null if overflow happens before merging") { assume(isSpark33Plus, "try_sum is available in Spark 3.3+") val longDf = Seq(Long.MaxValue, Long.MaxValue, 2).toDF("v")