From ebcf14cad4b14958fe1867ed7e58588b22782465 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 18 May 2024 14:48:13 -0700 Subject: [PATCH] fix: fix CometNativeExec.doCanonicalize for ReusedExchangeExec --- .../apache/spark/sql/comet/operators.scala | 20 +- .../approved-plans-v2_7/q5a/explain.txt | 472 +++--------------- .../approved-plans-v2_7/q5a/simplified.txt | 88 +--- .../apache/comet/exec/CometExecSuite.scala | 30 ++ 4 files changed, 125 insertions(+), 485 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 63587af32..bb17442a5 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -363,7 +363,11 @@ abstract class CometNativeExec extends CometExec { } override protected def doCanonicalize(): SparkPlan = { - val canonicalizedPlan = super.doCanonicalize().asInstanceOf[CometNativeExec] + val canonicalizedPlan = super + .doCanonicalize() + .asInstanceOf[CometNativeExec] + .canonicalizePlans() + if (serializedPlanOpt.isDefined) { // If the plan is a boundary node, we should remove the serialized plan. canonicalizedPlan.cleanBlock() @@ -371,6 +375,20 @@ abstract class CometNativeExec extends CometExec { canonicalizedPlan } } + + /** + * Canonicalizes the plans of Product parameters in Comet native operators. + */ + protected def canonicalizePlans(): CometNativeExec = { + def transform(arg: Any): AnyRef = arg match { + case sparkPlan: SparkPlan => sparkPlan.canonicalized + case other: AnyRef => other + case null => null + } + + val newArgs = mapProductIterator(transform) + makeCopy(newArgs).asInstanceOf[CometNativeExec] + } } abstract class CometUnaryExec extends CometNativeExec with UnaryExecNode diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q5a/explain.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q5a/explain.txt index 2345d02e7..2b45472ff 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q5a/explain.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q5a/explain.txt @@ -1,9 +1,9 @@ == Physical Plan == -TakeOrderedAndProject (137) -+- * HashAggregate (136) - +- Exchange (135) - +- * HashAggregate (134) - +- Union (133) +TakeOrderedAndProject (83) ++- * HashAggregate (82) + +- Exchange (81) + +- * HashAggregate (80) + +- Union (79) :- * HashAggregate (68) : +- Exchange (67) : +- * HashAggregate (66) @@ -72,70 +72,16 @@ TakeOrderedAndProject (137) : +- * ColumnarToRow (58) : +- CometFilter (57) : +- CometScan parquet spark_catalog.default.web_site (56) - :- * HashAggregate (100) - : +- Exchange (99) - : +- * HashAggregate (98) - : +- * HashAggregate (97) - : +- Exchange (96) - : +- * HashAggregate (95) - : +- Union (94) - : :- * HashAggregate (70) - : : +- ReusedExchange (69) - : :- * HashAggregate (72) - : : +- ReusedExchange (71) - : +- * HashAggregate (93) - : +- Exchange (92) - : +- * HashAggregate (91) - : +- * Project (90) - : +- * BroadcastHashJoin Inner BuildRight (89) - : :- * Project (87) - : : +- * BroadcastHashJoin Inner BuildRight (86) - : : :- * ColumnarToRow (84) - : : : +- CometUnion (83) - : : : :- CometProject (75) - : : : : +- CometFilter (74) - : : : : +- CometScan parquet spark_catalog.default.web_sales (73) - : : : +- CometProject (82) - : : : +- CometBroadcastHashJoin (81) - : : : :- CometBroadcastExchange (77) - : : : : +- CometScan parquet spark_catalog.default.web_returns (76) - : : : +- CometProject (80) - : : : +- CometFilter (79) - : : : +- CometScan parquet spark_catalog.default.web_sales (78) - : : +- ReusedExchange (85) - : +- ReusedExchange (88) - +- * HashAggregate (132) - +- Exchange (131) - +- * HashAggregate (130) - +- * HashAggregate (129) - +- Exchange (128) - +- * HashAggregate (127) - +- Union (126) - :- * HashAggregate (102) - : +- ReusedExchange (101) - :- * HashAggregate (104) - : +- ReusedExchange (103) - +- * HashAggregate (125) - +- Exchange (124) - +- * HashAggregate (123) - +- * Project (122) - +- * BroadcastHashJoin Inner BuildRight (121) - :- * Project (119) - : +- * BroadcastHashJoin Inner BuildRight (118) - : :- * ColumnarToRow (116) - : : +- CometUnion (115) - : : :- CometProject (107) - : : : +- CometFilter (106) - : : : +- CometScan parquet spark_catalog.default.web_sales (105) - : : +- CometProject (114) - : : +- CometBroadcastHashJoin (113) - : : :- CometBroadcastExchange (109) - : : : +- CometScan parquet spark_catalog.default.web_returns (108) - : : +- CometProject (112) - : : +- CometFilter (111) - : : +- CometScan parquet spark_catalog.default.web_sales (110) - : +- ReusedExchange (117) - +- ReusedExchange (120) + :- * HashAggregate (73) + : +- Exchange (72) + : +- * HashAggregate (71) + : +- * HashAggregate (70) + : +- ReusedExchange (69) + +- * HashAggregate (78) + +- Exchange (77) + +- * HashAggregate (76) + +- * HashAggregate (75) + +- ReusedExchange (74) (1) Scan parquet spark_catalog.default.store_sales @@ -177,7 +123,7 @@ Child 1 Input [6]: [store_sk#16, date_sk#17, sales_price#18, profit#19, return_a (8) ColumnarToRow [codegen id : 3] Input [6]: [store_sk#6, date_sk#7, sales_price#8, profit#9, return_amt#10, net_loss#11] -(9) ReusedExchange [Reuses operator id: 142] +(9) ReusedExchange [Reuses operator id: 88] Output [1]: [d_date_sk#22] (10) BroadcastHashJoin [codegen id : 3] @@ -275,7 +221,7 @@ Child 1 Input [6]: [page_sk#57, date_sk#58, sales_price#59, profit#60, return_am (28) ColumnarToRow [codegen id : 7] Input [6]: [page_sk#47, date_sk#48, sales_price#49, profit#50, return_amt#51, net_loss#52] -(29) ReusedExchange [Reuses operator id: 142] +(29) ReusedExchange [Reuses operator id: 88] Output [1]: [d_date_sk#63] (30) BroadcastHashJoin [codegen id : 7] @@ -392,7 +338,7 @@ Child 1 Input [6]: [wsr_web_site_sk#103, date_sk#104, sales_price#105, profit#10 (52) ColumnarToRow [codegen id : 11] Input [6]: [wsr_web_site_sk#88, date_sk#89, sales_price#90, profit#91, return_amt#92, net_loss#93] -(53) ReusedExchange [Reuses operator id: 142] +(53) ReusedExchange [Reuses operator id: 88] Output [1]: [d_date_sk#109] (54) BroadcastHashJoin [codegen id : 11] @@ -471,381 +417,117 @@ Functions [3]: [sum(sales#39), sum(returns#40), sum(profit#41)] Aggregate Attributes [3]: [sum(sales#39)#141, sum(returns#40)#142, sum(profit#41)#143] Results [5]: [channel#37, id#38, cast(sum(sales#39)#141 as decimal(37,2)) AS sales#144, cast(sum(returns#40)#142 as decimal(37,2)) AS returns#145, cast(sum(profit#41)#143 as decimal(38,2)) AS profit#146] -(69) ReusedExchange [Reuses operator id: 19] -Output [5]: [s_store_id#24, sum#147, sum#148, sum#149, sum#150] +(69) ReusedExchange [Reuses operator id: 67] +Output [8]: [channel#37, id#38, sum#135, isEmpty#136, sum#137, isEmpty#138, sum#139, isEmpty#140] -(70) HashAggregate [codegen id : 18] -Input [5]: [s_store_id#24, sum#147, sum#148, sum#149, sum#150] -Keys [1]: [s_store_id#24] -Functions [4]: [sum(UnscaledValue(sales_price#8)), sum(UnscaledValue(return_amt#10)), sum(UnscaledValue(profit#9)), sum(UnscaledValue(net_loss#11))] -Aggregate Attributes [4]: [sum(UnscaledValue(sales_price#8))#33, sum(UnscaledValue(return_amt#10))#34, sum(UnscaledValue(profit#9))#35, sum(UnscaledValue(net_loss#11))#36] -Results [5]: [store channel AS channel#37, concat(store, s_store_id#24) AS id#38, MakeDecimal(sum(UnscaledValue(sales_price#8))#33,17,2) AS sales#39, MakeDecimal(sum(UnscaledValue(return_amt#10))#34,17,2) AS returns#40, (MakeDecimal(sum(UnscaledValue(profit#9))#35,17,2) - MakeDecimal(sum(UnscaledValue(net_loss#11))#36,17,2)) AS profit#41] - -(71) ReusedExchange [Reuses operator id: 39] -Output [5]: [cp_catalog_page_id#65, sum#151, sum#152, sum#153, sum#154] - -(72) HashAggregate [codegen id : 22] -Input [5]: [cp_catalog_page_id#65, sum#151, sum#152, sum#153, sum#154] -Keys [1]: [cp_catalog_page_id#65] -Functions [4]: [sum(UnscaledValue(sales_price#49)), sum(UnscaledValue(return_amt#51)), sum(UnscaledValue(profit#50)), sum(UnscaledValue(net_loss#52))] -Aggregate Attributes [4]: [sum(UnscaledValue(sales_price#49))#74, sum(UnscaledValue(return_amt#51))#75, sum(UnscaledValue(profit#50))#76, sum(UnscaledValue(net_loss#52))#77] -Results [5]: [catalog channel AS channel#78, concat(catalog_page, cp_catalog_page_id#65) AS id#79, MakeDecimal(sum(UnscaledValue(sales_price#49))#74,17,2) AS sales#80, MakeDecimal(sum(UnscaledValue(return_amt#51))#75,17,2) AS returns#81, (MakeDecimal(sum(UnscaledValue(profit#50))#76,17,2) - MakeDecimal(sum(UnscaledValue(net_loss#52))#77,17,2)) AS profit#82] - -(73) Scan parquet spark_catalog.default.web_sales -Output [4]: [ws_web_site_sk#83, ws_ext_sales_price#84, ws_net_profit#85, ws_sold_date_sk#86] -Batched: true -Location: InMemoryFileIndex [] -PartitionFilters: [isnotnull(ws_sold_date_sk#86), dynamicpruningexpression(ws_sold_date_sk#86 IN dynamicpruning#155)] -PushedFilters: [IsNotNull(ws_web_site_sk)] -ReadSchema: struct - -(74) CometFilter -Input [4]: [ws_web_site_sk#83, ws_ext_sales_price#84, ws_net_profit#85, ws_sold_date_sk#86] -Condition : isnotnull(ws_web_site_sk#83) - -(75) CometProject -Input [4]: [ws_web_site_sk#83, ws_ext_sales_price#84, ws_net_profit#85, ws_sold_date_sk#86] -Arguments: [wsr_web_site_sk#88, date_sk#89, sales_price#90, profit#91, return_amt#92, net_loss#93], [ws_web_site_sk#83 AS wsr_web_site_sk#88, ws_sold_date_sk#86 AS date_sk#89, ws_ext_sales_price#84 AS sales_price#90, ws_net_profit#85 AS profit#91, 0.00 AS return_amt#92, 0.00 AS net_loss#93] - -(76) Scan parquet spark_catalog.default.web_returns -Output [5]: [wr_item_sk#94, wr_order_number#95, wr_return_amt#96, wr_net_loss#97, wr_returned_date_sk#98] -Batched: true -Location: InMemoryFileIndex [] -PartitionFilters: [isnotnull(wr_returned_date_sk#98), dynamicpruningexpression(wr_returned_date_sk#98 IN dynamicpruning#155)] -ReadSchema: struct - -(77) CometBroadcastExchange -Input [5]: [wr_item_sk#94, wr_order_number#95, wr_return_amt#96, wr_net_loss#97, wr_returned_date_sk#98] -Arguments: [wr_item_sk#94, wr_order_number#95, wr_return_amt#96, wr_net_loss#97, wr_returned_date_sk#98] - -(78) Scan parquet spark_catalog.default.web_sales -Output [4]: [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101, ws_sold_date_sk#102] -Batched: true -Location [not included in comparison]/{warehouse_dir}/web_sales] -PushedFilters: [IsNotNull(ws_item_sk), IsNotNull(ws_order_number), IsNotNull(ws_web_site_sk)] -ReadSchema: struct - -(79) CometFilter -Input [4]: [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101, ws_sold_date_sk#102] -Condition : ((isnotnull(ws_item_sk#99) AND isnotnull(ws_order_number#101)) AND isnotnull(ws_web_site_sk#100)) - -(80) CometProject -Input [4]: [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101, ws_sold_date_sk#102] -Arguments: [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101], [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101] - -(81) CometBroadcastHashJoin -Left output [5]: [wr_item_sk#94, wr_order_number#95, wr_return_amt#96, wr_net_loss#97, wr_returned_date_sk#98] -Right output [3]: [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101] -Arguments: [wr_item_sk#94, wr_order_number#95], [ws_item_sk#99, ws_order_number#101], Inner - -(82) CometProject -Input [8]: [wr_item_sk#94, wr_order_number#95, wr_return_amt#96, wr_net_loss#97, wr_returned_date_sk#98, ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101] -Arguments: [wsr_web_site_sk#103, date_sk#104, sales_price#105, profit#106, return_amt#107, net_loss#108], [ws_web_site_sk#100 AS wsr_web_site_sk#103, wr_returned_date_sk#98 AS date_sk#104, 0.00 AS sales_price#105, 0.00 AS profit#106, wr_return_amt#96 AS return_amt#107, wr_net_loss#97 AS net_loss#108] - -(83) CometUnion -Child 0 Input [6]: [wsr_web_site_sk#88, date_sk#89, sales_price#90, profit#91, return_amt#92, net_loss#93] -Child 1 Input [6]: [wsr_web_site_sk#103, date_sk#104, sales_price#105, profit#106, return_amt#107, net_loss#108] - -(84) ColumnarToRow [codegen id : 25] -Input [6]: [wsr_web_site_sk#88, date_sk#89, sales_price#90, profit#91, return_amt#92, net_loss#93] - -(85) ReusedExchange [Reuses operator id: 142] -Output [1]: [d_date_sk#109] - -(86) BroadcastHashJoin [codegen id : 25] -Left keys [1]: [date_sk#89] -Right keys [1]: [d_date_sk#109] -Join type: Inner -Join condition: None - -(87) Project [codegen id : 25] -Output [5]: [wsr_web_site_sk#88, sales_price#90, profit#91, return_amt#92, net_loss#93] -Input [7]: [wsr_web_site_sk#88, date_sk#89, sales_price#90, profit#91, return_amt#92, net_loss#93, d_date_sk#109] - -(88) ReusedExchange [Reuses operator id: 59] -Output [2]: [web_site_sk#110, web_site_id#111] - -(89) BroadcastHashJoin [codegen id : 25] -Left keys [1]: [wsr_web_site_sk#88] -Right keys [1]: [web_site_sk#110] -Join type: Inner -Join condition: None - -(90) Project [codegen id : 25] -Output [5]: [sales_price#90, profit#91, return_amt#92, net_loss#93, web_site_id#111] -Input [7]: [wsr_web_site_sk#88, sales_price#90, profit#91, return_amt#92, net_loss#93, web_site_sk#110, web_site_id#111] - -(91) HashAggregate [codegen id : 25] -Input [5]: [sales_price#90, profit#91, return_amt#92, net_loss#93, web_site_id#111] -Keys [1]: [web_site_id#111] -Functions [4]: [partial_sum(UnscaledValue(sales_price#90)), partial_sum(UnscaledValue(return_amt#92)), partial_sum(UnscaledValue(profit#91)), partial_sum(UnscaledValue(net_loss#93))] -Aggregate Attributes [4]: [sum#156, sum#157, sum#158, sum#159] -Results [5]: [web_site_id#111, sum#160, sum#161, sum#162, sum#163] - -(92) Exchange -Input [5]: [web_site_id#111, sum#160, sum#161, sum#162, sum#163] -Arguments: hashpartitioning(web_site_id#111, 5), ENSURE_REQUIREMENTS, [plan_id=8] - -(93) HashAggregate [codegen id : 26] -Input [5]: [web_site_id#111, sum#160, sum#161, sum#162, sum#163] -Keys [1]: [web_site_id#111] -Functions [4]: [sum(UnscaledValue(sales_price#90)), sum(UnscaledValue(return_amt#92)), sum(UnscaledValue(profit#91)), sum(UnscaledValue(net_loss#93))] -Aggregate Attributes [4]: [sum(UnscaledValue(sales_price#90))#120, sum(UnscaledValue(return_amt#92))#121, sum(UnscaledValue(profit#91))#122, sum(UnscaledValue(net_loss#93))#123] -Results [5]: [web channel AS channel#124, concat(web_site, web_site_id#111) AS id#125, MakeDecimal(sum(UnscaledValue(sales_price#90))#120,17,2) AS sales#126, MakeDecimal(sum(UnscaledValue(return_amt#92))#121,17,2) AS returns#127, (MakeDecimal(sum(UnscaledValue(profit#91))#122,17,2) - MakeDecimal(sum(UnscaledValue(net_loss#93))#123,17,2)) AS profit#128] - -(94) Union - -(95) HashAggregate [codegen id : 27] -Input [5]: [channel#37, id#38, sales#39, returns#40, profit#41] -Keys [2]: [channel#37, id#38] -Functions [3]: [partial_sum(sales#39), partial_sum(returns#40), partial_sum(profit#41)] -Aggregate Attributes [6]: [sum#129, isEmpty#130, sum#131, isEmpty#132, sum#133, isEmpty#134] -Results [8]: [channel#37, id#38, sum#135, isEmpty#136, sum#137, isEmpty#138, sum#139, isEmpty#140] - -(96) Exchange -Input [8]: [channel#37, id#38, sum#135, isEmpty#136, sum#137, isEmpty#138, sum#139, isEmpty#140] -Arguments: hashpartitioning(channel#37, id#38, 5), ENSURE_REQUIREMENTS, [plan_id=9] - -(97) HashAggregate [codegen id : 28] +(70) HashAggregate [codegen id : 28] Input [8]: [channel#37, id#38, sum#135, isEmpty#136, sum#137, isEmpty#138, sum#139, isEmpty#140] Keys [2]: [channel#37, id#38] Functions [3]: [sum(sales#39), sum(returns#40), sum(profit#41)] Aggregate Attributes [3]: [sum(sales#39)#141, sum(returns#40)#142, sum(profit#41)#143] -Results [4]: [channel#37, sum(sales#39)#141 AS sales#164, sum(returns#40)#142 AS returns#165, sum(profit#41)#143 AS profit#166] +Results [4]: [channel#37, sum(sales#39)#141 AS sales#147, sum(returns#40)#142 AS returns#148, sum(profit#41)#143 AS profit#149] -(98) HashAggregate [codegen id : 28] -Input [4]: [channel#37, sales#164, returns#165, profit#166] +(71) HashAggregate [codegen id : 28] +Input [4]: [channel#37, sales#147, returns#148, profit#149] Keys [1]: [channel#37] -Functions [3]: [partial_sum(sales#164), partial_sum(returns#165), partial_sum(profit#166)] -Aggregate Attributes [6]: [sum#167, isEmpty#168, sum#169, isEmpty#170, sum#171, isEmpty#172] -Results [7]: [channel#37, sum#173, isEmpty#174, sum#175, isEmpty#176, sum#177, isEmpty#178] +Functions [3]: [partial_sum(sales#147), partial_sum(returns#148), partial_sum(profit#149)] +Aggregate Attributes [6]: [sum#150, isEmpty#151, sum#152, isEmpty#153, sum#154, isEmpty#155] +Results [7]: [channel#37, sum#156, isEmpty#157, sum#158, isEmpty#159, sum#160, isEmpty#161] -(99) Exchange -Input [7]: [channel#37, sum#173, isEmpty#174, sum#175, isEmpty#176, sum#177, isEmpty#178] -Arguments: hashpartitioning(channel#37, 5), ENSURE_REQUIREMENTS, [plan_id=10] +(72) Exchange +Input [7]: [channel#37, sum#156, isEmpty#157, sum#158, isEmpty#159, sum#160, isEmpty#161] +Arguments: hashpartitioning(channel#37, 5), ENSURE_REQUIREMENTS, [plan_id=8] -(100) HashAggregate [codegen id : 29] -Input [7]: [channel#37, sum#173, isEmpty#174, sum#175, isEmpty#176, sum#177, isEmpty#178] +(73) HashAggregate [codegen id : 29] +Input [7]: [channel#37, sum#156, isEmpty#157, sum#158, isEmpty#159, sum#160, isEmpty#161] Keys [1]: [channel#37] -Functions [3]: [sum(sales#164), sum(returns#165), sum(profit#166)] -Aggregate Attributes [3]: [sum(sales#164)#179, sum(returns#165)#180, sum(profit#166)#181] -Results [5]: [channel#37, null AS id#182, sum(sales#164)#179 AS sum(sales)#183, sum(returns#165)#180 AS sum(returns)#184, sum(profit#166)#181 AS sum(profit)#185] - -(101) ReusedExchange [Reuses operator id: 19] -Output [5]: [s_store_id#24, sum#186, sum#187, sum#188, sum#189] - -(102) HashAggregate [codegen id : 33] -Input [5]: [s_store_id#24, sum#186, sum#187, sum#188, sum#189] -Keys [1]: [s_store_id#24] -Functions [4]: [sum(UnscaledValue(sales_price#8)), sum(UnscaledValue(return_amt#10)), sum(UnscaledValue(profit#9)), sum(UnscaledValue(net_loss#11))] -Aggregate Attributes [4]: [sum(UnscaledValue(sales_price#8))#33, sum(UnscaledValue(return_amt#10))#34, sum(UnscaledValue(profit#9))#35, sum(UnscaledValue(net_loss#11))#36] -Results [5]: [store channel AS channel#37, concat(store, s_store_id#24) AS id#38, MakeDecimal(sum(UnscaledValue(sales_price#8))#33,17,2) AS sales#39, MakeDecimal(sum(UnscaledValue(return_amt#10))#34,17,2) AS returns#40, (MakeDecimal(sum(UnscaledValue(profit#9))#35,17,2) - MakeDecimal(sum(UnscaledValue(net_loss#11))#36,17,2)) AS profit#41] - -(103) ReusedExchange [Reuses operator id: 39] -Output [5]: [cp_catalog_page_id#65, sum#190, sum#191, sum#192, sum#193] - -(104) HashAggregate [codegen id : 37] -Input [5]: [cp_catalog_page_id#65, sum#190, sum#191, sum#192, sum#193] -Keys [1]: [cp_catalog_page_id#65] -Functions [4]: [sum(UnscaledValue(sales_price#49)), sum(UnscaledValue(return_amt#51)), sum(UnscaledValue(profit#50)), sum(UnscaledValue(net_loss#52))] -Aggregate Attributes [4]: [sum(UnscaledValue(sales_price#49))#74, sum(UnscaledValue(return_amt#51))#75, sum(UnscaledValue(profit#50))#76, sum(UnscaledValue(net_loss#52))#77] -Results [5]: [catalog channel AS channel#78, concat(catalog_page, cp_catalog_page_id#65) AS id#79, MakeDecimal(sum(UnscaledValue(sales_price#49))#74,17,2) AS sales#80, MakeDecimal(sum(UnscaledValue(return_amt#51))#75,17,2) AS returns#81, (MakeDecimal(sum(UnscaledValue(profit#50))#76,17,2) - MakeDecimal(sum(UnscaledValue(net_loss#52))#77,17,2)) AS profit#82] - -(105) Scan parquet spark_catalog.default.web_sales -Output [4]: [ws_web_site_sk#83, ws_ext_sales_price#84, ws_net_profit#85, ws_sold_date_sk#86] -Batched: true -Location: InMemoryFileIndex [] -PartitionFilters: [isnotnull(ws_sold_date_sk#86), dynamicpruningexpression(ws_sold_date_sk#86 IN dynamicpruning#194)] -PushedFilters: [IsNotNull(ws_web_site_sk)] -ReadSchema: struct - -(106) CometFilter -Input [4]: [ws_web_site_sk#83, ws_ext_sales_price#84, ws_net_profit#85, ws_sold_date_sk#86] -Condition : isnotnull(ws_web_site_sk#83) - -(107) CometProject -Input [4]: [ws_web_site_sk#83, ws_ext_sales_price#84, ws_net_profit#85, ws_sold_date_sk#86] -Arguments: [wsr_web_site_sk#88, date_sk#89, sales_price#90, profit#91, return_amt#92, net_loss#93], [ws_web_site_sk#83 AS wsr_web_site_sk#88, ws_sold_date_sk#86 AS date_sk#89, ws_ext_sales_price#84 AS sales_price#90, ws_net_profit#85 AS profit#91, 0.00 AS return_amt#92, 0.00 AS net_loss#93] - -(108) Scan parquet spark_catalog.default.web_returns -Output [5]: [wr_item_sk#94, wr_order_number#95, wr_return_amt#96, wr_net_loss#97, wr_returned_date_sk#98] -Batched: true -Location: InMemoryFileIndex [] -PartitionFilters: [isnotnull(wr_returned_date_sk#98), dynamicpruningexpression(wr_returned_date_sk#98 IN dynamicpruning#194)] -ReadSchema: struct - -(109) CometBroadcastExchange -Input [5]: [wr_item_sk#94, wr_order_number#95, wr_return_amt#96, wr_net_loss#97, wr_returned_date_sk#98] -Arguments: [wr_item_sk#94, wr_order_number#95, wr_return_amt#96, wr_net_loss#97, wr_returned_date_sk#98] - -(110) Scan parquet spark_catalog.default.web_sales -Output [4]: [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101, ws_sold_date_sk#102] -Batched: true -Location [not included in comparison]/{warehouse_dir}/web_sales] -PushedFilters: [IsNotNull(ws_item_sk), IsNotNull(ws_order_number), IsNotNull(ws_web_site_sk)] -ReadSchema: struct - -(111) CometFilter -Input [4]: [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101, ws_sold_date_sk#102] -Condition : ((isnotnull(ws_item_sk#99) AND isnotnull(ws_order_number#101)) AND isnotnull(ws_web_site_sk#100)) - -(112) CometProject -Input [4]: [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101, ws_sold_date_sk#102] -Arguments: [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101], [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101] - -(113) CometBroadcastHashJoin -Left output [5]: [wr_item_sk#94, wr_order_number#95, wr_return_amt#96, wr_net_loss#97, wr_returned_date_sk#98] -Right output [3]: [ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101] -Arguments: [wr_item_sk#94, wr_order_number#95], [ws_item_sk#99, ws_order_number#101], Inner - -(114) CometProject -Input [8]: [wr_item_sk#94, wr_order_number#95, wr_return_amt#96, wr_net_loss#97, wr_returned_date_sk#98, ws_item_sk#99, ws_web_site_sk#100, ws_order_number#101] -Arguments: [wsr_web_site_sk#103, date_sk#104, sales_price#105, profit#106, return_amt#107, net_loss#108], [ws_web_site_sk#100 AS wsr_web_site_sk#103, wr_returned_date_sk#98 AS date_sk#104, 0.00 AS sales_price#105, 0.00 AS profit#106, wr_return_amt#96 AS return_amt#107, wr_net_loss#97 AS net_loss#108] - -(115) CometUnion -Child 0 Input [6]: [wsr_web_site_sk#88, date_sk#89, sales_price#90, profit#91, return_amt#92, net_loss#93] -Child 1 Input [6]: [wsr_web_site_sk#103, date_sk#104, sales_price#105, profit#106, return_amt#107, net_loss#108] - -(116) ColumnarToRow [codegen id : 40] -Input [6]: [wsr_web_site_sk#88, date_sk#89, sales_price#90, profit#91, return_amt#92, net_loss#93] - -(117) ReusedExchange [Reuses operator id: 142] -Output [1]: [d_date_sk#109] - -(118) BroadcastHashJoin [codegen id : 40] -Left keys [1]: [date_sk#89] -Right keys [1]: [d_date_sk#109] -Join type: Inner -Join condition: None - -(119) Project [codegen id : 40] -Output [5]: [wsr_web_site_sk#88, sales_price#90, profit#91, return_amt#92, net_loss#93] -Input [7]: [wsr_web_site_sk#88, date_sk#89, sales_price#90, profit#91, return_amt#92, net_loss#93, d_date_sk#109] - -(120) ReusedExchange [Reuses operator id: 59] -Output [2]: [web_site_sk#110, web_site_id#111] - -(121) BroadcastHashJoin [codegen id : 40] -Left keys [1]: [wsr_web_site_sk#88] -Right keys [1]: [web_site_sk#110] -Join type: Inner -Join condition: None - -(122) Project [codegen id : 40] -Output [5]: [sales_price#90, profit#91, return_amt#92, net_loss#93, web_site_id#111] -Input [7]: [wsr_web_site_sk#88, sales_price#90, profit#91, return_amt#92, net_loss#93, web_site_sk#110, web_site_id#111] - -(123) HashAggregate [codegen id : 40] -Input [5]: [sales_price#90, profit#91, return_amt#92, net_loss#93, web_site_id#111] -Keys [1]: [web_site_id#111] -Functions [4]: [partial_sum(UnscaledValue(sales_price#90)), partial_sum(UnscaledValue(return_amt#92)), partial_sum(UnscaledValue(profit#91)), partial_sum(UnscaledValue(net_loss#93))] -Aggregate Attributes [4]: [sum#195, sum#196, sum#197, sum#198] -Results [5]: [web_site_id#111, sum#199, sum#200, sum#201, sum#202] - -(124) Exchange -Input [5]: [web_site_id#111, sum#199, sum#200, sum#201, sum#202] -Arguments: hashpartitioning(web_site_id#111, 5), ENSURE_REQUIREMENTS, [plan_id=11] - -(125) HashAggregate [codegen id : 41] -Input [5]: [web_site_id#111, sum#199, sum#200, sum#201, sum#202] -Keys [1]: [web_site_id#111] -Functions [4]: [sum(UnscaledValue(sales_price#90)), sum(UnscaledValue(return_amt#92)), sum(UnscaledValue(profit#91)), sum(UnscaledValue(net_loss#93))] -Aggregate Attributes [4]: [sum(UnscaledValue(sales_price#90))#120, sum(UnscaledValue(return_amt#92))#121, sum(UnscaledValue(profit#91))#122, sum(UnscaledValue(net_loss#93))#123] -Results [5]: [web channel AS channel#124, concat(web_site, web_site_id#111) AS id#125, MakeDecimal(sum(UnscaledValue(sales_price#90))#120,17,2) AS sales#126, MakeDecimal(sum(UnscaledValue(return_amt#92))#121,17,2) AS returns#127, (MakeDecimal(sum(UnscaledValue(profit#91))#122,17,2) - MakeDecimal(sum(UnscaledValue(net_loss#93))#123,17,2)) AS profit#128] +Functions [3]: [sum(sales#147), sum(returns#148), sum(profit#149)] +Aggregate Attributes [3]: [sum(sales#147)#162, sum(returns#148)#163, sum(profit#149)#164] +Results [5]: [channel#37, null AS id#165, sum(sales#147)#162 AS sum(sales)#166, sum(returns#148)#163 AS sum(returns)#167, sum(profit#149)#164 AS sum(profit)#168] -(126) Union +(74) ReusedExchange [Reuses operator id: 67] +Output [8]: [channel#37, id#38, sum#135, isEmpty#136, sum#137, isEmpty#138, sum#139, isEmpty#140] -(127) HashAggregate [codegen id : 42] -Input [5]: [channel#37, id#38, sales#39, returns#40, profit#41] -Keys [2]: [channel#37, id#38] -Functions [3]: [partial_sum(sales#39), partial_sum(returns#40), partial_sum(profit#41)] -Aggregate Attributes [6]: [sum#129, isEmpty#130, sum#131, isEmpty#132, sum#133, isEmpty#134] -Results [8]: [channel#37, id#38, sum#135, isEmpty#136, sum#137, isEmpty#138, sum#139, isEmpty#140] - -(128) Exchange -Input [8]: [channel#37, id#38, sum#135, isEmpty#136, sum#137, isEmpty#138, sum#139, isEmpty#140] -Arguments: hashpartitioning(channel#37, id#38, 5), ENSURE_REQUIREMENTS, [plan_id=12] - -(129) HashAggregate [codegen id : 43] +(75) HashAggregate [codegen id : 43] Input [8]: [channel#37, id#38, sum#135, isEmpty#136, sum#137, isEmpty#138, sum#139, isEmpty#140] Keys [2]: [channel#37, id#38] Functions [3]: [sum(sales#39), sum(returns#40), sum(profit#41)] Aggregate Attributes [3]: [sum(sales#39)#141, sum(returns#40)#142, sum(profit#41)#143] -Results [3]: [sum(sales#39)#141 AS sales#164, sum(returns#40)#142 AS returns#165, sum(profit#41)#143 AS profit#166] +Results [3]: [sum(sales#39)#141 AS sales#147, sum(returns#40)#142 AS returns#148, sum(profit#41)#143 AS profit#149] -(130) HashAggregate [codegen id : 43] -Input [3]: [sales#164, returns#165, profit#166] +(76) HashAggregate [codegen id : 43] +Input [3]: [sales#147, returns#148, profit#149] Keys: [] -Functions [3]: [partial_sum(sales#164), partial_sum(returns#165), partial_sum(profit#166)] -Aggregate Attributes [6]: [sum#203, isEmpty#204, sum#205, isEmpty#206, sum#207, isEmpty#208] -Results [6]: [sum#209, isEmpty#210, sum#211, isEmpty#212, sum#213, isEmpty#214] +Functions [3]: [partial_sum(sales#147), partial_sum(returns#148), partial_sum(profit#149)] +Aggregate Attributes [6]: [sum#169, isEmpty#170, sum#171, isEmpty#172, sum#173, isEmpty#174] +Results [6]: [sum#175, isEmpty#176, sum#177, isEmpty#178, sum#179, isEmpty#180] -(131) Exchange -Input [6]: [sum#209, isEmpty#210, sum#211, isEmpty#212, sum#213, isEmpty#214] -Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=13] +(77) Exchange +Input [6]: [sum#175, isEmpty#176, sum#177, isEmpty#178, sum#179, isEmpty#180] +Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=9] -(132) HashAggregate [codegen id : 44] -Input [6]: [sum#209, isEmpty#210, sum#211, isEmpty#212, sum#213, isEmpty#214] +(78) HashAggregate [codegen id : 44] +Input [6]: [sum#175, isEmpty#176, sum#177, isEmpty#178, sum#179, isEmpty#180] Keys: [] -Functions [3]: [sum(sales#164), sum(returns#165), sum(profit#166)] -Aggregate Attributes [3]: [sum(sales#164)#215, sum(returns#165)#216, sum(profit#166)#217] -Results [5]: [null AS channel#218, null AS id#219, sum(sales#164)#215 AS sum(sales)#220, sum(returns#165)#216 AS sum(returns)#221, sum(profit#166)#217 AS sum(profit)#222] +Functions [3]: [sum(sales#147), sum(returns#148), sum(profit#149)] +Aggregate Attributes [3]: [sum(sales#147)#181, sum(returns#148)#182, sum(profit#149)#183] +Results [5]: [null AS channel#184, null AS id#185, sum(sales#147)#181 AS sum(sales)#186, sum(returns#148)#182 AS sum(returns)#187, sum(profit#149)#183 AS sum(profit)#188] -(133) Union +(79) Union -(134) HashAggregate [codegen id : 45] +(80) HashAggregate [codegen id : 45] Input [5]: [channel#37, id#38, sales#144, returns#145, profit#146] Keys [5]: [channel#37, id#38, sales#144, returns#145, profit#146] Functions: [] Aggregate Attributes: [] Results [5]: [channel#37, id#38, sales#144, returns#145, profit#146] -(135) Exchange +(81) Exchange Input [5]: [channel#37, id#38, sales#144, returns#145, profit#146] -Arguments: hashpartitioning(channel#37, id#38, sales#144, returns#145, profit#146, 5), ENSURE_REQUIREMENTS, [plan_id=14] +Arguments: hashpartitioning(channel#37, id#38, sales#144, returns#145, profit#146, 5), ENSURE_REQUIREMENTS, [plan_id=10] -(136) HashAggregate [codegen id : 46] +(82) HashAggregate [codegen id : 46] Input [5]: [channel#37, id#38, sales#144, returns#145, profit#146] Keys [5]: [channel#37, id#38, sales#144, returns#145, profit#146] Functions: [] Aggregate Attributes: [] Results [5]: [channel#37, id#38, sales#144, returns#145, profit#146] -(137) TakeOrderedAndProject +(83) TakeOrderedAndProject Input [5]: [channel#37, id#38, sales#144, returns#145, profit#146] Arguments: 100, [channel#37 ASC NULLS FIRST, id#38 ASC NULLS FIRST], [channel#37, id#38, sales#144, returns#145, profit#146] ===== Subqueries ===== Subquery:1 Hosting operator id = 1 Hosting Expression = ss_sold_date_sk#4 IN dynamicpruning#5 -BroadcastExchange (142) -+- * ColumnarToRow (141) - +- CometProject (140) - +- CometFilter (139) - +- CometScan parquet spark_catalog.default.date_dim (138) +BroadcastExchange (88) ++- * ColumnarToRow (87) + +- CometProject (86) + +- CometFilter (85) + +- CometScan parquet spark_catalog.default.date_dim (84) -(138) Scan parquet spark_catalog.default.date_dim -Output [2]: [d_date_sk#22, d_date#223] +(84) Scan parquet spark_catalog.default.date_dim +Output [2]: [d_date_sk#22, d_date#189] Batched: true Location [not included in comparison]/{warehouse_dir}/date_dim] PushedFilters: [IsNotNull(d_date), GreaterThanOrEqual(d_date,1998-08-04), LessThanOrEqual(d_date,1998-08-18), IsNotNull(d_date_sk)] ReadSchema: struct -(139) CometFilter -Input [2]: [d_date_sk#22, d_date#223] -Condition : (((isnotnull(d_date#223) AND (d_date#223 >= 1998-08-04)) AND (d_date#223 <= 1998-08-18)) AND isnotnull(d_date_sk#22)) +(85) CometFilter +Input [2]: [d_date_sk#22, d_date#189] +Condition : (((isnotnull(d_date#189) AND (d_date#189 >= 1998-08-04)) AND (d_date#189 <= 1998-08-18)) AND isnotnull(d_date_sk#22)) -(140) CometProject -Input [2]: [d_date_sk#22, d_date#223] +(86) CometProject +Input [2]: [d_date_sk#22, d_date#189] Arguments: [d_date_sk#22], [d_date_sk#22] -(141) ColumnarToRow [codegen id : 1] +(87) ColumnarToRow [codegen id : 1] Input [1]: [d_date_sk#22] -(142) BroadcastExchange +(88) BroadcastExchange Input [1]: [d_date_sk#22] -Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=15] +Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=11] Subquery:2 Hosting operator id = 4 Hosting Expression = sr_returned_date_sk#15 IN dynamicpruning#5 @@ -857,12 +539,4 @@ Subquery:5 Hosting operator id = 41 Hosting Expression = ws_sold_date_sk#86 IN d Subquery:6 Hosting operator id = 44 Hosting Expression = wr_returned_date_sk#98 IN dynamicpruning#5 -Subquery:7 Hosting operator id = 73 Hosting Expression = ws_sold_date_sk#86 IN dynamicpruning#5 - -Subquery:8 Hosting operator id = 76 Hosting Expression = wr_returned_date_sk#98 IN dynamicpruning#5 - -Subquery:9 Hosting operator id = 105 Hosting Expression = ws_sold_date_sk#86 IN dynamicpruning#5 - -Subquery:10 Hosting operator id = 108 Hosting Expression = wr_returned_date_sk#98 IN dynamicpruning#5 - diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q5a/simplified.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q5a/simplified.txt index ff62cb5c2..aaec304fd 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q5a/simplified.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q5a/simplified.txt @@ -124,95 +124,13 @@ TakeOrderedAndProject [channel,id,sales,returns,profit] HashAggregate [channel,sales,returns,profit] [sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty] HashAggregate [channel,id,sum,isEmpty,sum,isEmpty,sum,isEmpty] [sum(sales),sum(returns),sum(profit),sales,returns,profit,sum,isEmpty,sum,isEmpty,sum,isEmpty] InputAdapter - Exchange [channel,id] #12 - WholeStageCodegen (27) - HashAggregate [channel,id,sales,returns,profit] [sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty] - InputAdapter - Union - WholeStageCodegen (18) - HashAggregate [s_store_id,sum,sum,sum,sum] [sum(UnscaledValue(sales_price)),sum(UnscaledValue(return_amt)),sum(UnscaledValue(profit)),sum(UnscaledValue(net_loss)),channel,id,sales,returns,profit,sum,sum,sum,sum] - InputAdapter - ReusedExchange [s_store_id,sum,sum,sum,sum] #3 - WholeStageCodegen (22) - HashAggregate [cp_catalog_page_id,sum,sum,sum,sum] [sum(UnscaledValue(sales_price)),sum(UnscaledValue(return_amt)),sum(UnscaledValue(profit)),sum(UnscaledValue(net_loss)),channel,id,sales,returns,profit,sum,sum,sum,sum] - InputAdapter - ReusedExchange [cp_catalog_page_id,sum,sum,sum,sum] #6 - WholeStageCodegen (26) - HashAggregate [web_site_id,sum,sum,sum,sum] [sum(UnscaledValue(sales_price)),sum(UnscaledValue(return_amt)),sum(UnscaledValue(profit)),sum(UnscaledValue(net_loss)),channel,id,sales,returns,profit,sum,sum,sum,sum] - InputAdapter - Exchange [web_site_id] #13 - WholeStageCodegen (25) - HashAggregate [web_site_id,sales_price,return_amt,profit,net_loss] [sum,sum,sum,sum,sum,sum,sum,sum] - Project [sales_price,profit,return_amt,net_loss,web_site_id] - BroadcastHashJoin [wsr_web_site_sk,web_site_sk] - Project [wsr_web_site_sk,sales_price,profit,return_amt,net_loss] - BroadcastHashJoin [date_sk,d_date_sk] - ColumnarToRow - InputAdapter - CometUnion - CometProject [ws_web_site_sk,ws_sold_date_sk,ws_ext_sales_price,ws_net_profit] [wsr_web_site_sk,date_sk,sales_price,profit,return_amt,net_loss] - CometFilter [ws_web_site_sk] - CometScan parquet spark_catalog.default.web_sales [ws_web_site_sk,ws_ext_sales_price,ws_net_profit,ws_sold_date_sk] - ReusedSubquery [d_date_sk] #1 - CometProject [ws_web_site_sk,wr_returned_date_sk,wr_return_amt,wr_net_loss] [wsr_web_site_sk,date_sk,sales_price,profit,return_amt,net_loss] - CometBroadcastHashJoin [wr_item_sk,wr_order_number,ws_item_sk,ws_order_number] - CometBroadcastExchange #14 - CometScan parquet spark_catalog.default.web_returns [wr_item_sk,wr_order_number,wr_return_amt,wr_net_loss,wr_returned_date_sk] - ReusedSubquery [d_date_sk] #1 - CometProject [ws_item_sk,ws_web_site_sk,ws_order_number] - CometFilter [ws_item_sk,ws_order_number,ws_web_site_sk] - CometScan parquet spark_catalog.default.web_sales [ws_item_sk,ws_web_site_sk,ws_order_number,ws_sold_date_sk] - InputAdapter - ReusedExchange [d_date_sk] #4 - InputAdapter - ReusedExchange [web_site_sk,web_site_id] #10 + ReusedExchange [channel,id,sum,isEmpty,sum,isEmpty,sum,isEmpty] #2 WholeStageCodegen (44) HashAggregate [sum,isEmpty,sum,isEmpty,sum,isEmpty] [sum(sales),sum(returns),sum(profit),channel,id,sum(sales),sum(returns),sum(profit),sum,isEmpty,sum,isEmpty,sum,isEmpty] InputAdapter - Exchange #15 + Exchange #12 WholeStageCodegen (43) HashAggregate [sales,returns,profit] [sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty] HashAggregate [channel,id,sum,isEmpty,sum,isEmpty,sum,isEmpty] [sum(sales),sum(returns),sum(profit),sales,returns,profit,sum,isEmpty,sum,isEmpty,sum,isEmpty] InputAdapter - Exchange [channel,id] #16 - WholeStageCodegen (42) - HashAggregate [channel,id,sales,returns,profit] [sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty,sum,isEmpty] - InputAdapter - Union - WholeStageCodegen (33) - HashAggregate [s_store_id,sum,sum,sum,sum] [sum(UnscaledValue(sales_price)),sum(UnscaledValue(return_amt)),sum(UnscaledValue(profit)),sum(UnscaledValue(net_loss)),channel,id,sales,returns,profit,sum,sum,sum,sum] - InputAdapter - ReusedExchange [s_store_id,sum,sum,sum,sum] #3 - WholeStageCodegen (37) - HashAggregate [cp_catalog_page_id,sum,sum,sum,sum] [sum(UnscaledValue(sales_price)),sum(UnscaledValue(return_amt)),sum(UnscaledValue(profit)),sum(UnscaledValue(net_loss)),channel,id,sales,returns,profit,sum,sum,sum,sum] - InputAdapter - ReusedExchange [cp_catalog_page_id,sum,sum,sum,sum] #6 - WholeStageCodegen (41) - HashAggregate [web_site_id,sum,sum,sum,sum] [sum(UnscaledValue(sales_price)),sum(UnscaledValue(return_amt)),sum(UnscaledValue(profit)),sum(UnscaledValue(net_loss)),channel,id,sales,returns,profit,sum,sum,sum,sum] - InputAdapter - Exchange [web_site_id] #17 - WholeStageCodegen (40) - HashAggregate [web_site_id,sales_price,return_amt,profit,net_loss] [sum,sum,sum,sum,sum,sum,sum,sum] - Project [sales_price,profit,return_amt,net_loss,web_site_id] - BroadcastHashJoin [wsr_web_site_sk,web_site_sk] - Project [wsr_web_site_sk,sales_price,profit,return_amt,net_loss] - BroadcastHashJoin [date_sk,d_date_sk] - ColumnarToRow - InputAdapter - CometUnion - CometProject [ws_web_site_sk,ws_sold_date_sk,ws_ext_sales_price,ws_net_profit] [wsr_web_site_sk,date_sk,sales_price,profit,return_amt,net_loss] - CometFilter [ws_web_site_sk] - CometScan parquet spark_catalog.default.web_sales [ws_web_site_sk,ws_ext_sales_price,ws_net_profit,ws_sold_date_sk] - ReusedSubquery [d_date_sk] #1 - CometProject [ws_web_site_sk,wr_returned_date_sk,wr_return_amt,wr_net_loss] [wsr_web_site_sk,date_sk,sales_price,profit,return_amt,net_loss] - CometBroadcastHashJoin [wr_item_sk,wr_order_number,ws_item_sk,ws_order_number] - CometBroadcastExchange #18 - CometScan parquet spark_catalog.default.web_returns [wr_item_sk,wr_order_number,wr_return_amt,wr_net_loss,wr_returned_date_sk] - ReusedSubquery [d_date_sk] #1 - CometProject [ws_item_sk,ws_web_site_sk,ws_order_number] - CometFilter [ws_item_sk,ws_order_number,ws_web_site_sk] - CometScan parquet spark_catalog.default.web_sales [ws_item_sk,ws_web_site_sk,ws_order_number,ws_sold_date_sk] - InputAdapter - ReusedExchange [d_date_sk] #4 - InputAdapter - ReusedExchange [web_site_sk,web_site_id] #10 + ReusedExchange [channel,id,sum,isEmpty,sum,isEmpty,sum,isEmpty] #2 diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 2e1444281..c5fef022c 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -63,6 +63,36 @@ class CometExecSuite extends CometTestBase { } } + test("fix CometNativeExec.doCanonicalize for ReusedExchangeExec") { + assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark 3.4+") + withSQLConf( + CometConf.COMET_EXEC_BROADCAST_FORCE_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + withTable("td") { + testData + .withColumn("bucket", $"key" % 3) + .write + .mode(SaveMode.Overwrite) + .bucketBy(2, "bucket") + .format("parquet") + .saveAsTable("td") + val df = sql(""" + |SELECT t1.key, t2.key, t3.key + |FROM td AS t1 + |JOIN td AS t2 ON t2.key = t1.key + |JOIN td AS t3 ON t3.key = t2.key + |WHERE t1.bucket = 1 AND t2.bucket = 1 AND t3.bucket = 1 + |""".stripMargin) + val reusedPlan = ReuseExchangeAndSubquery.apply(df.queryExecution.executedPlan) + val reusedExchanges = collect(reusedPlan) { case r: ReusedExchangeExec => + r + } + assert(reusedExchanges.size == 1) + assert(reusedExchanges.head.child.isInstanceOf[CometBroadcastExchangeExec]) + } + } + } + test("ReusedExchangeExec should work on CometBroadcastExchangeExec") { assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark 3.4+") withSQLConf(