From f1ed927ce1095d7ced22ad6d135b74dddf57c301 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 27 Dec 2024 15:10:46 -0700 Subject: [PATCH] make metrics more accurate --- docs/source/user-guide/configs.md | 2 +- .../src/execution/shuffle/shuffle_writer.rs | 31 ++++++++++++------- .../spark/sql/comet/CometMetricNode.scala | 2 +- 3 files changed, 22 insertions(+), 13 deletions(-) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 01b4cd1ff..1f70fa173 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -50,7 +50,7 @@ Comet provides the following configuration settings. | spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. | 0.7 | | spark.comet.exec.project.enabled | Whether to enable project by default. | true | | spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false | -| spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and snappy are supported. Compression can be disabled by setting spark.shuffle.compress=false. | snappy | +| spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and snappy are supported. Compression can be disabled by setting spark.shuffle.compress=false. | lz4 | | spark.comet.exec.shuffle.compression.level | The compression level to use when compressing shuffle files with zstd. | 1 | | spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | true | | spark.comet.exec.sort.enabled | Whether to enable sort by default. | true | diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index 4c389f0f9..f1e39150c 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -311,7 +311,7 @@ impl PartitionBuffer { repart_timer.stop(); if self.num_active_rows >= self.batch_size { - let flush = self.flush(&metrics.ipc_time); + let flush = self.flush(&metrics); if let Err(e) = flush { return AppendRowStatus::MemDiff(Err(e)); } @@ -329,7 +329,7 @@ impl PartitionBuffer { } /// flush active data into frozen bytes - fn flush(&mut self, ipc_time: &Time) -> Result { + fn flush(&mut self, metrics: &ShuffleRepartitionerMetrics) -> Result { if self.num_active_rows == 0 { return Ok(0); } @@ -339,14 +339,19 @@ impl PartitionBuffer { let active = std::mem::take(&mut self.active); let num_rows = self.num_active_rows; self.num_active_rows = 0; + + let mut mempool_timer = metrics.mempool_time.timer(); self.reservation.try_shrink(self.active_slots_mem_size)?; + mempool_timer.stop(); + let mut repart_timer = metrics.repart_time.timer(); let frozen_batch = make_batch(Arc::clone(&self.schema), active, num_rows)?; + repart_timer.stop(); let frozen_capacity_old = self.frozen.capacity(); let mut cursor = Cursor::new(&mut self.frozen); cursor.seek(SeekFrom::End(0))?; - write_ipc_compressed(&frozen_batch, &mut cursor, &self.codec, ipc_time)?; + write_ipc_compressed(&frozen_batch, &mut cursor, &self.codec, &metrics.encode_time)?; mem_diff += (self.frozen.capacity() - frozen_capacity_old) as isize; Ok(mem_diff) @@ -651,7 +656,7 @@ struct ShuffleRepartitionerMetrics { mempool_time: Time, /// Time encoding batches to IPC format - ipc_time: Time, + encode_time: Time, /// Time spent writing to disk. Maps to "shuffleWriteTime" in Spark SQL Metrics. write_time: Time, @@ -675,7 +680,7 @@ impl ShuffleRepartitionerMetrics { baseline: BaselineMetrics::new(metrics, partition), repart_time: MetricBuilder::new(metrics).subset_time("repart_time", partition), mempool_time: MetricBuilder::new(metrics).subset_time("mempool_time", partition), - ipc_time: MetricBuilder::new(metrics).subset_time("ipc_time", partition), + encode_time: MetricBuilder::new(metrics).subset_time("encode_time", partition), write_time: MetricBuilder::new(metrics).subset_time("write_time", partition), input_batches: MetricBuilder::new(metrics).counter("input_batches", partition), spill_count: MetricBuilder::new(metrics).spill_count(partition), @@ -924,7 +929,7 @@ impl ShuffleRepartitioner { let mut output_batches: Vec> = vec![vec![]; num_output_partitions]; let mut offsets = vec![0; num_output_partitions + 1]; for i in 0..num_output_partitions { - buffered_partitions[i].flush(&self.metrics.ipc_time)?; + buffered_partitions[i].flush(&self.metrics)?; output_batches[i] = std::mem::take(&mut buffered_partitions[i].frozen); } @@ -1024,19 +1029,19 @@ impl ShuffleRepartitioner { } let mut timer = self.metrics.write_time.timer(); - let spillfile = self .runtime .disk_manager .create_tmp_file("shuffle writer spill")?; + timer.stop(); + let offsets = spill_into( &mut self.buffered_partitions, spillfile.path(), self.num_output_partitions, - &self.metrics.ipc_time, + &self.metrics, )?; - timer.stop(); let mut spills = self.spills.lock().await; let used = self.reservation.size(); @@ -1108,16 +1113,18 @@ fn spill_into( buffered_partitions: &mut [PartitionBuffer], path: &Path, num_output_partitions: usize, - ipc_time: &Time, + metrics: &ShuffleRepartitionerMetrics, ) -> Result> { let mut output_batches: Vec> = vec![vec![]; num_output_partitions]; for i in 0..num_output_partitions { - buffered_partitions[i].flush(ipc_time)?; + buffered_partitions[i].flush(metrics)?; output_batches[i] = std::mem::take(&mut buffered_partitions[i].frozen); } let path = path.to_owned(); + let mut write_timer = metrics.write_time.timer(); + let mut offsets = vec![0; num_output_partitions + 1]; let mut spill_data = OpenOptions::new() .write(true) @@ -1131,6 +1138,8 @@ fn spill_into( spill_data.write_all(&output_batches[i])?; output_batches[i].clear(); } + write_timer.stop(); + // add one extra offset at last to ease partition length computation offsets[num_output_partitions] = spill_data.stream_position()?; Ok(offsets) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala index 11a39c40b..53370a03b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala @@ -132,7 +132,7 @@ object CometMetricNode { def shuffleMetrics(sc: SparkContext): Map[String, SQLMetric] = { Map( - "elapsed_compute" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle time"), + "elapsed_compute" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle writer time"), "mempool_time" -> SQLMetrics.createNanoTimingMetric(sc, "memory pool time"), "repart_time" -> SQLMetrics.createNanoTimingMetric(sc, "repartition time"), "encode_time" -> SQLMetrics.createNanoTimingMetric(sc, "encoding and compression time"),