From 46f215d4a058c4e2cd636d52387850ba5b55a79d Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Tue, 17 Dec 2024 10:52:33 -0800 Subject: [PATCH] fix: enabling Spark tests with offHeap requirement --- .../org/apache/comet/CometSparkSessionExtensions.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 61c45daff..5304ce5d6 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -53,7 +53,7 @@ import org.apache.spark.sql.types.{DoubleType, FloatType} import org.apache.comet.CometConf._ import org.apache.comet.CometExplainInfo.getActualPlan -import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSpark34Plus, isSpark40Plus, shouldApplySparkToColumnar, withInfo, withInfos} +import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isOffHeapEnabled, isSpark34Plus, isSpark40Plus, isTesting, shouldApplySparkToColumnar, withInfo, withInfos} import org.apache.comet.parquet.{CometParquetScan, SupportsComet} import org.apache.comet.rules.RewriteJoin import org.apache.comet.serde.OperatorOuterClass.Operator @@ -921,8 +921,8 @@ class CometSparkSessionExtensions override def apply(plan: SparkPlan): SparkPlan = { // Comet required off-heap memory to be enabled - if ("true" != conf.getConfString("spark.memory.offHeap.enabled", "false")) { - logInfo("Comet extension disabled because spark.memory.offHeap.enabled=false") + if (!isOffHeapEnabled(conf) && !isTesting) { + logWarning("Comet extension disabled because spark.memory.offHeap.enabled=false") return plan } @@ -1174,8 +1174,7 @@ object CometSparkSessionExtensions extends Logging { } private[comet] def isOffHeapEnabled(conf: SQLConf): Boolean = - conf.contains("spark.memory.offHeap.enabled") && - conf.getConfString("spark.memory.offHeap.enabled").toBoolean + conf.getConfString("spark.memory.offHeap.enabled", "false").toBoolean // Copied from org.apache.spark.util.Utils which is private to Spark. private[comet] def isTesting: Boolean = {