From 472c37fda3d75e5f02b52a87c4ed3ac3114352de Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 18 May 2024 14:48:13 -0700 Subject: [PATCH] fix: fix CometNativeExec.doCanonicalize for ReusedExchangeExec --- .../apache/spark/sql/comet/operators.scala | 20 ++++++++++++- .../apache/comet/exec/CometExecSuite.scala | 29 +++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 63587af32b..bb17442a59 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -363,7 +363,11 @@ abstract class CometNativeExec extends CometExec { } override protected def doCanonicalize(): SparkPlan = { - val canonicalizedPlan = super.doCanonicalize().asInstanceOf[CometNativeExec] + val canonicalizedPlan = super + .doCanonicalize() + .asInstanceOf[CometNativeExec] + .canonicalizePlans() + if (serializedPlanOpt.isDefined) { // If the plan is a boundary node, we should remove the serialized plan. canonicalizedPlan.cleanBlock() @@ -371,6 +375,20 @@ abstract class CometNativeExec extends CometExec { canonicalizedPlan } } + + /** + * Canonicalizes the plans of Product parameters in Comet native operators. + */ + protected def canonicalizePlans(): CometNativeExec = { + def transform(arg: Any): AnyRef = arg match { + case sparkPlan: SparkPlan => sparkPlan.canonicalized + case other: AnyRef => other + case null => null + } + + val newArgs = mapProductIterator(transform) + makeCopy(newArgs).asInstanceOf[CometNativeExec] + } } abstract class CometUnaryExec extends CometNativeExec with UnaryExecNode diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 2e14442815..3dfb55052d 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -63,6 +63,35 @@ class CometExecSuite extends CometTestBase { } } + test("fix CometNativeExec.doCanonicalize for ReusedExchangeExec") { + withSQLConf( + CometConf.COMET_EXEC_BROADCAST_FORCE_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + withTable("td") { + testData + .withColumn("bucket", $"key" % 3) + .write + .mode(SaveMode.Overwrite) + .bucketBy(2, "bucket") + .format("parquet") + .saveAsTable("td") + val df = sql(""" + |SELECT t1.key, t2.key, t3.key + |FROM td AS t1 + |JOIN td AS t2 ON t2.key = t1.key + |JOIN td AS t3 ON t3.key = t2.key + |WHERE t1.bucket = 1 AND t2.bucket = 1 AND t3.bucket = 1 + |""".stripMargin) + val reusedPlan = ReuseExchangeAndSubquery.apply(df.queryExecution.executedPlan) + val reusedExchanges = collect(reusedPlan) { case r: ReusedExchangeExec => + r + } + assert(reusedExchanges.size == 1) + assert(reusedExchanges.head.child.isInstanceOf[CometBroadcastExchangeExec]) + } + } + } + test("ReusedExchangeExec should work on CometBroadcastExchangeExec") { assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark 3.4+") withSQLConf(