Skip to content

Commit

Permalink
feat: Improve CometHashJoin statistics (apache#309)
Browse files Browse the repository at this point in the history
* HashMergeJoin metrics

* HashMergeJoin metrics test

* Fix test

* Fix format

* Fix descriptions

* Fix imports

* Update spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala

Co-authored-by: Liang-Chi Hsieh <[email protected]>

* delete conf

* Fix

---------

Co-authored-by: Liang-Chi Hsieh <[email protected]>
  • Loading branch information
planga82 and viirya authored Apr 24, 2024
1 parent ae914c8 commit 869da2d
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 1 deletion.
20 changes: 20 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,26 @@ case class CometHashJoinExec(

override def hashCode(): Int =
Objects.hashCode(leftKeys, rightKeys, condition, buildSide, left, right)

override lazy val metrics: Map[String, SQLMetric] =
Map(
"build_time" ->
SQLMetrics.createNanoTimingMetric(
sparkContext,
"Total time for collecting build-side of join"),
"build_input_batches" ->
SQLMetrics.createMetric(sparkContext, "Number of batches consumed by build-side"),
"build_input_rows" ->
SQLMetrics.createMetric(sparkContext, "Number of rows consumed by build-side"),
"build_mem_used" ->
SQLMetrics.createSizeMetric(sparkContext, "Memory used by build-side"),
"input_batches" ->
SQLMetrics.createMetric(sparkContext, "Number of batches consumed by probe-side"),
"input_rows" ->
SQLMetrics.createMetric(sparkContext, "Number of rows consumed by probe-side"),
"output_batches" -> SQLMetrics.createMetric(sparkContext, "Number of batches produced"),
"output_rows" -> SQLMetrics.createMetric(sparkContext, "Number of rows produced"),
"join_time" -> SQLMetrics.createNanoTimingMetric(sparkContext, "Total time for joining"))
}

case class CometBroadcastHashJoinExec(
Expand Down
35 changes: 34 additions & 1 deletion spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStatistics, CatalogTable}
import org.apache.spark.sql.catalyst.expressions.Hex
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateMode
import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometCollectLimitExec, CometFilterExec, CometHashAggregateExec, CometProjectExec, CometRowToColumnarExec, CometScanExec, CometSortExec, CometSortMergeJoinExec, CometTakeOrderedAndProjectExec}
import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometCollectLimitExec, CometFilterExec, CometHashAggregateExec, CometHashJoinExec, CometProjectExec, CometRowToColumnarExec, CometScanExec, CometSortExec, CometSortMergeJoinExec, CometTakeOrderedAndProjectExec}
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometShuffleExchangeExec}
import org.apache.spark.sql.execution.{CollectLimitExec, ProjectExec, SQLExecution, UnionExec}
import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
Expand Down Expand Up @@ -331,6 +331,39 @@ class CometExecSuite extends CometTestBase {
}
}

test("Comet native metrics: HashJoin") {
withParquetTable((0 until 5).map(i => (i, i + 1)), "t1") {
withParquetTable((0 until 5).map(i => (i, i + 1)), "t2") {
val df = sql("SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 INNER JOIN t2 ON t1._1 = t2._1")
df.collect()

val metrics = find(df.queryExecution.executedPlan) {
case _: CometHashJoinExec => true
case _ => false
}.map(_.metrics).get

assert(metrics.contains("build_time"))
assert(metrics("build_time").value > 1L)
assert(metrics.contains("build_input_batches"))
assert(metrics("build_input_batches").value == 5L)
assert(metrics.contains("build_mem_used"))
assert(metrics("build_mem_used").value > 1L)
assert(metrics.contains("build_input_rows"))
assert(metrics("build_input_rows").value == 5L)
assert(metrics.contains("input_batches"))
assert(metrics("input_batches").value == 5L)
assert(metrics.contains("input_rows"))
assert(metrics("input_rows").value == 5L)
assert(metrics.contains("output_batches"))
assert(metrics("output_batches").value == 5L)
assert(metrics.contains("output_rows"))
assert(metrics("output_rows").value == 5L)
assert(metrics.contains("join_time"))
assert(metrics("join_time").value > 1L)
}
}
}

test(
"fix: ReusedExchangeExec + CometShuffleExchangeExec under QueryStageExec " +
"should be CometRoot") {
Expand Down

0 comments on commit 869da2d

Please sign in to comment.