diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala index fd5afdb80..2e5251398 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.types._ import org.apache.spark.util.collection._ @@ -71,6 +72,93 @@ case class CometNativeScanExec( } override def hashCode(): Int = Objects.hashCode(output) + + override lazy val metrics: Map[String, SQLMetric] = { + // We don't append CometMetricNode.baselineMetrics because + // elapsed_compute has no counterpart on the native side. + Map( + "output_rows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "time_elapsed_opening" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Native: Wall clock time elapsed for file opening"), + "time_elapsed_scanning_until_data" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Native: Wall clock time elapsed for file scanning +" + + "first record batch of decompression + decoding"), + "time_elapsed_scanning_total" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Native: Total elapsed wall clock time for for scanning " + + "+ record batch decompression / decoding"), + "time_elapsed_processing" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Native: Wall clock time elapsed for data decompression + decoding"), + "file_open_errors" -> + SQLMetrics.createMetric(sparkContext, "Native: Count of errors opening file"), + "file_scan_errors" -> + SQLMetrics.createMetric(sparkContext, "Native: Count of errors scanning file"), + "predicate_evaluation_errors" -> + SQLMetrics.createMetric( + sparkContext, + "Native: Number of times the predicate could not be evaluated"), + "row_groups_matched_bloom_filter" -> + SQLMetrics.createMetric( + sparkContext, + "Native: Number of row groups whose bloom filters were checked and matched (not pruned)"), + "row_groups_pruned_bloom_filter" -> + SQLMetrics.createMetric( + sparkContext, + "Native: Number of row groups pruned by bloom filters"), + "row_groups_matched_statistics" -> + SQLMetrics.createMetric( + sparkContext, + "Native: Number of row groups whose statistics were checked and matched (not pruned)"), + "row_groups_pruned_statistics" -> + SQLMetrics.createMetric( + sparkContext, + "Native: Number of row groups pruned by statistics"), + "bytes_scanned" -> + SQLMetrics.createSizeMetric(sparkContext, "Native: Total number of bytes scanned"), + "pushdown_rows_pruned" -> + SQLMetrics.createMetric( + sparkContext, + "Native: Total rows filtered out by predicates pushed into parquet scan"), + "pushdown_rows_matched" -> + SQLMetrics.createMetric( + sparkContext, + "Native: Total rows passed predicates pushed into parquet scan"), + "row_pushdown_eval_time" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Native: Total time spent evaluating row-level pushdown filters"), + "statistics_eval_time" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Native: Total time spent evaluating row group-level statistics filters"), + "bloom_filter_eval_time" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Native: Total time spent evaluating row group Bloom Filters"), + "page_index_rows_pruned" -> + SQLMetrics.createMetric( + sparkContext, + "Native: Total rows filtered out by parquet page index"), + "page_index_rows_matched" -> + SQLMetrics.createMetric( + sparkContext, + "Native: Total rows passed through the parquet page index"), + "page_index_eval_time" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Native: Total time spent evaluating parquet page index filters"), + "metadata_load_time" -> + SQLMetrics.createNanoTimingMetric( + sparkContext, + "Native: Total time spent reading and parsing metadata from the footer")) + } } object CometNativeScanExec extends DataTypeSupport {