From 1b6953f1f90c6364ec8e63bea32c7d13735ccbba Mon Sep 17 00:00:00 2001 From: Pablo Langa Date: Tue, 23 Apr 2024 12:40:40 -0400 Subject: [PATCH 1/9] HashMergeJoin metrics --- .../org/apache/spark/sql/comet/operators.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 1065367c2..2ec821517 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -655,6 +655,23 @@ case class CometHashJoinExec( override def hashCode(): Int = Objects.hashCode(leftKeys, rightKeys, condition, buildSide, left, right) + + override lazy val metrics: Map[String, SQLMetric] = + Map( + "build_time" -> + SQLMetrics.createNanoTimingMetric(sparkContext, + "Total time for collecting build-side of join"), + "build_input_batches" -> + SQLMetrics.createMetric(sparkContext, "Number of batches consumed by build-side"), + "build_input_rows" -> + SQLMetrics.createMetric(sparkContext, "Number of rows consumed by build-side"), + "build_mem_used" -> + SQLMetrics.createSizeMetric(sparkContext, "Number of rows consumed by build-side"), + "input_batches" -> SQLMetrics.createMetric(sparkContext, "Number of batches consumed"), + "input_rows" -> SQLMetrics.createMetric(sparkContext, "Number of rows consumed"), + "output_batches" -> SQLMetrics.createMetric(sparkContext, "Number of batches produced"), + "output_rows" -> SQLMetrics.createMetric(sparkContext, "Number of rows produced"), + "join_time" -> SQLMetrics.createNanoTimingMetric(sparkContext, "Total time for joining")) } case class CometBroadcastHashJoinExec( From feeed874c0fe06bbc81be63ca16da99ac080fedc Mon Sep 17 00:00:00 2001 From: Pablo Langa Date: Tue, 23 Apr 2024 12:40:48 -0400 Subject: [PATCH 2/9] HashMergeJoin metrics test --- .../apache/comet/exec/CometExecSuite.scala | 39 ++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index cc968a638..161f7caae 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStatistics, CatalogTable} import org.apache.spark.sql.catalyst.expressions.Hex import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateMode -import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometCollectLimitExec, CometFilterExec, CometHashAggregateExec, CometProjectExec, CometRowToColumnarExec, CometScanExec, CometSortMergeJoinExec, CometTakeOrderedAndProjectExec} +import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometCollectLimitExec, CometFilterExec, CometHashAggregateExec, CometHashJoinExec, CometProjectExec, CometRowToColumnarExec, CometScanExec, CometSortMergeJoinExec, CometTakeOrderedAndProjectExec} import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometShuffleExchangeExec} import org.apache.spark.sql.execution.{CollectLimitExec, ProjectExec, SQLExecution, UnionExec} import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec @@ -308,6 +308,43 @@ class CometExecSuite extends CometTestBase { } } + test("Comet native metrics: HashMergeJoin") { + withSQLConf( + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + withParquetTable((0 until 5).map(_.toString).map(i => (i, i + 1)), "t1") { + withParquetTable((0 until 5).map(_.toString).map(i => (i, i + 1)), "t2") { + val df = sql("SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 INNER JOIN t2 ON t1._1 = t2._1") + df.collect() + + val metrics = find(df.queryExecution.executedPlan) { + case _: CometHashJoinExec => true + case _ => false + }.map(_.metrics).get + + assert(metrics.contains("build_time")) + assert(metrics("build_time").value > 1L) + assert(metrics.contains("build_input_batches")) + assert(metrics("build_input_batches").value == 5L) + assert(metrics.contains("build_mem_used")) + assert(metrics("build_mem_used").value > 1L) + assert(metrics.contains("build_input_rows")) + assert(metrics("build_input_rows").value == 5L) + assert(metrics.contains("input_batches")) + assert(metrics("input_batches").value == 5L) + assert(metrics.contains("input_rows")) + assert(metrics("input_rows").value == 5L) + assert(metrics.contains("output_batches")) + assert(metrics("output_batches").value == 5L) + assert(metrics.contains("output_rows")) + assert(metrics("output_rows").value == 5L) + assert(metrics.contains("join_time")) + assert(metrics("join_time").value > 1L) + } + } + } + } + test( "fix: ReusedExchangeExec + CometShuffleExchangeExec under QueryStageExec " + "should be CometRoot") { From 0691cd86dacf4620d838ba86f3f60330e2f64eb9 Mon Sep 17 00:00:00 2001 From: Pablo Langa Date: Tue, 23 Apr 2024 12:54:09 -0400 Subject: [PATCH 3/9] Fix test --- .../src/test/scala/org/apache/comet/exec/CometExecSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 161f7caae..557588208 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -312,8 +312,8 @@ class CometExecSuite extends CometTestBase { withSQLConf( CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { - withParquetTable((0 until 5).map(_.toString).map(i => (i, i + 1)), "t1") { - withParquetTable((0 until 5).map(_.toString).map(i => (i, i + 1)), "t2") { + withParquetTable((0 until 5).map(i => (i, i + 1)), "t1") { + withParquetTable((0 until 5).map(i => (i, i + 1)), "t2") { val df = sql("SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 INNER JOIN t2 ON t1._1 = t2._1") df.collect() From c3d66b03c49a4dc9809d0f879db685a2a4785175 Mon Sep 17 00:00:00 2001 From: Pablo Langa Date: Tue, 23 Apr 2024 12:58:03 -0400 Subject: [PATCH 4/9] Fix format --- .../src/main/scala/org/apache/spark/sql/comet/operators.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 2ec821517..5c443888b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -659,7 +659,8 @@ case class CometHashJoinExec( override lazy val metrics: Map[String, SQLMetric] = Map( "build_time" -> - SQLMetrics.createNanoTimingMetric(sparkContext, + SQLMetrics.createNanoTimingMetric( + sparkContext, "Total time for collecting build-side of join"), "build_input_batches" -> SQLMetrics.createMetric(sparkContext, "Number of batches consumed by build-side"), From e7bbd7c504c33b0d7189376a2ef594cfe3c0b746 Mon Sep 17 00:00:00 2001 From: Pablo Langa Date: Tue, 23 Apr 2024 13:50:32 -0400 Subject: [PATCH 5/9] Fix descriptions --- .../main/scala/org/apache/spark/sql/comet/operators.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 5c443888b..03600417b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -667,9 +667,11 @@ case class CometHashJoinExec( "build_input_rows" -> SQLMetrics.createMetric(sparkContext, "Number of rows consumed by build-side"), "build_mem_used" -> - SQLMetrics.createSizeMetric(sparkContext, "Number of rows consumed by build-side"), - "input_batches" -> SQLMetrics.createMetric(sparkContext, "Number of batches consumed"), - "input_rows" -> SQLMetrics.createMetric(sparkContext, "Number of rows consumed"), + SQLMetrics.createSizeMetric(sparkContext, "Memory used by build-side"), + "input_batches" -> + SQLMetrics.createMetric(sparkContext, "Number of batches consumed by probe-side"), + "input_rows" -> + SQLMetrics.createMetric(sparkContext, "Number of rows consumed by probe-side"), "output_batches" -> SQLMetrics.createMetric(sparkContext, "Number of batches produced"), "output_rows" -> SQLMetrics.createMetric(sparkContext, "Number of rows produced"), "join_time" -> SQLMetrics.createNanoTimingMetric(sparkContext, "Total time for joining")) From 41619a57cc34c3cd6fab654e63f5c6bffcb5bb56 Mon Sep 17 00:00:00 2001 From: Pablo Langa Date: Tue, 23 Apr 2024 15:03:40 -0400 Subject: [PATCH 6/9] Fix imports --- .../src/test/scala/org/apache/comet/exec/CometExecSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 649896e15..49ed09bd5 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStatistics, CatalogTable} import org.apache.spark.sql.catalyst.expressions.Hex import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateMode -import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometCollectLimitExec, CometFilterExec, CometHashAggregateExec, CometProjectExec, CometRowToColumnarExec, CometScanExec, CometSortExec, CometSortMergeJoinExec, CometTakeOrderedAndProjectExec} +import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometCollectLimitExec, CometFilterExec, CometHashAggregateExec, CometHashJoinExec, CometProjectExec, CometRowToColumnarExec, CometScanExec, CometSortExec, CometSortMergeJoinExec, CometTakeOrderedAndProjectExec} import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometShuffleExchangeExec} import org.apache.spark.sql.execution.{CollectLimitExec, ProjectExec, SQLExecution, UnionExec} import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec @@ -1309,4 +1309,4 @@ case class BucketedTableTestSpec( numPartitions: Int = 10, expectedShuffle: Boolean = true, expectedSort: Boolean = true, - expectedNumOutputPartitions: Option[Int] = None) \ No newline at end of file + expectedNumOutputPartitions: Option[Int] = None) From 55c3b0fd0818fae3e13cad536fe3313d4d11d291 Mon Sep 17 00:00:00 2001 From: Pablo Langa Date: Tue, 23 Apr 2024 19:00:59 -0400 Subject: [PATCH 7/9] Update spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala Co-authored-by: Liang-Chi Hsieh --- spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 49ed09bd5..14d36180f 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -331,7 +331,7 @@ class CometExecSuite extends CometTestBase { } } - test("Comet native metrics: HashMergeJoin") { + test("Comet native metrics: HashJoin") { withSQLConf( CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { From a9d12de4e08204dbb55ce8b2a7207119b52a80c6 Mon Sep 17 00:00:00 2001 From: Pablo Langa Date: Wed, 24 Apr 2024 07:57:59 -0400 Subject: [PATCH 8/9] delete conf --- .../apache/comet/exec/CometExecSuite.scala | 58 +++++++++---------- 1 file changed, 27 insertions(+), 31 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 14d36180f..9b830ca2c 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -332,38 +332,34 @@ class CometExecSuite extends CometTestBase { } test("Comet native metrics: HashJoin") { - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { - withParquetTable((0 until 5).map(i => (i, i + 1)), "t1") { - withParquetTable((0 until 5).map(i => (i, i + 1)), "t2") { - val df = sql("SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 INNER JOIN t2 ON t1._1 = t2._1") - df.collect() + withParquetTable((0 until 5).map(i => (i, i + 1)), "t1") { + withParquetTable((0 until 5).map(i => (i, i + 1)), "t2") { + val df = sql("SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 INNER JOIN t2 ON t1._1 = t2._1") + df.collect() - val metrics = find(df.queryExecution.executedPlan) { - case _: CometHashJoinExec => true - case _ => false - }.map(_.metrics).get + val metrics = find(df.queryExecution.executedPlan) { + case _: CometHashJoinExec => true + case _ => false + }.map(_.metrics).get - assert(metrics.contains("build_time")) - assert(metrics("build_time").value > 1L) - assert(metrics.contains("build_input_batches")) - assert(metrics("build_input_batches").value == 5L) - assert(metrics.contains("build_mem_used")) - assert(metrics("build_mem_used").value > 1L) - assert(metrics.contains("build_input_rows")) - assert(metrics("build_input_rows").value == 5L) - assert(metrics.contains("input_batches")) - assert(metrics("input_batches").value == 5L) - assert(metrics.contains("input_rows")) - assert(metrics("input_rows").value == 5L) - assert(metrics.contains("output_batches")) - assert(metrics("output_batches").value == 5L) - assert(metrics.contains("output_rows")) - assert(metrics("output_rows").value == 5L) - assert(metrics.contains("join_time")) - assert(metrics("join_time").value > 1L) - } + assert(metrics.contains("build_time")) + assert(metrics("build_time").value > 1L) + assert(metrics.contains("build_input_batches")) + assert(metrics("build_input_batches").value == 5L) + assert(metrics.contains("build_mem_used")) + assert(metrics("build_mem_used").value > 1L) + assert(metrics.contains("build_input_rows")) + assert(metrics("build_input_rows").value == 5L) + assert(metrics.contains("input_batches")) + assert(metrics("input_batches").value == 5L) + assert(metrics.contains("input_rows")) + assert(metrics("input_rows").value == 5L) + assert(metrics.contains("output_batches")) + assert(metrics("output_batches").value == 5L) + assert(metrics.contains("output_rows")) + assert(metrics("output_rows").value == 5L) + assert(metrics.contains("join_time")) + assert(metrics("join_time").value > 1L) } } } @@ -1309,4 +1305,4 @@ case class BucketedTableTestSpec( numPartitions: Int = 10, expectedShuffle: Boolean = true, expectedSort: Boolean = true, - expectedNumOutputPartitions: Option[Int] = None) + expectedNumOutputPartitions: Option[Int] = None) \ No newline at end of file From d3ea50bcbb2f3f079de13ff24d66f47d72570d96 Mon Sep 17 00:00:00 2001 From: Pablo Langa Date: Wed, 24 Apr 2024 10:00:49 -0400 Subject: [PATCH 9/9] Fix --- spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 9b830ca2c..e5b3523dc 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -1305,4 +1305,4 @@ case class BucketedTableTestSpec( numPartitions: Int = 10, expectedShuffle: Boolean = true, expectedSort: Boolean = true, - expectedNumOutputPartitions: Option[Int] = None) \ No newline at end of file + expectedNumOutputPartitions: Option[Int] = None)