From b3977cd6e663b03d1dd63cb8d3f6c66f19ad0795 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 4 Jul 2024 09:18:40 -0600 Subject: [PATCH] chore: Rename shuffle write metric (#624) * Rename shuffle write metric * introduce constants for shuffle native write metric --- .../apache/spark/sql/comet/CometCollectLimitExec.scala | 5 +++-- .../sql/comet/CometTakeOrderedAndProjectExec.scala | 5 +++-- .../org/apache/spark/sql/comet/CometWindowExec.scala | 5 +++-- .../execution/shuffle/CometShuffleExchangeExec.scala | 10 +++++++--- 4 files changed, 16 insertions(+), 9 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala index dd4855126..d2c9158ee 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala @@ -23,6 +23,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.comet.execution.shuffle.{CometShuffledBatchRDD, CometShuffleExchangeExec} +import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.{METRIC_NATIVE_TIME_DESCRIPTION, METRIC_NATIVE_TIME_NAME} import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan, UnaryExecNode, UnsafeRowSerializer} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} import org.apache.spark.sql.vectorized.ColumnarBatch @@ -51,8 +52,8 @@ case class CometCollectLimitExec( SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val metrics: Map[String, SQLMetric] = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), - "shuffleReadElapsedCompute" -> - SQLMetrics.createNanoTimingMetric(sparkContext, "shuffle read elapsed compute at native"), + METRIC_NATIVE_TIME_NAME -> + SQLMetrics.createNanoTimingMetric(sparkContext, METRIC_NATIVE_TIME_DESCRIPTION), "numPartitions" -> SQLMetrics.createMetric( sparkContext, "number of partitions")) ++ readMetrics ++ writeMetrics diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala index 26ec401ed..6e9bfe424 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala @@ -24,6 +24,7 @@ import org.apache.spark.serializer.Serializer import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, SortOrder} import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.comet.execution.shuffle.{CometShuffledBatchRDD, CometShuffleExchangeExec} +import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.{METRIC_NATIVE_TIME_DESCRIPTION, METRIC_NATIVE_TIME_NAME} import org.apache.spark.sql.execution.{SparkPlan, TakeOrderedAndProjectExec, UnaryExecNode, UnsafeRowSerializer} import org.apache.spark.sql.execution.metric.{SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} import org.apache.spark.sql.execution.metric.SQLMetric @@ -55,8 +56,8 @@ case class CometTakeOrderedAndProjectExec( SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val metrics: Map[String, SQLMetric] = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), - "shuffleReadElapsedCompute" -> - SQLMetrics.createNanoTimingMetric(sparkContext, "shuffle read elapsed compute at native"), + METRIC_NATIVE_TIME_NAME -> + SQLMetrics.createNanoTimingMetric(sparkContext, METRIC_NATIVE_TIME_DESCRIPTION), "numPartitions" -> SQLMetrics.createMetric( sparkContext, "number of partitions")) ++ readMetrics ++ writeMetrics diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometWindowExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometWindowExec.scala index 9a1232f0c..9685e75e1 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometWindowExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometWindowExec.scala @@ -25,6 +25,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, NamedExpression, SortOrder, WindowExpression} import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.comet.CometWindowExec.getNativePlan +import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.{METRIC_NATIVE_TIME_DESCRIPTION, METRIC_NATIVE_TIME_NAME} import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} import org.apache.spark.sql.vectorized.ColumnarBatch @@ -59,8 +60,8 @@ case class CometWindowExec( SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val metrics: Map[String, SQLMetric] = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), - "shuffleReadElapsedCompute" -> - SQLMetrics.createNanoTimingMetric(sparkContext, "shuffle read elapsed compute at native"), + METRIC_NATIVE_TIME_NAME -> + SQLMetrics.createNanoTimingMetric(sparkContext, METRIC_NATIVE_TIME_DESCRIPTION), "numPartitions" -> SQLMetrics.createMetric( sparkContext, "number of partitions")) ++ readMetrics ++ writeMetrics diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index aabe3c350..cec64d7a8 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.comet.{CometExec, CometMetricNode, CometPlan} +import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.{METRIC_NATIVE_TIME_DESCRIPTION, METRIC_NATIVE_TIME_NAME} import org.apache.spark.sql.comet.shims.ShimCometShuffleWriteProcessor import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeLike, ShuffleOrigin} @@ -77,8 +78,8 @@ case class CometShuffleExchangeExec( SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val metrics: Map[String, SQLMetric] = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), - "shuffleReadElapsedCompute" -> - SQLMetrics.createNanoTimingMetric(sparkContext, "shuffle read elapsed compute at native"), + METRIC_NATIVE_TIME_NAME -> + SQLMetrics.createNanoTimingMetric(sparkContext, METRIC_NATIVE_TIME_DESCRIPTION), "numPartitions" -> SQLMetrics.createMetric( sparkContext, "number of partitions")) ++ readMetrics ++ writeMetrics @@ -220,6 +221,9 @@ case class CometShuffleExchangeExec( } object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec { + val METRIC_NATIVE_TIME_NAME = "shuffleNativeTotalTime" + val METRIC_NATIVE_TIME_DESCRIPTION = "shuffle native code time" + def prepareShuffleDependency( rdd: RDD[ColumnarBatch], outputAttributes: Seq[Attribute], @@ -479,7 +483,7 @@ class CometShuffleWriteProcessor( // Maps native metrics to SQL metrics val nativeSQLMetrics = Map( "output_rows" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN), - "elapsed_compute" -> metrics("shuffleReadElapsedCompute")) + "elapsed_compute" -> metrics("shuffleNativeTotalTime")) val nativeMetrics = CometMetricNode(nativeSQLMetrics) // Getting rid of the fake partitionId