Skip to content

Commit

Permalink
feat: Improve CometSortMergeJoin statistics (#304)
Browse files Browse the repository at this point in the history
* Improve CometSortMegeJoin statistics

* Add tests

---------

Co-authored-by: Pablo Langa <[email protected]>
  • Loading branch information
planga82 and pablolanga-stratio authored Apr 23, 2024
1 parent de42975 commit 646f9a0
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 1 deletion.
10 changes: 10 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,16 @@ case class CometSortMergeJoinExec(

override def hashCode(): Int =
Objects.hashCode(leftKeys, rightKeys, condition, left, right)

override lazy val metrics: Map[String, SQLMetric] =
Map(
"input_batches" -> SQLMetrics.createMetric(sparkContext, "Number of batches consumed"),
"input_rows" -> SQLMetrics.createMetric(sparkContext, "Number of rows consumed"),
"output_batches" -> SQLMetrics.createMetric(sparkContext, "Number of batches produced"),
"output_rows" -> SQLMetrics.createMetric(sparkContext, "Number of rows produced"),
"peak_mem_used" ->
SQLMetrics.createSizeMetric(sparkContext, "Peak memory used for buffered data"),
"join_time" -> SQLMetrics.createNanoTimingMetric(sparkContext, "Total time for joining"))
}

case class CometScanWrapper(override val nativeOp: Operator, override val originalPlan: SparkPlan)
Expand Down
35 changes: 34 additions & 1 deletion spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStatistics, CatalogTable}
import org.apache.spark.sql.catalyst.expressions.Hex
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateMode
import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometCollectLimitExec, CometFilterExec, CometHashAggregateExec, CometProjectExec, CometRowToColumnarExec, CometScanExec, CometTakeOrderedAndProjectExec}
import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometCollectLimitExec, CometFilterExec, CometHashAggregateExec, CometProjectExec, CometRowToColumnarExec, CometScanExec, CometSortMergeJoinExec, CometTakeOrderedAndProjectExec}
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometShuffleExchangeExec}
import org.apache.spark.sql.execution.{CollectLimitExec, ProjectExec, SQLExecution, UnionExec}
import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
Expand Down Expand Up @@ -275,6 +275,39 @@ class CometExecSuite extends CometTestBase {
}
}

test("Comet native metrics: SortMergeJoin") {
withSQLConf(
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true",
"spark.sql.autoBroadcastJoinThreshold" -> "0",
"spark.sql.join.preferSortMergeJoin" -> "true") {
withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl1") {
withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl2") {
val df = sql("SELECT * FROM tbl1 INNER JOIN tbl2 ON tbl1._1 = tbl2._1")
df.collect()

val metrics = find(df.queryExecution.executedPlan) {
case _: CometSortMergeJoinExec => true
case _ => false
}.map(_.metrics).get

assert(metrics.contains("input_batches"))
assert(metrics("input_batches").value == 2L)
assert(metrics.contains("input_rows"))
assert(metrics("input_rows").value == 10L)
assert(metrics.contains("output_batches"))
assert(metrics("output_batches").value == 1L)
assert(metrics.contains("output_rows"))
assert(metrics("output_rows").value == 5L)
assert(metrics.contains("peak_mem_used"))
assert(metrics("peak_mem_used").value > 1L)
assert(metrics.contains("join_time"))
assert(metrics("join_time").value > 1L)
}
}
}
}

test(
"fix: ReusedExchangeExec + CometShuffleExchangeExec under QueryStageExec " +
"should be CometRoot") {
Expand Down

0 comments on commit 646f9a0

Please sign in to comment.