From d5f707d5e0f7e93578ddd373326d22fe434d7b75 Mon Sep 17 00:00:00 2001 From: Pablo Langa Date: Mon, 22 Apr 2024 13:28:35 -0400 Subject: [PATCH] Add tests --- .../apache/comet/exec/CometExecSuite.scala | 35 ++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index a8b05cc98..115b30295 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStatistics, CatalogTable} import org.apache.spark.sql.catalyst.expressions.Hex import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateMode -import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometCollectLimitExec, CometFilterExec, CometHashAggregateExec, CometProjectExec, CometRowToColumnarExec, CometScanExec, CometTakeOrderedAndProjectExec} +import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometCollectLimitExec, CometFilterExec, CometHashAggregateExec, CometProjectExec, CometRowToColumnarExec, CometScanExec, CometSortMergeJoinExec, CometTakeOrderedAndProjectExec} import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometShuffleExchangeExec} import org.apache.spark.sql.execution.{CollectLimitExec, ProjectExec, SQLExecution, UnionExec} import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec @@ -252,6 +252,39 @@ class CometExecSuite extends CometTestBase { } } + test("Comet native metrics: SortMergeJoin") { + withSQLConf( + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", + "spark.sql.autoBroadcastJoinThreshold" -> "0", + "spark.sql.join.preferSortMergeJoin" -> "true") { + withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl1") { + withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl2") { + val df = sql("SELECT * FROM tbl1 INNER JOIN tbl2 ON tbl1._1 = tbl2._1") + df.collect() + + val metrics = find(df.queryExecution.executedPlan) { + case _: CometSortMergeJoinExec => true + case _ => false + }.map(_.metrics).get + + assert(metrics.contains("input_batches")) + assert(metrics("input_batches").value == 2L) + assert(metrics.contains("input_rows")) + assert(metrics("input_rows").value == 10L) + assert(metrics.contains("output_batches")) + assert(metrics("output_batches").value == 1L) + assert(metrics.contains("output_rows")) + assert(metrics("output_rows").value == 5L) + assert(metrics.contains("peak_mem_used")) + assert(metrics("peak_mem_used").value > 1L) + assert(metrics.contains("join_time")) + assert(metrics("join_time").value > 1L) + } + } + } + } + test( "fix: ReusedExchangeExec + CometShuffleExchangeExec under QueryStageExec " + "should be CometRoot") {