From 3c34653cc453d5ce15ca05d74ffa0db4ffb7f98b Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 16 Dec 2024 13:00:17 -0500 Subject: [PATCH] Add metrics from ParquetFileMetrics and FileStreamMetrics on native to CometNativeScanExec. --- .../spark/sql/comet/CometNativeScanExec.scala | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala index fd5afdb80..729778f1d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.types._ import org.apache.spark.util.collection._ @@ -71,6 +72,81 @@ case class CometNativeScanExec( } override def hashCode(): Int = Objects.hashCode(output) + + override lazy val metrics: Map[String, SQLMetric] = { + Map( + "time_elapsed_opening" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Wall clock time elapsed for file opening"), + "time_elapsed_scanning_until_data" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Wall clock time elapsed for file scanning +" + + "first record batch of decompression + decoding"), + "time_elapsed_scanning_total" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Total elapsed wall clock time for for scanning + record batch decompression / decoding"), + "time_elapsed_processing" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Wall clock time elapsed for data decompression + decoding"), + "file_open_errors" -> + SQLMetrics.createMetric(sparkContext, "Count of errors opening file"), + "file_scan_errors" -> + SQLMetrics.createMetric(sparkContext, "Count of errors scanning file"), + "predicate_evaluation_errors" -> + SQLMetrics.createMetric( + sparkContext, + "Number of times the predicate could not be evaluated"), + "row_groups_matched_bloom_filter" -> + SQLMetrics.createMetric( + sparkContext, + "Number of row groups whose bloom filters were checked and matched (not pruned)"), + "row_groups_pruned_bloom_filter" -> + SQLMetrics.createMetric(sparkContext, "Number of row groups pruned by bloom filters"), + "row_groups_matched_statistics" -> + SQLMetrics.createMetric( + sparkContext, + "Number of row groups whose statistics were checked and matched (not pruned)"), + "row_groups_pruned_statistics" -> + SQLMetrics.createMetric(sparkContext, "Number of row groups pruned by statistics"), + "bytes_scanned" -> + SQLMetrics.createSizeMetric(sparkContext, "Total number of bytes scanned"), + "pushdown_rows_pruned" -> + SQLMetrics.createMetric( + sparkContext, + "Total rows filtered out by predicates pushed into parquet scan"), + "pushdown_rows_matched" -> + SQLMetrics.createMetric( + sparkContext, + "Total rows passed predicates pushed into parquet scan"), + "row_pushdown_eval_time" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Total time spent evaluating row-level pushdown filters"), + "statistics_eval_time" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Total time spent evaluating row group-level statistics filters"), + "bloom_filter_eval_time" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Total time spent evaluating row group Bloom Filters"), + "page_index_rows_pruned" -> + SQLMetrics.createMetric(sparkContext, "Total rows filtered out by parquet page index"), + "page_index_rows_matched" -> + SQLMetrics.createMetric(sparkContext, "Total rows passed through the parquet page index"), + "page_index_eval_time" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Total time spent evaluating parquet page index filters"), + "metadata_load_time" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Total time spent reading and parsing metadata from the footer")) + } } object CometNativeScanExec extends DataTypeSupport {