From 3c34653cc453d5ce15ca05d74ffa0db4ffb7f98b Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 16 Dec 2024 13:00:17 -0500 Subject: [PATCH 1/4] Add metrics from ParquetFileMetrics and FileStreamMetrics on native to CometNativeScanExec. --- .../spark/sql/comet/CometNativeScanExec.scala | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala index fd5afdb80..729778f1d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.types._ import org.apache.spark.util.collection._ @@ -71,6 +72,81 @@ case class CometNativeScanExec( } override def hashCode(): Int = Objects.hashCode(output) + + override lazy val metrics: Map[String, SQLMetric] = { + Map( + "time_elapsed_opening" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Wall clock time elapsed for file opening"), + "time_elapsed_scanning_until_data" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Wall clock time elapsed for file scanning +" + + "first record batch of decompression + decoding"), + "time_elapsed_scanning_total" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Total elapsed wall clock time for for scanning + record batch decompression / decoding"), + "time_elapsed_processing" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Wall clock time elapsed for data decompression + decoding"), + "file_open_errors" -> + SQLMetrics.createMetric(sparkContext, "Count of errors opening file"), + "file_scan_errors" -> + SQLMetrics.createMetric(sparkContext, "Count of errors scanning file"), + "predicate_evaluation_errors" -> + SQLMetrics.createMetric( + sparkContext, + "Number of times the predicate could not be evaluated"), + "row_groups_matched_bloom_filter" -> + SQLMetrics.createMetric( + sparkContext, + "Number of row groups whose bloom filters were checked and matched (not pruned)"), + "row_groups_pruned_bloom_filter" -> + SQLMetrics.createMetric(sparkContext, "Number of row groups pruned by bloom filters"), + "row_groups_matched_statistics" -> + SQLMetrics.createMetric( + sparkContext, + "Number of row groups whose statistics were checked and matched (not pruned)"), + "row_groups_pruned_statistics" -> + SQLMetrics.createMetric(sparkContext, "Number of row groups pruned by statistics"), + "bytes_scanned" -> + SQLMetrics.createSizeMetric(sparkContext, "Total number of bytes scanned"), + "pushdown_rows_pruned" -> + SQLMetrics.createMetric( + sparkContext, + "Total rows filtered out by predicates pushed into parquet scan"), + "pushdown_rows_matched" -> + SQLMetrics.createMetric( + sparkContext, + "Total rows passed predicates pushed into parquet scan"), + "row_pushdown_eval_time" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Total time spent evaluating row-level pushdown filters"), + "statistics_eval_time" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Total time spent evaluating row group-level statistics filters"), + "bloom_filter_eval_time" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Total time spent evaluating row group Bloom Filters"), + "page_index_rows_pruned" -> + SQLMetrics.createMetric(sparkContext, "Total rows filtered out by parquet page index"), + "page_index_rows_matched" -> + SQLMetrics.createMetric(sparkContext, "Total rows passed through the parquet page index"), + "page_index_eval_time" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Total time spent evaluating parquet page index filters"), + "metadata_load_time" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Total time spent reading and parsing metadata from the footer")) + } } object CometNativeScanExec extends DataTypeSupport { From 0680bc358333afeffefa9ad4395a022a8085f3f1 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 16 Dec 2024 16:52:30 -0500 Subject: [PATCH 2/4] Append to base metrics. --- .../spark/sql/comet/CometNativeScanExec.scala | 148 +++++++++--------- 1 file changed, 76 insertions(+), 72 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala index 729778f1d..b0ab4fc2a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala @@ -74,78 +74,82 @@ case class CometNativeScanExec( override def hashCode(): Int = Objects.hashCode(output) override lazy val metrics: Map[String, SQLMetric] = { - Map( - "time_elapsed_opening" -> - SQLMetrics.createNanoTimingMetric( - sparkContext, - "Wall clock time elapsed for file opening"), - "time_elapsed_scanning_until_data" -> - SQLMetrics.createNanoTimingMetric( - sparkContext, - "Wall clock time elapsed for file scanning +" + - "first record batch of decompression + decoding"), - "time_elapsed_scanning_total" -> - SQLMetrics.createNanoTimingMetric( - sparkContext, - "Total elapsed wall clock time for for scanning + record batch decompression / decoding"), - "time_elapsed_processing" -> - SQLMetrics.createNanoTimingMetric( - sparkContext, - "Wall clock time elapsed for data decompression + decoding"), - "file_open_errors" -> - SQLMetrics.createMetric(sparkContext, "Count of errors opening file"), - "file_scan_errors" -> - SQLMetrics.createMetric(sparkContext, "Count of errors scanning file"), - "predicate_evaluation_errors" -> - SQLMetrics.createMetric( - sparkContext, - "Number of times the predicate could not be evaluated"), - "row_groups_matched_bloom_filter" -> - SQLMetrics.createMetric( - sparkContext, - "Number of row groups whose bloom filters were checked and matched (not pruned)"), - "row_groups_pruned_bloom_filter" -> - SQLMetrics.createMetric(sparkContext, "Number of row groups pruned by bloom filters"), - "row_groups_matched_statistics" -> - SQLMetrics.createMetric( - sparkContext, - "Number of row groups whose statistics were checked and matched (not pruned)"), - "row_groups_pruned_statistics" -> - SQLMetrics.createMetric(sparkContext, "Number of row groups pruned by statistics"), - "bytes_scanned" -> - SQLMetrics.createSizeMetric(sparkContext, "Total number of bytes scanned"), - "pushdown_rows_pruned" -> - SQLMetrics.createMetric( - sparkContext, - "Total rows filtered out by predicates pushed into parquet scan"), - "pushdown_rows_matched" -> - SQLMetrics.createMetric( - sparkContext, - "Total rows passed predicates pushed into parquet scan"), - "row_pushdown_eval_time" -> - SQLMetrics.createNanoTimingMetric( - sparkContext, - "Total time spent evaluating row-level pushdown filters"), - "statistics_eval_time" -> - SQLMetrics.createNanoTimingMetric( - sparkContext, - "Total time spent evaluating row group-level statistics filters"), - "bloom_filter_eval_time" -> - SQLMetrics.createNanoTimingMetric( - sparkContext, - "Total time spent evaluating row group Bloom Filters"), - "page_index_rows_pruned" -> - SQLMetrics.createMetric(sparkContext, "Total rows filtered out by parquet page index"), - "page_index_rows_matched" -> - SQLMetrics.createMetric(sparkContext, "Total rows passed through the parquet page index"), - "page_index_eval_time" -> - SQLMetrics.createNanoTimingMetric( - sparkContext, - "Total time spent evaluating parquet page index filters"), - "metadata_load_time" -> - SQLMetrics.createNanoTimingMetric( - sparkContext, - "Total time spent reading and parsing metadata from the footer")) + CometMetricNode.baselineMetrics(sparkContext) ++ + Map( + "time_elapsed_opening" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Wall clock time elapsed for file opening"), + "time_elapsed_scanning_until_data" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Wall clock time elapsed for file scanning +" + + "first record batch of decompression + decoding"), + "time_elapsed_scanning_total" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Total elapsed wall clock time for for scanning " + + "+ record batch decompression / decoding"), + "time_elapsed_processing" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Wall clock time elapsed for data decompression + decoding"), + "file_open_errors" -> + SQLMetrics.createMetric(sparkContext, "Count of errors opening file"), + "file_scan_errors" -> + SQLMetrics.createMetric(sparkContext, "Count of errors scanning file"), + "predicate_evaluation_errors" -> + SQLMetrics.createMetric( + sparkContext, + "Number of times the predicate could not be evaluated"), + "row_groups_matched_bloom_filter" -> + SQLMetrics.createMetric( + sparkContext, + "Number of row groups whose bloom filters were checked and matched (not pruned)"), + "row_groups_pruned_bloom_filter" -> + SQLMetrics.createMetric(sparkContext, "Number of row groups pruned by bloom filters"), + "row_groups_matched_statistics" -> + SQLMetrics.createMetric( + sparkContext, + "Number of row groups whose statistics were checked and matched (not pruned)"), + "row_groups_pruned_statistics" -> + SQLMetrics.createMetric(sparkContext, "Number of row groups pruned by statistics"), + "bytes_scanned" -> + SQLMetrics.createSizeMetric(sparkContext, "Total number of bytes scanned"), + "pushdown_rows_pruned" -> + SQLMetrics.createMetric( + sparkContext, + "Total rows filtered out by predicates pushed into parquet scan"), + "pushdown_rows_matched" -> + SQLMetrics.createMetric( + sparkContext, + "Total rows passed predicates pushed into parquet scan"), + "row_pushdown_eval_time" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Total time spent evaluating row-level pushdown filters"), + "statistics_eval_time" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Total time spent evaluating row group-level statistics filters"), + "bloom_filter_eval_time" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Total time spent evaluating row group Bloom Filters"), + "page_index_rows_pruned" -> + SQLMetrics.createMetric(sparkContext, "Total rows filtered out by parquet page index"), + "page_index_rows_matched" -> + SQLMetrics.createMetric( + sparkContext, + "Total rows passed through the parquet page index"), + "page_index_eval_time" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Total time spent evaluating parquet page index filters"), + "metadata_load_time" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Total time spent reading and parsing metadata from the footer")) } } From b9170e5dbdb37c1c5ff7db2e9485319f7888320d Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 17 Dec 2024 10:09:18 -0500 Subject: [PATCH 3/4] Remove elapsed_compute from NativeScan metrics. --- .../spark/sql/comet/CometNativeScanExec.scala | 152 +++++++++--------- 1 file changed, 76 insertions(+), 76 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala index b0ab4fc2a..4685bd13a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala @@ -74,82 +74,82 @@ case class CometNativeScanExec( override def hashCode(): Int = Objects.hashCode(output) override lazy val metrics: Map[String, SQLMetric] = { - CometMetricNode.baselineMetrics(sparkContext) ++ - Map( - "time_elapsed_opening" -> - SQLMetrics.createNanoTimingMetric( - sparkContext, - "Wall clock time elapsed for file opening"), - "time_elapsed_scanning_until_data" -> - SQLMetrics.createNanoTimingMetric( - sparkContext, - "Wall clock time elapsed for file scanning +" + - "first record batch of decompression + decoding"), - "time_elapsed_scanning_total" -> - SQLMetrics.createNanoTimingMetric( - sparkContext, - "Total elapsed wall clock time for for scanning " + - "+ record batch decompression / decoding"), - "time_elapsed_processing" -> - SQLMetrics.createNanoTimingMetric( - sparkContext, - "Wall clock time elapsed for data decompression + decoding"), - "file_open_errors" -> - SQLMetrics.createMetric(sparkContext, "Count of errors opening file"), - "file_scan_errors" -> - SQLMetrics.createMetric(sparkContext, "Count of errors scanning file"), - "predicate_evaluation_errors" -> - SQLMetrics.createMetric( - sparkContext, - "Number of times the predicate could not be evaluated"), - "row_groups_matched_bloom_filter" -> - SQLMetrics.createMetric( - sparkContext, - "Number of row groups whose bloom filters were checked and matched (not pruned)"), - "row_groups_pruned_bloom_filter" -> - SQLMetrics.createMetric(sparkContext, "Number of row groups pruned by bloom filters"), - "row_groups_matched_statistics" -> - SQLMetrics.createMetric( - sparkContext, - "Number of row groups whose statistics were checked and matched (not pruned)"), - "row_groups_pruned_statistics" -> - SQLMetrics.createMetric(sparkContext, "Number of row groups pruned by statistics"), - "bytes_scanned" -> - SQLMetrics.createSizeMetric(sparkContext, "Total number of bytes scanned"), - "pushdown_rows_pruned" -> - SQLMetrics.createMetric( - sparkContext, - "Total rows filtered out by predicates pushed into parquet scan"), - "pushdown_rows_matched" -> - SQLMetrics.createMetric( - sparkContext, - "Total rows passed predicates pushed into parquet scan"), - "row_pushdown_eval_time" -> - SQLMetrics.createNanoTimingMetric( - sparkContext, - "Total time spent evaluating row-level pushdown filters"), - "statistics_eval_time" -> - SQLMetrics.createNanoTimingMetric( - sparkContext, - "Total time spent evaluating row group-level statistics filters"), - "bloom_filter_eval_time" -> - SQLMetrics.createNanoTimingMetric( - sparkContext, - "Total time spent evaluating row group Bloom Filters"), - "page_index_rows_pruned" -> - SQLMetrics.createMetric(sparkContext, "Total rows filtered out by parquet page index"), - "page_index_rows_matched" -> - SQLMetrics.createMetric( - sparkContext, - "Total rows passed through the parquet page index"), - "page_index_eval_time" -> - SQLMetrics.createNanoTimingMetric( - sparkContext, - "Total time spent evaluating parquet page index filters"), - "metadata_load_time" -> - SQLMetrics.createNanoTimingMetric( - sparkContext, - "Total time spent reading and parsing metadata from the footer")) + // We don't append CometMetricNode.baselineMetrics because + // elapsed_compute has no counterpart on the native side. + Map( + "output_rows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "time_elapsed_opening" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Wall clock time elapsed for file opening"), + "time_elapsed_scanning_until_data" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Wall clock time elapsed for file scanning +" + + "first record batch of decompression + decoding"), + "time_elapsed_scanning_total" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Total elapsed wall clock time for for scanning " + + "+ record batch decompression / decoding"), + "time_elapsed_processing" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Wall clock time elapsed for data decompression + decoding"), + "file_open_errors" -> + SQLMetrics.createMetric(sparkContext, "Count of errors opening file"), + "file_scan_errors" -> + SQLMetrics.createMetric(sparkContext, "Count of errors scanning file"), + "predicate_evaluation_errors" -> + SQLMetrics.createMetric( + sparkContext, + "Number of times the predicate could not be evaluated"), + "row_groups_matched_bloom_filter" -> + SQLMetrics.createMetric( + sparkContext, + "Number of row groups whose bloom filters were checked and matched (not pruned)"), + "row_groups_pruned_bloom_filter" -> + SQLMetrics.createMetric(sparkContext, "Number of row groups pruned by bloom filters"), + "row_groups_matched_statistics" -> + SQLMetrics.createMetric( + sparkContext, + "Number of row groups whose statistics were checked and matched (not pruned)"), + "row_groups_pruned_statistics" -> + SQLMetrics.createMetric(sparkContext, "Number of row groups pruned by statistics"), + "bytes_scanned" -> + SQLMetrics.createSizeMetric(sparkContext, "Total number of bytes scanned"), + "pushdown_rows_pruned" -> + SQLMetrics.createMetric( + sparkContext, + "Total rows filtered out by predicates pushed into parquet scan"), + "pushdown_rows_matched" -> + SQLMetrics.createMetric( + sparkContext, + "Total rows passed predicates pushed into parquet scan"), + "row_pushdown_eval_time" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Total time spent evaluating row-level pushdown filters"), + "statistics_eval_time" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Total time spent evaluating row group-level statistics filters"), + "bloom_filter_eval_time" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Total time spent evaluating row group Bloom Filters"), + "page_index_rows_pruned" -> + SQLMetrics.createMetric(sparkContext, "Total rows filtered out by parquet page index"), + "page_index_rows_matched" -> + SQLMetrics.createMetric(sparkContext, "Total rows passed through the parquet page index"), + "page_index_eval_time" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Total time spent evaluating parquet page index filters"), + "metadata_load_time" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Total time spent reading and parsing metadata from the footer")) } } From d4f359f4e58dc2e0479b418211f9a1c3c765ec97 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 17 Dec 2024 10:29:15 -0500 Subject: [PATCH 4/4] Add Native prefix for relevant metrics. --- .../spark/sql/comet/CometNativeScanExec.scala | 50 +++++++++++-------- 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala index 4685bd13a..2e5251398 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala @@ -81,75 +81,83 @@ case class CometNativeScanExec( "time_elapsed_opening" -> SQLMetrics.createNanoTimingMetric( sparkContext, - "Wall clock time elapsed for file opening"), + "Native: Wall clock time elapsed for file opening"), "time_elapsed_scanning_until_data" -> SQLMetrics.createNanoTimingMetric( sparkContext, - "Wall clock time elapsed for file scanning +" + + "Native: Wall clock time elapsed for file scanning +" + "first record batch of decompression + decoding"), "time_elapsed_scanning_total" -> SQLMetrics.createNanoTimingMetric( sparkContext, - "Total elapsed wall clock time for for scanning " + + "Native: Total elapsed wall clock time for for scanning " + "+ record batch decompression / decoding"), "time_elapsed_processing" -> SQLMetrics.createNanoTimingMetric( sparkContext, - "Wall clock time elapsed for data decompression + decoding"), + "Native: Wall clock time elapsed for data decompression + decoding"), "file_open_errors" -> - SQLMetrics.createMetric(sparkContext, "Count of errors opening file"), + SQLMetrics.createMetric(sparkContext, "Native: Count of errors opening file"), "file_scan_errors" -> - SQLMetrics.createMetric(sparkContext, "Count of errors scanning file"), + SQLMetrics.createMetric(sparkContext, "Native: Count of errors scanning file"), "predicate_evaluation_errors" -> SQLMetrics.createMetric( sparkContext, - "Number of times the predicate could not be evaluated"), + "Native: Number of times the predicate could not be evaluated"), "row_groups_matched_bloom_filter" -> SQLMetrics.createMetric( sparkContext, - "Number of row groups whose bloom filters were checked and matched (not pruned)"), + "Native: Number of row groups whose bloom filters were checked and matched (not pruned)"), "row_groups_pruned_bloom_filter" -> - SQLMetrics.createMetric(sparkContext, "Number of row groups pruned by bloom filters"), + SQLMetrics.createMetric( + sparkContext, + "Native: Number of row groups pruned by bloom filters"), "row_groups_matched_statistics" -> SQLMetrics.createMetric( sparkContext, - "Number of row groups whose statistics were checked and matched (not pruned)"), + "Native: Number of row groups whose statistics were checked and matched (not pruned)"), "row_groups_pruned_statistics" -> - SQLMetrics.createMetric(sparkContext, "Number of row groups pruned by statistics"), + SQLMetrics.createMetric( + sparkContext, + "Native: Number of row groups pruned by statistics"), "bytes_scanned" -> - SQLMetrics.createSizeMetric(sparkContext, "Total number of bytes scanned"), + SQLMetrics.createSizeMetric(sparkContext, "Native: Total number of bytes scanned"), "pushdown_rows_pruned" -> SQLMetrics.createMetric( sparkContext, - "Total rows filtered out by predicates pushed into parquet scan"), + "Native: Total rows filtered out by predicates pushed into parquet scan"), "pushdown_rows_matched" -> SQLMetrics.createMetric( sparkContext, - "Total rows passed predicates pushed into parquet scan"), + "Native: Total rows passed predicates pushed into parquet scan"), "row_pushdown_eval_time" -> SQLMetrics.createNanoTimingMetric( sparkContext, - "Total time spent evaluating row-level pushdown filters"), + "Native: Total time spent evaluating row-level pushdown filters"), "statistics_eval_time" -> SQLMetrics.createNanoTimingMetric( sparkContext, - "Total time spent evaluating row group-level statistics filters"), + "Native: Total time spent evaluating row group-level statistics filters"), "bloom_filter_eval_time" -> SQLMetrics.createNanoTimingMetric( sparkContext, - "Total time spent evaluating row group Bloom Filters"), + "Native: Total time spent evaluating row group Bloom Filters"), "page_index_rows_pruned" -> - SQLMetrics.createMetric(sparkContext, "Total rows filtered out by parquet page index"), + SQLMetrics.createMetric( + sparkContext, + "Native: Total rows filtered out by parquet page index"), "page_index_rows_matched" -> - SQLMetrics.createMetric(sparkContext, "Total rows passed through the parquet page index"), + SQLMetrics.createMetric( + sparkContext, + "Native: Total rows passed through the parquet page index"), "page_index_eval_time" -> SQLMetrics.createNanoTimingMetric( sparkContext, - "Total time spent evaluating parquet page index filters"), + "Native: Total time spent evaluating parquet page index filters"), "metadata_load_time" -> SQLMetrics.createNanoTimingMetric( sparkContext, - "Total time spent reading and parsing metadata from the footer")) + "Native: Total time spent reading and parsing metadata from the footer")) } }