From 73b4f9becad8921a1c65fc35333c308a8f42a49d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 16 Dec 2024 21:21:43 -0700 Subject: [PATCH] add input_batches metric --- .../core/src/execution/shuffle/shuffle_writer.rs | 15 ++++++++------- .../apache/spark/sql/comet/CometMetricNode.scala | 1 + .../shuffle/CometShuffleExchangeExec.scala | 4 ++-- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index ef56dd473..152e45a56 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -651,7 +651,9 @@ struct ShuffleRepartitionerMetrics { /// Time encoding batches to IPC format ipc_time: Time, - //other_time: Time, + /// Number of input batches + input_batches: Count, + /// count of spills during the execution of the operator spill_count: Count, @@ -664,14 +666,12 @@ struct ShuffleRepartitionerMetrics { impl ShuffleRepartitionerMetrics { fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { - let input_time = MetricBuilder::new(metrics).subset_time("input_time", partition); - let write_time = MetricBuilder::new(metrics).subset_time("write_time", partition); - let ipc_time = MetricBuilder::new(metrics).subset_time("ipc_time", partition); Self { baseline: BaselineMetrics::new(metrics, partition), - input_time, - write_time, - ipc_time, + input_time: MetricBuilder::new(metrics).subset_time("input_time", partition), + write_time: MetricBuilder::new(metrics).subset_time("write_time", partition), + ipc_time: MetricBuilder::new(metrics).subset_time("ipc_time", partition), + input_batches: MetricBuilder::new(metrics).counter("input_batches", partition), spill_count: MetricBuilder::new(metrics).spill_count(partition), spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition), data_size: MetricBuilder::new(metrics).counter("data_size", partition), @@ -739,6 +739,7 @@ impl ShuffleRepartitioner { self.partitioning_batch(batch).await?; start = end; } + self.metrics.input_batches.add(1); self.metrics .baseline .elapsed_compute() diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala index fcccfa227..4ac04993e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala @@ -138,6 +138,7 @@ object CometMetricNode { "elapsed_compute" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle time"), "ipc_time" -> SQLMetrics.createNanoTimingMetric(sc, "IPC encoding time"), "input_time" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle input time"), + "input_batches" -> SQLMetrics.createMetric(sc, "number of input batches"), "shuffleWallTime" -> SQLMetrics.createNanoTimingMetric( sc, "shuffle wall time (inclusive)")) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index 1aaafd00e..1b9dbeaed 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -482,12 +482,12 @@ class CometShuffleWriteProcessor( val nativePlan = getNativePlan(tempDataFilename, tempIndexFilename) // these metrics are only reported when detailed metrics are enabled via config - val detailedMetrics = Seq("elapsed_compute", "input_time", "ipc_time") + val detailedMetrics = Seq("elapsed_compute", "input_time", "ipc_time", "input_batches") // Maps native metrics to SQL metrics val nativeSQLMetrics = Map( - "data_size" -> metrics("dataSize"), "output_rows" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN), + "data_size" -> metrics("dataSize"), "write_time" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)) ++ metrics.filterKeys(detailedMetrics.contains) val nativeMetrics = CometMetricNode(nativeSQLMetrics)