Skip to content

Commit

Permalink
chore: Rename shuffle write metric (#624)
Browse files Browse the repository at this point in the history
* Rename shuffle write metric

* introduce constants for shuffle native write metric
  • Loading branch information
andygrove authored Jul 4, 2024
1 parent 9fff887 commit b3977cd
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.comet.execution.shuffle.{CometShuffledBatchRDD, CometShuffleExchangeExec}
import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.{METRIC_NATIVE_TIME_DESCRIPTION, METRIC_NATIVE_TIME_NAME}
import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan, UnaryExecNode, UnsafeRowSerializer}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down Expand Up @@ -51,8 +52,8 @@ case class CometCollectLimitExec(
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics: Map[String, SQLMetric] = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
"shuffleReadElapsedCompute" ->
SQLMetrics.createNanoTimingMetric(sparkContext, "shuffle read elapsed compute at native"),
METRIC_NATIVE_TIME_NAME ->
SQLMetrics.createNanoTimingMetric(sparkContext, METRIC_NATIVE_TIME_DESCRIPTION),
"numPartitions" -> SQLMetrics.createMetric(
sparkContext,
"number of partitions")) ++ readMetrics ++ writeMetrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.serializer.Serializer
import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.comet.execution.shuffle.{CometShuffledBatchRDD, CometShuffleExchangeExec}
import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.{METRIC_NATIVE_TIME_DESCRIPTION, METRIC_NATIVE_TIME_NAME}
import org.apache.spark.sql.execution.{SparkPlan, TakeOrderedAndProjectExec, UnaryExecNode, UnsafeRowSerializer}
import org.apache.spark.sql.execution.metric.{SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}
import org.apache.spark.sql.execution.metric.SQLMetric
Expand Down Expand Up @@ -55,8 +56,8 @@ case class CometTakeOrderedAndProjectExec(
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics: Map[String, SQLMetric] = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
"shuffleReadElapsedCompute" ->
SQLMetrics.createNanoTimingMetric(sparkContext, "shuffle read elapsed compute at native"),
METRIC_NATIVE_TIME_NAME ->
SQLMetrics.createNanoTimingMetric(sparkContext, METRIC_NATIVE_TIME_DESCRIPTION),
"numPartitions" -> SQLMetrics.createMetric(
sparkContext,
"number of partitions")) ++ readMetrics ++ writeMetrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, NamedExpression, SortOrder, WindowExpression}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.comet.CometWindowExec.getNativePlan
import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.{METRIC_NATIVE_TIME_DESCRIPTION, METRIC_NATIVE_TIME_NAME}
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down Expand Up @@ -59,8 +60,8 @@ case class CometWindowExec(
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics: Map[String, SQLMetric] = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
"shuffleReadElapsedCompute" ->
SQLMetrics.createNanoTimingMetric(sparkContext, "shuffle read elapsed compute at native"),
METRIC_NATIVE_TIME_NAME ->
SQLMetrics.createNanoTimingMetric(sparkContext, METRIC_NATIVE_TIME_DESCRIPTION),
"numPartitions" -> SQLMetrics.createMetric(
sparkContext,
"number of partitions")) ++ readMetrics ++ writeMetrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.comet.{CometExec, CometMetricNode, CometPlan}
import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec.{METRIC_NATIVE_TIME_DESCRIPTION, METRIC_NATIVE_TIME_NAME}
import org.apache.spark.sql.comet.shims.ShimCometShuffleWriteProcessor
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeLike, ShuffleOrigin}
Expand Down Expand Up @@ -77,8 +78,8 @@ case class CometShuffleExchangeExec(
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics: Map[String, SQLMetric] = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
"shuffleReadElapsedCompute" ->
SQLMetrics.createNanoTimingMetric(sparkContext, "shuffle read elapsed compute at native"),
METRIC_NATIVE_TIME_NAME ->
SQLMetrics.createNanoTimingMetric(sparkContext, METRIC_NATIVE_TIME_DESCRIPTION),
"numPartitions" -> SQLMetrics.createMetric(
sparkContext,
"number of partitions")) ++ readMetrics ++ writeMetrics
Expand Down Expand Up @@ -220,6 +221,9 @@ case class CometShuffleExchangeExec(
}

object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec {
val METRIC_NATIVE_TIME_NAME = "shuffleNativeTotalTime"
val METRIC_NATIVE_TIME_DESCRIPTION = "shuffle native code time"

def prepareShuffleDependency(
rdd: RDD[ColumnarBatch],
outputAttributes: Seq[Attribute],
Expand Down Expand Up @@ -479,7 +483,7 @@ class CometShuffleWriteProcessor(
// Maps native metrics to SQL metrics
val nativeSQLMetrics = Map(
"output_rows" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN),
"elapsed_compute" -> metrics("shuffleReadElapsedCompute"))
"elapsed_compute" -> metrics("shuffleNativeTotalTime"))
val nativeMetrics = CometMetricNode(nativeSQLMetrics)

// Getting rid of the fake partitionId
Expand Down

0 comments on commit b3977cd

Please sign in to comment.