From 4da74d86478191ff8ddb7ecf0ce38887461feda6 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 17 Apr 2024 20:11:28 -0700 Subject: [PATCH] fix: incorrect result with aggregate expression with filter (#284) --- .../org/apache/comet/serde/QueryPlanSerde.scala | 5 +++++ .../org/apache/comet/exec/CometAggregateSuite.scala | 13 +++++++++++++ 2 files changed, 18 insertions(+) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index b62c2223e..a0c17fc9d 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1880,6 +1880,11 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { return None } + // Aggregate expressions with filter are not supported yet. + if (aggregateExpressions.exists(_.filter.isDefined)) { + return None + } + val groupingExprs = groupingExpressions.map(exprToProto(_, child.output)) // In some of the cases, the aggregateExpressions could be empty. diff --git a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala index 09c7151d1..f6415cbfc 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala @@ -40,6 +40,19 @@ import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { import testImplicits._ + test("count with aggregation filter") { + withSQLConf( + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + val df1 = sql("SELECT count(DISTINCT 2), count(DISTINCT 2,3)") + checkSparkAnswer(df1) + + val df2 = sql("SELECT count(DISTINCT 2), count(DISTINCT 3,2)") + checkSparkAnswer(df2) + } + } + test("lead/lag should return the default value if the offset row does not exist") { withSQLConf( CometConf.COMET_ENABLED.key -> "true",