Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pablolanga-stratio committed Apr 22, 2024
1 parent 3f884b9 commit d5f707d
Showing 1 changed file with 34 additions and 1 deletion.
35 changes: 34 additions & 1 deletion spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStatistics, CatalogTable}
import org.apache.spark.sql.catalyst.expressions.Hex
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateMode
import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometCollectLimitExec, CometFilterExec, CometHashAggregateExec, CometProjectExec, CometRowToColumnarExec, CometScanExec, CometTakeOrderedAndProjectExec}
import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometCollectLimitExec, CometFilterExec, CometHashAggregateExec, CometProjectExec, CometRowToColumnarExec, CometScanExec, CometSortMergeJoinExec, CometTakeOrderedAndProjectExec}
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometShuffleExchangeExec}
import org.apache.spark.sql.execution.{CollectLimitExec, ProjectExec, SQLExecution, UnionExec}
import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
Expand Down Expand Up @@ -252,6 +252,39 @@ class CometExecSuite extends CometTestBase {
}
}

test("Comet native metrics: SortMergeJoin") {
withSQLConf(
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true",
"spark.sql.autoBroadcastJoinThreshold" -> "0",
"spark.sql.join.preferSortMergeJoin" -> "true") {
withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl1") {
withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl2") {
val df = sql("SELECT * FROM tbl1 INNER JOIN tbl2 ON tbl1._1 = tbl2._1")
df.collect()

val metrics = find(df.queryExecution.executedPlan) {
case _: CometSortMergeJoinExec => true
case _ => false
}.map(_.metrics).get

assert(metrics.contains("input_batches"))
assert(metrics("input_batches").value == 2L)
assert(metrics.contains("input_rows"))
assert(metrics("input_rows").value == 10L)
assert(metrics.contains("output_batches"))
assert(metrics("output_batches").value == 1L)
assert(metrics.contains("output_rows"))
assert(metrics("output_rows").value == 5L)
assert(metrics.contains("peak_mem_used"))
assert(metrics("peak_mem_used").value > 1L)
assert(metrics.contains("join_time"))
assert(metrics("join_time").value > 1L)
}
}
}
}

test(
"fix: ReusedExchangeExec + CometShuffleExchangeExec under QueryStageExec " +
"should be CometRoot") {
Expand Down

0 comments on commit d5f707d

Please sign in to comment.