Skip to content

Commit

Permalink
Fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Apr 26, 2024
1 parent 87f50dc commit cf7b382
Showing 1 changed file with 8 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ abstract class CometNativeExec extends CometExec {
val firstNonBroadcastPlan = sparkPlans.zipWithIndex.find {
case (_: CometBroadcastExchangeExec, _) => false
case (BroadcastQueryStageExec(_, _: CometBroadcastExchangeExec, _), _) => false
case (BroadcastQueryStageExec(_, _: ReusedExchangeExec, _), _) => false
case _ => true
}

Expand All @@ -263,6 +264,13 @@ abstract class CometNativeExec extends CometExec {
inputs += c.setNumPartitions(firstNonBroadcastPlanNumPartitions).executeColumnar()
case BroadcastQueryStageExec(_, c: CometBroadcastExchangeExec, _) =>
inputs += c.setNumPartitions(firstNonBroadcastPlanNumPartitions).executeColumnar()
case ReusedExchangeExec(_, c: CometBroadcastExchangeExec) =>
inputs += c.setNumPartitions(firstNonBroadcastPlanNumPartitions).executeColumnar()
case BroadcastQueryStageExec(
_,
ReusedExchangeExec(_, c: CometBroadcastExchangeExec),
_) =>
inputs += c.setNumPartitions(firstNonBroadcastPlanNumPartitions).executeColumnar()
case _ if idx == firstNonBroadcastPlan.get._2 =>
inputs += firstNonBroadcastPlanRDD
case _ =>
Expand Down

0 comments on commit cf7b382

Please sign in to comment.