Skip to content

Commit

Permalink
feat: Improve CometBroadcastHashJoin statistics (apache#339)
Browse files Browse the repository at this point in the history
* broadcast hash join metrics

(cherry picked from commit 97a647a0757250f9feaea6571b8cb0738c6ec340)

* broadcast hash join test

(cherry picked from commit df418aeaf9f0923d17a69edf5829c8f77a1934c1)

* format

* add assume
  • Loading branch information
planga82 authored and Steve Vaughan Jr committed Apr 30, 2024
1 parent 0413815 commit 4d9eb9c
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,28 @@ object CometMetricNode {
"total time (in ms) spent in this operator"))
}

/**
* SQL Metrics for DataFusion HashJoin
*/
def hashJoinMetrics(sc: SparkContext): Map[String, SQLMetric] = {
Map(
"build_time" ->
SQLMetrics.createNanoTimingMetric(sc, "Total time for collecting build-side of join"),
"build_input_batches" ->
SQLMetrics.createMetric(sc, "Number of batches consumed by build-side"),
"build_input_rows" ->
SQLMetrics.createMetric(sc, "Number of rows consumed by build-side"),
"build_mem_used" ->
SQLMetrics.createSizeMetric(sc, "Memory used by build-side"),
"input_batches" ->
SQLMetrics.createMetric(sc, "Number of batches consumed by probe-side"),
"input_rows" ->
SQLMetrics.createMetric(sc, "Number of rows consumed by probe-side"),
"output_batches" -> SQLMetrics.createMetric(sc, "Number of batches produced"),
"output_rows" -> SQLMetrics.createMetric(sc, "Number of rows produced"),
"join_time" -> SQLMetrics.createNanoTimingMetric(sc, "Total time for joining"))
}

/**
* Creates a [[CometMetricNode]] from a [[CometPlan]].
*/
Expand Down
22 changes: 4 additions & 18 deletions spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -697,24 +697,7 @@ case class CometHashJoinExec(
Objects.hashCode(leftKeys, rightKeys, condition, buildSide, left, right)

override lazy val metrics: Map[String, SQLMetric] =
Map(
"build_time" ->
SQLMetrics.createNanoTimingMetric(
sparkContext,
"Total time for collecting build-side of join"),
"build_input_batches" ->
SQLMetrics.createMetric(sparkContext, "Number of batches consumed by build-side"),
"build_input_rows" ->
SQLMetrics.createMetric(sparkContext, "Number of rows consumed by build-side"),
"build_mem_used" ->
SQLMetrics.createSizeMetric(sparkContext, "Memory used by build-side"),
"input_batches" ->
SQLMetrics.createMetric(sparkContext, "Number of batches consumed by probe-side"),
"input_rows" ->
SQLMetrics.createMetric(sparkContext, "Number of rows consumed by probe-side"),
"output_batches" -> SQLMetrics.createMetric(sparkContext, "Number of batches produced"),
"output_rows" -> SQLMetrics.createMetric(sparkContext, "Number of rows produced"),
"join_time" -> SQLMetrics.createNanoTimingMetric(sparkContext, "Total time for joining"))
CometMetricNode.hashJoinMetrics(sparkContext)
}

case class CometBroadcastHashJoinExec(
Expand Down Expand Up @@ -846,6 +829,9 @@ case class CometBroadcastHashJoinExec(

override def hashCode(): Int =
Objects.hashCode(leftKeys, rightKeys, condition, buildSide, left, right)

override lazy val metrics: Map[String, SQLMetric] =
CometMetricNode.hashJoinMetrics(sparkContext)
}

case class CometSortMergeJoinExec(
Expand Down
36 changes: 35 additions & 1 deletion spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStatistics, CatalogTable}
import org.apache.spark.sql.catalyst.expressions.Hex
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateMode
import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometCollectLimitExec, CometFilterExec, CometHashAggregateExec, CometHashJoinExec, CometProjectExec, CometRowToColumnarExec, CometScanExec, CometSortExec, CometSortMergeJoinExec, CometTakeOrderedAndProjectExec}
import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometBroadcastHashJoinExec, CometCollectLimitExec, CometFilterExec, CometHashAggregateExec, CometHashJoinExec, CometProjectExec, CometRowToColumnarExec, CometScanExec, CometSortExec, CometSortMergeJoinExec, CometTakeOrderedAndProjectExec}
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometShuffleExchangeExec}
import org.apache.spark.sql.execution.{CollectLimitExec, ProjectExec, SQLExecution, UnionExec}
import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
Expand Down Expand Up @@ -364,6 +364,40 @@ class CometExecSuite extends CometTestBase {
}
}

test("Comet native metrics: BroadcastHashJoin") {
assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark 3.4+")
withParquetTable((0 until 5).map(i => (i, i + 1)), "t1") {
withParquetTable((0 until 5).map(i => (i, i + 1)), "t2") {
val df = sql("SELECT /*+ BROADCAST(t1) */ * FROM t1 INNER JOIN t2 ON t1._1 = t2._1")
df.collect()

val metrics = find(df.queryExecution.executedPlan) {
case _: CometBroadcastHashJoinExec => true
case _ => false
}.map(_.metrics).get

assert(metrics.contains("build_time"))
assert(metrics("build_time").value > 1L)
assert(metrics.contains("build_input_batches"))
assert(metrics("build_input_batches").value == 25L)
assert(metrics.contains("build_mem_used"))
assert(metrics("build_mem_used").value > 1L)
assert(metrics.contains("build_input_rows"))
assert(metrics("build_input_rows").value == 25L)
assert(metrics.contains("input_batches"))
assert(metrics("input_batches").value == 5L)
assert(metrics.contains("input_rows"))
assert(metrics("input_rows").value == 5L)
assert(metrics.contains("output_batches"))
assert(metrics("output_batches").value == 5L)
assert(metrics.contains("output_rows"))
assert(metrics("output_rows").value == 5L)
assert(metrics.contains("join_time"))
assert(metrics("join_time").value > 1L)
}
}
}

test(
"fix: ReusedExchangeExec + CometShuffleExchangeExec under QueryStageExec " +
"should be CometRoot") {
Expand Down

0 comments on commit 4d9eb9c

Please sign in to comment.