From fa2da47477efd8df2cde8b4a7d9e80310ae80b92 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 26 Feb 2024 16:18:51 -0800 Subject: [PATCH 1/5] fix: Fix corrupted AggregateMode when transforming plan parameters --- .../apache/spark/sql/comet/operators.scala | 28 ++++++++++++++++--- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 0298bc643..1a8fcb2b9 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -149,6 +149,9 @@ abstract class CometNativeExec extends CometExec { /** * The serialized native query plan, optional. This is only defined when the current node is the * "boundary" node between native and Spark. + * + * Note that derived classes of `BosonNativeExec` must have `serializedPlanOpt` in last product + * parameter. */ def serializedPlanOpt: Option[Array[Byte]] @@ -276,13 +279,30 @@ abstract class CometNativeExec extends CometExec { } } + /** + * Maps through product elements except the last one. The last element will be transformed using + * the provided function. This is used to transform `serializedPlanOpt` parameter in case + * classes of Boson native operator where the `serializedPlanOpt` is always the last produce + * element. That is because we cannot match `Option[Array[Byte]]` due to type erase. + */ + private def mapProduct(f: Any => AnyRef): Array[AnyRef] = { + val arr = Array.ofDim[AnyRef](productArity) + var i = 0 + while (i < arr.length - 1) { + arr(i) = productElement(i).asInstanceOf[AnyRef] + i += 1 + } + arr(arr.length - 1) = f(productElement(arr.length - 1)) + arr + } + /** * Converts this native Comet operator and its children into a native block which can be * executed as a whole (i.e., in a single JNI call) from the native side. */ def convertBlock(): CometNativeExec = { def transform(arg: Any): AnyRef = arg match { - case serializedPlan: Option[Array[Byte]] if serializedPlan.isEmpty => + case serializedPlan: Option[_] if serializedPlan.isEmpty => val out = new ByteArrayOutputStream() nativeOp.writeTo(out) out.close() @@ -291,7 +311,7 @@ abstract class CometNativeExec extends CometExec { case null => null } - val newArgs = mapProductIterator(transform) + val newArgs = mapProduct(transform) makeCopy(newArgs).asInstanceOf[CometNativeExec] } @@ -300,13 +320,13 @@ abstract class CometNativeExec extends CometExec { */ def cleanBlock(): CometNativeExec = { def transform(arg: Any): AnyRef = arg match { - case serializedPlan: Option[Array[Byte]] if serializedPlan.isDefined => + case serializedPlan: Option[_] if serializedPlan.isDefined => None case other: AnyRef => other case null => null } - val newArgs = mapProductIterator(transform) + val newArgs = mapProduct(transform) makeCopy(newArgs).asInstanceOf[CometNativeExec] } From e7aedca3e95bd18417662a62f1a6602781b25a34 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 26 Feb 2024 16:50:16 -0800 Subject: [PATCH 2/5] Add test --- .../apache/comet/exec/CometExecSuite.scala | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 0414671c2..872bbff37 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -22,26 +22,24 @@ package org.apache.comet.exec import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Random - import org.scalactic.source.Position import org.scalatest.Tag - import org.apache.hadoop.fs.Path import org.apache.spark.sql.{AnalysisException, Column, CometTestBase, DataFrame, DataFrameWriter, Row, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStatistics, CatalogTable} import org.apache.spark.sql.catalyst.expressions.Hex +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateMode import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometFilterExec, CometHashAggregateExec, CometProjectExec, CometScanExec, CometTakeOrderedAndProjectExec} import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometShuffleExchangeExec} import org.apache.spark.sql.execution.{CollectLimitExec, ProjectExec, UnionExec} import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec import org.apache.spark.sql.execution.joins.{BroadcastNestedLoopJoinExec, CartesianProductExec, SortMergeJoinExec} import org.apache.spark.sql.execution.window.WindowExec -import org.apache.spark.sql.functions.{date_add, expr} +import org.apache.spark.sql.functions.{date_add, expr, sum} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE import org.apache.spark.unsafe.types.UTF8String - import org.apache.comet.CometConf class CometExecSuite extends CometTestBase { @@ -56,6 +54,19 @@ class CometExecSuite extends CometTestBase { } } + test("Fix corrupted AggregateMode when transforming plan parameters") { + withParquetTable((0 until 5).map(i => (i, i + 1)), "table") { + val df = sql("SELECT * FROM table").groupBy($"_1").agg(sum("_2")) + val agg = stripAQEPlan(df.queryExecution.executedPlan).collectFirst { + case s: CometHashAggregateExec => s + }.get + + assert(agg.mode.isDefined && agg.mode.get.isInstanceOf[AggregateMode]) + val newAgg = agg.cleanBlock().asInstanceOf[CometHashAggregateExec] + assert(newAgg.mode.isDefined && newAgg.mode.get.isInstanceOf[AggregateMode]) + } + } + test("CometBroadcastExchangeExec") { withSQLConf(CometConf.COMET_EXEC_BROADCAST_ENABLED.key -> "true") { withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl_a") { From 354c0c2777c71025ace267492b5157cd02137fdb Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 26 Feb 2024 18:25:03 -0800 Subject: [PATCH 3/5] Fix style --- .../src/test/scala/org/apache/comet/exec/CometExecSuite.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 872bbff37..6dafb2792 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -22,8 +22,10 @@ package org.apache.comet.exec import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Random + import org.scalactic.source.Position import org.scalatest.Tag + import org.apache.hadoop.fs.Path import org.apache.spark.sql.{AnalysisException, Column, CometTestBase, DataFrame, DataFrameWriter, Row, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier @@ -40,6 +42,7 @@ import org.apache.spark.sql.functions.{date_add, expr, sum} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE import org.apache.spark.unsafe.types.UTF8String + import org.apache.comet.CometConf class CometExecSuite extends CometTestBase { From e45433bdcc926320534b8b0085a2d06067beae30 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 26 Feb 2024 22:24:32 -0800 Subject: [PATCH 4/5] For review --- .../scala/org/apache/spark/sql/comet/operators.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 1a8fcb2b9..08f339c1a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -150,7 +150,7 @@ abstract class CometNativeExec extends CometExec { * The serialized native query plan, optional. This is only defined when the current node is the * "boundary" node between native and Spark. * - * Note that derived classes of `BosonNativeExec` must have `serializedPlanOpt` in last product + * Note that derived classes of `CometNativeExec` must have `serializedPlanOpt` in last product * parameter. */ def serializedPlanOpt: Option[Array[Byte]] @@ -280,10 +280,11 @@ abstract class CometNativeExec extends CometExec { } /** - * Maps through product elements except the last one. The last element will be transformed using - * the provided function. This is used to transform `serializedPlanOpt` parameter in case - * classes of Boson native operator where the `serializedPlanOpt` is always the last produce - * element. That is because we cannot match `Option[Array[Byte]]` due to type erase. + * Copies product elements to the output array except the last one. The last element will be + * transformed using the provided function. This is used to transform `serializedPlanOpt` + * parameter in case classes of Comet native operator where the `serializedPlanOpt` is always + * the last produce element. That is because we cannot match `Option[Array[Byte]]` due to type + * erase. */ private def mapProduct(f: Any => AnyRef): Array[AnyRef] = { val arr = Array.ofDim[AnyRef](productArity) From 353dead6b2b98207c6ec7361cd599b52128f75f7 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 27 Feb 2024 22:15:46 -0800 Subject: [PATCH 5/5] For review --- .../comet/CometSparkSessionExtensions.scala | 20 ++++-- .../apache/spark/sql/comet/operators.scala | 65 ++++++++----------- 2 files changed, 40 insertions(+), 45 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index f2aba74a0..10c332801 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -237,7 +237,13 @@ class CometSparkSessionExtensions val newOp = transform1(op) newOp match { case Some(nativeOp) => - CometProjectExec(nativeOp, op, op.projectList, op.output, op.child, None) + CometProjectExec( + nativeOp, + op, + op.projectList, + op.output, + op.child, + SerializedPlan(None)) case None => op } @@ -246,7 +252,7 @@ class CometSparkSessionExtensions val newOp = transform1(op) newOp match { case Some(nativeOp) => - CometFilterExec(nativeOp, op, op.condition, op.child, None) + CometFilterExec(nativeOp, op, op.condition, op.child, SerializedPlan(None)) case None => op } @@ -255,7 +261,7 @@ class CometSparkSessionExtensions val newOp = transform1(op) newOp match { case Some(nativeOp) => - CometSortExec(nativeOp, op, op.sortOrder, op.child, None) + CometSortExec(nativeOp, op, op.sortOrder, op.child, SerializedPlan(None)) case None => op } @@ -264,7 +270,7 @@ class CometSparkSessionExtensions val newOp = transform1(op) newOp match { case Some(nativeOp) => - CometLocalLimitExec(nativeOp, op, op.limit, op.child, None) + CometLocalLimitExec(nativeOp, op, op.limit, op.child, SerializedPlan(None)) case None => op } @@ -273,7 +279,7 @@ class CometSparkSessionExtensions val newOp = transform1(op) newOp match { case Some(nativeOp) => - CometGlobalLimitExec(nativeOp, op, op.limit, op.child, None) + CometGlobalLimitExec(nativeOp, op, op.limit, op.child, SerializedPlan(None)) case None => op } @@ -282,7 +288,7 @@ class CometSparkSessionExtensions val newOp = transform1(op) newOp match { case Some(nativeOp) => - CometExpandExec(nativeOp, op, op.projections, op.child, None) + CometExpandExec(nativeOp, op, op.projections, op.child, SerializedPlan(None)) case None => op } @@ -305,7 +311,7 @@ class CometSparkSessionExtensions child.output, if (modes.nonEmpty) Some(modes.head) else None, child, - None) + SerializedPlan(None)) case None => op } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 08f339c1a..e75f9a4a5 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -149,11 +149,8 @@ abstract class CometNativeExec extends CometExec { /** * The serialized native query plan, optional. This is only defined when the current node is the * "boundary" node between native and Spark. - * - * Note that derived classes of `CometNativeExec` must have `serializedPlanOpt` in last product - * parameter. */ - def serializedPlanOpt: Option[Array[Byte]] + def serializedPlanOpt: SerializedPlan /** The Comet native operator */ def nativeOp: Operator @@ -203,7 +200,7 @@ abstract class CometNativeExec extends CometExec { } override def doExecuteColumnar(): RDD[ColumnarBatch] = { - serializedPlanOpt match { + serializedPlanOpt.plan match { case None => // This is in the middle of a native execution, it should not be executed directly. throw new CometRuntimeException( @@ -279,40 +276,22 @@ abstract class CometNativeExec extends CometExec { } } - /** - * Copies product elements to the output array except the last one. The last element will be - * transformed using the provided function. This is used to transform `serializedPlanOpt` - * parameter in case classes of Comet native operator where the `serializedPlanOpt` is always - * the last produce element. That is because we cannot match `Option[Array[Byte]]` due to type - * erase. - */ - private def mapProduct(f: Any => AnyRef): Array[AnyRef] = { - val arr = Array.ofDim[AnyRef](productArity) - var i = 0 - while (i < arr.length - 1) { - arr(i) = productElement(i).asInstanceOf[AnyRef] - i += 1 - } - arr(arr.length - 1) = f(productElement(arr.length - 1)) - arr - } - /** * Converts this native Comet operator and its children into a native block which can be * executed as a whole (i.e., in a single JNI call) from the native side. */ def convertBlock(): CometNativeExec = { def transform(arg: Any): AnyRef = arg match { - case serializedPlan: Option[_] if serializedPlan.isEmpty => + case serializedPlan: SerializedPlan if serializedPlan.isEmpty => val out = new ByteArrayOutputStream() nativeOp.writeTo(out) out.close() - Some(out.toByteArray) + SerializedPlan(Some(out.toByteArray)) case other: AnyRef => other case null => null } - val newArgs = mapProduct(transform) + val newArgs = mapProductIterator(transform) makeCopy(newArgs).asInstanceOf[CometNativeExec] } @@ -321,13 +300,13 @@ abstract class CometNativeExec extends CometExec { */ def cleanBlock(): CometNativeExec = { def transform(arg: Any): AnyRef = arg match { - case serializedPlan: Option[_] if serializedPlan.isDefined => - None + case serializedPlan: SerializedPlan if serializedPlan.isDefined => + SerializedPlan(None) case other: AnyRef => other case null => null } - val newArgs = mapProduct(transform) + val newArgs = mapProductIterator(transform) makeCopy(newArgs).asInstanceOf[CometNativeExec] } @@ -344,13 +323,23 @@ abstract class CometNativeExec extends CometExec { abstract class CometUnaryExec extends CometNativeExec with UnaryExecNode +/** + * Represents the serialized plan of Comet native operators. Only the first operator in a block of + * continuous Comet native operators has defined plan bytes which contains the serialization of + * the plan tree of the block. + */ +case class SerializedPlan(plan: Option[Array[Byte]]) { + def isDefined: Boolean = plan.isDefined + def isEmpty: Boolean = plan.isEmpty +} + case class CometProjectExec( override val nativeOp: Operator, override val originalPlan: SparkPlan, projectList: Seq[NamedExpression], override val output: Seq[Attribute], child: SparkPlan, - override val serializedPlanOpt: Option[Array[Byte]]) + override val serializedPlanOpt: SerializedPlan) extends CometUnaryExec { override def producedAttributes: AttributeSet = outputSet override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = @@ -377,7 +366,7 @@ case class CometFilterExec( override val originalPlan: SparkPlan, condition: Expression, child: SparkPlan, - override val serializedPlanOpt: Option[Array[Byte]]) + override val serializedPlanOpt: SerializedPlan) extends CometUnaryExec { override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = this.copy(child = newChild) @@ -411,7 +400,7 @@ case class CometSortExec( override val originalPlan: SparkPlan, sortOrder: Seq[SortOrder], child: SparkPlan, - override val serializedPlanOpt: Option[Array[Byte]]) + override val serializedPlanOpt: SerializedPlan) extends CometUnaryExec { override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = this.copy(child = newChild) @@ -443,7 +432,7 @@ case class CometLocalLimitExec( override val originalPlan: SparkPlan, limit: Int, child: SparkPlan, - override val serializedPlanOpt: Option[Array[Byte]]) + override val serializedPlanOpt: SerializedPlan) extends CometUnaryExec { override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = this.copy(child = newChild) @@ -470,7 +459,7 @@ case class CometGlobalLimitExec( override val originalPlan: SparkPlan, limit: Int, child: SparkPlan, - override val serializedPlanOpt: Option[Array[Byte]]) + override val serializedPlanOpt: SerializedPlan) extends CometUnaryExec { override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = this.copy(child = newChild) @@ -495,7 +484,7 @@ case class CometExpandExec( override val originalPlan: SparkPlan, projections: Seq[Seq[Expression]], child: SparkPlan, - override val serializedPlanOpt: Option[Array[Byte]]) + override val serializedPlanOpt: SerializedPlan) extends CometUnaryExec { override def producedAttributes: AttributeSet = outputSet @@ -559,7 +548,7 @@ case class CometHashAggregateExec( input: Seq[Attribute], mode: Option[AggregateMode], child: SparkPlan, - override val serializedPlanOpt: Option[Array[Byte]]) + override val serializedPlanOpt: SerializedPlan) extends CometUnaryExec { override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = this.copy(child = newChild) @@ -597,7 +586,7 @@ case class CometHashAggregateExec( case class CometScanWrapper(override val nativeOp: Operator, override val originalPlan: SparkPlan) extends CometNativeExec with LeafExecNode { - override val serializedPlanOpt: Option[Array[Byte]] = None + override val serializedPlanOpt: SerializedPlan = SerializedPlan(None) override def stringArgs: Iterator[Any] = Iterator(originalPlan.output, originalPlan) } @@ -613,7 +602,7 @@ case class CometSinkPlaceHolder( override val originalPlan: SparkPlan, child: SparkPlan) extends CometUnaryExec { - override val serializedPlanOpt: Option[Array[Byte]] = None + override val serializedPlanOpt: SerializedPlan = SerializedPlan(None) override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = { this.copy(child = newChild)