From cf7b38209713519a3686c3ad8b9e30d32101b662 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 25 Apr 2024 17:58:55 -0700 Subject: [PATCH] Fix test --- .../main/scala/org/apache/spark/sql/comet/operators.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index a8579757d..adbe412de 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -239,6 +239,7 @@ abstract class CometNativeExec extends CometExec { val firstNonBroadcastPlan = sparkPlans.zipWithIndex.find { case (_: CometBroadcastExchangeExec, _) => false case (BroadcastQueryStageExec(_, _: CometBroadcastExchangeExec, _), _) => false + case (BroadcastQueryStageExec(_, _: ReusedExchangeExec, _), _) => false case _ => true } @@ -263,6 +264,13 @@ abstract class CometNativeExec extends CometExec { inputs += c.setNumPartitions(firstNonBroadcastPlanNumPartitions).executeColumnar() case BroadcastQueryStageExec(_, c: CometBroadcastExchangeExec, _) => inputs += c.setNumPartitions(firstNonBroadcastPlanNumPartitions).executeColumnar() + case ReusedExchangeExec(_, c: CometBroadcastExchangeExec) => + inputs += c.setNumPartitions(firstNonBroadcastPlanNumPartitions).executeColumnar() + case BroadcastQueryStageExec( + _, + ReusedExchangeExec(_, c: CometBroadcastExchangeExec), + _) => + inputs += c.setNumPartitions(firstNonBroadcastPlanNumPartitions).executeColumnar() case _ if idx == firstNonBroadcastPlan.get._2 => inputs += firstNonBroadcastPlanRDD case _ =>