Skip to content

Commit

Permalink
fix: enabling Spark tests with offHeap requirement
Browse files Browse the repository at this point in the history
  • Loading branch information
kazuyukitanimura committed Dec 17, 2024
1 parent f1d0879 commit 46f215d
Showing 1 changed file with 4 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import org.apache.spark.sql.types.{DoubleType, FloatType}

import org.apache.comet.CometConf._
import org.apache.comet.CometExplainInfo.getActualPlan
import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSpark34Plus, isSpark40Plus, shouldApplySparkToColumnar, withInfo, withInfos}
import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isOffHeapEnabled, isSpark34Plus, isSpark40Plus, isTesting, shouldApplySparkToColumnar, withInfo, withInfos}
import org.apache.comet.parquet.{CometParquetScan, SupportsComet}
import org.apache.comet.rules.RewriteJoin
import org.apache.comet.serde.OperatorOuterClass.Operator
Expand Down Expand Up @@ -921,8 +921,8 @@ class CometSparkSessionExtensions
override def apply(plan: SparkPlan): SparkPlan = {

// Comet required off-heap memory to be enabled
if ("true" != conf.getConfString("spark.memory.offHeap.enabled", "false")) {
logInfo("Comet extension disabled because spark.memory.offHeap.enabled=false")
if (!isOffHeapEnabled(conf) && !isTesting) {
logWarning("Comet extension disabled because spark.memory.offHeap.enabled=false")
return plan
}

Expand Down Expand Up @@ -1174,8 +1174,7 @@ object CometSparkSessionExtensions extends Logging {
}

private[comet] def isOffHeapEnabled(conf: SQLConf): Boolean =
conf.contains("spark.memory.offHeap.enabled") &&
conf.getConfString("spark.memory.offHeap.enabled").toBoolean
conf.getConfString("spark.memory.offHeap.enabled", "false").toBoolean

// Copied from org.apache.spark.util.Utils which is private to Spark.
private[comet] def isTesting: Boolean = {
Expand Down

0 comments on commit 46f215d

Please sign in to comment.