diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index b602d7cf1..76551d05f 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -45,6 +45,9 @@ import org.apache.comet.shims.ShimCometConf */ object CometConf extends ShimCometConf { + private val METRICS_GUIDE = "For more information, refer to the Comet Metrics " + + "Guide (https://datafusion.apache.org/comet/user-guide/metrics.html)" + private val TUNING_GUIDE = "For more information, refer to the Comet Tuning " + "Guide (https://datafusion.apache.org/comet/user-guide/tuning.html)" @@ -414,6 +417,12 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) + val COMET_ENABLE_DETAILED_METRICS: ConfigEntry[Boolean] = + conf("spark.comet.metrics.detailed") + .doc(s"Enable this option to see additional SQL metrics. $METRICS_GUIDE.") + .booleanConf + .createWithDefault(false) + val COMET_EXPLAIN_FALLBACK_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.explainFallback.enabled") .doc( diff --git a/docs/source/index.rst b/docs/source/index.rst index 39ad27a57..21ec36ca9 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -51,6 +51,7 @@ as a native runtime to achieve improvement in terms of query efficiency and quer Configuration Settings Compatibility Guide Tuning Guide + Metrics Guide .. _toc.contributor-guide-links: .. toctree:: diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 69da79222..790c90019 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -64,6 +64,7 @@ Comet provides the following configuration settings. | spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false | | spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional non-heap memory per executor process for Comet. | 0.2 | | spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b | +| spark.comet.metrics.detailed | Enable this option to see additional SQL metrics. For more information, refer to the Comet Metrics Guide (https://datafusion.apache.org/comet/user-guide/metrics.html). | false | | spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false | | spark.comet.parquet.enable.directBuffer | Whether to use Java direct byte buffer when reading Parquet. | false | | spark.comet.parquet.read.io.adjust.readRange.skew | In the parallel reader, if the read ranges submitted are skewed in sizes, this option will cause the reader to break up larger read ranges into smaller ranges to reduce the skew. This will result in a slightly larger number of connections opened to the file system but may give improved performance. | false | diff --git a/docs/source/user-guide/metrics.md b/docs/source/user-guide/metrics.md new file mode 100644 index 000000000..dbe74c66c --- /dev/null +++ b/docs/source/user-guide/metrics.md @@ -0,0 +1,64 @@ + + +# Comet Metrics + +## Spark SQL Metrics + +Set `spark.comet.metrics.detailed=true` to see all available Comet metrics. + +### CometScanExec + +| Metric | Description | +| ----------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `scan time` | Total time to scan a Parquet file. This is not comparable to the same metric in Spark because Comet's scan metric is more accurate. Although both Comet and Spark measure the time in nanoseconds, Spark rounds this time to the nearest millisecond per batch and Comet does not. | + +### Exchange + +Comet adds some additional metrics: + +| Metric | Description | +| ------------------------------- | ----------------------------------------------------------------------------------------- | +| `ipc time` | Time to encode batches in IPC format. Includes compression time. | +| `native shuffle time` | Total time spent in native shuffle writer, excluding the execution time of the input plan | +| `native shuffle input time` | Time spend executing the shuffle input plan and fetching batches. | +| `shuffle wall time (inclusive)` | Total time executing the shuffle write, inclusive of executing the input plan. | + +## Native Metrics + +Setting `spark.comet.explain.native.enabled=true` will cause native plans to be logged in each executor. Metrics are +logged for each native plan (and there is one plan per task, so this is very verbose). + +Here is a guide to some of the native metrics. + +### ScanExec + +| Metric | Description | +| ----------------- | --------------------------------------------------------------------------------------------------- | +| `elapsed_compute` | Total time spent in this operator, fetching batches from a JVM iterator. | +| `jvm_fetch_time` | Time spent in the JVM fetching input batches to be read by this `ScanExec` instance. | +| `arrow_ffi_time` | Time spent using Arrow FFI to create Arrow batches from the memory addresses returned from the JVM. | + +### ShuffleWriterExec + +| Metric | Description | +| ----------------- | ----------------------------------------------------- | +| `elapsed_compute` | Total time excluding any child operators. | +| `input_time` | Time spent executing input plan and fetching batches. | +| `write_time` | Time spent writing bytes to disk. | diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md index af722494f..56303909b 100644 --- a/docs/source/user-guide/tuning.md +++ b/docs/source/user-guide/tuning.md @@ -102,28 +102,3 @@ native shuffle currently only supports `HashPartitioning` and `SinglePartitionin To enable native shuffle, set `spark.comet.exec.shuffle.mode` to `native`. If this mode is explicitly set, then any shuffle operations that cannot be supported in this mode will fall back to Spark. - -## Metrics - -### Spark SQL Metrics - -Some Comet metrics are not directly comparable to Spark metrics in some cases: - -- `CometScanExec` uses nanoseconds for total scan time. Spark also measures scan time in nanoseconds but converts to - milliseconds _per batch_ which can result in a large loss of precision, making it difficult to compare scan times - between Spark and Comet. - -### Native Metrics - -Setting `spark.comet.explain.native.enabled=true` will cause native plans to be logged in each executor. Metrics are -logged for each native plan (and there is one plan per task, so this is very verbose). - -Here is a guide to some of the native metrics. - -### ScanExec - -| Metric | Description | -| ----------------- | --------------------------------------------------------------------------------------------------- | -| `elapsed_compute` | Total time spent in this operator, fetching batches from a JVM iterator. | -| `jvm_fetch_time` | Time spent in the JVM fetching input batches to be read by this `ScanExec` instance. | -| `arrow_ffi_time` | Time spent using Arrow FFI to create Arrow batches from the memory addresses returned from the JVM. | diff --git a/native/core/benches/row_columnar.rs b/native/core/benches/row_columnar.rs index 60b41330e..28920440c 100644 --- a/native/core/benches/row_columnar.rs +++ b/native/core/benches/row_columnar.rs @@ -20,6 +20,7 @@ use comet::execution::shuffle::row::{ process_sorted_row_partition, SparkUnsafeObject, SparkUnsafeRow, }; use criterion::{criterion_group, criterion_main, Criterion}; +use datafusion::physical_plan::metrics::Time; use tempfile::Builder; const NUM_ROWS: usize = 10000; @@ -63,6 +64,8 @@ fn benchmark(c: &mut Criterion) { let row_size_ptr = row_sizes.as_mut_ptr(); let schema = vec![ArrowDataType::Int64; NUM_COLS]; + let ipc_time = Time::default(); + b.iter(|| { let tempfile = Builder::new().tempfile().unwrap(); @@ -77,6 +80,7 @@ fn benchmark(c: &mut Criterion) { false, 0, None, + &ipc_time, ) .unwrap(); }); diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 491b389c9..9f5adbeb9 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -17,8 +17,10 @@ //! Define JNI APIs which can be called from Java/Scala. +use super::{serde, utils::SparkArrowConvert, CometMemoryPool}; use arrow::datatypes::DataType as ArrowDataType; use arrow_array::RecordBatch; +use datafusion::physical_plan::metrics::Time; use datafusion::{ execution::{ disk_manager::DiskManagerConfig, @@ -40,8 +42,6 @@ use jni::{ use std::time::{Duration, Instant}; use std::{collections::HashMap, sync::Arc, task::Poll}; -use super::{serde, utils::SparkArrowConvert, CometMemoryPool}; - use crate::{ errors::{try_unwrap_or_throw, CometError, CometResult}, execution::{ @@ -455,6 +455,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative checksum_enabled: jboolean, checksum_algo: jint, current_checksum: jlong, + ipc_time: &Time, ) -> jlongArray { try_unwrap_or_throw(&e, |mut env| unsafe { let data_types = convert_datatype_arrays(&mut env, serialized_datatypes)?; @@ -493,6 +494,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative checksum_enabled, checksum_algo, current_checksum, + ipc_time, )?; let checksum = if let Some(checksum) = checksum { diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index a297f87c1..007cdc1d2 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -75,7 +75,7 @@ pub struct ScanExec { /// Metrics collector metrics: ExecutionPlanMetricsSet, /// Baseline metrics - baseline_metrics: BaselineMetrics, + pub(crate) baseline_metrics: BaselineMetrics, /// Time waiting for JVM input plan to execute and return batches jvm_fetch_time: Time, /// Time spent in FFI diff --git a/native/core/src/execution/shuffle/row.rs b/native/core/src/execution/shuffle/row.rs index ce752e68a..cfaca8ce4 100644 --- a/native/core/src/execution/shuffle/row.rs +++ b/native/core/src/execution/shuffle/row.rs @@ -40,6 +40,7 @@ use arrow_array::{ Array, ArrayRef, RecordBatch, RecordBatchOptions, }; use arrow_schema::{ArrowError, DataType, Field, Schema, TimeUnit}; +use datafusion::physical_plan::metrics::Time; use jni::sys::{jint, jlong}; use std::{ fs::OpenOptions, @@ -3295,6 +3296,7 @@ pub fn process_sorted_row_partition( // this is the initial checksum for this method, as it also gets updated iteratively // inside the loop within the method across batches. initial_checksum: Option, + ipc_time: &Time, ) -> Result<(i64, Option), CometError> { // TODO: We can tune this parameter automatically based on row size and cache size. let row_step = 10; @@ -3354,7 +3356,7 @@ pub fn process_sorted_row_partition( let mut frozen: Vec = vec![]; let mut cursor = Cursor::new(&mut frozen); cursor.seek(SeekFrom::End(0))?; - written += write_ipc_compressed(&batch, &mut cursor)?; + written += write_ipc_compressed(&batch, &mut cursor, ipc_time)?; if let Some(checksum) = &mut current_checksum { checksum.update(&mut cursor)?; diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index 7587ff06d..152e45a56 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -17,17 +17,6 @@ //! Defines the External shuffle repartition plan. -use std::{ - any::Any, - fmt, - fmt::{Debug, Formatter}, - fs::{File, OpenOptions}, - io::{BufReader, BufWriter, Cursor, Read, Seek, SeekFrom, Write}, - path::Path, - sync::Arc, - task::{Context, Poll}, -}; - use arrow::{datatypes::*, ipc::writer::StreamWriter}; use async_trait::async_trait; use bytes::Buf; @@ -59,7 +48,19 @@ use futures::executor::block_on; use futures::{lock::Mutex, Stream, StreamExt, TryFutureExt, TryStreamExt}; use itertools::Itertools; use simd_adler32::Adler32; +use std::time::Instant; +use std::{ + any::Any, + fmt, + fmt::{Debug, Formatter}, + fs::{File, OpenOptions}, + io::{BufReader, BufWriter, Cursor, Read, Seek, SeekFrom, Write}, + path::Path, + sync::Arc, + task::{Context, Poll}, +}; +use crate::execution::operators::ScanExec; use crate::{ common::bit::ceil, errors::{CometError, CometResult}, @@ -137,9 +138,21 @@ impl ExecutionPlan for ShuffleWriterExec { partition: usize, context: Arc, ) -> Result { - let input = self.input.execute(partition, Arc::clone(&context))?; let metrics = ShuffleRepartitionerMetrics::new(&self.metrics, 0); + if let Some(scan) = self.input.as_any().downcast_ref::() { + // ScanExec starts executing and fetching data during query planning time so we + // need to capture that time here to ensure that we have accurate metrics + metrics + .input_time + .add(scan.baseline_metrics.elapsed_compute()); + } + + // execute the child plan + let start_time = Instant::now(); + let input = self.input.execute(partition, Arc::clone(&context))?; + metrics.input_time.add_duration(start_time.elapsed()); + Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), futures::stream::once( @@ -271,7 +284,8 @@ impl PartitionBuffer { columns: &[ArrayRef], indices: &[usize], start_index: usize, - time_metric: &Time, + write_time: &Time, + ipc_time: &Time, ) -> AppendRowStatus { let mut mem_diff = 0; let mut start = start_index; @@ -293,8 +307,8 @@ impl PartitionBuffer { }); self.num_active_rows += end - start; if self.num_active_rows >= self.batch_size { - let mut timer = time_metric.timer(); - let flush = self.flush(); + let mut timer = write_time.timer(); + let flush = self.flush(ipc_time); if let Err(e) = flush { return AppendRowStatus::MemDiff(Err(e)); } @@ -313,7 +327,7 @@ impl PartitionBuffer { } /// flush active data into frozen bytes - fn flush(&mut self) -> Result { + fn flush(&mut self, ipc_time: &Time) -> Result { if self.num_active_rows == 0 { return Ok(0); } @@ -330,7 +344,7 @@ impl PartitionBuffer { let frozen_capacity_old = self.frozen.capacity(); let mut cursor = Cursor::new(&mut self.frozen); cursor.seek(SeekFrom::End(0))?; - write_ipc_compressed(&frozen_batch, &mut cursor)?; + write_ipc_compressed(&frozen_batch, &mut cursor, ipc_time)?; mem_diff += (self.frozen.capacity() - frozen_capacity_old) as isize; Ok(mem_diff) @@ -628,6 +642,18 @@ struct ShuffleRepartitionerMetrics { /// metrics baseline: BaselineMetrics, + /// Time executing child plan and fetching batches + input_time: Time, + + /// Time spent writing to disk. Maps to "shuffleWriteTime" in Spark SQL Metrics. + write_time: Time, + + /// Time encoding batches to IPC format + ipc_time: Time, + + /// Number of input batches + input_batches: Count, + /// count of spills during the execution of the operator spill_count: Count, @@ -642,6 +668,10 @@ impl ShuffleRepartitionerMetrics { fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { Self { baseline: BaselineMetrics::new(metrics, partition), + input_time: MetricBuilder::new(metrics).subset_time("input_time", partition), + write_time: MetricBuilder::new(metrics).subset_time("write_time", partition), + ipc_time: MetricBuilder::new(metrics).subset_time("ipc_time", partition), + input_batches: MetricBuilder::new(metrics).counter("input_batches", partition), spill_count: MetricBuilder::new(metrics).spill_count(partition), spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition), data_size: MetricBuilder::new(metrics).counter("data_size", partition), @@ -701,6 +731,7 @@ impl ShuffleRepartitioner { /// This function will slice input batch according to configured batch size and then /// shuffle rows into corresponding partition buffer. async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> { + let start_time = Instant::now(); let mut start = 0; while start < batch.num_rows() { let end = (start + self.batch_size).min(batch.num_rows()); @@ -708,6 +739,11 @@ impl ShuffleRepartitioner { self.partitioning_batch(batch).await?; start = end; } + self.metrics.input_batches.add(1); + self.metrics + .baseline + .elapsed_compute() + .add_duration(start_time.elapsed()); Ok(()) } @@ -848,12 +884,13 @@ impl ShuffleRepartitioner { /// Writes buffered shuffled record batches into Arrow IPC bytes. async fn shuffle_write(&mut self) -> Result { + let mut elapsed_compute = self.metrics.baseline.elapsed_compute().timer(); let num_output_partitions = self.num_output_partitions; let buffered_partitions = &mut self.buffered_partitions; let mut output_batches: Vec> = vec![vec![]; num_output_partitions]; for i in 0..num_output_partitions { - buffered_partitions[i].flush()?; + buffered_partitions[i].flush(&self.metrics.ipc_time)?; output_batches[i] = std::mem::take(&mut buffered_partitions[i].frozen); } @@ -872,7 +909,7 @@ impl ShuffleRepartitioner { .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {:?}", e)))?; for i in 0..num_output_partitions { - let mut timer = self.metrics.baseline.elapsed_compute().timer(); + let mut timer = self.metrics.write_time.timer(); offsets[i] = output_data.stream_position()?; output_data.write_all(&output_batches[i])?; @@ -885,7 +922,7 @@ impl ShuffleRepartitioner { for spill in &output_spills { let length = spill.offsets[i + 1] - spill.offsets[i]; if length > 0 { - let mut timer = self.metrics.baseline.elapsed_compute().timer(); + let mut timer = self.metrics.write_time.timer(); let mut spill_file = BufReader::new(File::open(spill.file.path()).map_err(|e| { @@ -900,7 +937,7 @@ impl ShuffleRepartitioner { } } } - let mut timer = self.metrics.baseline.elapsed_compute().timer(); + let mut timer = self.metrics.write_time.timer(); output_data.flush()?; timer.stop(); @@ -909,7 +946,7 @@ impl ShuffleRepartitioner { .stream_position() .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {:?}", e)))?; - let mut timer = self.metrics.baseline.elapsed_compute().timer(); + let mut timer = self.metrics.write_time.timer(); let mut output_index = BufWriter::new(File::create(index_file).map_err(|e| { @@ -927,6 +964,8 @@ impl ShuffleRepartitioner { let used = self.reservation.size(); self.reservation.shrink(used); + elapsed_compute.stop(); + // shuffle writer always has empty output Ok(Box::pin(EmptyStream::try_new(Arc::clone(&self.schema))?)) } @@ -959,7 +998,7 @@ impl ShuffleRepartitioner { return Ok(0); } - let mut timer = self.metrics.baseline.elapsed_compute().timer(); + let mut timer = self.metrics.write_time.timer(); let spillfile = self .runtime @@ -969,6 +1008,7 @@ impl ShuffleRepartitioner { &mut self.buffered_partitions, spillfile.path(), self.num_output_partitions, + &self.metrics.ipc_time, )?; timer.stop(); @@ -995,12 +1035,16 @@ impl ShuffleRepartitioner { let output = &mut self.buffered_partitions[partition_id]; - let time_metric = self.metrics.baseline.elapsed_compute(); - // If the range of indices is not big enough, just appending the rows into // active array builders instead of directly adding them as a record batch. let mut start_index: usize = 0; - let mut output_ret = output.append_rows(columns, indices, start_index, time_metric); + let mut output_ret = output.append_rows( + columns, + indices, + start_index, + &self.metrics.write_time, + &self.metrics.ipc_time, + ); loop { match output_ret { @@ -1017,10 +1061,14 @@ impl ShuffleRepartitioner { let output = &mut self.buffered_partitions[partition_id]; output.reservation.free(); - let time_metric = self.metrics.baseline.elapsed_compute(); - start_index = new_start; - output_ret = output.append_rows(columns, indices, start_index, time_metric); + output_ret = output.append_rows( + columns, + indices, + start_index, + &self.metrics.write_time, + &self.metrics.ipc_time, + ); if let AppendRowStatus::StartIndex(new_start) = output_ret { if new_start == start_index { @@ -1045,11 +1093,12 @@ fn spill_into( buffered_partitions: &mut [PartitionBuffer], path: &Path, num_output_partitions: usize, + ipc_time: &Time, ) -> Result> { let mut output_batches: Vec> = vec![vec![]; num_output_partitions]; for i in 0..num_output_partitions { - buffered_partitions[i].flush()?; + buffered_partitions[i].flush(ipc_time)?; output_batches[i] = std::mem::take(&mut buffered_partitions[i].frozen); } let path = path.to_owned(); @@ -1104,7 +1153,7 @@ async fn external_shuffle( context.session_config().batch_size(), ); - while let Some(batch) = input.next().await { + while let Some(batch) = fetch_next_batch(&mut input, &repartitioner.metrics.input_time).await { // Block on the repartitioner to insert the batch and shuffle the rows // into the corresponding partition buffer. // Otherwise, pull the next batch from the input stream might overwrite the @@ -1114,6 +1163,16 @@ async fn external_shuffle( repartitioner.shuffle_write().await } +async fn fetch_next_batch( + input: &mut SendableRecordBatchStream, + input_time: &Time, +) -> Option> { + let mut input_time = input_time.timer(); + let next_batch = input.next().await; + input_time.stop(); + next_batch +} + fn new_array_builders(schema: &SchemaRef, batch_size: usize) -> Vec> { schema .fields() @@ -1485,10 +1544,12 @@ impl Checksum { pub(crate) fn write_ipc_compressed( batch: &RecordBatch, output: &mut W, + time: &Time, ) -> Result { if batch.num_rows() == 0 { return Ok(0); } + let mut timer = time.timer(); let start_pos = output.stream_position()?; // write ipc_length placeholder @@ -1509,6 +1570,8 @@ pub(crate) fn write_ipc_compressed( output.seek(SeekFrom::Start(start_pos))?; output.write_all(&ipc_length.to_le_bytes()[..])?; + timer.stop(); + output.seek(SeekFrom::Start(end_pos))?; Ok((end_pos - start_pos) as usize) } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala index 8ea0b1765..f75af5076 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala @@ -57,7 +57,8 @@ case class CometCollectLimitExec( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), "numPartitions" -> SQLMetrics.createMetric( sparkContext, - "number of partitions")) ++ readMetrics ++ writeMetrics + "number of partitions")) ++ readMetrics ++ writeMetrics ++ CometMetricNode.shuffleMetrics( + sparkContext) private lazy val serializer: Serializer = new UnsafeRowSerializer(child.output.size, longMetric("dataSize")) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala index 47c89d943..4ac04993e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala @@ -24,6 +24,8 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.comet.CometConf + /** * A node carrying SQL metrics from SparkPlan, and metrics of its children. Native code will call * [[getChildNode]] and [[set]] to update the metrics. @@ -130,6 +132,21 @@ object CometMetricNode { "spilled_rows" -> SQLMetrics.createMetric(sc, "Total spilled rows")) } + def shuffleMetrics(sc: SparkContext): Map[String, SQLMetric] = { + if (CometConf.COMET_ENABLE_DETAILED_METRICS.get()) { + Map( + "elapsed_compute" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle time"), + "ipc_time" -> SQLMetrics.createNanoTimingMetric(sc, "IPC encoding time"), + "input_time" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle input time"), + "input_batches" -> SQLMetrics.createMetric(sc, "number of input batches"), + "shuffleWallTime" -> SQLMetrics.createNanoTimingMetric( + sc, + "shuffle wall time (inclusive)")) + } else { + Map("elapsed_compute" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle time")) + } + } + /** * Creates a [[CometMetricNode]] from a [[CometPlan]]. */ diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala index 5582f4d68..19586628a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala @@ -57,7 +57,8 @@ case class CometTakeOrderedAndProjectExec( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), "numPartitions" -> SQLMetrics.createMetric( sparkContext, - "number of partitions")) ++ readMetrics ++ writeMetrics + "number of partitions")) ++ readMetrics ++ writeMetrics ++ CometMetricNode.shuffleMetrics( + sparkContext) private lazy val serializer: Serializer = new UnsafeRowSerializer(child.output.size, longMetric("dataSize")) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index b1dd9ac83..1b9dbeaed 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -79,7 +79,8 @@ case class CometShuffleExchangeExec( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), "numPartitions" -> SQLMetrics.createMetric( sparkContext, - "number of partitions")) ++ readMetrics ++ writeMetrics + "number of partitions")) ++ readMetrics ++ writeMetrics ++ CometMetricNode.shuffleMetrics( + sparkContext) override def nodeName: String = if (shuffleType == CometNativeShuffle) { "CometExchange" @@ -464,6 +465,9 @@ class CometShuffleWriteProcessor( mapId: Long, mapIndex: Int, context: TaskContext): MapStatus = { + + val startTime = System.nanoTime() + val metricsReporter = createMetricsReporter(context) val shuffleBlockResolver = SparkEnv.get.shuffleManager.shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver] @@ -477,16 +481,21 @@ class CometShuffleWriteProcessor( // Call native shuffle write val nativePlan = getNativePlan(tempDataFilename, tempIndexFilename) + // these metrics are only reported when detailed metrics are enabled via config + val detailedMetrics = Seq("elapsed_compute", "input_time", "ipc_time", "input_batches") + // Maps native metrics to SQL metrics val nativeSQLMetrics = Map( "output_rows" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN), "data_size" -> metrics("dataSize"), - "elapsed_compute" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)) + "write_time" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)) ++ + metrics.filterKeys(detailedMetrics.contains) val nativeMetrics = CometMetricNode(nativeSQLMetrics) // Getting rid of the fake partitionId val newInputs = inputs.asInstanceOf[Iterator[_ <: Product2[Any, Any]]].map(_._2) + // create the native plan for the ShuffleWriterExec val cometIter = CometExec.getCometIterator( Seq(newInputs.asInstanceOf[Iterator[ColumnarBatch]]), outputAttributes.length, @@ -524,6 +533,10 @@ class CometShuffleWriteProcessor( partitionLengths, Array.empty, // TODO: add checksums tempDataFilePath.toFile) + + // update wall time metric if available + metrics.get("shuffleWallTime").foreach(_.add(System.nanoTime() - startTime)) + MapStatus.apply(SparkEnv.get.blockManager.shuffleServerId, partitionLengths, mapId) }