Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[comet-parquet-exec] CometNativeScan metrics from ParquetFileMetrics and FileStreamMetrics #1172

Open
wants to merge 4 commits into
base: comet-parquet-exec
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.types._
import org.apache.spark.util.collection._

Expand Down Expand Up @@ -71,6 +72,93 @@ case class CometNativeScanExec(
}

override def hashCode(): Int = Objects.hashCode(output)

override lazy val metrics: Map[String, SQLMetric] = {
// We don't append CometMetricNode.baselineMetrics because
// elapsed_compute has no counterpart on the native side.
Map(
"output_rows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"time_elapsed_opening" ->
SQLMetrics.createNanoTimingMetric(
sparkContext,
"Native: Wall clock time elapsed for file opening"),
"time_elapsed_scanning_until_data" ->
SQLMetrics.createNanoTimingMetric(
sparkContext,
"Native: Wall clock time elapsed for file scanning +" +
"first record batch of decompression + decoding"),
"time_elapsed_scanning_total" ->
SQLMetrics.createNanoTimingMetric(
sparkContext,
"Native: Total elapsed wall clock time for for scanning " +
"+ record batch decompression / decoding"),
"time_elapsed_processing" ->
SQLMetrics.createNanoTimingMetric(
sparkContext,
"Native: Wall clock time elapsed for data decompression + decoding"),
"file_open_errors" ->
SQLMetrics.createMetric(sparkContext, "Native: Count of errors opening file"),
"file_scan_errors" ->
SQLMetrics.createMetric(sparkContext, "Native: Count of errors scanning file"),
"predicate_evaluation_errors" ->
SQLMetrics.createMetric(
sparkContext,
"Native: Number of times the predicate could not be evaluated"),
"row_groups_matched_bloom_filter" ->
SQLMetrics.createMetric(
sparkContext,
"Native: Number of row groups whose bloom filters were checked and matched (not pruned)"),
"row_groups_pruned_bloom_filter" ->
SQLMetrics.createMetric(
sparkContext,
"Native: Number of row groups pruned by bloom filters"),
"row_groups_matched_statistics" ->
SQLMetrics.createMetric(
sparkContext,
"Native: Number of row groups whose statistics were checked and matched (not pruned)"),
"row_groups_pruned_statistics" ->
SQLMetrics.createMetric(
sparkContext,
"Native: Number of row groups pruned by statistics"),
"bytes_scanned" ->
SQLMetrics.createSizeMetric(sparkContext, "Native: Total number of bytes scanned"),
"pushdown_rows_pruned" ->
SQLMetrics.createMetric(
sparkContext,
"Native: Total rows filtered out by predicates pushed into parquet scan"),
"pushdown_rows_matched" ->
SQLMetrics.createMetric(
sparkContext,
"Native: Total rows passed predicates pushed into parquet scan"),
"row_pushdown_eval_time" ->
SQLMetrics.createNanoTimingMetric(
sparkContext,
"Native: Total time spent evaluating row-level pushdown filters"),
"statistics_eval_time" ->
SQLMetrics.createNanoTimingMetric(
sparkContext,
"Native: Total time spent evaluating row group-level statistics filters"),
"bloom_filter_eval_time" ->
SQLMetrics.createNanoTimingMetric(
sparkContext,
"Native: Total time spent evaluating row group Bloom Filters"),
"page_index_rows_pruned" ->
SQLMetrics.createMetric(
sparkContext,
"Native: Total rows filtered out by parquet page index"),
"page_index_rows_matched" ->
SQLMetrics.createMetric(
sparkContext,
"Native: Total rows passed through the parquet page index"),
"page_index_eval_time" ->
SQLMetrics.createNanoTimingMetric(
sparkContext,
"Native: Total time spent evaluating parquet page index filters"),
"metadata_load_time" ->
SQLMetrics.createNanoTimingMetric(
sparkContext,
"Native: Total time spent reading and parsing metadata from the footer"))
}
}

object CometNativeScanExec extends DataTypeSupport {
Expand Down
Loading