Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add additional metrics for shuffle write #1173

Closed
wants to merge 12 commits into from
Closed
Prev Previous commit
Next Next commit
Ready for review
andygrove committed Dec 16, 2024
commit b26322e1d15214a66dc95efc4b4a335a2879bde4
9 changes: 9 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
@@ -45,6 +45,9 @@ import org.apache.comet.shims.ShimCometConf
*/
object CometConf extends ShimCometConf {

private val METRICS_GUIDE = "For more information, refer to the Comet Metrics " +
"Guide (https://datafusion.apache.org/comet/user-guide/metrics.html)"

private val TUNING_GUIDE = "For more information, refer to the Comet Tuning " +
"Guide (https://datafusion.apache.org/comet/user-guide/tuning.html)"

@@ -414,6 +417,12 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_ENABLE_DETAILED_METRICS: ConfigEntry[Boolean] =
conf("spark.comet.metrics.detailed")
.doc(s"Enable this option to see additional SQL metrics. $METRICS_GUIDE.")
.booleanConf
.createWithDefault(false)

val COMET_EXPLAIN_FALLBACK_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.explainFallback.enabled")
.doc(
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
@@ -64,6 +64,7 @@ Comet provides the following configuration settings.
| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false |
| spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional non-heap memory per executor process for Comet. | 0.2 |
| spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b |
| spark.comet.metrics.detailed | Enable this option to see additional SQL metrics. For more information, refer to the Comet Metrics Guide (https://datafusion.apache.org/comet/user-guide/metrics.html). | false |
| spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false |
| spark.comet.parquet.enable.directBuffer | Whether to use Java direct byte buffer when reading Parquet. | false |
| spark.comet.parquet.read.io.adjust.readRange.skew | In the parallel reader, if the read ranges submitted are skewed in sizes, this option will cause the reader to break up larger read ranges into smaller ranges to reduce the skew. This will result in a slightly larger number of connections opened to the file system but may give improved performance. | false |
2 changes: 1 addition & 1 deletion docs/source/user-guide/metrics.md
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@ under the License.

## Spark SQL Metrics

Some Comet metrics are not directly comparable to Spark metrics in some cases:
Set `spark.comet.metrics.detailed=true` to see all available Comet metrics.

### CometScanExec

1 change: 0 additions & 1 deletion native/core/src/execution/metrics/utils.rs
Original file line number Diff line number Diff line change
@@ -73,7 +73,6 @@ pub fn update_comet_metric(
comet_metric_node(metric_node).get_child_node(i as i32) -> JObject
)?;
if child_metric_node.is_null() {
println!("Missing JVM metric node for {}", child_plan.plan_id);
continue;
}
update_comet_metric(env, &child_metric_node, child_plan, metrics_jstrings)?;
2 changes: 1 addition & 1 deletion native/core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
@@ -75,7 +75,7 @@ pub struct ScanExec {
/// Metrics collector
metrics: ExecutionPlanMetricsSet,
/// Baseline metrics
baseline_metrics: BaselineMetrics,
pub(crate) baseline_metrics: BaselineMetrics,
/// Time waiting for JVM input plan to execute and return batches
jvm_fetch_time: Time,
/// Time spent in FFI
9 changes: 9 additions & 0 deletions native/core/src/execution/shuffle/shuffle_writer.rs
Original file line number Diff line number Diff line change
@@ -60,6 +60,7 @@ use std::{
task::{Context, Poll},
};

use crate::execution::operators::ScanExec;
use crate::{
common::bit::ceil,
errors::{CometError, CometResult},
@@ -139,6 +140,14 @@ impl ExecutionPlan for ShuffleWriterExec {
) -> Result<SendableRecordBatchStream> {
let metrics = ShuffleRepartitionerMetrics::new(&self.metrics, 0);

if let Some(scan) = self.input.as_any().downcast_ref::<ScanExec>() {
// ScanExec starts executing and fetching data during query planning time so we
// need to capture that time here to ensure that we have accurate metrics
metrics
.input_time
.add(scan.baseline_metrics.elapsed_compute());
}

// execute the child plan
let start_time = Instant::now();
let input = self.input.execute(partition, Arc::clone(&context))?;
Original file line number Diff line number Diff line change
@@ -24,6 +24,8 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}

import org.apache.comet.CometConf

/**
* A node carrying SQL metrics from SparkPlan, and metrics of its children. Native code will call
* [[getChildNode]] and [[set]] to update the metrics.
@@ -131,12 +133,16 @@ object CometMetricNode {
}

def shuffleMetrics(sc: SparkContext): Map[String, SQLMetric] = {
Map(
"elapsed_compute" -> SQLMetrics.createNanoTimingMetric(
sc,
"native shuffle time"),
"input_time" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle input time"),
"shuffleWallTime" -> SQLMetrics.createNanoTimingMetric(sc, "shuffle wall time (inclusive)"))
if (CometConf.COMET_ENABLE_DETAILED_METRICS.get()) {
Map(
"elapsed_compute" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle time"),
"input_time" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle input time"),
"shuffleWallTime" -> SQLMetrics.createNanoTimingMetric(
sc,
"shuffle wall time (inclusive)"))
} else {
Map("elapsed_compute" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle time"))
}
}

/**
Original file line number Diff line number Diff line change
@@ -483,11 +483,10 @@ class CometShuffleWriteProcessor(

// Maps native metrics to SQL metrics
val nativeSQLMetrics = Map(
"output_rows" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN),
"data_size" -> metrics("dataSize"),
"elapsed_compute" -> metrics("elapsed_compute"),
"input_time" -> metrics("input_time"),
"write_time" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME))
"output_rows" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN),
"write_time" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)) ++
metrics.filterKeys(Seq("elapsed_compute", "input_time").contains)
val nativeMetrics = CometMetricNode(nativeSQLMetrics)

// Getting rid of the fake partitionId
@@ -532,7 +531,8 @@ class CometShuffleWriteProcessor(
Array.empty, // TODO: add checksums
tempDataFilePath.toFile)

metrics("shuffleWallTime").add(System.nanoTime() - startTime)
// update wall time metric if available
metrics.get("shuffleWallTime").foreach(_.add(System.nanoTime() - startTime))

MapStatus.apply(SparkEnv.get.blockManager.shuffleServerId, partitionLengths, mapId)
}
Loading