Skip to content

Commit

Permalink
fix ci
Browse files Browse the repository at this point in the history
  • Loading branch information
parthchandra committed Apr 19, 2024
1 parent 852168b commit a8a228c
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -462,10 +462,6 @@ class CometSparkSessionExtensions
withInfo(op, "BroadcastHashJoin is not enabled")
op

case op: BroadcastHashJoinExec if !op.children.forall(isCometNative(_)) =>
withInfo(op, "BroadcastHashJoin disabled because not all child plans are native")
op

case op: SortMergeJoinExec
if isCometOperatorEnabled(conf, "sort_merge_join") &&
op.children.forall(isCometNative(_)) =>
Expand Down Expand Up @@ -573,8 +569,7 @@ class CometSparkSessionExtensions
case b: BroadcastExchangeExec
if isCometNative(b.child) &&
isCometOperatorEnabled(conf, "broadcastExchangeExec") =>
val newOp = QueryPlanSerde.operator2Proto(b)
newOp match {
QueryPlanSerde.operator2Proto(b) match {
case Some(nativeOp) =>
val cometOp = CometBroadcastExchangeExec(b, b.child)
CometSinkPlaceHolder(nativeOp, b, cometOp)
Expand All @@ -589,7 +584,7 @@ class CometSparkSessionExtensions
} else {
if (!isCometOperatorEnabled(
conf,
"broadcastExchangeExec") || !isCometBroadCastForceEnabled(conf)) {
"broadcastExchangeExec") && !isCometBroadCastForceEnabled(conf)) {
withInfo(plan, "Native Broadcast is not enabled")
}
plan
Expand All @@ -598,6 +593,13 @@ class CometSparkSessionExtensions
plan
}

// this case should be checked only after the previous case checking for a
// child BroadcastExchange has been applied, otherwise that transform
// never gets applied
case op: BroadcastHashJoinExec if !op.children.forall(isCometNative(_)) =>
withInfo(op, "BroadcastHashJoin disabled because not all child plans are native")
op

// For AQE shuffle stage on a Comet shuffle exchange
case s @ ShuffleQueryStageExec(_, _: CometShuffleExchangeExec, _) =>
val newOp = transform1(s)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1296,6 +1296,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde {
} else {
val unboundRef = ExprOuterClass.UnboundReference
.newBuilder()
.setName(attr.name)
.setDatatype(dataType.get)
.build()

Expand Down

0 comments on commit a8a228c

Please sign in to comment.