Skip to content

Commit

Permalink
fix: incorrect result on Comet multiple column distinct count (#268)
Browse files Browse the repository at this point in the history
* fix: incorrect result on Comet multiple column distinct count

* Update core/src/execution/datafusion/planner.rs

Co-authored-by: Andy Grove <[email protected]>

---------

Co-authored-by: Andy Grove <[email protected]>
  • Loading branch information
viirya and andygrove authored Apr 15, 2024
1 parent 9ab6c75 commit c1a2746
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 3 deletions.
12 changes: 10 additions & 2 deletions core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1094,8 +1094,16 @@ impl PhysicalPlanner {
) -> Result<Arc<dyn AggregateExpr>, ExecutionError> {
match spark_expr.expr_struct.as_ref().unwrap() {
AggExprStruct::Count(expr) => {
let child = self.create_expr(&expr.children[0], schema)?;
Ok(Arc::new(Count::new(child, "count", DataType::Int64)))
let children = expr
.children
.iter()
.map(|child| self.create_expr(child, schema.clone()))
.collect::<Result<Vec<_>, _>>()?;
Ok(Arc::new(Count::new_with_multiple_exprs(
children,
"count",
DataType::Int64,
)))
}
AggExprStruct::Min(expr) => {
let child = self.create_expr(expr.child.as_ref().unwrap(), schema)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
import org.apache.spark.sql.catalyst.optimizer.EliminateSorts
import org.apache.spark.sql.comet.CometHashAggregateExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.functions.sum
import org.apache.spark.sql.functions.{count_distinct, sum}
import org.apache.spark.sql.internal.SQLConf

import org.apache.comet.CometConf
Expand All @@ -40,6 +40,25 @@ import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
import testImplicits._

test("multiple column distinct count") {
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
val df1 = Seq(
("a", "b", "c"),
("a", "b", "c"),
("a", "b", "d"),
("x", "y", "z"),
("x", "q", null.asInstanceOf[String]))
.toDF("key1", "key2", "key3")

checkSparkAnswer(df1.agg(count_distinct($"key1", $"key2")))
checkSparkAnswer(df1.agg(count_distinct($"key1", $"key2", $"key3")))
checkSparkAnswer(df1.groupBy($"key1").agg(count_distinct($"key2", $"key3")))
}
}

test("Only trigger Comet Final aggregation on Comet partial aggregation") {
withTempView("lowerCaseData") {
lowerCaseData.createOrReplaceTempView("lowerCaseData")
Expand Down

0 comments on commit c1a2746

Please sign in to comment.