Skip to content

Commit

Permalink
feat: Support First/Last aggregate functions (#97)
Browse files Browse the repository at this point in the history
Co-authored-by: Huaxin Gao <[email protected]>
  • Loading branch information
huaxingao and Huaxin Gao authored Feb 23, 2024
1 parent 48741c0 commit fbe7f80
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 16 deletions.
25 changes: 24 additions & 1 deletion core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ use datafusion_common::ScalarValue;
use datafusion_physical_expr::{
execution_props::ExecutionProps,
expressions::{
CaseExpr, CastExpr, Count, InListExpr, IsNullExpr, Max, Min, NegativeExpr, NotExpr, Sum,
CaseExpr, CastExpr, Count, FirstValue, InListExpr, IsNullExpr, LastValue, Max, Min,
NegativeExpr, NotExpr, Sum,
},
AggregateExpr, ScalarFunctionExpr,
};
Expand Down Expand Up @@ -900,6 +901,28 @@ impl PhysicalPlanner {
}
}
}
AggExprStruct::First(expr) => {
let child = self.create_expr(expr.child.as_ref().unwrap(), schema)?;
let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap());
Ok(Arc::new(FirstValue::new(
child,
"first",
datatype,
vec![],
vec![],
)))
}
AggExprStruct::Last(expr) => {
let child = self.create_expr(expr.child.as_ref().unwrap(), schema)?;
let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap());
Ok(Arc::new(LastValue::new(
child,
"last",
datatype,
vec![],
vec![],
)))
}
}
}

Expand Down
14 changes: 14 additions & 0 deletions core/src/execution/proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ message AggExpr {
Min min = 4;
Max max = 5;
Avg avg = 6;
First first = 7;
Last last = 8;
}
}

Expand Down Expand Up @@ -115,6 +117,18 @@ message Avg {
bool fail_on_error = 4; // currently unused (useful for deciding Ansi vs Legacy mode)
}

message First {
Expr child = 1;
DataType datatype = 2;
bool ignore_nulls = 3;
}

message Last {
Expr child = 1;
DataType datatype = 2;
bool ignore_nulls = 3;
}

message Literal {
oneof value {
bool bool_val = 1;
Expand Down
38 changes: 37 additions & 1 deletion spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average, Count, Final, Max, Min, Partial, Sum}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average, Count, Final, First, Last, Max, Min, Partial, Sum}
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, SinglePartition}
Expand Down Expand Up @@ -287,6 +287,42 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde {
} else {
None
}
case first @ First(child, ignoreNulls)
if !ignoreNulls => // DataFusion doesn't support ignoreNulls true
val childExpr = exprToProto(child, inputs)
val dataType = serializeDataType(first.dataType)

if (childExpr.isDefined && dataType.isDefined) {
val firstBuilder = ExprOuterClass.First.newBuilder()
firstBuilder.setChild(childExpr.get)
firstBuilder.setDatatype(dataType.get)

Some(
ExprOuterClass.AggExpr
.newBuilder()
.setFirst(firstBuilder)
.build())
} else {
None
}
case last @ Last(child, ignoreNulls)
if !ignoreNulls => // DataFusion doesn't support ignoreNulls true
val childExpr = exprToProto(child, inputs)
val dataType = serializeDataType(last.dataType)

if (childExpr.isDefined && dataType.isDefined) {
val lastBuilder = ExprOuterClass.Last.newBuilder()
lastBuilder.setChild(childExpr.get)
lastBuilder.setDatatype(dataType.get)

Some(
ExprOuterClass.AggExpr
.newBuilder()
.setLast(lastBuilder)
.build())
} else {
None
}

case fn =>
emitWarning(s"unsupported Spark aggregate function: $fn")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,12 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
(0 until numValues).map(i => (i, Random.nextInt() % numGroups)),
"tbl",
dictionaryEnabled) {
checkSparkAnswer(
"SELECT _2, SUM(_1), SUM(DISTINCT _1), MIN(_1), MAX(_1), COUNT(_1), COUNT(DISTINCT _1), AVG(_1) FROM tbl GROUP BY _2")
withView("v") {
sql("CREATE TEMP VIEW v AS SELECT _1, _2 FROM tbl ORDER BY _1")
checkSparkAnswer(
"SELECT _2, SUM(_1), SUM(DISTINCT _1), MIN(_1), MAX(_1), COUNT(_1)," +
" COUNT(DISTINCT _1), AVG(_1), FIRST(_1), LAST(_1) FROM v GROUP BY _2")
}
}
}
}
Expand All @@ -458,6 +462,11 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
val path = new Path(dir.toURI.toString, "test.parquet")
makeParquetFile(path, numValues, numGroups, dictionaryEnabled)
withParquetTable(path.toUri.toString, "tbl") {
withView("v") {
sql("CREATE TEMP VIEW v AS SELECT _g1, _g2, _3 FROM tbl ORDER BY _3")
checkSparkAnswer("SELECT _g1, _g2, FIRST(_3) FROM v GROUP BY _g1, _g2")
checkSparkAnswer("SELECT _g1, _g2, LAST(_3) FROM v GROUP BY _g1, _g2")
}
checkSparkAnswer("SELECT _g1, _g2, SUM(_3) FROM tbl GROUP BY _g1, _g2")
checkSparkAnswer("SELECT _g1, _g2, COUNT(_3) FROM tbl GROUP BY _g1, _g2")
checkSparkAnswer("SELECT _g1, _g2, SUM(DISTINCT _3) FROM tbl GROUP BY _g1, _g2")
Expand Down Expand Up @@ -491,6 +500,12 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
val path = new Path(dir.toURI.toString, "test.parquet")
makeParquetFile(path, numValues, numGroups, dictionaryEnabled)
withParquetTable(path.toUri.toString, "tbl") {
withView("v") {
sql("CREATE TEMP VIEW v AS SELECT _g3, _g4, _3, _4 FROM tbl ORDER BY _3, _4")
checkSparkAnswer(
"SELECT _g3, _g4, FIRST(_3), FIRST(_4) FROM v GROUP BY _g3, _g4")
checkSparkAnswer("SELECT _g3, _g4, LAST(_3), LAST(_4) FROM v GROUP BY _g3, _g4")
}
checkSparkAnswer("SELECT _g3, _g4, SUM(_3), SUM(_4) FROM tbl GROUP BY _g3, _g4")
checkSparkAnswer(
"SELECT _g3, _g4, SUM(DISTINCT _3), SUM(DISTINCT _4) FROM tbl GROUP BY _g3, _g4")
Expand Down Expand Up @@ -524,6 +539,11 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
// Test all combinations of different aggregation & group-by types
(1 to 4).foreach { col =>
(1 to 14).foreach { gCol =>
withView("v") {
sql(s"CREATE TEMP VIEW v AS SELECT _g$gCol, _$col FROM tbl ORDER BY _$col")
checkSparkAnswer(s"SELECT _g$gCol, FIRST(_$col) FROM v GROUP BY _g$gCol")
checkSparkAnswer(s"SELECT _g$gCol, LAST(_$col) FROM v GROUP BY _g$gCol")
}
checkSparkAnswer(s"SELECT _g$gCol, SUM(_$col) FROM tbl GROUP BY _g$gCol")
checkSparkAnswer(
s"SELECT _g$gCol, SUM(DISTINCT _$col) FROM tbl GROUP BY _g$gCol")
Expand Down Expand Up @@ -771,9 +791,9 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {

test("distinct") {
withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
Seq(true, false).foreach { bosonColumnShuffleEnabled =>
Seq(true, false).foreach { cometColumnShuffleEnabled =>
withSQLConf(
CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> bosonColumnShuffleEnabled.toString) {
CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> cometColumnShuffleEnabled.toString) {
Seq(true, false).foreach { dictionary =>
withSQLConf("parquet.enable.dictionary" -> dictionary.toString) {
val table = "test"
Expand All @@ -782,40 +802,40 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
sql(
s"insert into $table values(1, 1, 1), (1, 1, 1), (1, 3, 1), (1, 4, 2), (5, 3, 2)")

var expectedNumOfBosonAggregates = 2
var expectedNumOfCometAggregates = 2

checkSparkAnswerAndNumOfAggregates(
s"SELECT DISTINCT(col2) FROM $table",
expectedNumOfBosonAggregates)
expectedNumOfCometAggregates)

expectedNumOfBosonAggregates = 4
expectedNumOfCometAggregates = 4

checkSparkAnswerAndNumOfAggregates(
s"SELECT COUNT(distinct col2) FROM $table",
expectedNumOfBosonAggregates)
expectedNumOfCometAggregates)

checkSparkAnswerAndNumOfAggregates(
s"SELECT COUNT(distinct col2), col1 FROM $table group by col1",
expectedNumOfBosonAggregates)
expectedNumOfCometAggregates)

checkSparkAnswerAndNumOfAggregates(
s"SELECT SUM(distinct col2) FROM $table",
expectedNumOfBosonAggregates)
expectedNumOfCometAggregates)

checkSparkAnswerAndNumOfAggregates(
s"SELECT SUM(distinct col2), col1 FROM $table group by col1",
expectedNumOfBosonAggregates)
expectedNumOfCometAggregates)

checkSparkAnswerAndNumOfAggregates(
"SELECT COUNT(distinct col2), SUM(distinct col2), col1, COUNT(distinct col2)," +
s" SUM(distinct col2) FROM $table group by col1",
expectedNumOfBosonAggregates)
expectedNumOfCometAggregates)

expectedNumOfBosonAggregates = 1
expectedNumOfCometAggregates = 1
checkSparkAnswerAndNumOfAggregates(
"SELECT COUNT(col2), MIN(col2), COUNT(DISTINCT col2), SUM(col2)," +
s" SUM(DISTINCT col2), COUNT(DISTINCT col2), col1 FROM $table group by col1",
expectedNumOfBosonAggregates)
expectedNumOfCometAggregates)
}
}
}
Expand All @@ -824,6 +844,53 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

test("first/last") {
withSQLConf(
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
Seq(true, false).foreach { dictionary =>
withSQLConf("parquet.enable.dictionary" -> dictionary.toString) {
val table = "test"
withTable(table) {
sql(s"create table $table(col1 int, col2 int, col3 int) using parquet")
sql(
s"insert into $table values(4, 1, 1), (4, 1, 1), (3, 3, 1)," +
" (2, 4, 2), (1, 3, 2), (null, 1, 1)")
withView("t") {
sql("CREATE VIEW t AS SELECT col1, col3 FROM test ORDER BY col1")

var expectedNumOfCometAggregates = 2
checkSparkAnswerAndNumOfAggregates(
"SELECT FIRST(col1), LAST(col1) FROM t",
expectedNumOfCometAggregates)

checkSparkAnswerAndNumOfAggregates(
"SELECT FIRST(col1), LAST(col1), MIN(col1), COUNT(col1) FROM t",
expectedNumOfCometAggregates)

checkSparkAnswerAndNumOfAggregates(
"SELECT FIRST(col1), LAST(col1), col3 FROM t GROUP BY col3",
expectedNumOfCometAggregates)

checkSparkAnswerAndNumOfAggregates(
"SELECT FIRST(col1), LAST(col1), MIN(col1), COUNT(col1), col3 FROM t GROUP BY col3",
expectedNumOfCometAggregates)

expectedNumOfCometAggregates = 0
checkSparkAnswerAndNumOfAggregates(
"SELECT FIRST(col1, true), LAST(col1) FROM t",
expectedNumOfCometAggregates)

checkSparkAnswerAndNumOfAggregates(
"SELECT FIRST(col1), LAST(col1, true), col3 FROM t GROUP BY col3",
expectedNumOfCometAggregates)
}
}
}
}
}
}

protected def checkSparkAnswerAndNumOfAggregates(query: String, numAggregates: Int): Unit = {
val df = sql(query)
checkSparkAnswer(df)
Expand Down

0 comments on commit fbe7f80

Please sign in to comment.