diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 589a1d4fd..1a7b30613 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -323,7 +323,8 @@ class CometSparkSessionExtensions case s: TakeOrderedAndProjectExec if isCometNative(s.child) && isCometOperatorEnabled(conf, "takeOrderedAndProjectExec") && isCometShuffleEnabled(conf) && - CometTakeOrderedAndProjectExec.isSupported(s.projectList, s.sortOrder, s.child) => + CometTakeOrderedAndProjectExec.isSupported(s.projectList, s.sortOrder, s.child) && + s.offset == 0 => // TODO: support offset for Spark 3.4 QueryPlanSerde.operator2Proto(s) match { case Some(nativeOp) => diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index e675026a0..b922e6f79 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -1033,6 +1033,24 @@ class CometExecSuite extends CometTestBase { } }) } + + test("Fallback to Spark for TakeOrderedAndProjectExec with offset") { + Seq("true", "false").foreach(aqeEnabled => + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) { + withTable("t1") { + val numRows = 10 + spark + .range(numRows) + .selectExpr("if (id % 2 = 0, null, id) AS a", s"$numRows - id AS b") + .repartition(3) // Force repartition to test data will come to single partition + .write + .saveAsTable("t1") + + val df = sql("SELECT * FROM t1 ORDER BY a, b LIMIT 3").offset(1).groupBy($"a").sum("b") + checkSparkAnswer(df) + } + }) + } } case class BucketedTableTestSpec(