diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 8545eee90..1065367c2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -727,6 +727,16 @@ case class CometSortMergeJoinExec( override def hashCode(): Int = Objects.hashCode(leftKeys, rightKeys, condition, left, right) + + override lazy val metrics: Map[String, SQLMetric] = + Map( + "input_batches" -> SQLMetrics.createMetric(sparkContext, "Number of batches consumed"), + "input_rows" -> SQLMetrics.createMetric(sparkContext, "Number of rows consumed"), + "output_batches" -> SQLMetrics.createMetric(sparkContext, "Number of batches produced"), + "output_rows" -> SQLMetrics.createMetric(sparkContext, "Number of rows produced"), + "peak_mem_used" -> + SQLMetrics.createSizeMetric(sparkContext, "Peak memory used for buffered data"), + "join_time" -> SQLMetrics.createNanoTimingMetric(sparkContext, "Total time for joining")) } case class CometScanWrapper(override val nativeOp: Operator, override val originalPlan: SparkPlan)