Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support sort merge join #178

Merged
merged 9 commits into from
Mar 18, 2024
Merged

feat: Support sort merge join #178

merged 9 commits into from
Mar 18, 2024

Conversation

viirya
Copy link
Member

@viirya viirya commented Mar 9, 2024

Which issue does this PR close?

Closes #177.

Rationale for this change

What changes are included in this PR?

How are these changes tested?

@viirya viirya force-pushed the sort_merge_join branch from 70fc8c7 to 0b7f600 Compare March 9, 2024 08:56
@codecov-commenter
Copy link

codecov-commenter commented Mar 9, 2024

Codecov Report

Attention: Patch coverage is 69.73684% with 23 lines in your changes are missing coverage. Please review.

Project coverage is 33.40%. Comparing base (81a641f) to head (637ff19).
Report is 4 commits behind head on main.

Files Patch % Lines
.../scala/org/apache/comet/serde/QueryPlanSerde.scala 66.66% 4 Missing and 4 partials ⚠️
...n/scala/org/apache/spark/sql/comet/operators.scala 66.66% 0 Missing and 8 partials ⚠️
...org/apache/comet/CometSparkSessionExtensions.scala 84.00% 0 Missing and 4 partials ⚠️
...java/org/apache/comet/vector/CometPlainVector.java 0.00% 3 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main     #178      +/-   ##
============================================
+ Coverage     33.32%   33.40%   +0.08%     
+ Complexity      769      768       -1     
============================================
  Files           107      107              
  Lines         35395    36331     +936     
  Branches       7669     7936     +267     
============================================
+ Hits          11795    12137     +342     
- Misses        21146    21645     +499     
- Partials       2454     2549      +95     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@viirya
Copy link
Member Author

viirya commented Mar 9, 2024

cc @sunchao @kazuyukitanimura

Comment on lines +574 to +599
val operatorDisabledFlag = s"$COMET_EXEC_CONFIG_PREFIX.$operator.disabled"
conf.getConfString(operatorFlag, "false").toBoolean || isCometAllOperatorEnabled(conf) &&
!conf.getConfString(operatorDisabledFlag, "false").toBoolean
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This "disable" flag is useful to disable a particular operator in unit test. For example, I disable sort merge join in one existing test below.

Comment on lines +1861 to +1941
// TODO: Support SortMergeJoin with join condition after new DataFusion release
if (join.condition.isDefined) {
return None
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've not added join filter support in this PR. I will do it in follow up.

@@ -859,6 +906,7 @@ class CometExecSuite extends CometTestBase {
.saveAsTable("bucketed_table2")

withSQLConf(
"spark.comet.exec.sort_merge_join.disabled" -> "true",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test explicitly checks for Spark sort merge join. I'd like to keep what it proposes to test so disable Comet sort merge join here.

@viirya
Copy link
Member Author

viirya commented Mar 12, 2024

I will update the diff for failed Spark tests.

core/src/execution/datafusion/planner.rs Outdated Show resolved Hide resolved
withParquetTable((0 until 10).map(i => (i, i % 5)), "tbl_a") {
withParquetTable((0 until 10).map(i => (i % 10, i + 2)), "tbl_b") {
val df1 = sql("SELECT * FROM tbl_a JOIN tbl_b ON tbl_a._2 = tbl_b._1")
checkSparkAnswerAndOperator(df1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add checks to make sure the plan includes the Comet SMJ, i.e. stripAQEPlan(df.queryExecution.executedPlan).collectFirst...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkSparkAnswerAndOperator already does the check. If there is Spark SMJ or other join operators, it will report error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. How does checkSparkAnswerAndOperator checks whether it is Comet SMJ?
Unless we provide includeClasses, it only checks classes below?
https://github.com/apache/arrow-datafusion-comet/blob/main/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala#L156

      case _: CometScanExec | _: CometBatchScanExec => true
      case _: CometSinkPlaceHolder | _: CometScanWrapper => false
      case _: CometExec | _: CometShuffleExchangeExec => true
      case _: CometBroadcastExchangeExec => true
      case _: WholeStageCodegenExec | _: ColumnarToRowExec | _: InputAdapter => true

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CometSortMergeJoinExec along with other native operators are CometExec so it is listed in checkCometOperators. We don't white list all native operators but the common base class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks. I think it is good for now. Ideal if we can check CometSortMergeJoinExec specifically, as we will add other joins. In that way, we can make sure that we are not testing different join accidentally.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that makes sense. I can add something to verify join type (sort merge join or hash join or others) in a follow up. Thanks for the suggestion.

Copy link
Contributor

@kazuyukitanimura kazuyukitanimura left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

withParquetTable((0 until 10).map(i => (i, i % 5)), "tbl_a") {
withParquetTable((0 until 10).map(i => (i % 10, i + 2)), "tbl_b") {
val df1 = sql("SELECT * FROM tbl_a JOIN tbl_b ON tbl_a._2 = tbl_b._1")
checkSparkAnswerAndOperator(df1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks. I think it is good for now. Ideal if we can check CometSortMergeJoinExec specifically, as we will add other joins. In that way, we can make sure that we are not testing different join accidentally.

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM (pending CI)

@viirya viirya merged commit 8aab44c into apache:main Mar 18, 2024
26 checks passed
@viirya
Copy link
Member Author

viirya commented Mar 18, 2024

Merged. Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support sort merge join
5 participants