Skip to content

Commit

Permalink
minor: Remove unnecessary logic
Browse files Browse the repository at this point in the history
  • Loading branch information
sunchao committed Mar 5, 2024
1 parent 56ccf6a commit 3139f32
Showing 1 changed file with 7 additions and 37 deletions.
44 changes: 7 additions & 37 deletions spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1790,43 +1790,13 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde {
case _ => return None
}

val output = mode match {
case CometAggregateMode.Partial => child.output
case CometAggregateMode.Final =>
// Assuming `Final` always follows `Partial` aggregation, this find the first
// `Partial` aggregation and get the input attributes from it.
// During finding partial aggregation, we must ensure all traversed op are
// native operators. If not, we should fallback to Spark.
var seenNonNativeOp = false
var partialAggInput: Option[Seq[Attribute]] = None
child.transformDown {
case op if !op.isInstanceOf[CometPlan] =>
seenNonNativeOp = true
op
case op @ CometHashAggregateExec(_, _, _, _, input, Some(Partial), _, _) =>
if (!seenNonNativeOp && partialAggInput.isEmpty) {
partialAggInput = Some(input)
}
op
}

if (partialAggInput.isDefined) {
partialAggInput.get
} else {
return None
}
case _ => return None
}

val binding = if (mode == CometAggregateMode.Final) {
// In final mode, the aggregate expressions are bound to the output of the
// child and partial aggregate expressions buffer attributes produced by partial
// aggregation. This is done in Spark `HashAggregateExec` internally. In Comet,
// we don't have to do this because we don't use the merging expression.
false
} else {
true
}
// In final mode, the aggregate expressions are bound to the output of the
// child and partial aggregate expressions buffer attributes produced by partial
// aggregation. This is done in Spark `HashAggregateExec` internally. In Comet,
// we don't have to do this because we don't use the merging expression.
val binding = mode != CometAggregateMode.Final
// `output` is only used when `binding` is true (i.e., non-Final)
val output = child.output

val aggExprs = aggregateExpressions.map(aggExprToProto(_, output, binding))
if (childOp.nonEmpty && groupingExprs.forall(_.isDefined) &&
Expand Down

0 comments on commit 3139f32

Please sign in to comment.