From 0ed183ae56e7e2e3054621cc06b308d512d2c9b4 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 22 Feb 2024 11:26:39 -0800 Subject: [PATCH] feat: Support multiple input sources for CometNativeExec (#87) CometNativeExec currently limits the number of input source. That blocks the operators with multiple input sources like join operator. This patch generalizes the input source handling to remove the limitation. --- .../comet/CometSparkSessionExtensions.scala | 20 ++- .../apache/comet/serde/QueryPlanSerde.scala | 2 +- .../spark/sql/comet/ZippedPartitionsRDD.scala | 62 +++++++ .../apache/spark/sql/comet/operators.scala | 170 ++++++++++++++---- .../approved-plans-v1_4/q28/explain.txt | 18 +- .../approved-plans-v1_4/q41/explain.txt | 6 +- .../approved-plans-v1_4/q44/explain.txt | 12 +- .../approved-plans-v1_4/q54/explain.txt | 12 +- .../approved-plans-v1_4/q6/explain.txt | 12 +- .../approved-plans-v1_4/q9/explain.txt | 30 ++-- .../approved-plans-v2_7/q6/explain.txt | 12 +- 11 files changed, 279 insertions(+), 77 deletions(-) create mode 100644 spark/src/main/scala/org/apache/spark/sql/comet/ZippedPartitionsRDD.scala diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index b8b1a7a2f..ddd5f57ff 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -237,7 +237,7 @@ class CometSparkSessionExtensions val newOp = transform1(op) newOp match { case Some(nativeOp) => - CometProjectExec(nativeOp, op, op.projectList, op.output, op.child) + CometProjectExec(nativeOp, op, op.projectList, op.output, op.child, None) case None => op } @@ -246,7 +246,7 @@ class CometSparkSessionExtensions val newOp = transform1(op) newOp match { case Some(nativeOp) => - CometFilterExec(nativeOp, op, op.condition, op.child) + CometFilterExec(nativeOp, op, op.condition, op.child, None) case None => op } @@ -255,7 +255,7 @@ class CometSparkSessionExtensions val newOp = transform1(op) newOp match { case Some(nativeOp) => - CometSortExec(nativeOp, op, op.sortOrder, op.child) + CometSortExec(nativeOp, op, op.sortOrder, op.child, None) case None => op } @@ -264,7 +264,7 @@ class CometSparkSessionExtensions val newOp = transform1(op) newOp match { case Some(nativeOp) => - CometLocalLimitExec(nativeOp, op, op.limit, op.child) + CometLocalLimitExec(nativeOp, op, op.limit, op.child, None) case None => op } @@ -273,7 +273,7 @@ class CometSparkSessionExtensions val newOp = transform1(op) newOp match { case Some(nativeOp) => - CometGlobalLimitExec(nativeOp, op, op.limit, op.child) + CometGlobalLimitExec(nativeOp, op, op.limit, op.child, None) case None => op } @@ -282,7 +282,7 @@ class CometSparkSessionExtensions val newOp = transform1(op) newOp match { case Some(nativeOp) => - CometExpandExec(nativeOp, op, op.projections, op.child) + CometExpandExec(nativeOp, op, op.projections, op.child, None) case None => op } @@ -304,7 +304,8 @@ class CometSparkSessionExtensions aggExprs, child.output, if (modes.nonEmpty) Some(modes.head) else None, - child) + child, + None) case None => op } @@ -425,10 +426,11 @@ class CometSparkSessionExtensions newPlan.transformDown { case op: CometNativeExec => if (firstNativeOp) { - op.convertBlock() firstNativeOp = false + op.convertBlock() + } else { + op } - op case op => firstNativeOp = true op diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index a365e7543..938e49f73 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1718,7 +1718,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { case op if !op.isInstanceOf[CometPlan] => seenNonNativeOp = true op - case op @ CometHashAggregateExec(_, _, _, _, input, Some(Partial), _) => + case op @ CometHashAggregateExec(_, _, _, _, input, Some(Partial), _, _) => if (!seenNonNativeOp && partialAggInput.isEmpty) { partialAggInput = Some(input) } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/ZippedPartitionsRDD.scala b/spark/src/main/scala/org/apache/spark/sql/comet/ZippedPartitionsRDD.scala new file mode 100644 index 000000000..6db8c67d5 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/comet/ZippedPartitionsRDD.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.rdd.{RDD, RDDOperationScope, ZippedPartitionsBaseRDD, ZippedPartitionsPartition} +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * Similar to Spark `ZippedPartitionsRDD[1-4]` classes, this class is used to zip partitions of + * the multiple RDDs into a single RDD. Spark `ZippedPartitionsRDD[1-4]` classes only support at + * most 4 RDDs. This class is used to support more than 4 RDDs. This ZipPartitionsRDD is used to + * zip the input sources of the Comet physical plan. So it only zips partitions of ColumnarBatch. + */ +private[spark] class ZippedPartitionsRDD( + sc: SparkContext, + var f: (Seq[Iterator[ColumnarBatch]]) => Iterator[ColumnarBatch], + var zipRdds: Seq[RDD[ColumnarBatch]], + preservesPartitioning: Boolean = false) + extends ZippedPartitionsBaseRDD[ColumnarBatch](sc, zipRdds, preservesPartitioning) { + + override def compute(s: Partition, context: TaskContext): Iterator[ColumnarBatch] = { + val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions + val iterators = + zipRdds.zipWithIndex.map(pair => pair._1.iterator(partitions(pair._2), context)) + f(iterators) + } + + override def clearDependencies(): Unit = { + super.clearDependencies() + zipRdds = null + f = null + } +} + +object ZippedPartitionsRDD { + def apply(sc: SparkContext, rdds: Seq[RDD[ColumnarBatch]])( + f: (Seq[Iterator[ColumnarBatch]]) => Iterator[ColumnarBatch]): RDD[ColumnarBatch] = + withScope(sc) { + new ZippedPartitionsRDD(sc, f, rdds) + } + + private[spark] def withScope[U](sc: SparkContext)(body: => U): U = + RDDOperationScope.withScope[U](sc)(body) +} diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index d1d6f8f20..07b8d5ccc 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -32,8 +32,9 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, NamedExpression, SortOrder} import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateMode} import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.comet.execution.shuffle.ArrowReaderIterator +import org.apache.spark.sql.comet.execution.shuffle.{ArrowReaderIterator, CometShuffleExchangeExec} import org.apache.spark.sql.execution.{ColumnarToRowExec, ExecSubqueryExpression, ExplainUtils, LeafExecNode, ScalarSubquery, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, ShuffleQueryStageExec} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.vectorized.ColumnarBatch @@ -121,7 +122,7 @@ object CometExec { } /** - * Decodes the byte arrays back to ColumnarBatches and put them into buffer. + * Decodes the byte arrays back to ColumnarBatchs and put them into buffer. */ def decodeBatches(bytes: ChunkedByteBuffer): Iterator[ColumnarBatch] = { if (bytes.size == 0) { @@ -145,7 +146,7 @@ abstract class CometNativeExec extends CometExec { * The serialized native query plan, optional. This is only defined when the current node is the * "boundary" node between native and Spark. */ - private var serializedPlanOpt: Option[Array[Byte]] = None + def serializedPlanOpt: Option[Array[Byte]] /** The Comet native operator */ def nativeOp: Operator @@ -197,8 +198,9 @@ abstract class CometNativeExec extends CometExec { override def doExecuteColumnar(): RDD[ColumnarBatch] = { serializedPlanOpt match { case None => - assert(children.length == 1) // TODO: fix this! - children.head.executeColumnar() + // This is in the middle of a native execution, it should not be executed directly. + throw new CometRuntimeException( + s"CometNativeExec should not be executed directly without a serialized plan: $this") case Some(serializedPlan) => // Switch to use Decimal128 regardless of precision, since Arrow native execution // doesn't support Decimal32 and Decimal64 yet. @@ -224,17 +226,49 @@ abstract class CometNativeExec extends CometExec { it } - children.map(_.executeColumnar) match { - case Seq(child) => - child.mapPartitionsInternal(iter => createCometExecIter(Seq(iter))) - case Seq(first, second) => - first.zipPartitions(second) { (iter1, iter2) => - createCometExecIter(Seq(iter1, iter2)) - } - case _ => - throw new CometRuntimeException( - s"Expected only two children but got s${children.size}") + // Collect the input ColumnarBatches from the child operators and create a CometExecIterator + // to execute the native plan. + val inputs = ArrayBuffer.empty[RDD[ColumnarBatch]] + + foreachUntilCometInput(this)(inputs += _.executeColumnar()) + + if (inputs.isEmpty) { + throw new CometRuntimeException(s"No input for CometNativeExec: $this") } + + ZippedPartitionsRDD(sparkContext, inputs.toSeq)(createCometExecIter(_)) + } + } + + /** + * Traverse the tree of Comet physical operators until reaching the input sources operators and + * apply the given function to each operator. + * + * The input sources include the following operators: + * - CometScanExec - Comet scan node + * - CometBatchScanExec - Comet scan node + * - ShuffleQueryStageExec - AQE shuffle stage node on top of Comet shuffle + * - AQEShuffleReadExec - AQE shuffle read node on top of Comet shuffle + * - CometShuffleExchangeExec - Comet shuffle exchange node + * - CometUnionExec, etc. which executes its children native plan and produces ColumnarBatches + * + * @param plan + * the root of the Comet physical plan tree (e.g., the root of the SparkPlan tree of a query) + * to traverse + * @param func + * the function to apply to each Comet physical operator + */ + def foreachUntilCometInput(plan: SparkPlan)(func: SparkPlan => Unit): Unit = { + plan match { + case _: CometScanExec | _: CometBatchScanExec | _: ShuffleQueryStageExec | + _: AQEShuffleReadExec | _: CometShuffleExchangeExec | _: CometUnionExec | + _: CometCoalesceExec => + func(plan) + case _: CometPlan => + // Other Comet operators, continue to traverse the tree. + plan.children.foreach(foreachUntilCometInput(_)(func)) + case _ => + // no op } } @@ -242,11 +276,44 @@ abstract class CometNativeExec extends CometExec { * Converts this native Comet operator and its children into a native block which can be * executed as a whole (i.e., in a single JNI call) from the native side. */ - def convertBlock(): Unit = { - val out = new ByteArrayOutputStream() - nativeOp.writeTo(out) - out.close() - serializedPlanOpt = Some(out.toByteArray) + def convertBlock(): CometNativeExec = { + def transform(arg: Any): AnyRef = arg match { + case serializedPlan: Option[Array[Byte]] if serializedPlan.isEmpty => + val out = new ByteArrayOutputStream() + nativeOp.writeTo(out) + out.close() + Some(out.toByteArray) + case other: AnyRef => other + case null => null + } + + val newArgs = mapProductIterator(transform) + makeCopy(newArgs).asInstanceOf[CometNativeExec] + } + + /** + * Cleans the serialized plan from this native Comet operator. Used to canonicalize the plan. + */ + def cleanBlock(): CometNativeExec = { + def transform(arg: Any): AnyRef = arg match { + case serializedPlan: Option[Array[Byte]] if serializedPlan.isDefined => + None + case other: AnyRef => other + case null => null + } + + val newArgs = mapProductIterator(transform) + makeCopy(newArgs).asInstanceOf[CometNativeExec] + } + + override protected def doCanonicalize(): SparkPlan = { + val canonicalizedPlan = super.doCanonicalize().asInstanceOf[CometNativeExec] + if (serializedPlanOpt.isDefined) { + // If the plan is a boundary node, we should remove the serialized plan. + canonicalizedPlan.cleanBlock() + } else { + canonicalizedPlan + } } } @@ -257,7 +324,8 @@ case class CometProjectExec( override val originalPlan: SparkPlan, projectList: Seq[NamedExpression], override val output: Seq[Attribute], - child: SparkPlan) + child: SparkPlan, + override val serializedPlanOpt: Option[Array[Byte]]) extends CometUnaryExec { override def producedAttributes: AttributeSet = outputSet override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = @@ -269,7 +337,8 @@ case class CometProjectExec( obj match { case other: CometProjectExec => this.projectList == other.projectList && - this.output == other.output && this.child == other.child + this.output == other.output && this.child == other.child && + this.serializedPlanOpt == other.serializedPlanOpt case _ => false } @@ -282,7 +351,8 @@ case class CometFilterExec( override val nativeOp: Operator, override val originalPlan: SparkPlan, condition: Expression, - child: SparkPlan) + child: SparkPlan, + override val serializedPlanOpt: Option[Array[Byte]]) extends CometUnaryExec { override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = this.copy(child = newChild) @@ -293,7 +363,8 @@ case class CometFilterExec( override def equals(obj: Any): Boolean = { obj match { case other: CometFilterExec => - this.condition == other.condition && this.child == other.child + this.condition == other.condition && this.child == other.child && + this.serializedPlanOpt == other.serializedPlanOpt case _ => false } @@ -314,7 +385,8 @@ case class CometSortExec( override val nativeOp: Operator, override val originalPlan: SparkPlan, sortOrder: Seq[SortOrder], - child: SparkPlan) + child: SparkPlan, + override val serializedPlanOpt: Option[Array[Byte]]) extends CometUnaryExec { override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = this.copy(child = newChild) @@ -325,7 +397,8 @@ case class CometSortExec( override def equals(obj: Any): Boolean = { obj match { case other: CometSortExec => - this.sortOrder == other.sortOrder && this.child == other.child + this.sortOrder == other.sortOrder && this.child == other.child && + this.serializedPlanOpt == other.serializedPlanOpt case _ => false } @@ -344,7 +417,8 @@ case class CometLocalLimitExec( override val nativeOp: Operator, override val originalPlan: SparkPlan, limit: Int, - child: SparkPlan) + child: SparkPlan, + override val serializedPlanOpt: Option[Array[Byte]]) extends CometUnaryExec { override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = this.copy(child = newChild) @@ -352,13 +426,26 @@ case class CometLocalLimitExec( override def stringArgs: Iterator[Any] = Iterator(limit, child) override lazy val metrics: Map[String, SQLMetric] = Map.empty + + override def equals(obj: Any): Boolean = { + obj match { + case other: CometLocalLimitExec => + this.limit == other.limit && this.child == other.child && + this.serializedPlanOpt == other.serializedPlanOpt + case _ => + false + } + } + + override def hashCode(): Int = Objects.hashCode(limit: java.lang.Integer, child) } case class CometGlobalLimitExec( override val nativeOp: Operator, override val originalPlan: SparkPlan, limit: Int, - child: SparkPlan) + child: SparkPlan, + override val serializedPlanOpt: Option[Array[Byte]]) extends CometUnaryExec { override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = this.copy(child = newChild) @@ -368,7 +455,8 @@ case class CometGlobalLimitExec( override def equals(obj: Any): Boolean = { obj match { case other: CometGlobalLimitExec => - this.limit == other.limit && this.child == other.child + this.limit == other.limit && this.child == other.child && + this.serializedPlanOpt == other.serializedPlanOpt case _ => false } @@ -381,7 +469,8 @@ case class CometExpandExec( override val nativeOp: Operator, override val originalPlan: SparkPlan, projections: Seq[Seq[Expression]], - child: SparkPlan) + child: SparkPlan, + override val serializedPlanOpt: Option[Array[Byte]]) extends CometUnaryExec { override def producedAttributes: AttributeSet = outputSet @@ -393,7 +482,8 @@ case class CometExpandExec( override def equals(obj: Any): Boolean = { obj match { case other: CometExpandExec => - this.projections == other.projections && this.child == other.child + this.projections == other.projections && this.child == other.child && + this.serializedPlanOpt == other.serializedPlanOpt case _ => false } @@ -443,11 +533,21 @@ case class CometHashAggregateExec( aggregateExpressions: Seq[AggregateExpression], input: Seq[Attribute], mode: Option[AggregateMode], - child: SparkPlan) + child: SparkPlan, + override val serializedPlanOpt: Option[Array[Byte]]) extends CometUnaryExec { override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = this.copy(child = newChild) + override def verboseStringWithOperatorId(): String = { + s""" + |$formattedNodeName + |${ExplainUtils.generateFieldString("Input", child.output)} + |${ExplainUtils.generateFieldString("Keys", groupingExpressions)} + |${ExplainUtils.generateFieldString("Functions", aggregateExpressions)} + |""".stripMargin + } + override def stringArgs: Iterator[Any] = Iterator(input, mode, groupingExpressions, aggregateExpressions, child) @@ -458,7 +558,8 @@ case class CometHashAggregateExec( this.aggregateExpressions == other.aggregateExpressions && this.input == other.input && this.mode == other.mode && - this.child == other.child + this.child == other.child && + this.serializedPlanOpt == other.serializedPlanOpt case _ => false } @@ -471,6 +572,7 @@ case class CometHashAggregateExec( case class CometScanWrapper(override val nativeOp: Operator, override val originalPlan: SparkPlan) extends CometNativeExec with LeafExecNode { + override val serializedPlanOpt: Option[Array[Byte]] = None override def stringArgs: Iterator[Any] = Iterator(originalPlan.output, originalPlan) } @@ -486,6 +588,8 @@ case class CometSinkPlaceHolder( override val originalPlan: SparkPlan, child: SparkPlan) extends CometUnaryExec { + override val serializedPlanOpt: Option[Array[Byte]] = None + override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = { this.copy(child = newChild) } diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q28/explain.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q28/explain.txt index 690d530b0..db6d112e6 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q28/explain.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q28/explain.txt @@ -88,7 +88,8 @@ Arguments: [ss_list_price#3], [ss_list_price#3] (4) CometHashAggregate Input [1]: [ss_list_price#3] -Arguments: [ss_list_price#3], Partial, [ss_list_price#3], [partial_avg(UnscaledValue(ss_list_price#3)), partial_count(ss_list_price#3)] +Keys [1]: [ss_list_price#3] +Functions [2]: [partial_avg(UnscaledValue(ss_list_price#3)), partial_count(ss_list_price#3)] (5) CometExchange Input [4]: [ss_list_price#3, sum#6, count#7, count#8] @@ -139,7 +140,8 @@ Arguments: [ss_list_price#18], [ss_list_price#18] (14) CometHashAggregate Input [1]: [ss_list_price#18] -Arguments: [ss_list_price#18], Partial, [ss_list_price#18], [partial_avg(UnscaledValue(ss_list_price#18)), partial_count(ss_list_price#18)] +Keys [1]: [ss_list_price#18] +Functions [2]: [partial_avg(UnscaledValue(ss_list_price#18)), partial_count(ss_list_price#18)] (15) CometExchange Input [4]: [ss_list_price#18, sum#21, count#22, count#23] @@ -198,7 +200,8 @@ Arguments: [ss_list_price#33], [ss_list_price#33] (26) CometHashAggregate Input [1]: [ss_list_price#33] -Arguments: [ss_list_price#33], Partial, [ss_list_price#33], [partial_avg(UnscaledValue(ss_list_price#33)), partial_count(ss_list_price#33)] +Keys [1]: [ss_list_price#33] +Functions [2]: [partial_avg(UnscaledValue(ss_list_price#33)), partial_count(ss_list_price#33)] (27) CometExchange Input [4]: [ss_list_price#33, sum#36, count#37, count#38] @@ -257,7 +260,8 @@ Arguments: [ss_list_price#48], [ss_list_price#48] (38) CometHashAggregate Input [1]: [ss_list_price#48] -Arguments: [ss_list_price#48], Partial, [ss_list_price#48], [partial_avg(UnscaledValue(ss_list_price#48)), partial_count(ss_list_price#48)] +Keys [1]: [ss_list_price#48] +Functions [2]: [partial_avg(UnscaledValue(ss_list_price#48)), partial_count(ss_list_price#48)] (39) CometExchange Input [4]: [ss_list_price#48, sum#51, count#52, count#53] @@ -316,7 +320,8 @@ Arguments: [ss_list_price#63], [ss_list_price#63] (50) CometHashAggregate Input [1]: [ss_list_price#63] -Arguments: [ss_list_price#63], Partial, [ss_list_price#63], [partial_avg(UnscaledValue(ss_list_price#63)), partial_count(ss_list_price#63)] +Keys [1]: [ss_list_price#63] +Functions [2]: [partial_avg(UnscaledValue(ss_list_price#63)), partial_count(ss_list_price#63)] (51) CometExchange Input [4]: [ss_list_price#63, sum#66, count#67, count#68] @@ -375,7 +380,8 @@ Arguments: [ss_list_price#78], [ss_list_price#78] (62) CometHashAggregate Input [1]: [ss_list_price#78] -Arguments: [ss_list_price#78], Partial, [ss_list_price#78], [partial_avg(UnscaledValue(ss_list_price#78)), partial_count(ss_list_price#78)] +Keys [1]: [ss_list_price#78] +Functions [2]: [partial_avg(UnscaledValue(ss_list_price#78)), partial_count(ss_list_price#78)] (63) CometExchange Input [4]: [ss_list_price#78, sum#81, count#82, count#83] diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q41/explain.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q41/explain.txt index 871f89b0a..230899077 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q41/explain.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q41/explain.txt @@ -56,7 +56,8 @@ Arguments: [i_manufact#5], [i_manufact#5] (8) CometHashAggregate Input [1]: [i_manufact#5] -Arguments: [i_manufact#5], Partial, [i_manufact#5], [partial_count(1)] +Keys [1]: [i_manufact#5] +Functions [1]: [partial_count(1)] (9) CometExchange Input [2]: [i_manufact#5, count#9] @@ -64,7 +65,8 @@ Arguments: hashpartitioning(i_manufact#5, 5), ENSURE_REQUIREMENTS, CometNativeSh (10) CometHashAggregate Input [2]: [i_manufact#5, count#9] -Arguments: [i_manufact#5, count#9], Final, [i_manufact#5], [count(1)] +Keys [1]: [i_manufact#5] +Functions [1]: [count(1)] (11) CometFilter Input [2]: [item_cnt#10, i_manufact#5] diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q44/explain.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q44/explain.txt index 7fc6deacb..d7f13b274 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q44/explain.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q44/explain.txt @@ -51,7 +51,8 @@ Arguments: [ss_item_sk#1, ss_net_profit#3], [ss_item_sk#1, ss_net_profit#3] (4) CometHashAggregate Input [2]: [ss_item_sk#1, ss_net_profit#3] -Arguments: [ss_item_sk#1, ss_net_profit#3], Partial, [ss_item_sk#1], [partial_avg(UnscaledValue(ss_net_profit#3))] +Keys [1]: [ss_item_sk#1] +Functions [1]: [partial_avg(UnscaledValue(ss_net_profit#3))] (5) CometExchange Input [3]: [ss_item_sk#1, sum#5, count#6] @@ -59,7 +60,8 @@ Arguments: hashpartitioning(ss_item_sk#1, 5), ENSURE_REQUIREMENTS, CometNativeSh (6) CometHashAggregate Input [3]: [ss_item_sk#1, sum#5, count#6] -Arguments: [ss_item_sk#1, sum#5, count#6], Final, [ss_item_sk#1], [avg(UnscaledValue(ss_net_profit#3))] +Keys [1]: [ss_item_sk#1] +Functions [1]: [avg(UnscaledValue(ss_net_profit#3))] (7) CometFilter Input [2]: [item_sk#7, rank_col#8] @@ -202,7 +204,8 @@ Arguments: [ss_store_sk#22, ss_net_profit#23], [ss_store_sk#22, ss_net_profit#23 (37) CometHashAggregate Input [2]: [ss_store_sk#22, ss_net_profit#23] -Arguments: [ss_store_sk#22, ss_net_profit#23], Partial, [ss_store_sk#22], [partial_avg(UnscaledValue(ss_net_profit#23))] +Keys [1]: [ss_store_sk#22] +Functions [1]: [partial_avg(UnscaledValue(ss_net_profit#23))] (38) CometExchange Input [3]: [ss_store_sk#22, sum#25, count#26] @@ -210,7 +213,8 @@ Arguments: hashpartitioning(ss_store_sk#22, 5), ENSURE_REQUIREMENTS, CometNative (39) CometHashAggregate Input [3]: [ss_store_sk#22, sum#25, count#26] -Arguments: [ss_store_sk#22, sum#25, count#26], Final, [ss_store_sk#22], [avg(UnscaledValue(ss_net_profit#23))] +Keys [1]: [ss_store_sk#22] +Functions [1]: [avg(UnscaledValue(ss_net_profit#23))] (40) ColumnarToRow [codegen id : 1] Input [1]: [rank_col#27] diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54/explain.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54/explain.txt index 0f75e3530..00c8f9c9b 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54/explain.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54/explain.txt @@ -415,7 +415,8 @@ Arguments: [(d_month_seq + 1)#49], [(d_month_seq#46 + 1) AS (d_month_seq + 1)#49 (69) CometHashAggregate Input [1]: [(d_month_seq + 1)#49] -Arguments: [(d_month_seq + 1)#49], [(d_month_seq + 1)#49] +Keys [1]: [(d_month_seq + 1)#49] +Functions: [] (70) CometExchange Input [1]: [(d_month_seq + 1)#49] @@ -423,7 +424,8 @@ Arguments: hashpartitioning((d_month_seq + 1)#49, 5), ENSURE_REQUIREMENTS, Comet (71) CometHashAggregate Input [1]: [(d_month_seq + 1)#49] -Arguments: [(d_month_seq + 1)#49], [(d_month_seq + 1)#49] +Keys [1]: [(d_month_seq + 1)#49] +Functions: [] (72) ColumnarToRow [codegen id : 1] Input [1]: [(d_month_seq + 1)#49] @@ -455,7 +457,8 @@ Arguments: [(d_month_seq + 3)#53], [(d_month_seq#50 + 3) AS (d_month_seq + 3)#53 (76) CometHashAggregate Input [1]: [(d_month_seq + 3)#53] -Arguments: [(d_month_seq + 3)#53], [(d_month_seq + 3)#53] +Keys [1]: [(d_month_seq + 3)#53] +Functions: [] (77) CometExchange Input [1]: [(d_month_seq + 3)#53] @@ -463,7 +466,8 @@ Arguments: hashpartitioning((d_month_seq + 3)#53, 5), ENSURE_REQUIREMENTS, Comet (78) CometHashAggregate Input [1]: [(d_month_seq + 3)#53] -Arguments: [(d_month_seq + 3)#53], [(d_month_seq + 3)#53] +Keys [1]: [(d_month_seq + 3)#53] +Functions: [] (79) ColumnarToRow [codegen id : 1] Input [1]: [(d_month_seq + 3)#53] diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6/explain.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6/explain.txt index bb79f9aa7..5f34aa3df 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6/explain.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6/explain.txt @@ -151,7 +151,8 @@ Condition : isnotnull(i_category#14) (24) CometHashAggregate Input [2]: [i_current_price#13, i_category#14] -Arguments: [i_current_price#13, i_category#14], Partial, [i_category#14], [partial_avg(UnscaledValue(i_current_price#13))] +Keys [1]: [i_category#14] +Functions [1]: [partial_avg(UnscaledValue(i_current_price#13))] (25) CometExchange Input [3]: [i_category#14, sum#15, count#16] @@ -159,7 +160,8 @@ Arguments: hashpartitioning(i_category#14, 5), ENSURE_REQUIREMENTS, CometNativeS (26) CometHashAggregate Input [3]: [i_category#14, sum#15, count#16] -Arguments: [i_category#14, sum#15, count#16], Final, [i_category#14], [avg(UnscaledValue(i_current_price#13))] +Keys [1]: [i_category#14] +Functions [1]: [avg(UnscaledValue(i_current_price#13))] (27) CometFilter Input [2]: [avg(i_current_price)#17, i_category#14] @@ -281,7 +283,8 @@ Arguments: [d_month_seq#26], [d_month_seq#26] (48) CometHashAggregate Input [1]: [d_month_seq#26] -Arguments: [d_month_seq#26], [d_month_seq#26] +Keys [1]: [d_month_seq#26] +Functions: [] (49) CometExchange Input [1]: [d_month_seq#26] @@ -289,7 +292,8 @@ Arguments: hashpartitioning(d_month_seq#26, 5), ENSURE_REQUIREMENTS, CometNative (50) CometHashAggregate Input [1]: [d_month_seq#26] -Arguments: [d_month_seq#26], [d_month_seq#26] +Keys [1]: [d_month_seq#26] +Functions: [] (51) ColumnarToRow [codegen id : 1] Input [1]: [d_month_seq#26] diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q9/explain.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q9/explain.txt index 05e559935..167181142 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q9/explain.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q9/explain.txt @@ -53,7 +53,8 @@ Arguments: [ss_ext_discount_amt#18, ss_net_paid#19], [ss_ext_discount_amt#18, ss (8) CometHashAggregate Input [2]: [ss_ext_discount_amt#18, ss_net_paid#19] -Arguments: [ss_ext_discount_amt#18, ss_net_paid#19], Partial, [partial_count(1), partial_avg(UnscaledValue(ss_ext_discount_amt#18)), partial_avg(UnscaledValue(ss_net_paid#19))] +Keys: [] +Functions [3]: [partial_count(1), partial_avg(UnscaledValue(ss_ext_discount_amt#18)), partial_avg(UnscaledValue(ss_net_paid#19))] (9) CometExchange Input [5]: [count#21, sum#22, count#23, sum#24, count#25] @@ -61,7 +62,8 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=1] (10) CometHashAggregate Input [5]: [count#21, sum#22, count#23, sum#24, count#25] -Arguments: [count#21, sum#22, count#23, sum#24, count#25], Final, [count(1), avg(UnscaledValue(ss_ext_discount_amt#18)), avg(UnscaledValue(ss_net_paid#19))] +Keys: [] +Functions [3]: [count(1), avg(UnscaledValue(ss_ext_discount_amt#18)), avg(UnscaledValue(ss_net_paid#19))] (11) ColumnarToRow [codegen id : 1] Input [3]: [count(1)#26, avg(ss_ext_discount_amt)#27, avg(ss_net_paid)#28] @@ -102,7 +104,8 @@ Arguments: [ss_ext_discount_amt#31, ss_net_paid#32], [ss_ext_discount_amt#31, ss (16) CometHashAggregate Input [2]: [ss_ext_discount_amt#31, ss_net_paid#32] -Arguments: [ss_ext_discount_amt#31, ss_net_paid#32], Partial, [partial_count(1), partial_avg(UnscaledValue(ss_ext_discount_amt#31)), partial_avg(UnscaledValue(ss_net_paid#32))] +Keys: [] +Functions [3]: [partial_count(1), partial_avg(UnscaledValue(ss_ext_discount_amt#31)), partial_avg(UnscaledValue(ss_net_paid#32))] (17) CometExchange Input [5]: [count#34, sum#35, count#36, sum#37, count#38] @@ -110,7 +113,8 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=2] (18) CometHashAggregate Input [5]: [count#34, sum#35, count#36, sum#37, count#38] -Arguments: [count#34, sum#35, count#36, sum#37, count#38], Final, [count(1), avg(UnscaledValue(ss_ext_discount_amt#31)), avg(UnscaledValue(ss_net_paid#32))] +Keys: [] +Functions [3]: [count(1), avg(UnscaledValue(ss_ext_discount_amt#31)), avg(UnscaledValue(ss_net_paid#32))] (19) ColumnarToRow [codegen id : 1] Input [3]: [count(1)#39, avg(ss_ext_discount_amt)#40, avg(ss_net_paid)#41] @@ -151,7 +155,8 @@ Arguments: [ss_ext_discount_amt#44, ss_net_paid#45], [ss_ext_discount_amt#44, ss (24) CometHashAggregate Input [2]: [ss_ext_discount_amt#44, ss_net_paid#45] -Arguments: [ss_ext_discount_amt#44, ss_net_paid#45], Partial, [partial_count(1), partial_avg(UnscaledValue(ss_ext_discount_amt#44)), partial_avg(UnscaledValue(ss_net_paid#45))] +Keys: [] +Functions [3]: [partial_count(1), partial_avg(UnscaledValue(ss_ext_discount_amt#44)), partial_avg(UnscaledValue(ss_net_paid#45))] (25) CometExchange Input [5]: [count#47, sum#48, count#49, sum#50, count#51] @@ -159,7 +164,8 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=3] (26) CometHashAggregate Input [5]: [count#47, sum#48, count#49, sum#50, count#51] -Arguments: [count#47, sum#48, count#49, sum#50, count#51], Final, [count(1), avg(UnscaledValue(ss_ext_discount_amt#44)), avg(UnscaledValue(ss_net_paid#45))] +Keys: [] +Functions [3]: [count(1), avg(UnscaledValue(ss_ext_discount_amt#44)), avg(UnscaledValue(ss_net_paid#45))] (27) ColumnarToRow [codegen id : 1] Input [3]: [count(1)#52, avg(ss_ext_discount_amt)#53, avg(ss_net_paid)#54] @@ -200,7 +206,8 @@ Arguments: [ss_ext_discount_amt#57, ss_net_paid#58], [ss_ext_discount_amt#57, ss (32) CometHashAggregate Input [2]: [ss_ext_discount_amt#57, ss_net_paid#58] -Arguments: [ss_ext_discount_amt#57, ss_net_paid#58], Partial, [partial_count(1), partial_avg(UnscaledValue(ss_ext_discount_amt#57)), partial_avg(UnscaledValue(ss_net_paid#58))] +Keys: [] +Functions [3]: [partial_count(1), partial_avg(UnscaledValue(ss_ext_discount_amt#57)), partial_avg(UnscaledValue(ss_net_paid#58))] (33) CometExchange Input [5]: [count#60, sum#61, count#62, sum#63, count#64] @@ -208,7 +215,8 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=4] (34) CometHashAggregate Input [5]: [count#60, sum#61, count#62, sum#63, count#64] -Arguments: [count#60, sum#61, count#62, sum#63, count#64], Final, [count(1), avg(UnscaledValue(ss_ext_discount_amt#57)), avg(UnscaledValue(ss_net_paid#58))] +Keys: [] +Functions [3]: [count(1), avg(UnscaledValue(ss_ext_discount_amt#57)), avg(UnscaledValue(ss_net_paid#58))] (35) ColumnarToRow [codegen id : 1] Input [3]: [count(1)#65, avg(ss_ext_discount_amt)#66, avg(ss_net_paid)#67] @@ -249,7 +257,8 @@ Arguments: [ss_ext_discount_amt#70, ss_net_paid#71], [ss_ext_discount_amt#70, ss (40) CometHashAggregate Input [2]: [ss_ext_discount_amt#70, ss_net_paid#71] -Arguments: [ss_ext_discount_amt#70, ss_net_paid#71], Partial, [partial_count(1), partial_avg(UnscaledValue(ss_ext_discount_amt#70)), partial_avg(UnscaledValue(ss_net_paid#71))] +Keys: [] +Functions [3]: [partial_count(1), partial_avg(UnscaledValue(ss_ext_discount_amt#70)), partial_avg(UnscaledValue(ss_net_paid#71))] (41) CometExchange Input [5]: [count#73, sum#74, count#75, sum#76, count#77] @@ -257,7 +266,8 @@ Arguments: SinglePartition, ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=5] (42) CometHashAggregate Input [5]: [count#73, sum#74, count#75, sum#76, count#77] -Arguments: [count#73, sum#74, count#75, sum#76, count#77], Final, [count(1), avg(UnscaledValue(ss_ext_discount_amt#70)), avg(UnscaledValue(ss_net_paid#71))] +Keys: [] +Functions [3]: [count(1), avg(UnscaledValue(ss_ext_discount_amt#70)), avg(UnscaledValue(ss_net_paid#71))] (43) ColumnarToRow [codegen id : 1] Input [3]: [count(1)#78, avg(ss_ext_discount_amt)#79, avg(ss_net_paid)#80] diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/explain.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/explain.txt index 7ed8c5393..fcaa84890 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/explain.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/explain.txt @@ -151,7 +151,8 @@ Condition : isnotnull(i_category#14) (24) CometHashAggregate Input [2]: [i_current_price#13, i_category#14] -Arguments: [i_current_price#13, i_category#14], Partial, [i_category#14], [partial_avg(UnscaledValue(i_current_price#13))] +Keys [1]: [i_category#14] +Functions [1]: [partial_avg(UnscaledValue(i_current_price#13))] (25) CometExchange Input [3]: [i_category#14, sum#15, count#16] @@ -159,7 +160,8 @@ Arguments: hashpartitioning(i_category#14, 5), ENSURE_REQUIREMENTS, CometNativeS (26) CometHashAggregate Input [3]: [i_category#14, sum#15, count#16] -Arguments: [i_category#14, sum#15, count#16], Final, [i_category#14], [avg(UnscaledValue(i_current_price#13))] +Keys [1]: [i_category#14] +Functions [1]: [avg(UnscaledValue(i_current_price#13))] (27) CometFilter Input [2]: [avg(i_current_price)#17, i_category#14] @@ -281,7 +283,8 @@ Arguments: [d_month_seq#26], [d_month_seq#26] (48) CometHashAggregate Input [1]: [d_month_seq#26] -Arguments: [d_month_seq#26], [d_month_seq#26] +Keys [1]: [d_month_seq#26] +Functions: [] (49) CometExchange Input [1]: [d_month_seq#26] @@ -289,7 +292,8 @@ Arguments: hashpartitioning(d_month_seq#26, 5), ENSURE_REQUIREMENTS, CometNative (50) CometHashAggregate Input [1]: [d_month_seq#26] -Arguments: [d_month_seq#26], [d_month_seq#26] +Keys [1]: [d_month_seq#26] +Functions: [] (51) ColumnarToRow [codegen id : 1] Input [1]: [d_month_seq#26]