Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Improve shuffle metrics (second attempt) #1175

Merged
merged 6 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ as a native runtime to achieve improvement in terms of query efficiency and quer
Configuration Settings <user-guide/configs>
Compatibility Guide <user-guide/compatibility>
Tuning Guide <user-guide/tuning>
Metrics Guide <user-guide/metrics>

.. _toc.contributor-guide-links:
.. toctree::
Expand Down
66 changes: 66 additions & 0 deletions docs/source/user-guide/metrics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Comet Metrics

## Spark SQL Metrics

Set `spark.comet.metrics.detailed=true` to see all available Comet metrics.

### CometScanExec

| Metric | Description |
| ----------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `scan time` | Total time to scan a Parquet file. This is not comparable to the same metric in Spark because Comet's scan metric is more accurate. Although both Comet and Spark measure the time in nanoseconds, Spark rounds this time to the nearest millisecond per batch and Comet does not. |

### Exchange

Comet adds some additional metrics:

| Metric | Description |
| ------------------------------- | ------------------------------------------------------------- |
| `native shuffle time` | Total time in native code excluding any child operators. |
| `repartition time` | Time to repartition batches. |
| `memory pool time` | Time interacting with memory pool. |
| `encoding and compression time` | Time to encode batches in IPC format and compress using ZSTD. |

## Native Metrics

Setting `spark.comet.explain.native.enabled=true` will cause native plans to be logged in each executor. Metrics are
logged for each native plan (and there is one plan per task, so this is very verbose).

Here is a guide to some of the native metrics.

### ScanExec

| Metric | Description |
| ----------------- | --------------------------------------------------------------------------------------------------- |
| `elapsed_compute` | Total time spent in this operator, fetching batches from a JVM iterator. |
| `jvm_fetch_time` | Time spent in the JVM fetching input batches to be read by this `ScanExec` instance. |
| `arrow_ffi_time` | Time spent using Arrow FFI to create Arrow batches from the memory addresses returned from the JVM. |

### ShuffleWriterExec

| Metric | Description |
| ----------------- | ------------------------------------------------------------- |
| `elapsed_compute` | Total time excluding any child operators. |
| `repart_time` | Time to repartition batches. |
| `ipc_time` | Time to encode batches in IPC format and compress using ZSTD. |
Copy link
Contributor

@mbutrovich mbutrovich Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add mempool_time and repart_time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added. Thanks.

| `mempool_time` | Time interacting with memory pool. |
| `write_time` | Time spent writing bytes to disk. |
25 changes: 0 additions & 25 deletions docs/source/user-guide/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,31 +103,6 @@ native shuffle currently only supports `HashPartitioning` and `SinglePartitionin
To enable native shuffle, set `spark.comet.exec.shuffle.mode` to `native`. If this mode is explicitly set,
then any shuffle operations that cannot be supported in this mode will fall back to Spark.

## Metrics

### Spark SQL Metrics

Some Comet metrics are not directly comparable to Spark metrics in some cases:

- `CometScanExec` uses nanoseconds for total scan time. Spark also measures scan time in nanoseconds but converts to
milliseconds _per batch_ which can result in a large loss of precision, making it difficult to compare scan times
between Spark and Comet.

### Native Metrics

Setting `spark.comet.explain.native.enabled=true` will cause native plans to be logged in each executor. Metrics are
logged for each native plan (and there is one plan per task, so this is very verbose).

Here is a guide to some of the native metrics.

### ScanExec

| Metric | Description |
| ----------------- | --------------------------------------------------------------------------------------------------- |
| `elapsed_compute` | Total time spent in this operator, fetching batches from a JVM iterator. |
| `jvm_fetch_time` | Time spent in the JVM fetching input batches to be read by this `ScanExec` instance. |
| `arrow_ffi_time` | Time spent using Arrow FFI to create Arrow batches from the memory addresses returned from the JVM. |

## Explain Plan
### Extended Explain
With Spark 4.0.0 and newer, Comet can provide extended explain plan information in the Spark UI. Currently this lists
Expand Down
6 changes: 5 additions & 1 deletion native/core/src/execution/shuffle/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use arrow_array::{
Array, ArrayRef, RecordBatch, RecordBatchOptions,
};
use arrow_schema::{ArrowError, DataType, Field, Schema, TimeUnit};
use datafusion::physical_plan::metrics::Time;
use jni::sys::{jint, jlong};
use std::{
fs::OpenOptions,
Expand Down Expand Up @@ -3354,7 +3355,10 @@ pub fn process_sorted_row_partition(
let mut frozen: Vec<u8> = vec![];
let mut cursor = Cursor::new(&mut frozen);
cursor.seek(SeekFrom::End(0))?;
written += write_ipc_compressed(&batch, &mut cursor)?;

// we do not collect metrics in Native_writeSortedFileNative
let ipc_time = Time::default();
written += write_ipc_compressed(&batch, &mut cursor, &ipc_time)?;

if let Some(checksum) = &mut current_checksum {
checksum.update(&mut cursor)?;
Expand Down
Loading
Loading