From 4d9eb9ce83d235094ef9095555cc30ac7e64920f Mon Sep 17 00:00:00 2001 From: Pablo Langa Date: Tue, 30 Apr 2024 01:26:56 +0000 Subject: [PATCH] feat: Improve CometBroadcastHashJoin statistics (#339) * broadcast hash join metrics (cherry picked from commit 97a647a0757250f9feaea6571b8cb0738c6ec340) * broadcast hash join test (cherry picked from commit df418aeaf9f0923d17a69edf5829c8f77a1934c1) * format * add assume --- .../spark/sql/comet/CometMetricNode.scala | 22 ++++++++++++ .../apache/spark/sql/comet/operators.scala | 22 +++--------- .../apache/comet/exec/CometExecSuite.scala | 36 ++++++++++++++++++- 3 files changed, 61 insertions(+), 19 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala index 60b26ca00c..7288455f50 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala @@ -79,6 +79,28 @@ object CometMetricNode { "total time (in ms) spent in this operator")) } + /** + * SQL Metrics for DataFusion HashJoin + */ + def hashJoinMetrics(sc: SparkContext): Map[String, SQLMetric] = { + Map( + "build_time" -> + SQLMetrics.createNanoTimingMetric(sc, "Total time for collecting build-side of join"), + "build_input_batches" -> + SQLMetrics.createMetric(sc, "Number of batches consumed by build-side"), + "build_input_rows" -> + SQLMetrics.createMetric(sc, "Number of rows consumed by build-side"), + "build_mem_used" -> + SQLMetrics.createSizeMetric(sc, "Memory used by build-side"), + "input_batches" -> + SQLMetrics.createMetric(sc, "Number of batches consumed by probe-side"), + "input_rows" -> + SQLMetrics.createMetric(sc, "Number of rows consumed by probe-side"), + "output_batches" -> SQLMetrics.createMetric(sc, "Number of batches produced"), + "output_rows" -> SQLMetrics.createMetric(sc, "Number of rows produced"), + "join_time" -> SQLMetrics.createNanoTimingMetric(sc, "Total time for joining")) + } + /** * Creates a [[CometMetricNode]] from a [[CometPlan]]. */ diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 39ffef140f..4e6d997324 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -697,24 +697,7 @@ case class CometHashJoinExec( Objects.hashCode(leftKeys, rightKeys, condition, buildSide, left, right) override lazy val metrics: Map[String, SQLMetric] = - Map( - "build_time" -> - SQLMetrics.createNanoTimingMetric( - sparkContext, - "Total time for collecting build-side of join"), - "build_input_batches" -> - SQLMetrics.createMetric(sparkContext, "Number of batches consumed by build-side"), - "build_input_rows" -> - SQLMetrics.createMetric(sparkContext, "Number of rows consumed by build-side"), - "build_mem_used" -> - SQLMetrics.createSizeMetric(sparkContext, "Memory used by build-side"), - "input_batches" -> - SQLMetrics.createMetric(sparkContext, "Number of batches consumed by probe-side"), - "input_rows" -> - SQLMetrics.createMetric(sparkContext, "Number of rows consumed by probe-side"), - "output_batches" -> SQLMetrics.createMetric(sparkContext, "Number of batches produced"), - "output_rows" -> SQLMetrics.createMetric(sparkContext, "Number of rows produced"), - "join_time" -> SQLMetrics.createNanoTimingMetric(sparkContext, "Total time for joining")) + CometMetricNode.hashJoinMetrics(sparkContext) } case class CometBroadcastHashJoinExec( @@ -846,6 +829,9 @@ case class CometBroadcastHashJoinExec( override def hashCode(): Int = Objects.hashCode(leftKeys, rightKeys, condition, buildSide, left, right) + + override lazy val metrics: Map[String, SQLMetric] = + CometMetricNode.hashJoinMetrics(sparkContext) } case class CometSortMergeJoinExec( diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 9d003476d4..03b5882fa8 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStatistics, CatalogTable} import org.apache.spark.sql.catalyst.expressions.Hex import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateMode -import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometCollectLimitExec, CometFilterExec, CometHashAggregateExec, CometHashJoinExec, CometProjectExec, CometRowToColumnarExec, CometScanExec, CometSortExec, CometSortMergeJoinExec, CometTakeOrderedAndProjectExec} +import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometBroadcastHashJoinExec, CometCollectLimitExec, CometFilterExec, CometHashAggregateExec, CometHashJoinExec, CometProjectExec, CometRowToColumnarExec, CometScanExec, CometSortExec, CometSortMergeJoinExec, CometTakeOrderedAndProjectExec} import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometShuffleExchangeExec} import org.apache.spark.sql.execution.{CollectLimitExec, ProjectExec, SQLExecution, UnionExec} import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec @@ -364,6 +364,40 @@ class CometExecSuite extends CometTestBase { } } + test("Comet native metrics: BroadcastHashJoin") { + assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark 3.4+") + withParquetTable((0 until 5).map(i => (i, i + 1)), "t1") { + withParquetTable((0 until 5).map(i => (i, i + 1)), "t2") { + val df = sql("SELECT /*+ BROADCAST(t1) */ * FROM t1 INNER JOIN t2 ON t1._1 = t2._1") + df.collect() + + val metrics = find(df.queryExecution.executedPlan) { + case _: CometBroadcastHashJoinExec => true + case _ => false + }.map(_.metrics).get + + assert(metrics.contains("build_time")) + assert(metrics("build_time").value > 1L) + assert(metrics.contains("build_input_batches")) + assert(metrics("build_input_batches").value == 25L) + assert(metrics.contains("build_mem_used")) + assert(metrics("build_mem_used").value > 1L) + assert(metrics.contains("build_input_rows")) + assert(metrics("build_input_rows").value == 25L) + assert(metrics.contains("input_batches")) + assert(metrics("input_batches").value == 5L) + assert(metrics.contains("input_rows")) + assert(metrics("input_rows").value == 5L) + assert(metrics.contains("output_batches")) + assert(metrics("output_batches").value == 5L) + assert(metrics.contains("output_rows")) + assert(metrics("output_rows").value == 5L) + assert(metrics.contains("join_time")) + assert(metrics("join_time").value > 1L) + } + } + } + test( "fix: ReusedExchangeExec + CometShuffleExchangeExec under QueryStageExec " + "should be CometRoot") {