Skip to content

Commit

Permalink
fix: Comet should not fail on negative limit parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Apr 18, 2024
1 parent 4da74d8 commit 7757e1c
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1837,14 +1837,14 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde {
}

case globalLimitExec: GlobalLimitExec if isCometOperatorEnabled(op.conf, "global_limit") =>
if (childOp.nonEmpty) {
// TODO: We don't support negative limit for now.
if (childOp.nonEmpty && globalLimitExec.limit >= 0) {
val limitBuilder = OperatorOuterClass.Limit.newBuilder()

// Spark 3.2 doesn't support offset for GlobalLimit, but newer Spark versions
// support it. Before we upgrade to Spark 3.3, just set it zero.
// TODO: Spark 3.3 might have negative limit (-1) for Offset usage.
// When we upgrade to Spark 3.3., we need to address it here.
assert(globalLimitExec.limit >= 0, "limit should be greater or equal to zero")
limitBuilder.setLimit(globalLimitExec.limit)
limitBuilder.setOffset(0)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ class CometExecSuite extends CometTestBase {
}
}

test("offset") {
withSQLConf(CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
checkSparkAnswer(testData.offset(90))
checkSparkAnswer(arrayData.toDF().offset(99))
checkSparkAnswer(mapData.toDF().offset(99))
}
}

test("try_sum should return null if overflow happens before merging") {
assume(isSpark33Plus, "try_sum is available in Spark 3.3+")
val longDf = Seq(Long.MaxValue, Long.MaxValue, 2).toDF("v")
Expand Down

0 comments on commit 7757e1c

Please sign in to comment.