Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support HashJoin operator #194

Merged
merged 12 commits into from
Mar 20, 2024
Merged

feat: Support HashJoin operator #194

merged 12 commits into from
Mar 20, 2024

Conversation

viirya
Copy link
Member

@viirya viirya commented Mar 12, 2024

Which issue does this PR close?

Closes #193.

Rationale for this change

What changes are included in this PR?

How are these changes tested?

Comment on lines 73 to 82
// TODO: Spark 3.4 returns SortMergeJoin for this query even with SHUFFLE_HASH hint.
// We need to investigate why this happens and fix it.
/*
val df2 =
sql("SELECT /*+ SHUFFLE_HASH(tbl_a) */ * FROM tbl_a LEFT JOIN tbl_b ON tbl_a._2 = tbl_b._1")
checkSparkAnswerAndOperator(df2)

val df3 =
sql("SELECT /*+ SHUFFLE_HASH(tbl_b) */ * FROM tbl_b LEFT JOIN tbl_a ON tbl_a._2 = tbl_b._1")
checkSparkAnswerAndOperator(df3)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can use this : spark.sql.join.forceApplyShuffledHashJoin for force SHJ always was added in spark to test this joins specifically
apache/spark#33182

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me try. I just wonder why it is not planned as HashJoin by Spark. For right join, it works as expected. Only left join failed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it doesn't work. That's something weird. I'd need to look at it further.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:( This is strange can for some reason SPARK_TESTING might not be set in env ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this is because left join with build left and right join with build right in hash join is supported in Spark 3.5 or above by apache/spark#41398. Since Comet currently uses 3.4, it isn't supported yet.

Comment on lines +1843 to +1846
// DataFusion HashJoin assumes build side is always left.
// TODO: support BuildRight

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we wanna support BuildRight in Datafusion side or we wanna do some thing smart here

like this : https://github.com/apache/spark/pull/29097/files
flipping the join sides for Innerlike joins ?

@viirya wdyt ?

Copy link
Member Author

@viirya viirya Mar 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is when Spark planner decides to use right side as build side for HashJoin. I don't think we will/should go to do flop join sides in Comet. We may need to update DataFusion HashJoin to support right side as build side.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ACK, this makes sense so it's like comet shouldn't make any decisions making changes on spark plan itself, and just focus on execution rather than planning :) !

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just reading along, won't BuildRight be just swapping build and probe? I think the left/right naming in DataFusion is just there because it used to be called like this, but in practice there is only build vs probe in the operator.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, in DataFusion, only left side could be the build side. But in Spark, the HashJoin operator has a build side parameter to indicate which side is build side. The operator will do right thing accordingly internally. So currently we cannot just create a DataFusion HashJoin operator with right side as build side.

It can be swapped between left and right side, only if we also swap outputs and also column binding in joining keys and joining filter. I'd like to relax the build side constraint in DataFusion instead of doing the swap in Comet.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, thanks for explaining

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created apache/datafusion#9603 to track it.

@viirya
Copy link
Member Author

viirya commented Mar 12, 2024

There are still two TPCDS query failures: q72 and q72-v2.7. I will investigate on this.

EDIT: Fixed.

@codecov-commenter
Copy link

codecov-commenter commented Mar 14, 2024

Codecov Report

Attention: Patch coverage is 61.11111% with 28 lines in your changes are missing coverage. Please review.

Project coverage is 33.32%. Comparing base (8aab44c) to head (75065cc).

Files Patch % Lines
.../scala/org/apache/comet/serde/QueryPlanSerde.scala 53.57% 5 Missing and 8 partials ⚠️
...n/scala/org/apache/spark/sql/comet/operators.scala 64.00% 1 Missing and 8 partials ⚠️
...org/apache/comet/CometSparkSessionExtensions.scala 68.42% 2 Missing and 4 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main     #194      +/-   ##
============================================
- Coverage     33.41%   33.32%   -0.10%     
  Complexity      768      768              
============================================
  Files           107      107              
  Lines         36329    37037     +708     
  Branches       7935     8106     +171     
============================================
+ Hits          12138    12341     +203     
- Misses        21643    22099     +456     
- Partials       2548     2597      +49     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@@ -87,3 +88,21 @@ message Expand {
repeated spark.spark_expression.Expr project_list = 1;
int32 num_expr_per_project = 3;
}

message HashJoin {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we name it as Join so that we can use it for both SMJ and SHJ / BHJ and I will further use it in BNLJ change I am working on ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are different join operators. I'm not sure how we use same Join to represent them?

Copy link

@singhpk234 singhpk234 Mar 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apologies, I wasn't clear in the comment above, I was thinking of something like this :

message Join {
  repeated spark.spark_expression.Expr left_join_keys = 1;
  repeated spark.spark_expression.Expr right_join_keys = 2;
  JoinType join_type = 3;
  // can serve as condition in SHJ and sort_options in SMJ
  repeated spark.spark_expression.Expr join_exprs = 4;
  JoinExec join_exec = 5;
}

message JoinExec {
 HashJoin = 0;
 SMJ = 1;
....
}

may be it's too much and having diff proto msg for each join should be right thing to do !

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, it looks more complicated to me. And one message per join operator looks simple to me.

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm thanks @viirya
I think we can let it move, Im thinking of abother thing:
Spark most likely can figure out optimal join order, does that mean if DF has a fixed order than the same joined query run in Spark and Comet can have drastical performance downside? If so we probably may want to document is somewhere?

@viirya
Copy link
Member Author

viirya commented Mar 16, 2024

lgtm thanks @viirya I think we can let it move, Im thinking of abother thing: Spark most likely can figure out optimal join order, does that mean if DF has a fixed order than the same joined query run in Spark and Comet can have drastical performance downside? If so we probably may want to document is somewhere?

Hmm, I don't understand the question. You mean if DataFusion has a different join order other than Spark, then Comet can have much better performance?

We don't rely on DataFusion query optimizer but use Spark optimization. Comet's join order should be same as Spark.

@viirya
Copy link
Member Author

viirya commented Mar 20, 2024

I will go to merge this today if no more comments. cc @sunchao

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM too, thanks @viirya !

@viirya viirya merged commit ce38812 into apache:main Mar 20, 2024
26 checks passed
@viirya
Copy link
Member Author

viirya commented Mar 20, 2024

Merged. Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support HashJoin operator
6 participants