Skip to content

Commit

Permalink
make metrics more accurate
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Dec 27, 2024
1 parent e13d72f commit f1ed927
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 13 deletions.
2 changes: 1 addition & 1 deletion docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. | 0.7 |
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |
| spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false |
| spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and snappy are supported. Compression can be disabled by setting spark.shuffle.compress=false. | snappy |
| spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and snappy are supported. Compression can be disabled by setting spark.shuffle.compress=false. | lz4 |
| spark.comet.exec.shuffle.compression.level | The compression level to use when compressing shuffle files with zstd. | 1 |
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | true |
| spark.comet.exec.sort.enabled | Whether to enable sort by default. | true |
Expand Down
31 changes: 20 additions & 11 deletions native/core/src/execution/shuffle/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ impl PartitionBuffer {
repart_timer.stop();

if self.num_active_rows >= self.batch_size {
let flush = self.flush(&metrics.ipc_time);
let flush = self.flush(&metrics);
if let Err(e) = flush {
return AppendRowStatus::MemDiff(Err(e));
}
Expand All @@ -329,7 +329,7 @@ impl PartitionBuffer {
}

/// flush active data into frozen bytes
fn flush(&mut self, ipc_time: &Time) -> Result<isize> {
fn flush(&mut self, metrics: &ShuffleRepartitionerMetrics) -> Result<isize> {
if self.num_active_rows == 0 {
return Ok(0);
}
Expand All @@ -339,14 +339,19 @@ impl PartitionBuffer {
let active = std::mem::take(&mut self.active);
let num_rows = self.num_active_rows;
self.num_active_rows = 0;

let mut mempool_timer = metrics.mempool_time.timer();
self.reservation.try_shrink(self.active_slots_mem_size)?;
mempool_timer.stop();

let mut repart_timer = metrics.repart_time.timer();
let frozen_batch = make_batch(Arc::clone(&self.schema), active, num_rows)?;
repart_timer.stop();

let frozen_capacity_old = self.frozen.capacity();
let mut cursor = Cursor::new(&mut self.frozen);
cursor.seek(SeekFrom::End(0))?;
write_ipc_compressed(&frozen_batch, &mut cursor, &self.codec, ipc_time)?;
write_ipc_compressed(&frozen_batch, &mut cursor, &self.codec, &metrics.encode_time)?;

mem_diff += (self.frozen.capacity() - frozen_capacity_old) as isize;
Ok(mem_diff)
Expand Down Expand Up @@ -651,7 +656,7 @@ struct ShuffleRepartitionerMetrics {
mempool_time: Time,

/// Time encoding batches to IPC format
ipc_time: Time,
encode_time: Time,

/// Time spent writing to disk. Maps to "shuffleWriteTime" in Spark SQL Metrics.
write_time: Time,
Expand All @@ -675,7 +680,7 @@ impl ShuffleRepartitionerMetrics {
baseline: BaselineMetrics::new(metrics, partition),
repart_time: MetricBuilder::new(metrics).subset_time("repart_time", partition),
mempool_time: MetricBuilder::new(metrics).subset_time("mempool_time", partition),
ipc_time: MetricBuilder::new(metrics).subset_time("ipc_time", partition),
encode_time: MetricBuilder::new(metrics).subset_time("encode_time", partition),
write_time: MetricBuilder::new(metrics).subset_time("write_time", partition),
input_batches: MetricBuilder::new(metrics).counter("input_batches", partition),
spill_count: MetricBuilder::new(metrics).spill_count(partition),
Expand Down Expand Up @@ -924,7 +929,7 @@ impl ShuffleRepartitioner {
let mut output_batches: Vec<Vec<u8>> = vec![vec![]; num_output_partitions];
let mut offsets = vec![0; num_output_partitions + 1];
for i in 0..num_output_partitions {
buffered_partitions[i].flush(&self.metrics.ipc_time)?;
buffered_partitions[i].flush(&self.metrics)?;
output_batches[i] = std::mem::take(&mut buffered_partitions[i].frozen);
}

Expand Down Expand Up @@ -1024,19 +1029,19 @@ impl ShuffleRepartitioner {
}

let mut timer = self.metrics.write_time.timer();

let spillfile = self
.runtime
.disk_manager
.create_tmp_file("shuffle writer spill")?;
timer.stop();

let offsets = spill_into(
&mut self.buffered_partitions,
spillfile.path(),
self.num_output_partitions,
&self.metrics.ipc_time,
&self.metrics,
)?;

timer.stop();

let mut spills = self.spills.lock().await;
let used = self.reservation.size();
Expand Down Expand Up @@ -1108,16 +1113,18 @@ fn spill_into(
buffered_partitions: &mut [PartitionBuffer],
path: &Path,
num_output_partitions: usize,
ipc_time: &Time,
metrics: &ShuffleRepartitionerMetrics,
) -> Result<Vec<u64>> {
let mut output_batches: Vec<Vec<u8>> = vec![vec![]; num_output_partitions];

for i in 0..num_output_partitions {
buffered_partitions[i].flush(ipc_time)?;
buffered_partitions[i].flush(metrics)?;
output_batches[i] = std::mem::take(&mut buffered_partitions[i].frozen);
}
let path = path.to_owned();

let mut write_timer = metrics.write_time.timer();

let mut offsets = vec![0; num_output_partitions + 1];
let mut spill_data = OpenOptions::new()
.write(true)
Expand All @@ -1131,6 +1138,8 @@ fn spill_into(
spill_data.write_all(&output_batches[i])?;
output_batches[i].clear();
}
write_timer.stop();

// add one extra offset at last to ease partition length computation
offsets[num_output_partitions] = spill_data.stream_position()?;
Ok(offsets)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ object CometMetricNode {

def shuffleMetrics(sc: SparkContext): Map[String, SQLMetric] = {
Map(
"elapsed_compute" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle time"),
"elapsed_compute" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle writer time"),
"mempool_time" -> SQLMetrics.createNanoTimingMetric(sc, "memory pool time"),
"repart_time" -> SQLMetrics.createNanoTimingMetric(sc, "repartition time"),
"encode_time" -> SQLMetrics.createNanoTimingMetric(sc, "encoding and compression time"),
Expand Down

0 comments on commit f1ed927

Please sign in to comment.