From 247363f6b7c2e3b30d7d6337d4022a46d6977f56 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 3 Jul 2024 08:50:56 -0600 Subject: [PATCH 1/2] Rename shuffle write metric --- .../comet/execution/shuffle/CometShuffleExchangeExec.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index aabe3c350..704a626fa 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -77,8 +77,8 @@ case class CometShuffleExchangeExec( SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val metrics: Map[String, SQLMetric] = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), - "shuffleReadElapsedCompute" -> - SQLMetrics.createNanoTimingMetric(sparkContext, "shuffle read elapsed compute at native"), + "shuffleNativeTotalTime" -> + SQLMetrics.createNanoTimingMetric(sparkContext, "shuffle native code time"), "numPartitions" -> SQLMetrics.createMetric( sparkContext, "number of partitions")) ++ readMetrics ++ writeMetrics @@ -479,7 +479,7 @@ class CometShuffleWriteProcessor( // Maps native metrics to SQL metrics val nativeSQLMetrics = Map( "output_rows" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN), - "elapsed_compute" -> metrics("shuffleReadElapsedCompute")) + "elapsed_compute" -> metrics("shuffleNativeTotalTime")) val nativeMetrics = CometMetricNode(nativeSQLMetrics) // Getting rid of the fake partitionId From 8f53d3049d8895a6564e8b866c1ba83ce4e5d106 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 3 Jul 2024 16:53:46 -0600 Subject: [PATCH 2/2] introduce constants for shuffle native write metric --- .../apache/spark/sql/comet/CometCollectLimitExec.scala | 5 +++-- .../spark/sql/comet/CometTakeOrderedAndProjectExec.scala | 5 +++-- .../org/apache/spark/sql/comet/CometWindowExec.scala | 5 +++-- .../execution/shuffle/CometShuffleExchangeExec.scala | 8 ++++++-- 4 files changed, 15 insertions(+), 8 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala index dd4855126..d2c9158ee 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala @@ -23,6 +23,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.comet.execution.shuffle.{CometShuffledBatchRDD, CometShuffleExchangeExec} +import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.{METRIC_NATIVE_TIME_DESCRIPTION, METRIC_NATIVE_TIME_NAME} import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan, UnaryExecNode, UnsafeRowSerializer} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} import org.apache.spark.sql.vectorized.ColumnarBatch @@ -51,8 +52,8 @@ case class CometCollectLimitExec( SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val metrics: Map[String, SQLMetric] = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), - "shuffleReadElapsedCompute" -> - SQLMetrics.createNanoTimingMetric(sparkContext, "shuffle read elapsed compute at native"), + METRIC_NATIVE_TIME_NAME -> + SQLMetrics.createNanoTimingMetric(sparkContext, METRIC_NATIVE_TIME_DESCRIPTION), "numPartitions" -> SQLMetrics.createMetric( sparkContext, "number of partitions")) ++ readMetrics ++ writeMetrics diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala index 26ec401ed..6e9bfe424 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala @@ -24,6 +24,7 @@ import org.apache.spark.serializer.Serializer import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, SortOrder} import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.comet.execution.shuffle.{CometShuffledBatchRDD, CometShuffleExchangeExec} +import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.{METRIC_NATIVE_TIME_DESCRIPTION, METRIC_NATIVE_TIME_NAME} import org.apache.spark.sql.execution.{SparkPlan, TakeOrderedAndProjectExec, UnaryExecNode, UnsafeRowSerializer} import org.apache.spark.sql.execution.metric.{SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} import org.apache.spark.sql.execution.metric.SQLMetric @@ -55,8 +56,8 @@ case class CometTakeOrderedAndProjectExec( SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val metrics: Map[String, SQLMetric] = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), - "shuffleReadElapsedCompute" -> - SQLMetrics.createNanoTimingMetric(sparkContext, "shuffle read elapsed compute at native"), + METRIC_NATIVE_TIME_NAME -> + SQLMetrics.createNanoTimingMetric(sparkContext, METRIC_NATIVE_TIME_DESCRIPTION), "numPartitions" -> SQLMetrics.createMetric( sparkContext, "number of partitions")) ++ readMetrics ++ writeMetrics diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometWindowExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometWindowExec.scala index 9a1232f0c..9685e75e1 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometWindowExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometWindowExec.scala @@ -25,6 +25,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, NamedExpression, SortOrder, WindowExpression} import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.comet.CometWindowExec.getNativePlan +import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.{METRIC_NATIVE_TIME_DESCRIPTION, METRIC_NATIVE_TIME_NAME} import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} import org.apache.spark.sql.vectorized.ColumnarBatch @@ -59,8 +60,8 @@ case class CometWindowExec( SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val metrics: Map[String, SQLMetric] = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), - "shuffleReadElapsedCompute" -> - SQLMetrics.createNanoTimingMetric(sparkContext, "shuffle read elapsed compute at native"), + METRIC_NATIVE_TIME_NAME -> + SQLMetrics.createNanoTimingMetric(sparkContext, METRIC_NATIVE_TIME_DESCRIPTION), "numPartitions" -> SQLMetrics.createMetric( sparkContext, "number of partitions")) ++ readMetrics ++ writeMetrics diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index 704a626fa..cec64d7a8 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.comet.{CometExec, CometMetricNode, CometPlan} +import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.{METRIC_NATIVE_TIME_DESCRIPTION, METRIC_NATIVE_TIME_NAME} import org.apache.spark.sql.comet.shims.ShimCometShuffleWriteProcessor import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeLike, ShuffleOrigin} @@ -77,8 +78,8 @@ case class CometShuffleExchangeExec( SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val metrics: Map[String, SQLMetric] = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), - "shuffleNativeTotalTime" -> - SQLMetrics.createNanoTimingMetric(sparkContext, "shuffle native code time"), + METRIC_NATIVE_TIME_NAME -> + SQLMetrics.createNanoTimingMetric(sparkContext, METRIC_NATIVE_TIME_DESCRIPTION), "numPartitions" -> SQLMetrics.createMetric( sparkContext, "number of partitions")) ++ readMetrics ++ writeMetrics @@ -220,6 +221,9 @@ case class CometShuffleExchangeExec( } object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec { + val METRIC_NATIVE_TIME_NAME = "shuffleNativeTotalTime" + val METRIC_NATIVE_TIME_DESCRIPTION = "shuffle native code time" + def prepareShuffleDependency( rdd: RDD[ColumnarBatch], outputAttributes: Seq[Attribute],