From 7150c87598bcad430b4674c8fab5deff44cb0cd6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 17 Dec 2024 09:46:37 -0700 Subject: [PATCH 1/5] improve shuffle metrics --- native/core/src/execution/shuffle/row.rs | 6 +- .../src/execution/shuffle/shuffle_writer.rs | 125 ++++++++++-------- .../sql/comet/CometCollectLimitExec.scala | 3 +- .../spark/sql/comet/CometMetricNode.scala | 7 + .../CometTakeOrderedAndProjectExec.scala | 3 +- .../shuffle/CometShuffleExchangeExec.scala | 9 +- 6 files changed, 92 insertions(+), 61 deletions(-) diff --git a/native/core/src/execution/shuffle/row.rs b/native/core/src/execution/shuffle/row.rs index ce752e68a..ecab77d96 100644 --- a/native/core/src/execution/shuffle/row.rs +++ b/native/core/src/execution/shuffle/row.rs @@ -40,6 +40,7 @@ use arrow_array::{ Array, ArrayRef, RecordBatch, RecordBatchOptions, }; use arrow_schema::{ArrowError, DataType, Field, Schema, TimeUnit}; +use datafusion::physical_plan::metrics::Time; use jni::sys::{jint, jlong}; use std::{ fs::OpenOptions, @@ -3354,7 +3355,10 @@ pub fn process_sorted_row_partition( let mut frozen: Vec = vec![]; let mut cursor = Cursor::new(&mut frozen); cursor.seek(SeekFrom::End(0))?; - written += write_ipc_compressed(&batch, &mut cursor)?; + + // we do not collect metrics in Native_writeSortedFileNative + let ipc_time = Time::default(); + written += write_ipc_compressed(&batch, &mut cursor, &ipc_time)?; if let Some(checksum) = &mut current_checksum { checksum.update(&mut cursor)?; diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index 7587ff06d..69b0d1f6c 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -17,17 +17,10 @@ //! Defines the External shuffle repartition plan. -use std::{ - any::Any, - fmt, - fmt::{Debug, Formatter}, - fs::{File, OpenOptions}, - io::{BufReader, BufWriter, Cursor, Read, Seek, SeekFrom, Write}, - path::Path, - sync::Arc, - task::{Context, Poll}, +use crate::{ + common::bit::ceil, + errors::{CometError, CometResult}, }; - use arrow::{datatypes::*, ipc::writer::StreamWriter}; use async_trait::async_trait; use bytes::Buf; @@ -54,17 +47,24 @@ use datafusion::{ RecordBatchStream, SendableRecordBatchStream, Statistics, }, }; +use datafusion_comet_spark_expr::spark_hash::create_murmur3_hashes; use datafusion_physical_expr::EquivalenceProperties; use futures::executor::block_on; use futures::{lock::Mutex, Stream, StreamExt, TryFutureExt, TryStreamExt}; use itertools::Itertools; use simd_adler32::Adler32; - -use crate::{ - common::bit::ceil, - errors::{CometError, CometResult}, +use std::io::Error; +use std::{ + any::Any, + fmt, + fmt::{Debug, Formatter}, + fs::{File, OpenOptions}, + io::{BufReader, BufWriter, Cursor, Read, Seek, SeekFrom, Write}, + path::Path, + sync::Arc, + task::{Context, Poll}, }; -use datafusion_comet_spark_expr::spark_hash::create_murmur3_hashes; +use tokio::time::Instant; /// The status of appending rows to a partition buffer. enum AppendRowStatus { @@ -271,7 +271,7 @@ impl PartitionBuffer { columns: &[ArrayRef], indices: &[usize], start_index: usize, - time_metric: &Time, + ipc_time: &Time, ) -> AppendRowStatus { let mut mem_diff = 0; let mut start = start_index; @@ -293,13 +293,11 @@ impl PartitionBuffer { }); self.num_active_rows += end - start; if self.num_active_rows >= self.batch_size { - let mut timer = time_metric.timer(); - let flush = self.flush(); + let flush = self.flush(ipc_time); if let Err(e) = flush { return AppendRowStatus::MemDiff(Err(e)); } mem_diff += flush.unwrap(); - timer.stop(); let init = self.init_active_if_necessary(); if init.is_err() { @@ -313,7 +311,7 @@ impl PartitionBuffer { } /// flush active data into frozen bytes - fn flush(&mut self) -> Result { + fn flush(&mut self, ipc_time: &Time) -> Result { if self.num_active_rows == 0 { return Ok(0); } @@ -330,7 +328,7 @@ impl PartitionBuffer { let frozen_capacity_old = self.frozen.capacity(); let mut cursor = Cursor::new(&mut self.frozen); cursor.seek(SeekFrom::End(0))?; - write_ipc_compressed(&frozen_batch, &mut cursor)?; + write_ipc_compressed(&frozen_batch, &mut cursor, ipc_time)?; mem_diff += (self.frozen.capacity() - frozen_capacity_old) as isize; Ok(mem_diff) @@ -628,6 +626,15 @@ struct ShuffleRepartitionerMetrics { /// metrics baseline: BaselineMetrics, + /// Time spent writing to disk. Maps to "shuffleWriteTime" in Spark SQL Metrics. + write_time: Time, + + /// Time encoding batches to IPC format + ipc_time: Time, + + /// Number of input batches + input_batches: Count, + /// count of spills during the execution of the operator spill_count: Count, @@ -642,6 +649,9 @@ impl ShuffleRepartitionerMetrics { fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { Self { baseline: BaselineMetrics::new(metrics, partition), + write_time: MetricBuilder::new(metrics).subset_time("write_time", partition), + ipc_time: MetricBuilder::new(metrics).subset_time("ipc_time", partition), + input_batches: MetricBuilder::new(metrics).counter("input_batches", partition), spill_count: MetricBuilder::new(metrics).spill_count(partition), spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition), data_size: MetricBuilder::new(metrics).counter("data_size", partition), @@ -701,6 +711,7 @@ impl ShuffleRepartitioner { /// This function will slice input batch according to configured batch size and then /// shuffle rows into corresponding partition buffer. async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> { + let start_time = Instant::now(); let mut start = 0; while start < batch.num_rows() { let end = (start + self.batch_size).min(batch.num_rows()); @@ -708,6 +719,11 @@ impl ShuffleRepartitioner { self.partitioning_batch(batch).await?; start = end; } + self.metrics.input_batches.add(1); + self.metrics + .baseline + .elapsed_compute() + .add_duration(start_time.elapsed()); Ok(()) } @@ -848,12 +864,13 @@ impl ShuffleRepartitioner { /// Writes buffered shuffled record batches into Arrow IPC bytes. async fn shuffle_write(&mut self) -> Result { + let mut elapsed_compute = self.metrics.baseline.elapsed_compute().timer(); let num_output_partitions = self.num_output_partitions; let buffered_partitions = &mut self.buffered_partitions; let mut output_batches: Vec> = vec![vec![]; num_output_partitions]; for i in 0..num_output_partitions { - buffered_partitions[i].flush()?; + buffered_partitions[i].flush(&self.metrics.ipc_time)?; output_batches[i] = std::mem::take(&mut buffered_partitions[i].frozen); } @@ -864,52 +881,37 @@ impl ShuffleRepartitioner { let index_file = self.output_index_file.clone(); let mut offsets = vec![0; num_output_partitions + 1]; - let mut output_data = OpenOptions::new() + let output_data = OpenOptions::new() .write(true) .create(true) .truncate(true) .open(data_file) .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {:?}", e)))?; - for i in 0..num_output_partitions { - let mut timer = self.metrics.baseline.elapsed_compute().timer(); + let mut output_data = BufWriter::new(output_data); + let mut write_time = self.metrics.write_time.timer(); + for i in 0..num_output_partitions { offsets[i] = output_data.stream_position()?; output_data.write_all(&output_batches[i])?; - - timer.stop(); - output_batches[i].clear(); // append partition in each spills for spill in &output_spills { let length = spill.offsets[i + 1] - spill.offsets[i]; if length > 0 { - let mut timer = self.metrics.baseline.elapsed_compute().timer(); - let mut spill_file = - BufReader::new(File::open(spill.file.path()).map_err(|e| { - DataFusionError::Execution(format!("shuffle write error: {:?}", e)) - })?); + BufReader::new(File::open(spill.file.path()).map_err(Self::to_df_err)?); spill_file.seek(SeekFrom::Start(spill.offsets[i]))?; - std::io::copy(&mut spill_file.take(length), &mut output_data).map_err(|e| { - DataFusionError::Execution(format!("shuffle write error: {:?}", e)) - })?; - - timer.stop(); + std::io::copy(&mut spill_file.take(length), &mut output_data) + .map_err(Self::to_df_err)?; } } } - let mut timer = self.metrics.baseline.elapsed_compute().timer(); output_data.flush()?; - timer.stop(); // add one extra offset at last to ease partition length computation - offsets[num_output_partitions] = output_data - .stream_position() - .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {:?}", e)))?; - - let mut timer = self.metrics.baseline.elapsed_compute().timer(); + offsets[num_output_partitions] = output_data.stream_position().map_err(Self::to_df_err)?; let mut output_index = BufWriter::new(File::create(index_file).map_err(|e| { @@ -918,19 +920,25 @@ impl ShuffleRepartitioner { for offset in offsets { output_index .write_all(&(offset as i64).to_le_bytes()[..]) - .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {:?}", e)))?; + .map_err(Self::to_df_err)?; } output_index.flush()?; - timer.stop(); + write_time.stop(); let used = self.reservation.size(); self.reservation.shrink(used); + elapsed_compute.stop(); + // shuffle writer always has empty output Ok(Box::pin(EmptyStream::try_new(Arc::clone(&self.schema))?)) } + fn to_df_err(e: Error) -> DataFusionError { + DataFusionError::Execution(format!("shuffle write error: {:?}", e)) + } + fn used(&self) -> usize { self.reservation.size() } @@ -959,7 +967,7 @@ impl ShuffleRepartitioner { return Ok(0); } - let mut timer = self.metrics.baseline.elapsed_compute().timer(); + let mut timer = self.metrics.write_time.timer(); let spillfile = self .runtime @@ -969,6 +977,7 @@ impl ShuffleRepartitioner { &mut self.buffered_partitions, spillfile.path(), self.num_output_partitions, + &self.metrics.ipc_time, )?; timer.stop(); @@ -995,12 +1004,11 @@ impl ShuffleRepartitioner { let output = &mut self.buffered_partitions[partition_id]; - let time_metric = self.metrics.baseline.elapsed_compute(); - // If the range of indices is not big enough, just appending the rows into // active array builders instead of directly adding them as a record batch. let mut start_index: usize = 0; - let mut output_ret = output.append_rows(columns, indices, start_index, time_metric); + let mut output_ret = + output.append_rows(columns, indices, start_index, &self.metrics.ipc_time); loop { match output_ret { @@ -1017,10 +1025,9 @@ impl ShuffleRepartitioner { let output = &mut self.buffered_partitions[partition_id]; output.reservation.free(); - let time_metric = self.metrics.baseline.elapsed_compute(); - start_index = new_start; - output_ret = output.append_rows(columns, indices, start_index, time_metric); + output_ret = + output.append_rows(columns, indices, start_index, &self.metrics.ipc_time); if let AppendRowStatus::StartIndex(new_start) = output_ret { if new_start == start_index { @@ -1045,11 +1052,12 @@ fn spill_into( buffered_partitions: &mut [PartitionBuffer], path: &Path, num_output_partitions: usize, + ipc_time: &Time, ) -> Result> { let mut output_batches: Vec> = vec![vec![]; num_output_partitions]; for i in 0..num_output_partitions { - buffered_partitions[i].flush()?; + buffered_partitions[i].flush(ipc_time)?; output_batches[i] = std::mem::take(&mut buffered_partitions[i].frozen); } let path = path.to_owned(); @@ -1485,10 +1493,13 @@ impl Checksum { pub(crate) fn write_ipc_compressed( batch: &RecordBatch, output: &mut W, + ipc_time: &Time, ) -> Result { if batch.num_rows() == 0 { return Ok(0); } + + let mut timer = ipc_time.timer(); let start_pos = output.stream_position()?; // write ipc_length placeholder @@ -1508,8 +1519,10 @@ pub(crate) fn write_ipc_compressed( // fill ipc length output.seek(SeekFrom::Start(start_pos))?; output.write_all(&ipc_length.to_le_bytes()[..])?; - output.seek(SeekFrom::Start(end_pos))?; + + timer.stop(); + Ok((end_pos - start_pos) as usize) } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala index 8ea0b1765..f75af5076 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala @@ -57,7 +57,8 @@ case class CometCollectLimitExec( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), "numPartitions" -> SQLMetrics.createMetric( sparkContext, - "number of partitions")) ++ readMetrics ++ writeMetrics + "number of partitions")) ++ readMetrics ++ writeMetrics ++ CometMetricNode.shuffleMetrics( + sparkContext) private lazy val serializer: Serializer = new UnsafeRowSerializer(child.output.size, longMetric("dataSize")) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala index 47c89d943..5766c134f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala @@ -130,6 +130,13 @@ object CometMetricNode { "spilled_rows" -> SQLMetrics.createMetric(sc, "Total spilled rows")) } + def shuffleMetrics(sc: SparkContext): Map[String, SQLMetric] = { + Map( + "elapsed_compute" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle time"), + "ipc_time" -> SQLMetrics.createNanoTimingMetric(sc, "Encoding and compression time"), + "input_batches" -> SQLMetrics.createMetric(sc, "number of input batches")) + } + /** * Creates a [[CometMetricNode]] from a [[CometPlan]]. */ diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala index 5582f4d68..19586628a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala @@ -57,7 +57,8 @@ case class CometTakeOrderedAndProjectExec( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), "numPartitions" -> SQLMetrics.createMetric( sparkContext, - "number of partitions")) ++ readMetrics ++ writeMetrics + "number of partitions")) ++ readMetrics ++ writeMetrics ++ CometMetricNode.shuffleMetrics( + sparkContext) private lazy val serializer: Serializer = new UnsafeRowSerializer(child.output.size, longMetric("dataSize")) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index b1dd9ac83..d04cc0e0f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -79,7 +79,8 @@ case class CometShuffleExchangeExec( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), "numPartitions" -> SQLMetrics.createMetric( sparkContext, - "number of partitions")) ++ readMetrics ++ writeMetrics + "number of partitions")) ++ readMetrics ++ writeMetrics ++ CometMetricNode.shuffleMetrics( + sparkContext) override def nodeName: String = if (shuffleType == CometNativeShuffle) { "CometExchange" @@ -477,11 +478,15 @@ class CometShuffleWriteProcessor( // Call native shuffle write val nativePlan = getNativePlan(tempDataFilename, tempIndexFilename) + // these metrics are only reported when detailed metrics are enabled via config + val detailedMetrics = Seq("elapsed_compute", "ipc_time", "input_batches") + // Maps native metrics to SQL metrics val nativeSQLMetrics = Map( "output_rows" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN), "data_size" -> metrics("dataSize"), - "elapsed_compute" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)) + "write_time" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)) ++ + metrics.filterKeys(detailedMetrics.contains) val nativeMetrics = CometMetricNode(nativeSQLMetrics) // Getting rid of the fake partitionId From 6adb04c1954a74a1d47bd6f9021156f54a1ea38d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 17 Dec 2024 09:53:10 -0700 Subject: [PATCH 2/5] docs --- docs/source/index.rst | 1 + docs/source/user-guide/metrics.md | 62 +++++++++++++++++++ docs/source/user-guide/tuning.md | 25 -------- .../spark/sql/comet/CometMetricNode.scala | 2 +- 4 files changed, 64 insertions(+), 26 deletions(-) create mode 100644 docs/source/user-guide/metrics.md diff --git a/docs/source/index.rst b/docs/source/index.rst index 39ad27a57..21ec36ca9 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -51,6 +51,7 @@ as a native runtime to achieve improvement in terms of query efficiency and quer Configuration Settings Compatibility Guide Tuning Guide + Metrics Guide .. _toc.contributor-guide-links: .. toctree:: diff --git a/docs/source/user-guide/metrics.md b/docs/source/user-guide/metrics.md new file mode 100644 index 000000000..fe6aa04e5 --- /dev/null +++ b/docs/source/user-guide/metrics.md @@ -0,0 +1,62 @@ + + +# Comet Metrics + +## Spark SQL Metrics + +Set `spark.comet.metrics.detailed=true` to see all available Comet metrics. + +### CometScanExec + +| Metric | Description | +| ----------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `scan time` | Total time to scan a Parquet file. This is not comparable to the same metric in Spark because Comet's scan metric is more accurate. Although both Comet and Spark measure the time in nanoseconds, Spark rounds this time to the nearest millisecond per batch and Comet does not. | + +### Exchange + +Comet adds some additional metrics: + +| Metric | Description | +| ------------------------------- | ----------------------------------------------------------------------------------------- | +| `encoding and compression time` | Time to encode batches in IPC format and compress using ZSTD. | +| `native shuffle time` | Total time spent in native shuffle writer, excluding the execution time of the input plan | + +## Native Metrics + +Setting `spark.comet.explain.native.enabled=true` will cause native plans to be logged in each executor. Metrics are +logged for each native plan (and there is one plan per task, so this is very verbose). + +Here is a guide to some of the native metrics. + +### ScanExec + +| Metric | Description | +| ----------------- | --------------------------------------------------------------------------------------------------- | +| `elapsed_compute` | Total time spent in this operator, fetching batches from a JVM iterator. | +| `jvm_fetch_time` | Time spent in the JVM fetching input batches to be read by this `ScanExec` instance. | +| `arrow_ffi_time` | Time spent using Arrow FFI to create Arrow batches from the memory addresses returned from the JVM. | + +### ShuffleWriterExec + +| Metric | Description | +| ----------------- | ------------------------------------------------------------- | +| `elapsed_compute` | Total time excluding any child operators. | +| `ipc_time` | Time to encode batches in IPC format and compress using ZSTD. | +| `write_time` | Time spent writing bytes to disk. | diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md index af722494f..56303909b 100644 --- a/docs/source/user-guide/tuning.md +++ b/docs/source/user-guide/tuning.md @@ -102,28 +102,3 @@ native shuffle currently only supports `HashPartitioning` and `SinglePartitionin To enable native shuffle, set `spark.comet.exec.shuffle.mode` to `native`. If this mode is explicitly set, then any shuffle operations that cannot be supported in this mode will fall back to Spark. - -## Metrics - -### Spark SQL Metrics - -Some Comet metrics are not directly comparable to Spark metrics in some cases: - -- `CometScanExec` uses nanoseconds for total scan time. Spark also measures scan time in nanoseconds but converts to - milliseconds _per batch_ which can result in a large loss of precision, making it difficult to compare scan times - between Spark and Comet. - -### Native Metrics - -Setting `spark.comet.explain.native.enabled=true` will cause native plans to be logged in each executor. Metrics are -logged for each native plan (and there is one plan per task, so this is very verbose). - -Here is a guide to some of the native metrics. - -### ScanExec - -| Metric | Description | -| ----------------- | --------------------------------------------------------------------------------------------------- | -| `elapsed_compute` | Total time spent in this operator, fetching batches from a JVM iterator. | -| `jvm_fetch_time` | Time spent in the JVM fetching input batches to be read by this `ScanExec` instance. | -| `arrow_ffi_time` | Time spent using Arrow FFI to create Arrow batches from the memory addresses returned from the JVM. | diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala index 5766c134f..fa967dcdf 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala @@ -133,7 +133,7 @@ object CometMetricNode { def shuffleMetrics(sc: SparkContext): Map[String, SQLMetric] = { Map( "elapsed_compute" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle time"), - "ipc_time" -> SQLMetrics.createNanoTimingMetric(sc, "Encoding and compression time"), + "ipc_time" -> SQLMetrics.createNanoTimingMetric(sc, "encoding and compression time"), "input_batches" -> SQLMetrics.createMetric(sc, "number of input batches")) } From 2782a9f48d999c9eb10d2e241c9429fdb81f175c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 17 Dec 2024 16:05:19 -0700 Subject: [PATCH 3/5] more metrics --- .../src/execution/shuffle/shuffle_writer.rs | 182 ++++++++++++------ .../spark/sql/comet/CometMetricNode.scala | 4 + .../shuffle/CometShuffleExchangeExec.scala | 10 +- 3 files changed, 132 insertions(+), 64 deletions(-) diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index 69b0d1f6c..3f8b79cfd 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -25,7 +25,6 @@ use arrow::{datatypes::*, ipc::writer::StreamWriter}; use async_trait::async_trait; use bytes::Buf; use crc32fast::Hasher; -use datafusion::physical_plan::metrics::Time; use datafusion::{ arrow::{ array::*, @@ -41,7 +40,9 @@ use datafusion::{ runtime_env::RuntimeEnv, }, physical_plan::{ - metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}, + metrics::{ + BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, Time, + }, stream::RecordBatchStreamAdapter, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, @@ -242,7 +243,11 @@ impl PartitionBuffer { /// Initializes active builders if necessary. /// Returns error if memory reservation fails. - fn init_active_if_necessary(&mut self) -> Result { + fn init_active_if_necessary( + &mut self, + mempool_time: &Time, + repart_time: &Time, + ) -> Result { let mut mem_diff = 0; if self.active.is_empty() { @@ -256,9 +261,13 @@ impl PartitionBuffer { .sum::(); } + let mut mempool_timer = mempool_time.timer(); self.reservation.try_grow(self.active_slots_mem_size)?; + mempool_timer.stop(); + let mut repart_timer = repart_time.timer(); self.active = new_array_builders(&self.schema, self.batch_size); + repart_timer.stop(); mem_diff += self.active_slots_mem_size as isize; } @@ -271,13 +280,15 @@ impl PartitionBuffer { columns: &[ArrayRef], indices: &[usize], start_index: usize, + mempool_time: &Time, + repart_time: &Time, ipc_time: &Time, ) -> AppendRowStatus { let mut mem_diff = 0; let mut start = start_index; // lazy init because some partition may be empty - let init = self.init_active_if_necessary(); + let init = self.init_active_if_necessary(mempool_time, repart_time); if init.is_err() { return AppendRowStatus::StartIndex(start); } @@ -285,6 +296,8 @@ impl PartitionBuffer { while start < indices.len() { let end = (start + self.batch_size).min(indices.len()); + + let mut repart_timer = repart_time.timer(); self.active .iter_mut() .zip(columns) @@ -292,6 +305,8 @@ impl PartitionBuffer { append_columns(builder, column, &indices[start..end], column.data_type()); }); self.num_active_rows += end - start; + repart_timer.stop(); + if self.num_active_rows >= self.batch_size { let flush = self.flush(ipc_time); if let Err(e) = flush { @@ -299,7 +314,7 @@ impl PartitionBuffer { } mem_diff += flush.unwrap(); - let init = self.init_active_if_necessary(); + let init = self.init_active_if_necessary(mempool_time, repart_time); if init.is_err() { return AppendRowStatus::StartIndex(end); } @@ -626,12 +641,18 @@ struct ShuffleRepartitionerMetrics { /// metrics baseline: BaselineMetrics, - /// Time spent writing to disk. Maps to "shuffleWriteTime" in Spark SQL Metrics. - write_time: Time, + /// Time to perform repartitioning + repart_time: Time, + + /// Time interacting with memory pool + mempool_time: Time, /// Time encoding batches to IPC format ipc_time: Time, + /// Time spent writing to disk. Maps to "shuffleWriteTime" in Spark SQL Metrics. + write_time: Time, + /// Number of input batches input_batches: Count, @@ -649,8 +670,10 @@ impl ShuffleRepartitionerMetrics { fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { Self { baseline: BaselineMetrics::new(metrics, partition), - write_time: MetricBuilder::new(metrics).subset_time("write_time", partition), + repart_time: MetricBuilder::new(metrics).subset_time("repart_time", partition), + mempool_time: MetricBuilder::new(metrics).subset_time("mempool_time", partition), ipc_time: MetricBuilder::new(metrics).subset_time("ipc_time", partition), + write_time: MetricBuilder::new(metrics).subset_time("write_time", partition), input_batches: MetricBuilder::new(metrics).counter("input_batches", partition), spill_count: MetricBuilder::new(metrics).spill_count(partition), spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition), @@ -754,53 +777,61 @@ impl ShuffleRepartitioner { let num_output_partitions = self.num_output_partitions; match &self.partitioning { Partitioning::Hash(exprs, _) => { - let arrays = exprs - .iter() - .map(|expr| expr.evaluate(&input)?.into_array(input.num_rows())) - .collect::>>()?; - - // use identical seed as spark hash partition - let hashes_buf = &mut self.hashes_buf[..arrays[0].len()]; - hashes_buf.fill(42_u32); - - // Hash arrays and compute buckets based on number of partitions - let partition_ids = &mut self.partition_ids[..arrays[0].len()]; - create_murmur3_hashes(&arrays, hashes_buf)? - .iter() - .enumerate() - .for_each(|(idx, hash)| { - partition_ids[idx] = pmod(*hash, num_output_partitions) as u64 + let (partition_starts, shuffled_partition_ids): (Vec, Vec) = { + let mut timer = self.metrics.repart_time.timer(); + let arrays = exprs + .iter() + .map(|expr| expr.evaluate(&input)?.into_array(input.num_rows())) + .collect::>>()?; + + // use identical seed as spark hash partition + let hashes_buf = &mut self.hashes_buf[..arrays[0].len()]; + hashes_buf.fill(42_u32); + + // Hash arrays and compute buckets based on number of partitions + let partition_ids = &mut self.partition_ids[..arrays[0].len()]; + create_murmur3_hashes(&arrays, hashes_buf)? + .iter() + .enumerate() + .for_each(|(idx, hash)| { + partition_ids[idx] = pmod(*hash, num_output_partitions) as u64 + }); + + // count each partition size + let mut partition_counters = vec![0usize; num_output_partitions]; + partition_ids + .iter() + .for_each(|partition_id| partition_counters[*partition_id as usize] += 1); + + // accumulate partition counters into partition ends + // e.g. partition counter: [1, 3, 2, 1] => [1, 4, 6, 7] + let mut partition_ends = partition_counters; + let mut accum = 0; + partition_ends.iter_mut().for_each(|v| { + *v += accum; + accum = *v; }); - // count each partition size - let mut partition_counters = vec![0usize; num_output_partitions]; - partition_ids - .iter() - .for_each(|partition_id| partition_counters[*partition_id as usize] += 1); - - // accumulate partition counters into partition ends - // e.g. partition counter: [1, 3, 2, 1] => [1, 4, 6, 7] - let mut partition_ends = partition_counters; - let mut accum = 0; - partition_ends.iter_mut().for_each(|v| { - *v += accum; - accum = *v; - }); - - // calculate shuffled partition ids - // e.g. partition ids: [3, 1, 1, 1, 2, 2, 0] => [6, 1, 2, 3, 4, 5, 0] which is the - // row indices for rows ordered by their partition id. For example, first partition - // 0 has one row index [6], partition 1 has row indices [1, 2, 3], etc. - let mut shuffled_partition_ids = vec![0usize; input.num_rows()]; - for (index, partition_id) in partition_ids.iter().enumerate().rev() { - partition_ends[*partition_id as usize] -= 1; - let end = partition_ends[*partition_id as usize]; - shuffled_partition_ids[end] = index; - } + // calculate shuffled partition ids + // e.g. partition ids: [3, 1, 1, 1, 2, 2, 0] => [6, 1, 2, 3, 4, 5, 0] which is the + // row indices for rows ordered by their partition id. For example, first partition + // 0 has one row index [6], partition 1 has row indices [1, 2, 3], etc. + let mut shuffled_partition_ids = vec![0usize; input.num_rows()]; + for (index, partition_id) in partition_ids.iter().enumerate().rev() { + partition_ends[*partition_id as usize] -= 1; + let end = partition_ends[*partition_id as usize]; + shuffled_partition_ids[end] = index; + } - // after calculating, partition ends become partition starts - let mut partition_starts = partition_ends; - partition_starts.push(input.num_rows()); + // after calculating, partition ends become partition starts + let mut partition_starts = partition_ends; + partition_starts.push(input.num_rows()); + timer.stop(); + Ok::<(Vec, Vec), DataFusionError>(( + partition_starts, + shuffled_partition_ids, + )) + }?; // For each interval of row indices of partition, taking rows from input batch and // appending into output buffer. @@ -820,11 +851,20 @@ impl ShuffleRepartitioner { if mem_diff > 0 { let mem_increase = mem_diff as usize; - if self.reservation.try_grow(mem_increase).is_err() { + + let try_grow = { + let mut mempool_timer = self.metrics.mempool_time.timer(); + let result = self.reservation.try_grow(mem_increase); + mempool_timer.stop(); + result + }; + + if try_grow.is_err() { self.spill().await?; + let mut mempool_timer = self.metrics.mempool_time.timer(); self.reservation.free(); self.reservation.try_grow(mem_increase)?; - + mempool_timer.stop(); mem_diff = 0; } } @@ -832,7 +872,9 @@ impl ShuffleRepartitioner { if mem_diff < 0 { let mem_used = self.reservation.size(); let mem_decrease = mem_used.min(-mem_diff as usize); + let mut mempool_timer = self.metrics.mempool_time.timer(); self.reservation.shrink(mem_decrease); + mempool_timer.stop(); } } } @@ -868,7 +910,7 @@ impl ShuffleRepartitioner { let num_output_partitions = self.num_output_partitions; let buffered_partitions = &mut self.buffered_partitions; let mut output_batches: Vec> = vec![vec![]; num_output_partitions]; - + let mut offsets = vec![0; num_output_partitions + 1]; for i in 0..num_output_partitions { buffered_partitions[i].flush(&self.metrics.ipc_time)?; output_batches[i] = std::mem::take(&mut buffered_partitions[i].frozen); @@ -880,7 +922,8 @@ impl ShuffleRepartitioner { let data_file = self.output_data_file.clone(); let index_file = self.output_index_file.clone(); - let mut offsets = vec![0; num_output_partitions + 1]; + let mut write_time = self.metrics.write_time.timer(); + let output_data = OpenOptions::new() .write(true) .create(true) @@ -890,7 +933,6 @@ impl ShuffleRepartitioner { let mut output_data = BufWriter::new(output_data); - let mut write_time = self.metrics.write_time.timer(); for i in 0..num_output_partitions { offsets[i] = output_data.stream_position()?; output_data.write_all(&output_batches[i])?; @@ -926,8 +968,10 @@ impl ShuffleRepartitioner { write_time.stop(); + let mut mempool_timer = self.metrics.mempool_time.timer(); let used = self.reservation.size(); self.reservation.shrink(used); + mempool_timer.stop(); elapsed_compute.stop(); @@ -1007,8 +1051,14 @@ impl ShuffleRepartitioner { // If the range of indices is not big enough, just appending the rows into // active array builders instead of directly adding them as a record batch. let mut start_index: usize = 0; - let mut output_ret = - output.append_rows(columns, indices, start_index, &self.metrics.ipc_time); + let mut output_ret = output.append_rows( + columns, + indices, + start_index, + &self.metrics.mempool_time, + &self.metrics.repart_time, + &self.metrics.ipc_time, + ); loop { match output_ret { @@ -1020,14 +1070,22 @@ impl ShuffleRepartitioner { // Cannot allocate enough memory for the array builders in the partition, // spill partitions and retry. self.spill().await?; - self.reservation.free(); + let mut mempool_timer = self.metrics.mempool_time.timer(); + self.reservation.free(); let output = &mut self.buffered_partitions[partition_id]; output.reservation.free(); + mempool_timer.stop(); start_index = new_start; - output_ret = - output.append_rows(columns, indices, start_index, &self.metrics.ipc_time); + output_ret = output.append_rows( + columns, + indices, + start_index, + &self.metrics.mempool_time, + &self.metrics.repart_time, + &self.metrics.ipc_time, + ); if let AppendRowStatus::StartIndex(new_start) = output_ret { if new_start == start_index { diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala index fa967dcdf..a26fa28c8 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala @@ -133,7 +133,11 @@ object CometMetricNode { def shuffleMetrics(sc: SparkContext): Map[String, SQLMetric] = { Map( "elapsed_compute" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle time"), + "mempool_time" -> SQLMetrics.createNanoTimingMetric(sc, "memory pool time"), + "repart_time" -> SQLMetrics.createNanoTimingMetric(sc, "repartition time"), "ipc_time" -> SQLMetrics.createNanoTimingMetric(sc, "encoding and compression time"), + "spill_count" -> SQLMetrics.createMetric(sc, "number of spills"), + "spilled_bytes" -> SQLMetrics.createMetric(sc, "spilled bytes"), "input_batches" -> SQLMetrics.createMetric(sc, "number of input batches")) } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index d04cc0e0f..0cd8a9ce6 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -478,8 +478,14 @@ class CometShuffleWriteProcessor( // Call native shuffle write val nativePlan = getNativePlan(tempDataFilename, tempIndexFilename) - // these metrics are only reported when detailed metrics are enabled via config - val detailedMetrics = Seq("elapsed_compute", "ipc_time", "input_batches") + val detailedMetrics = Seq( + "elapsed_compute", + "ipc_time", + "repart_time", + "mempool_time", + "input_batches", + "spill_count", + "spilled_bytes") // Maps native metrics to SQL metrics val nativeSQLMetrics = Map( From e37d039a2c5bcdabda354a1ed30800360941547c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 17 Dec 2024 16:08:55 -0700 Subject: [PATCH 4/5] refactor --- .../src/execution/shuffle/shuffle_writer.rs | 40 +++++-------------- 1 file changed, 10 insertions(+), 30 deletions(-) diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index 3f8b79cfd..fcc8c51f6 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -243,11 +243,7 @@ impl PartitionBuffer { /// Initializes active builders if necessary. /// Returns error if memory reservation fails. - fn init_active_if_necessary( - &mut self, - mempool_time: &Time, - repart_time: &Time, - ) -> Result { + fn init_active_if_necessary(&mut self, metrics: &ShuffleRepartitionerMetrics) -> Result { let mut mem_diff = 0; if self.active.is_empty() { @@ -261,11 +257,11 @@ impl PartitionBuffer { .sum::(); } - let mut mempool_timer = mempool_time.timer(); + let mut mempool_timer = metrics.mempool_time.timer(); self.reservation.try_grow(self.active_slots_mem_size)?; mempool_timer.stop(); - let mut repart_timer = repart_time.timer(); + let mut repart_timer = metrics.repart_time.timer(); self.active = new_array_builders(&self.schema, self.batch_size); repart_timer.stop(); @@ -280,15 +276,13 @@ impl PartitionBuffer { columns: &[ArrayRef], indices: &[usize], start_index: usize, - mempool_time: &Time, - repart_time: &Time, - ipc_time: &Time, + metrics: &ShuffleRepartitionerMetrics, ) -> AppendRowStatus { let mut mem_diff = 0; let mut start = start_index; // lazy init because some partition may be empty - let init = self.init_active_if_necessary(mempool_time, repart_time); + let init = self.init_active_if_necessary(metrics); if init.is_err() { return AppendRowStatus::StartIndex(start); } @@ -297,7 +291,7 @@ impl PartitionBuffer { while start < indices.len() { let end = (start + self.batch_size).min(indices.len()); - let mut repart_timer = repart_time.timer(); + let mut repart_timer = metrics.repart_time.timer(); self.active .iter_mut() .zip(columns) @@ -308,13 +302,13 @@ impl PartitionBuffer { repart_timer.stop(); if self.num_active_rows >= self.batch_size { - let flush = self.flush(ipc_time); + let flush = self.flush(&metrics.ipc_time); if let Err(e) = flush { return AppendRowStatus::MemDiff(Err(e)); } mem_diff += flush.unwrap(); - let init = self.init_active_if_necessary(mempool_time, repart_time); + let init = self.init_active_if_necessary(metrics); if init.is_err() { return AppendRowStatus::StartIndex(end); } @@ -1051,14 +1045,7 @@ impl ShuffleRepartitioner { // If the range of indices is not big enough, just appending the rows into // active array builders instead of directly adding them as a record batch. let mut start_index: usize = 0; - let mut output_ret = output.append_rows( - columns, - indices, - start_index, - &self.metrics.mempool_time, - &self.metrics.repart_time, - &self.metrics.ipc_time, - ); + let mut output_ret = output.append_rows(columns, indices, start_index, &self.metrics); loop { match output_ret { @@ -1078,14 +1065,7 @@ impl ShuffleRepartitioner { mempool_timer.stop(); start_index = new_start; - output_ret = output.append_rows( - columns, - indices, - start_index, - &self.metrics.mempool_time, - &self.metrics.repart_time, - &self.metrics.ipc_time, - ); + output_ret = output.append_rows(columns, indices, start_index, &self.metrics); if let AppendRowStatus::StartIndex(new_start) = output_ret { if new_start == start_index { From bdcec72845ec7cc23d2ccb2a5431c1381ffd18af Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 18 Dec 2024 09:24:37 -0700 Subject: [PATCH 5/5] address feedback --- docs/source/user-guide/metrics.md | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/docs/source/user-guide/metrics.md b/docs/source/user-guide/metrics.md index fe6aa04e5..509d0ae8c 100644 --- a/docs/source/user-guide/metrics.md +++ b/docs/source/user-guide/metrics.md @@ -33,10 +33,12 @@ Set `spark.comet.metrics.detailed=true` to see all available Comet metrics. Comet adds some additional metrics: -| Metric | Description | -| ------------------------------- | ----------------------------------------------------------------------------------------- | -| `encoding and compression time` | Time to encode batches in IPC format and compress using ZSTD. | -| `native shuffle time` | Total time spent in native shuffle writer, excluding the execution time of the input plan | +| Metric | Description | +| ------------------------------- | ------------------------------------------------------------- | +| `native shuffle time` | Total time in native code excluding any child operators. | +| `repartition time` | Time to repartition batches. | +| `memory pool time` | Time interacting with memory pool. | +| `encoding and compression time` | Time to encode batches in IPC format and compress using ZSTD. | ## Native Metrics @@ -58,5 +60,7 @@ Here is a guide to some of the native metrics. | Metric | Description | | ----------------- | ------------------------------------------------------------- | | `elapsed_compute` | Total time excluding any child operators. | +| `repart_time` | Time to repartition batches. | | `ipc_time` | Time to encode batches in IPC format and compress using ZSTD. | +| `mempool_time` | Time interacting with memory pool. | | `write_time` | Time spent writing bytes to disk. |