Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into hash_join_build_right
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed May 19, 2024
2 parents f1d5aeb + b4c2dc2 commit 6dc9745
Show file tree
Hide file tree
Showing 109 changed files with 8,947 additions and 15,285 deletions.
20 changes: 19 additions & 1 deletion spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -363,14 +363,32 @@ abstract class CometNativeExec extends CometExec {
}

override protected def doCanonicalize(): SparkPlan = {
val canonicalizedPlan = super.doCanonicalize().asInstanceOf[CometNativeExec]
val canonicalizedPlan = super
.doCanonicalize()
.asInstanceOf[CometNativeExec]
.canonicalizePlans()

if (serializedPlanOpt.isDefined) {
// If the plan is a boundary node, we should remove the serialized plan.
canonicalizedPlan.cleanBlock()
} else {
canonicalizedPlan
}
}

/**
* Canonicalizes the plans of Product parameters in Comet native operators.
*/
protected def canonicalizePlans(): CometNativeExec = {
def transform(arg: Any): AnyRef = arg match {
case sparkPlan: SparkPlan => sparkPlan.canonicalized
case other: AnyRef => other
case null => null
}

val newArgs = mapProductIterator(transform)
makeCopy(newArgs).asInstanceOf[CometNativeExec]
}
}

abstract class CometUnaryExec extends CometNativeExec with UnaryExecNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ trait ShimCometScanExec {
readSchema: StructType,
options: ParquetOptions): FileScanRDD =
classOf[FileScanRDD].getDeclaredConstructors
// Prevent to pick up incorrect constructors from any custom Spark forks.
.filter(c => List(3, 5, 6).contains(c.getParameterCount()) )
.map { c =>
c.getParameterCount match {
case 3 => c.newInstance(sparkSession, readFunction, filePartitions)
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -43,33 +43,27 @@ TakeOrderedAndProject [cd_gender,cd_marital_status,cd_education_status,cd_purcha
CometBroadcastHashJoin [ws_sold_date_sk,d_date_sk]
CometScan parquet spark_catalog.default.web_sales [ws_bill_customer_sk,ws_sold_date_sk]
ReusedSubquery [d_date_sk] #1
CometBroadcastExchange #6
CometProject [d_date_sk]
CometFilter [d_year,d_moy,d_date_sk]
CometScan parquet spark_catalog.default.date_dim [d_date_sk,d_year,d_moy]
ReusedExchange [d_date_sk] #4
InputAdapter
BroadcastExchange #7
BroadcastExchange #6
WholeStageCodegen (2)
ColumnarToRow
InputAdapter
CometProject [cs_ship_customer_sk]
CometBroadcastHashJoin [cs_sold_date_sk,d_date_sk]
CometScan parquet spark_catalog.default.catalog_sales [cs_ship_customer_sk,cs_sold_date_sk]
ReusedSubquery [d_date_sk] #1
CometBroadcastExchange #8
CometProject [d_date_sk]
CometFilter [d_year,d_moy,d_date_sk]
CometScan parquet spark_catalog.default.date_dim [d_date_sk,d_year,d_moy]
ReusedExchange [d_date_sk] #4
InputAdapter
BroadcastExchange #9
BroadcastExchange #7
WholeStageCodegen (3)
ColumnarToRow
InputAdapter
CometProject [ca_address_sk]
CometFilter [ca_county,ca_address_sk]
CometScan parquet spark_catalog.default.customer_address [ca_address_sk,ca_county]
InputAdapter
BroadcastExchange #10
BroadcastExchange #8
WholeStageCodegen (4)
ColumnarToRow
InputAdapter
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,13 @@ TakeOrderedAndProject [customer_preferred_cust_flag]
CometFilter [ws_bill_customer_sk]
CometScan parquet spark_catalog.default.web_sales [ws_bill_customer_sk,ws_ext_discount_amt,ws_ext_list_price,ws_sold_date_sk]
ReusedSubquery [d_date_sk] #1
CometBroadcastExchange #13
CometFilter [d_year,d_date_sk]
CometScan parquet spark_catalog.default.date_dim [d_date_sk,d_year]
ReusedExchange [d_date_sk,d_year] #4
InputAdapter
BroadcastExchange #14
BroadcastExchange #13
WholeStageCodegen (7)
HashAggregate [c_customer_id,c_first_name,c_last_name,c_preferred_cust_flag,c_birth_country,c_login,c_email_address,d_year,sum] [sum(UnscaledValue((ws_ext_list_price - ws_ext_discount_amt))),customer_id,year_total,sum]
InputAdapter
Exchange [c_customer_id,c_first_name,c_last_name,c_preferred_cust_flag,c_birth_country,c_login,c_email_address,d_year] #15
Exchange [c_customer_id,c_first_name,c_last_name,c_preferred_cust_flag,c_birth_country,c_login,c_email_address,d_year] #14
WholeStageCodegen (6)
ColumnarToRow
InputAdapter
Expand All @@ -102,10 +100,8 @@ TakeOrderedAndProject [customer_preferred_cust_flag]
CometBroadcastHashJoin [c_customer_sk,ws_bill_customer_sk]
CometFilter [c_customer_sk,c_customer_id]
CometScan parquet spark_catalog.default.customer [c_customer_sk,c_customer_id,c_first_name,c_last_name,c_preferred_cust_flag,c_birth_country,c_login,c_email_address]
CometBroadcastExchange #16
CometBroadcastExchange #15
CometFilter [ws_bill_customer_sk]
CometScan parquet spark_catalog.default.web_sales [ws_bill_customer_sk,ws_ext_discount_amt,ws_ext_list_price,ws_sold_date_sk]
ReusedSubquery [d_date_sk] #2
CometBroadcastExchange #17
CometFilter [d_year,d_date_sk]
CometScan parquet spark_catalog.default.date_dim [d_date_sk,d_year]
ReusedExchange [d_date_sk,d_year] #9
Loading

0 comments on commit 6dc9745

Please sign in to comment.