Skip to content

Commit

Permalink
Add metrics from ParquetFileMetrics and FileStreamMetrics on native t…
Browse files Browse the repository at this point in the history
…o CometNativeScanExec.
  • Loading branch information
mbutrovich committed Dec 16, 2024
1 parent 8563edf commit 3c34653
Showing 1 changed file with 76 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.types._
import org.apache.spark.util.collection._

Expand Down Expand Up @@ -71,6 +72,81 @@ case class CometNativeScanExec(
}

override def hashCode(): Int = Objects.hashCode(output)

override lazy val metrics: Map[String, SQLMetric] = {
Map(
"time_elapsed_opening" ->
SQLMetrics.createNanoTimingMetric(
sparkContext,
"Wall clock time elapsed for file opening"),
"time_elapsed_scanning_until_data" ->
SQLMetrics.createNanoTimingMetric(
sparkContext,
"Wall clock time elapsed for file scanning +" +
"first record batch of decompression + decoding"),
"time_elapsed_scanning_total" ->
SQLMetrics.createNanoTimingMetric(
sparkContext,
"Total elapsed wall clock time for for scanning + record batch decompression / decoding"),
"time_elapsed_processing" ->
SQLMetrics.createNanoTimingMetric(
sparkContext,
"Wall clock time elapsed for data decompression + decoding"),
"file_open_errors" ->
SQLMetrics.createMetric(sparkContext, "Count of errors opening file"),
"file_scan_errors" ->
SQLMetrics.createMetric(sparkContext, "Count of errors scanning file"),
"predicate_evaluation_errors" ->
SQLMetrics.createMetric(
sparkContext,
"Number of times the predicate could not be evaluated"),
"row_groups_matched_bloom_filter" ->
SQLMetrics.createMetric(
sparkContext,
"Number of row groups whose bloom filters were checked and matched (not pruned)"),
"row_groups_pruned_bloom_filter" ->
SQLMetrics.createMetric(sparkContext, "Number of row groups pruned by bloom filters"),
"row_groups_matched_statistics" ->
SQLMetrics.createMetric(
sparkContext,
"Number of row groups whose statistics were checked and matched (not pruned)"),
"row_groups_pruned_statistics" ->
SQLMetrics.createMetric(sparkContext, "Number of row groups pruned by statistics"),
"bytes_scanned" ->
SQLMetrics.createSizeMetric(sparkContext, "Total number of bytes scanned"),
"pushdown_rows_pruned" ->
SQLMetrics.createMetric(
sparkContext,
"Total rows filtered out by predicates pushed into parquet scan"),
"pushdown_rows_matched" ->
SQLMetrics.createMetric(
sparkContext,
"Total rows passed predicates pushed into parquet scan"),
"row_pushdown_eval_time" ->
SQLMetrics.createNanoTimingMetric(
sparkContext,
"Total time spent evaluating row-level pushdown filters"),
"statistics_eval_time" ->
SQLMetrics.createNanoTimingMetric(
sparkContext,
"Total time spent evaluating row group-level statistics filters"),
"bloom_filter_eval_time" ->
SQLMetrics.createNanoTimingMetric(
sparkContext,
"Total time spent evaluating row group Bloom Filters"),
"page_index_rows_pruned" ->
SQLMetrics.createMetric(sparkContext, "Total rows filtered out by parquet page index"),
"page_index_rows_matched" ->
SQLMetrics.createMetric(sparkContext, "Total rows passed through the parquet page index"),
"page_index_eval_time" ->
SQLMetrics.createNanoTimingMetric(
sparkContext,
"Total time spent evaluating parquet page index filters"),
"metadata_load_time" ->
SQLMetrics.createNanoTimingMetric(
sparkContext,
"Total time spent reading and parsing metadata from the footer"))
}
}

object CometNativeScanExec extends DataTypeSupport {
Expand Down

0 comments on commit 3c34653

Please sign in to comment.