Skip to content

Commit

Permalink
fix: Disable Comet shuffle with AQE coalesce partitions enabled (#380)
Browse files Browse the repository at this point in the history
* fix: Disable Comet shuffle with AQE coalesce partitions enabled

* Update plan stability

* Fix

* Fix

* Remove debug info

* Refine test
  • Loading branch information
viirya authored May 6, 2024
1 parent 76330e9 commit 19379a3
Show file tree
Hide file tree
Showing 59 changed files with 5,176 additions and 4,725 deletions.
11 changes: 11 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,17 @@ object CometConf {
.booleanConf
.createWithDefault(false)

val COMET_SHUFFLE_ENFORCE_MODE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.shuffle.enforceMode.enabled")
.doc(
"Comet shuffle doesn't support Spark AQE coalesce partitions. If AQE coalesce " +
"partitions is enabled, Comet shuffle won't be triggered even enabled. This config " +
"is used to enforce Comet to trigger shuffle even if AQE coalesce partitions is " +
"enabled. This is for testing purpose only.")
.internal()
.booleanConf
.createWithDefault(false)

val COMET_EXEC_BROADCAST_FORCE_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.broadcast.enabled")
.doc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,10 @@ object CometSparkSessionExtensions extends Logging {
private[comet] def isCometShuffleEnabled(conf: SQLConf): Boolean =
COMET_EXEC_SHUFFLE_ENABLED.get(conf) &&
(conf.contains("spark.shuffle.manager") && conf.getConfString("spark.shuffle.manager") ==
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") &&
// TODO: AQE coalesce partitions feature causes Comet shuffle memory leak.
// We should disable Comet shuffle when AQE coalesce partitions is enabled.
(!conf.coalesceShufflePartitionsEnabled || COMET_SHUFFLE_ENFORCE_MODE_ENABLED.get())

private[comet] def isCometScanEnabled(conf: SQLConf): Boolean = {
COMET_SCAN_ENABLED.get(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,20 @@
: : :- * SortMergeJoin LeftAnti (19)
: : : :- * Project (13)
: : : : +- * SortMergeJoin LeftSemi (12)
: : : : :- * ColumnarToRow (6)
: : : : : +- CometSort (5)
: : : : : +- CometExchange (4)
: : : : :- * Sort (6)
: : : : : +- Exchange (5)
: : : : : +- * ColumnarToRow (4)
: : : : : +- CometProject (3)
: : : : : +- CometFilter (2)
: : : : : +- CometScan parquet spark_catalog.default.catalog_sales (1)
: : : : +- * ColumnarToRow (11)
: : : : +- CometSort (10)
: : : : +- CometExchange (9)
: : : : +- * Sort (11)
: : : : +- Exchange (10)
: : : : +- * ColumnarToRow (9)
: : : : +- CometProject (8)
: : : : +- CometScan parquet spark_catalog.default.catalog_sales (7)
: : : +- * ColumnarToRow (18)
: : : +- CometSort (17)
: : : +- CometExchange (16)
: : : +- * Sort (18)
: : : +- Exchange (17)
: : : +- * ColumnarToRow (16)
: : : +- CometProject (15)
: : : +- CometScan parquet spark_catalog.default.catalog_returns (14)
: : +- BroadcastExchange (24)
Expand Down Expand Up @@ -61,16 +61,16 @@ Condition : ((isnotnull(cs_ship_date_sk#1) AND isnotnull(cs_ship_addr_sk#2)) AND
Input [8]: [cs_ship_date_sk#1, cs_ship_addr_sk#2, cs_call_center_sk#3, cs_warehouse_sk#4, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7, cs_sold_date_sk#8]
Arguments: [cs_ship_date_sk#1, cs_ship_addr_sk#2, cs_call_center_sk#3, cs_warehouse_sk#4, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7], [cs_ship_date_sk#1, cs_ship_addr_sk#2, cs_call_center_sk#3, cs_warehouse_sk#4, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7]

(4) CometExchange
(4) ColumnarToRow [codegen id : 1]
Input [7]: [cs_ship_date_sk#1, cs_ship_addr_sk#2, cs_call_center_sk#3, cs_warehouse_sk#4, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7]
Arguments: hashpartitioning(cs_order_number#5, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=1]

(5) CometSort
(5) Exchange
Input [7]: [cs_ship_date_sk#1, cs_ship_addr_sk#2, cs_call_center_sk#3, cs_warehouse_sk#4, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7]
Arguments: [cs_ship_date_sk#1, cs_ship_addr_sk#2, cs_call_center_sk#3, cs_warehouse_sk#4, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7], [cs_order_number#5 ASC NULLS FIRST]
Arguments: hashpartitioning(cs_order_number#5, 5), ENSURE_REQUIREMENTS, [plan_id=1]

(6) ColumnarToRow [codegen id : 1]
(6) Sort [codegen id : 2]
Input [7]: [cs_ship_date_sk#1, cs_ship_addr_sk#2, cs_call_center_sk#3, cs_warehouse_sk#4, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7]
Arguments: [cs_order_number#5 ASC NULLS FIRST], false, 0

(unknown) Scan parquet spark_catalog.default.catalog_sales
Output [3]: [cs_warehouse_sk#9, cs_order_number#10, cs_sold_date_sk#11]
Expand All @@ -82,24 +82,24 @@ ReadSchema: struct<cs_warehouse_sk:int,cs_order_number:int>
Input [3]: [cs_warehouse_sk#9, cs_order_number#10, cs_sold_date_sk#11]
Arguments: [cs_warehouse_sk#9, cs_order_number#10], [cs_warehouse_sk#9, cs_order_number#10]

(9) CometExchange
(9) ColumnarToRow [codegen id : 3]
Input [2]: [cs_warehouse_sk#9, cs_order_number#10]
Arguments: hashpartitioning(cs_order_number#10, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=2]

(10) CometSort
(10) Exchange
Input [2]: [cs_warehouse_sk#9, cs_order_number#10]
Arguments: [cs_warehouse_sk#9, cs_order_number#10], [cs_order_number#10 ASC NULLS FIRST]
Arguments: hashpartitioning(cs_order_number#10, 5), ENSURE_REQUIREMENTS, [plan_id=2]

(11) ColumnarToRow [codegen id : 2]
(11) Sort [codegen id : 4]
Input [2]: [cs_warehouse_sk#9, cs_order_number#10]
Arguments: [cs_order_number#10 ASC NULLS FIRST], false, 0

(12) SortMergeJoin [codegen id : 3]
(12) SortMergeJoin [codegen id : 5]
Left keys [1]: [cs_order_number#5]
Right keys [1]: [cs_order_number#10]
Join type: LeftSemi
Join condition: NOT (cs_warehouse_sk#4 = cs_warehouse_sk#9)

(13) Project [codegen id : 3]
(13) Project [codegen id : 5]
Output [6]: [cs_ship_date_sk#1, cs_ship_addr_sk#2, cs_call_center_sk#3, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7]
Input [7]: [cs_ship_date_sk#1, cs_ship_addr_sk#2, cs_call_center_sk#3, cs_warehouse_sk#4, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7]

Expand All @@ -113,18 +113,18 @@ ReadSchema: struct<cr_order_number:int>
Input [2]: [cr_order_number#12, cr_returned_date_sk#13]
Arguments: [cr_order_number#12], [cr_order_number#12]

(16) CometExchange
(16) ColumnarToRow [codegen id : 6]
Input [1]: [cr_order_number#12]
Arguments: hashpartitioning(cr_order_number#12, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=3]

(17) CometSort
(17) Exchange
Input [1]: [cr_order_number#12]
Arguments: [cr_order_number#12], [cr_order_number#12 ASC NULLS FIRST]
Arguments: hashpartitioning(cr_order_number#12, 5), ENSURE_REQUIREMENTS, [plan_id=3]

(18) ColumnarToRow [codegen id : 4]
(18) Sort [codegen id : 7]
Input [1]: [cr_order_number#12]
Arguments: [cr_order_number#12 ASC NULLS FIRST], false, 0

(19) SortMergeJoin [codegen id : 8]
(19) SortMergeJoin [codegen id : 11]
Left keys [1]: [cs_order_number#5]
Right keys [1]: [cr_order_number#12]
Join type: LeftAnti
Expand All @@ -145,20 +145,20 @@ Condition : (((isnotnull(d_date#15) AND (d_date#15 >= 2002-02-01)) AND (d_date#1
Input [2]: [d_date_sk#14, d_date#15]
Arguments: [d_date_sk#14], [d_date_sk#14]

(23) ColumnarToRow [codegen id : 5]
(23) ColumnarToRow [codegen id : 8]
Input [1]: [d_date_sk#14]

(24) BroadcastExchange
Input [1]: [d_date_sk#14]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=4]

(25) BroadcastHashJoin [codegen id : 8]
(25) BroadcastHashJoin [codegen id : 11]
Left keys [1]: [cs_ship_date_sk#1]
Right keys [1]: [d_date_sk#14]
Join type: Inner
Join condition: None

(26) Project [codegen id : 8]
(26) Project [codegen id : 11]
Output [5]: [cs_ship_addr_sk#2, cs_call_center_sk#3, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7]
Input [7]: [cs_ship_date_sk#1, cs_ship_addr_sk#2, cs_call_center_sk#3, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7, d_date_sk#14]

Expand All @@ -177,20 +177,20 @@ Condition : ((isnotnull(ca_state#17) AND (ca_state#17 = GA)) AND isnotnull(ca_ad
Input [2]: [ca_address_sk#16, ca_state#17]
Arguments: [ca_address_sk#16], [ca_address_sk#16]

(30) ColumnarToRow [codegen id : 6]
(30) ColumnarToRow [codegen id : 9]
Input [1]: [ca_address_sk#16]

(31) BroadcastExchange
Input [1]: [ca_address_sk#16]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=5]

(32) BroadcastHashJoin [codegen id : 8]
(32) BroadcastHashJoin [codegen id : 11]
Left keys [1]: [cs_ship_addr_sk#2]
Right keys [1]: [ca_address_sk#16]
Join type: Inner
Join condition: None

(33) Project [codegen id : 8]
(33) Project [codegen id : 11]
Output [4]: [cs_call_center_sk#3, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7]
Input [6]: [cs_ship_addr_sk#2, cs_call_center_sk#3, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7, ca_address_sk#16]

Expand All @@ -209,38 +209,38 @@ Condition : ((isnotnull(cc_county#19) AND (cc_county#19 = Williamson County)) AN
Input [2]: [cc_call_center_sk#18, cc_county#19]
Arguments: [cc_call_center_sk#18], [cc_call_center_sk#18]

(37) ColumnarToRow [codegen id : 7]
(37) ColumnarToRow [codegen id : 10]
Input [1]: [cc_call_center_sk#18]

(38) BroadcastExchange
Input [1]: [cc_call_center_sk#18]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=6]

(39) BroadcastHashJoin [codegen id : 8]
(39) BroadcastHashJoin [codegen id : 11]
Left keys [1]: [cs_call_center_sk#3]
Right keys [1]: [cc_call_center_sk#18]
Join type: Inner
Join condition: None

(40) Project [codegen id : 8]
(40) Project [codegen id : 11]
Output [3]: [cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7]
Input [5]: [cs_call_center_sk#3, cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7, cc_call_center_sk#18]

(41) HashAggregate [codegen id : 8]
(41) HashAggregate [codegen id : 11]
Input [3]: [cs_order_number#5, cs_ext_ship_cost#6, cs_net_profit#7]
Keys [1]: [cs_order_number#5]
Functions [2]: [partial_sum(UnscaledValue(cs_ext_ship_cost#6)), partial_sum(UnscaledValue(cs_net_profit#7))]
Aggregate Attributes [2]: [sum(UnscaledValue(cs_ext_ship_cost#6))#20, sum(UnscaledValue(cs_net_profit#7))#21]
Results [3]: [cs_order_number#5, sum#22, sum#23]

(42) HashAggregate [codegen id : 8]
(42) HashAggregate [codegen id : 11]
Input [3]: [cs_order_number#5, sum#22, sum#23]
Keys [1]: [cs_order_number#5]
Functions [2]: [merge_sum(UnscaledValue(cs_ext_ship_cost#6)), merge_sum(UnscaledValue(cs_net_profit#7))]
Aggregate Attributes [2]: [sum(UnscaledValue(cs_ext_ship_cost#6))#20, sum(UnscaledValue(cs_net_profit#7))#21]
Results [3]: [cs_order_number#5, sum#22, sum#23]

(43) HashAggregate [codegen id : 8]
(43) HashAggregate [codegen id : 11]
Input [3]: [cs_order_number#5, sum#22, sum#23]
Keys: []
Functions [3]: [merge_sum(UnscaledValue(cs_ext_ship_cost#6)), merge_sum(UnscaledValue(cs_net_profit#7)), partial_count(distinct cs_order_number#5)]
Expand All @@ -251,7 +251,7 @@ Results [3]: [sum#22, sum#23, count#25]
Input [3]: [sum#22, sum#23, count#25]
Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=7]

(45) HashAggregate [codegen id : 9]
(45) HashAggregate [codegen id : 12]
Input [3]: [sum#22, sum#23, count#25]
Keys: []
Functions [3]: [sum(UnscaledValue(cs_ext_ship_cost#6)), sum(UnscaledValue(cs_net_profit#7)), count(distinct cs_order_number#5)]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
WholeStageCodegen (9)
WholeStageCodegen (12)
HashAggregate [sum,sum,count] [sum(UnscaledValue(cs_ext_ship_cost)),sum(UnscaledValue(cs_net_profit)),count(cs_order_number),order count ,total shipping cost ,total net profit ,sum,sum,count]
InputAdapter
Exchange #1
WholeStageCodegen (8)
WholeStageCodegen (11)
HashAggregate [cs_order_number] [sum(UnscaledValue(cs_ext_ship_cost)),sum(UnscaledValue(cs_net_profit)),count(cs_order_number),sum,sum,count,sum,sum,count]
HashAggregate [cs_order_number] [sum(UnscaledValue(cs_ext_ship_cost)),sum(UnscaledValue(cs_net_profit)),sum,sum,sum,sum]
HashAggregate [cs_order_number,cs_ext_ship_cost,cs_net_profit] [sum(UnscaledValue(cs_ext_ship_cost)),sum(UnscaledValue(cs_net_profit)),sum,sum,sum,sum]
Expand All @@ -14,53 +14,59 @@ WholeStageCodegen (9)
BroadcastHashJoin [cs_ship_date_sk,d_date_sk]
SortMergeJoin [cs_order_number,cr_order_number]
InputAdapter
WholeStageCodegen (3)
WholeStageCodegen (5)
Project [cs_ship_date_sk,cs_ship_addr_sk,cs_call_center_sk,cs_order_number,cs_ext_ship_cost,cs_net_profit]
SortMergeJoin [cs_order_number,cs_order_number,cs_warehouse_sk,cs_warehouse_sk]
InputAdapter
WholeStageCodegen (1)
ColumnarToRow
WholeStageCodegen (2)
Sort [cs_order_number]
InputAdapter
CometSort [cs_order_number]
CometExchange [cs_order_number] #2
CometProject [cs_ship_date_sk,cs_ship_addr_sk,cs_call_center_sk,cs_warehouse_sk,cs_order_number,cs_ext_ship_cost,cs_net_profit]
CometFilter [cs_ship_date_sk,cs_ship_addr_sk,cs_call_center_sk]
CometScan parquet spark_catalog.default.catalog_sales [cs_ship_date_sk,cs_ship_addr_sk,cs_call_center_sk,cs_warehouse_sk,cs_order_number,cs_ext_ship_cost,cs_net_profit,cs_sold_date_sk]
Exchange [cs_order_number] #2
WholeStageCodegen (1)
ColumnarToRow
InputAdapter
CometProject [cs_ship_date_sk,cs_ship_addr_sk,cs_call_center_sk,cs_warehouse_sk,cs_order_number,cs_ext_ship_cost,cs_net_profit]
CometFilter [cs_ship_date_sk,cs_ship_addr_sk,cs_call_center_sk]
CometScan parquet spark_catalog.default.catalog_sales [cs_ship_date_sk,cs_ship_addr_sk,cs_call_center_sk,cs_warehouse_sk,cs_order_number,cs_ext_ship_cost,cs_net_profit,cs_sold_date_sk]
InputAdapter
WholeStageCodegen (2)
ColumnarToRow
WholeStageCodegen (4)
Sort [cs_order_number]
InputAdapter
CometSort [cs_order_number]
CometExchange [cs_order_number] #3
CometProject [cs_warehouse_sk,cs_order_number]
CometScan parquet spark_catalog.default.catalog_sales [cs_warehouse_sk,cs_order_number,cs_sold_date_sk]
Exchange [cs_order_number] #3
WholeStageCodegen (3)
ColumnarToRow
InputAdapter
CometProject [cs_warehouse_sk,cs_order_number]
CometScan parquet spark_catalog.default.catalog_sales [cs_warehouse_sk,cs_order_number,cs_sold_date_sk]
InputAdapter
WholeStageCodegen (4)
ColumnarToRow
WholeStageCodegen (7)
Sort [cr_order_number]
InputAdapter
CometSort [cr_order_number]
CometExchange [cr_order_number] #4
CometProject [cr_order_number]
CometScan parquet spark_catalog.default.catalog_returns [cr_order_number,cr_returned_date_sk]
Exchange [cr_order_number] #4
WholeStageCodegen (6)
ColumnarToRow
InputAdapter
CometProject [cr_order_number]
CometScan parquet spark_catalog.default.catalog_returns [cr_order_number,cr_returned_date_sk]
InputAdapter
BroadcastExchange #5
WholeStageCodegen (5)
WholeStageCodegen (8)
ColumnarToRow
InputAdapter
CometProject [d_date_sk]
CometFilter [d_date,d_date_sk]
CometScan parquet spark_catalog.default.date_dim [d_date_sk,d_date]
InputAdapter
BroadcastExchange #6
WholeStageCodegen (6)
WholeStageCodegen (9)
ColumnarToRow
InputAdapter
CometProject [ca_address_sk]
CometFilter [ca_state,ca_address_sk]
CometScan parquet spark_catalog.default.customer_address [ca_address_sk,ca_state]
InputAdapter
BroadcastExchange #7
WholeStageCodegen (7)
WholeStageCodegen (10)
ColumnarToRow
InputAdapter
CometProject [cc_call_center_sk]
Expand Down
Loading

0 comments on commit 19379a3

Please sign in to comment.