diff --git a/spark/src/test/resources/micro-benchmark-sql/cometbench.py b/spark/src/test/resources/micro-benchmark-sql/cometbench.py deleted file mode 100644 index 4e42d1592..000000000 --- a/spark/src/test/resources/micro-benchmark-sql/cometbench.py +++ /dev/null @@ -1,75 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import argparse -import os -from pyspark.sql import SparkSession -import time - -def main(data_path: str, query_path: str, iterations: int): - - # Initialize a SparkSession - spark = SparkSession.builder \ - .appName("DataFusion Microbenchmarks: " + os.path.basename(query_path)) \ - .getOrCreate() - - # Register the tables - table_names = ["call_center", "catalog_page", "catalog_returns", "catalog_sales", "customer", - "customer_address", "customer_demographics", "date_dim", "time_dim", "household_demographics", - "income_band", "inventory", "item", "promotion", "reason", "ship_mode", "store", "store_returns", - "store_sales", "warehouse", "web_page", "web_returns", "web_sales", "web_site"] - - for table in table_names: - path = f"{data_path}/{table}.parquet" - print(f"Registering table {table} using path {path}") - df = spark.read.parquet(path) - df.createOrReplaceTempView(table) - - # read sql file - print(f"Reading query from path {query_path}") - with open(query_path, "r") as f: - sql = f.read().strip() - - - durations = [] - for iteration in range(0, iterations): - print(f"Starting iteration {iteration} of {iterations}") - - start_time = time.time() - df = spark.sql(sql) - rows = df.collect() - - print(f"Query returned {len(rows)} rows") - end_time = time.time() - duration = end_time - start_time - print(f"Query took {duration} seconds") - - durations.append(duration) - - # Stop the SparkSession - spark.stop() - - print(durations) - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="DataFusion benchmark derived from TPC-H / TPC-DS") - parser.add_argument("--data", required=True, help="Path to data files") - parser.add_argument("--query", required=True, help="Path to query file") - parser.add_argument("--iterations", required=False, default="1", help="How many iterations to run") - args = parser.parse_args() - - main(args.data, args.query, int(args.iterations)) \ No newline at end of file diff --git a/spark/src/test/resources/micro-benchmark-sql/README.md b/spark/src/test/resources/tpcds-micro-benchmarks/README.md similarity index 100% rename from spark/src/test/resources/micro-benchmark-sql/README.md rename to spark/src/test/resources/tpcds-micro-benchmarks/README.md diff --git a/spark/src/test/resources/micro-benchmark-sql/add_many_decimals.sql b/spark/src/test/resources/tpcds-micro-benchmarks/add_many_decimals.sql similarity index 100% rename from spark/src/test/resources/micro-benchmark-sql/add_many_decimals.sql rename to spark/src/test/resources/tpcds-micro-benchmarks/add_many_decimals.sql diff --git a/spark/src/test/resources/micro-benchmark-sql/add_many_integers.sql b/spark/src/test/resources/tpcds-micro-benchmarks/add_many_integers.sql similarity index 100% rename from spark/src/test/resources/micro-benchmark-sql/add_many_integers.sql rename to spark/src/test/resources/tpcds-micro-benchmarks/add_many_integers.sql diff --git a/spark/src/test/resources/micro-benchmark-sql/agg_high_cardinality.sql b/spark/src/test/resources/tpcds-micro-benchmarks/agg_high_cardinality.sql similarity index 100% rename from spark/src/test/resources/micro-benchmark-sql/agg_high_cardinality.sql rename to spark/src/test/resources/tpcds-micro-benchmarks/agg_high_cardinality.sql diff --git a/spark/src/test/resources/micro-benchmark-sql/agg_low_cardinality.sql b/spark/src/test/resources/tpcds-micro-benchmarks/agg_low_cardinality.sql similarity index 100% rename from spark/src/test/resources/micro-benchmark-sql/agg_low_cardinality.sql rename to spark/src/test/resources/tpcds-micro-benchmarks/agg_low_cardinality.sql diff --git a/spark/src/test/resources/micro-benchmark-sql/agg_sum_decimals_no_grouping.sql b/spark/src/test/resources/tpcds-micro-benchmarks/agg_sum_decimals_no_grouping.sql similarity index 100% rename from spark/src/test/resources/micro-benchmark-sql/agg_sum_decimals_no_grouping.sql rename to spark/src/test/resources/tpcds-micro-benchmarks/agg_sum_decimals_no_grouping.sql diff --git a/spark/src/test/resources/micro-benchmark-sql/agg_sum_integers_no_grouping.sql b/spark/src/test/resources/tpcds-micro-benchmarks/agg_sum_integers_no_grouping.sql similarity index 100% rename from spark/src/test/resources/micro-benchmark-sql/agg_sum_integers_no_grouping.sql rename to spark/src/test/resources/tpcds-micro-benchmarks/agg_sum_integers_no_grouping.sql diff --git a/spark/src/test/resources/micro-benchmark-sql/case_when_column_or_null.sql b/spark/src/test/resources/tpcds-micro-benchmarks/case_when_column_or_null.sql similarity index 100% rename from spark/src/test/resources/micro-benchmark-sql/case_when_column_or_null.sql rename to spark/src/test/resources/tpcds-micro-benchmarks/case_when_column_or_null.sql diff --git a/spark/src/test/resources/micro-benchmark-sql/case_when_scalar.sql b/spark/src/test/resources/tpcds-micro-benchmarks/case_when_scalar.sql similarity index 100% rename from spark/src/test/resources/micro-benchmark-sql/case_when_scalar.sql rename to spark/src/test/resources/tpcds-micro-benchmarks/case_when_scalar.sql diff --git a/spark/src/test/resources/micro-benchmark-sql/filter_highly_selective.sql b/spark/src/test/resources/tpcds-micro-benchmarks/filter_highly_selective.sql similarity index 100% rename from spark/src/test/resources/micro-benchmark-sql/filter_highly_selective.sql rename to spark/src/test/resources/tpcds-micro-benchmarks/filter_highly_selective.sql diff --git a/spark/src/test/resources/micro-benchmark-sql/filter_less_selective.sql b/spark/src/test/resources/tpcds-micro-benchmarks/filter_less_selective.sql similarity index 100% rename from spark/src/test/resources/micro-benchmark-sql/filter_less_selective.sql rename to spark/src/test/resources/tpcds-micro-benchmarks/filter_less_selective.sql diff --git a/spark/src/test/resources/micro-benchmark-sql/if_column_or_null.sql b/spark/src/test/resources/tpcds-micro-benchmarks/if_column_or_null.sql similarity index 100% rename from spark/src/test/resources/micro-benchmark-sql/if_column_or_null.sql rename to spark/src/test/resources/tpcds-micro-benchmarks/if_column_or_null.sql diff --git a/spark/src/test/resources/micro-benchmark-sql/join_anti.sql b/spark/src/test/resources/tpcds-micro-benchmarks/join_anti.sql similarity index 100% rename from spark/src/test/resources/micro-benchmark-sql/join_anti.sql rename to spark/src/test/resources/tpcds-micro-benchmarks/join_anti.sql diff --git a/spark/src/test/resources/micro-benchmark-sql/join_condition.sql b/spark/src/test/resources/tpcds-micro-benchmarks/join_condition.sql similarity index 100% rename from spark/src/test/resources/micro-benchmark-sql/join_condition.sql rename to spark/src/test/resources/tpcds-micro-benchmarks/join_condition.sql diff --git a/spark/src/test/resources/micro-benchmark-sql/join_exploding_output.sql b/spark/src/test/resources/tpcds-micro-benchmarks/join_exploding_output.sql similarity index 100% rename from spark/src/test/resources/micro-benchmark-sql/join_exploding_output.sql rename to spark/src/test/resources/tpcds-micro-benchmarks/join_exploding_output.sql diff --git a/spark/src/test/resources/micro-benchmark-sql/join_inner.sql b/spark/src/test/resources/tpcds-micro-benchmarks/join_inner.sql similarity index 100% rename from spark/src/test/resources/micro-benchmark-sql/join_inner.sql rename to spark/src/test/resources/tpcds-micro-benchmarks/join_inner.sql diff --git a/spark/src/test/resources/micro-benchmark-sql/join_left_outer.sql b/spark/src/test/resources/tpcds-micro-benchmarks/join_left_outer.sql similarity index 100% rename from spark/src/test/resources/micro-benchmark-sql/join_left_outer.sql rename to spark/src/test/resources/tpcds-micro-benchmarks/join_left_outer.sql diff --git a/spark/src/test/resources/micro-benchmark-sql/join_semi.sql b/spark/src/test/resources/tpcds-micro-benchmarks/join_semi.sql similarity index 100% rename from spark/src/test/resources/micro-benchmark-sql/join_semi.sql rename to spark/src/test/resources/tpcds-micro-benchmarks/join_semi.sql diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala new file mode 100644 index 000000000..909369d5f --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +import org.apache.comet.CometConf +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation +import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias +import org.apache.spark.sql.catalyst.util.resourceToString +import org.apache.spark.sql.{TPCDSQueries, TPCDSSchema} +import org.apache.spark.sql.execution.benchmark.TPCDSQueryBenchmark.tables +import org.apache.spark.sql.execution.benchmark.TPCDSQueryBenchmarkArguments +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types.StructType + +import scala.io.Source + +/** + * Benchmark to measure Comet query performance with micro benchmarks that use the TPCDS data. These queries + * represent subsets of the full TPCDS queries. + * + * To run this benchmark: + * {{{ + * // Build [tpcds-kit](https://github.com/databricks/tpcds-kit) + * cd /tmp && git clone https://github.com/databricks/tpcds-kit.git + * cd tpcds-kit/tools && make OS=MACOS + * + * // GenTPCDSData + * cd $COMET_HOME && mkdir /tmp/tpcds + * make benchmark-org.apache.spark.sql.GenTPCDSData -- --dsdgenDir /tmp/tpcds-kit/tools --location /tmp/tpcds --scaleFactor 1 + * + * // CometTPCDSMicroBenchmark + * SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometTPCDSMicroBenchmark -- --data-location /tmp/tpcds + * }}} + * + * Results will be written to "spark/benchmarks/CometTPCDSMicroBenchmark-**results.txt". + */ +object CometTPCDSMicroBenchmark extends CometTPCQueryBenchmarkBase with TPCDSQueries { + + val queries: Seq[String] = Seq( + "add_many_decimals", + "add_many_integers", + "agg_high_cardinality", + "agg_low_cardinality", + "agg_sum_decimals_no_grouping", + "agg_sum_integers_no_grouping", + "case_when_column_or_null", + "case_when_scalar", + "filter_highly_selective", + "filter_less_selective", + "if_column_or_null", + "join_anti", + "join_condition", + "join_exploding_output", + "join_inner", + "join_left_outer", + "join_semi" + ) + + override def runQueries( + queryLocation: String, + queries: Seq[String], + tableSizes: Map[String, Long], + benchmarkName: String, + nameSuffix: String = ""): Unit = { + queries.foreach { name => + val source = Source.fromFile(s"spark/src/test/resources/tpcds-micro-benchmarks/$name.sql") + val queryString = source.getLines().mkString("\n") + source.close() + + // This is an indirect hack to estimate the size of each query's input by traversing the + // logical plan and adding up the sizes of all tables that appear in the plan. + val queryRelations = scala.collection.mutable.HashSet[String]() + cometSpark.sql(queryString).queryExecution.analyzed.foreach { + case SubqueryAlias(alias, _: LogicalRelation) => + queryRelations.add(alias.name) + case LogicalRelation(_, _, Some(catalogTable), _) => + queryRelations.add(catalogTable.identifier.table) + case HiveTableRelation(tableMeta, _, _, _, _) => + queryRelations.add(tableMeta.identifier.table) + case _ => + } + val numRows = queryRelations.map(tableSizes.getOrElse(_, 0L)).sum + val benchmark = new Benchmark(benchmarkName, numRows, 2, output = output) + benchmark.addCase(s"$name$nameSuffix") { _ => + cometSpark.sql(queryString).noop() + } + benchmark.addCase(s"$name$nameSuffix") { _ => + cometSpark.sql(queryString).noop() + } + benchmark.addCase(s"$name$nameSuffix: Comet (Scan)") { _ => + withSQLConf(CometConf.COMET_ENABLED.key -> "true") { + cometSpark.sql(queryString).noop() + } + } + benchmark.addCase(s"$name$nameSuffix: Comet (Scan, Exec)") { _ => + withSQLConf( + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + cometSpark.sql(queryString).noop() + } + } + benchmark.run() + } + } + + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + val benchmarkArgs = new TPCDSQueryBenchmarkArguments(mainArgs) + + // If `--query-filter` defined, filters the queries that this option selects + val queriesToRun = filterQueries(queries, benchmarkArgs.queryFilter) + + val tableSizes = setupTables( + benchmarkArgs.dataLocation, + createTempView = false, + tables, + TPCDSSchemaHelper.getTableColumns) + + setupCBO(cometSpark, benchmarkArgs.cboEnabled, tables) + + runQueries("tpcdsmicro", queries = queriesToRun, tableSizes, "TPCDS Micro Benchmarks") + } +}