Skip to content

Commit

Permalink
add input_batches metric
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Dec 17, 2024
1 parent a317fc5 commit 73b4f9b
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 9 deletions.
15 changes: 8 additions & 7 deletions native/core/src/execution/shuffle/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,9 @@ struct ShuffleRepartitionerMetrics {
/// Time encoding batches to IPC format
ipc_time: Time,

//other_time: Time,
/// Number of input batches
input_batches: Count,

/// count of spills during the execution of the operator
spill_count: Count,

Expand All @@ -664,14 +666,12 @@ struct ShuffleRepartitionerMetrics {

impl ShuffleRepartitionerMetrics {
fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
let input_time = MetricBuilder::new(metrics).subset_time("input_time", partition);
let write_time = MetricBuilder::new(metrics).subset_time("write_time", partition);
let ipc_time = MetricBuilder::new(metrics).subset_time("ipc_time", partition);
Self {
baseline: BaselineMetrics::new(metrics, partition),
input_time,
write_time,
ipc_time,
input_time: MetricBuilder::new(metrics).subset_time("input_time", partition),
write_time: MetricBuilder::new(metrics).subset_time("write_time", partition),
ipc_time: MetricBuilder::new(metrics).subset_time("ipc_time", partition),
input_batches: MetricBuilder::new(metrics).counter("input_batches", partition),
spill_count: MetricBuilder::new(metrics).spill_count(partition),
spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition),
data_size: MetricBuilder::new(metrics).counter("data_size", partition),
Expand Down Expand Up @@ -739,6 +739,7 @@ impl ShuffleRepartitioner {
self.partitioning_batch(batch).await?;
start = end;
}
self.metrics.input_batches.add(1);
self.metrics
.baseline
.elapsed_compute()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ object CometMetricNode {
"elapsed_compute" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle time"),
"ipc_time" -> SQLMetrics.createNanoTimingMetric(sc, "IPC encoding time"),
"input_time" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle input time"),
"input_batches" -> SQLMetrics.createMetric(sc, "number of input batches"),
"shuffleWallTime" -> SQLMetrics.createNanoTimingMetric(
sc,
"shuffle wall time (inclusive)"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,12 +482,12 @@ class CometShuffleWriteProcessor(
val nativePlan = getNativePlan(tempDataFilename, tempIndexFilename)

// these metrics are only reported when detailed metrics are enabled via config
val detailedMetrics = Seq("elapsed_compute", "input_time", "ipc_time")
val detailedMetrics = Seq("elapsed_compute", "input_time", "ipc_time", "input_batches")

// Maps native metrics to SQL metrics
val nativeSQLMetrics = Map(
"data_size" -> metrics("dataSize"),
"output_rows" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN),
"data_size" -> metrics("dataSize"),
"write_time" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)) ++
metrics.filterKeys(detailedMetrics.contains)
val nativeMetrics = CometMetricNode(nativeSQLMetrics)
Expand Down

0 comments on commit 73b4f9b

Please sign in to comment.