From b37070dd275de3c2e4c58ff2d8d6ae82be9ad1d1 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 4 May 2024 14:19:57 -0700 Subject: [PATCH] Fix memory leak --- dev/diffs/3.4.2.diff | 11 +---------- .../apache/comet/CometSparkSessionExtensions.scala | 4 +++- .../scala/org/apache/spark/sql/CometTestBase.scala | 1 + 3 files changed, 5 insertions(+), 11 deletions(-) diff --git a/dev/diffs/3.4.2.diff b/dev/diffs/3.4.2.diff index e33c9ecd6..86a28c255 100644 --- a/dev/diffs/3.4.2.diff +++ b/dev/diffs/3.4.2.diff @@ -1149,7 +1149,7 @@ index ac710c32296..88a5329e74e 100644 val df = spark.read.parquet(path).selectExpr(projection: _*) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala -index 593bd7bb4ba..f39fe59f36b 100644 +index 593bd7bb4ba..2518d715154 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -26,9 +26,11 @@ import org.scalatest.time.SpanSugar._ @@ -1268,15 +1268,6 @@ index 593bd7bb4ba..f39fe59f36b 100644 val localShuffleRDD0 = localReads(0).execute().asInstanceOf[ShuffledRowRDD] val localShuffleRDD1 = localReads(1).execute().asInstanceOf[ShuffledRowRDD] // the final parallelism is math.max(1, numReduces / numMappers): math.max(1, 5/2) = 2 -@@ -298,7 +315,7 @@ class AdaptiveQueryExecSuite - .groupBy($"a").count() - checkAnswer(testDf, Seq()) - val plan = testDf.queryExecution.executedPlan -- assert(find(plan)(_.isInstanceOf[SortMergeJoinExec]).isDefined) -+ assert(find(plan)(_.isInstanceOf[CometSortMergeJoinExec]).isDefined) - val coalescedReads = collect(plan) { - case r: AQEShuffleReadExec => r - } @@ -322,7 +339,7 @@ class AdaptiveQueryExecSuite } } diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 1e78013c7..083c1062a 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -916,7 +916,9 @@ object CometSparkSessionExtensions extends Logging { private[comet] def isCometShuffleEnabled(conf: SQLConf): Boolean = COMET_EXEC_SHUFFLE_ENABLED.get(conf) && (conf.contains("spark.shuffle.manager") && conf.getConfString("spark.shuffle.manager") == - "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") && + // TODO: AQE coalesce partitions feature causes Comet columnar shuffle memory leak + !conf.coalesceShufflePartitionsEnabled private[comet] def isCometScanEnabled(conf: SQLConf): Boolean = { COMET_SCAN_ENABLED.get(conf) diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index 8fda13617..1ed447dc3 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -76,6 +76,7 @@ abstract class CometTestBase conf.set(MEMORY_OFFHEAP_SIZE.key, "2g") conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "1g") conf.set(SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key, "1g") + conf.set(SQLConf.COALESCE_PARTITIONS_ENABLED.key, "false") conf.set(CometConf.COMET_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key, "true")