Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Mar 29, 2024
1 parent 5fa8781 commit c804574
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -446,9 +446,7 @@ class CometSparkSessionExtensions
// exchange. It is only used for Comet native execution. We only transform Spark broadcast
// exchange to Comet broadcast exchange if its downstream is a Comet native plan or if the
// broadcast exchange is forced to be enabled by Comet config.
case plan
if (isCometNative(plan) || isCometBroadCastForceEnabled(conf)) &&
plan.children.exists(_.isInstanceOf[BroadcastExchangeExec]) =>
case plan if plan.children.exists(_.isInstanceOf[BroadcastExchangeExec]) =>
val newChildren = plan.children.map {
case b: BroadcastExchangeExec
if isCometNative(b.child) &&
Expand All @@ -461,7 +459,12 @@ class CometSparkSessionExtensions
}
case other => other
}
plan.withNewChildren(newChildren)
val newPlan = transform(plan.withNewChildren(newChildren))
if (isCometNative(newPlan) || isCometBroadCastForceEnabled(conf)) {
newPlan
} else {
plan
}

// For AQE shuffle stage on a Comet shuffle exchange
case s @ ShuffleQueryStageExec(_, _: CometShuffleExchangeExec, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ class CometJoinSuite extends CometTestBase {
withSQLConf(
CometConf.COMET_BATCH_SIZE.key -> "100",
SQLConf.PREFER_SORTMERGEJOIN.key -> "false",
"spark.comet.exec.broadcast.enabled" -> "true",
"spark.sql.join.forceApplyShuffledHashJoin" -> "true",
SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
Expand Down Expand Up @@ -74,7 +73,6 @@ class CometJoinSuite extends CometTestBase {
withSQLConf(
CometConf.COMET_BATCH_SIZE.key -> "100",
SQLConf.PREFER_SORTMERGEJOIN.key -> "false",
"spark.comet.exec.broadcast.enabled" -> "true",
"spark.sql.join.forceApplyShuffledHashJoin" -> "true",
SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
Expand Down

0 comments on commit c804574

Please sign in to comment.