diff --git a/fuzz-testing/.gitignore b/fuzz-testing/.gitignore new file mode 100644 index 000000000..72ca04e9f --- /dev/null +++ b/fuzz-testing/.gitignore @@ -0,0 +1,6 @@ +.idea +target +spark-warehouse +queries.sql +results.md +test*.parquet \ No newline at end of file diff --git a/fuzz-testing/README.md b/fuzz-testing/README.md new file mode 100644 index 000000000..702fbc47d --- /dev/null +++ b/fuzz-testing/README.md @@ -0,0 +1,64 @@ +# Comet Fuzz + +Comet Fuzz is a standalone project for generating random data and queries and executing queries against Spark +with Comet disabled and enabled and checking for incompatibilities. + +Although it is a simple tool it has already been useful in finding many bugs. + +Comet Fuzz is inspired by the [SparkFuzz](https://ir.cwi.nl/pub/30222) paper from Databricks and CWI. + +## Usage + +Build the jar file first. + +```shell +mvn package +``` + +Set appropriate values for `SPARK_HOME`, `SPARK_MASTER`, and `COMET_JAR` environment variables and then use +`spark-submit` to run CometFuzz against a Spark cluster. + +### Generating Data Files + +```shell +$SPARK_HOME/bin/spark-submit \ + --master $SPARK_MASTER \ + --class org.apache.comet.fuzz.Main \ + target/cometfuzz-0.1.0-SNAPSHOT-jar-with-dependencies.jar \ + data --num-files=2 --num-rows=200 --num-columns=100 +``` + +### Generating Queries + +Generate random queries that are based on the available test files. + +```shell +$SPARK_HOME/bin/spark-submit \ + --master $SPARK_MASTER \ + --class org.apache.comet.fuzz.Main \ + target/cometfuzz-0.1.0-SNAPSHOT-jar-with-dependencies.jar \ + queries --num-files=2 --num-queries=500 +``` + +Note that the output filename is currently hard-coded as `queries.sql` + +### Execute Queries + +```shell +$SPARK_HOME/bin/spark-submit \ + --master $SPARK_MASTER \ + --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \ + --conf spark.comet.enabled=true \ + --conf spark.comet.exec.enabled=true \ + --conf spark.comet.exec.all.enabled=true \ + --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ + --conf spark.comet.exec.shuffle.enabled=true \ + --conf spark.comet.columnar.shuffle.enabled=true \ + --jars $COMET_JAR \ + --driver-class-path $COMET_JAR \ + --class org.apache.comet.fuzz.Main \ + target/cometfuzz-0.1.0-SNAPSHOT-jar-with-dependencies.jar \ + run --num-files=2 --filename=queries.sql +``` + +Note that the output filename is currently hard-coded as `results.md` diff --git a/fuzz-testing/pom.xml b/fuzz-testing/pom.xml new file mode 100644 index 000000000..9f1352410 --- /dev/null +++ b/fuzz-testing/pom.xml @@ -0,0 +1,87 @@ + + 4.0.0 + + org.apache.comet.fuzz + cometfuzz + 0.1.0-SNAPSHOT + jar + + comet-fuzz + http://maven.apache.org + + + UTF-8 + 11 + 2.12.7 + 2.12 + 3.4.2 + + + + + org.scala-lang + scala-library + ${scala.version} + provided + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + provided + + + org.rogach + scallop_${scala.binary.version} + 3.5.1 + + + + + src/main/scala + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + ${java.version} + ${java.version} + + + + net.alchim31.maven + scala-maven-plugin + 4.7.2 + + + + compile + testCompile + + + + + + maven-assembly-plugin + 3.3.0 + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + + diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/DataGen.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/DataGen.scala new file mode 100644 index 000000000..1bd9ee9ea --- /dev/null +++ b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/DataGen.scala @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.fuzz + +import org.apache.spark.sql.{Row, SaveMode, SparkSession} +import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType} + +import java.sql.Timestamp +import scala.util.Random + +object DataGen { + + def generateRandomFiles(r: Random, spark: SparkSession, numFiles: Int, numRows: Int, numColumns: Int): Unit = { + for (i <- 0 until numFiles) { + generateRandomParquetFile(r, spark, s"test$i.parquet", numRows, numColumns) + } + } + + def generateRandomParquetFile(r: Random, spark: SparkSession, filename: String, numRows: Int, numColumns: Int): Unit = { + + // TODO add examples of all supported types, including complex types + val dataTypes = Seq( + (DataTypes.ByteType, 0.2), + (DataTypes.ShortType, 0.2), + (DataTypes.IntegerType, 0.2), + (DataTypes.LongType, 0.2), + (DataTypes.FloatType, 0.2), + (DataTypes.DoubleType, 0.2), + // TODO add support for all Comet supported types +// (DataTypes.createDecimalType(10,2), 0.2), +// (DataTypes.createDecimalType(10,0), 0.2), +// (DataTypes.createDecimalType(4,0), 0.2), + (DataTypes.DateType, 0.2), + (DataTypes.TimestampType, 0.2), +// (DataTypes.TimestampNTZType, 0.2), + (DataTypes.StringType, 0.2)) + + // generate schema using random data types + val fields = Range(0, numColumns) + .map(i => StructField(s"c$i", Utils.randomWeightedChoice(dataTypes), nullable = true)) + val schema = StructType(fields) + + // generate columnar data + val cols: Seq[Seq[Any]] = fields.map(f => generateColumn(r, f.dataType, numRows)) + + // convert to rows + val rows = Range(0, numRows).map(rowIndex => { + Row.fromSeq(cols.map(_(rowIndex))) + }) + + // TODO random partitioning and bucketing + // TODO random parquet write options + val df = spark.createDataFrame(spark.sparkContext.parallelize(rows), schema) + df.write.mode(SaveMode.Overwrite).parquet(filename) + } + + def generateColumn(r: Random, dataType: DataType, numRows: Int): Seq[Any] = { + dataType match { + case DataTypes.ByteType => + generateColumn(r, DataTypes.LongType, numRows).map(_.asInstanceOf[Long].toByte) + case DataTypes.ShortType => + generateColumn(r, DataTypes.LongType, numRows).map(_.asInstanceOf[Long].toShort) + case DataTypes.IntegerType => + generateColumn(r, DataTypes.LongType, numRows).map(_.asInstanceOf[Long].toInt) + case DataTypes.LongType => + Range(0, numRows).map(_ => { + r.nextInt(50) match { + case 0 => null + case 1 => 0L + case 2 => Byte.MinValue.toLong + case 3 => Byte.MaxValue.toLong + case 4 => Short.MinValue.toLong + case 5 => Short.MaxValue.toLong + case 6 => Int.MinValue.toLong + case 7 => Int.MaxValue.toLong + case 8 => Long.MinValue + case 9 => Long.MaxValue + case _ => r.nextLong() + } + }) + case DataTypes.FloatType => + Range(0, numRows).map(_ => { + r.nextInt(20) match { + case 0 => null + case 1 => Float.NegativeInfinity + case 2 => Float.PositiveInfinity + case 3 => Float.MinValue + case 4 => Float.MaxValue + case 5 => 0.0f + case 6 => -0.0f + case _ => r.nextFloat() + } + }) + case DataTypes.DoubleType => + Range(0, numRows).map(_ => { + r.nextInt(20) match { + case 0 => null + case 1 => Double.NegativeInfinity + case 2 => Double.PositiveInfinity + case 3 => Double.MinValue + case 4 => Double.MaxValue + case 5 => 0.0 + case 6 => -0.0 + case _ => r.nextDouble() + } + }) + case DataTypes.StringType => + Range(0, numRows).map(_ => { + r.nextInt(10) match { + case 0 => null + case 1 => r.nextInt().toByte.toString + case 2 => r.nextLong().toString + case 3 => r.nextDouble().toString + case _ => r.nextString(8) + } + }) + case DataTypes.DateType => + Range(0, numRows).map(_ => new java.sql.Date(1716645600011L + r.nextInt())) + case DataTypes.TimestampType => + Range(0, numRows).map(_ => new Timestamp(1716645600011L + r.nextInt())) + case _ => throw new IllegalStateException(s"Cannot generate data for $dataType yet") + } + } + + +} diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala new file mode 100644 index 000000000..4cc23d5c9 --- /dev/null +++ b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.fuzz + +import org.apache.spark.sql.SparkSession +import org.rogach.scallop.{ScallopConf, Subcommand} + +import scala.util.Random + +class Conf(arguments: Seq[String]) extends ScallopConf(arguments) { + val generateData = new Subcommand("data") { + val numFiles = opt[Int](required = true) + val numRows = opt[Int](required = true) + val numColumns = opt[Int](required = true) + } + val generateQueries = new Subcommand("queries") { + val numFiles = opt[Int](required = false) + val numQueries = opt[Int](required = true) + } + val runQueries = new Subcommand("run") { + val filename = opt[String](required = true) + val numFiles = opt[Int](required = false) + val showMatchingResults = opt[Boolean](required = false) + } + addSubcommand(generateData) + addSubcommand(generateQueries) + addSubcommand(runQueries) + verify() +} + +object Main { + + lazy val spark = SparkSession.builder() + .master("local[*]") + .getOrCreate() + + def main(args: Array[String]): Unit = { + val r = new Random(42) + + val conf = new Conf(args) + conf.subcommand match { + case Some(opt @ conf.generateData) => + DataGen.generateRandomFiles(r, spark, numFiles = opt.numFiles(), numRows = opt.numRows(), + numColumns = opt.numColumns()) + case Some(opt @ conf.generateQueries) => + QueryGen.generateRandomQueries(r, spark, numFiles = opt.numFiles(), opt.numQueries()) + case Some(opt @ conf.runQueries) => + QueryRunner.runQueries(spark, opt.numFiles(), opt.filename(), opt.showMatchingResults()) + case _ => + println("Invalid subcommand") + sys.exit(-1) + } + } +} \ No newline at end of file diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryGen.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryGen.scala new file mode 100644 index 000000000..5a3eeda7a --- /dev/null +++ b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryGen.scala @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.fuzz + +import org.apache.spark.sql.SparkSession + +import java.io.{BufferedWriter, FileWriter} +import scala.collection.mutable +import scala.util.Random + +object QueryGen { + + def generateRandomQueries(r: Random, spark: SparkSession, numFiles: Int, + numQueries: Int): Unit = { + for (i <- 0 until numFiles) { + spark.read.parquet(s"test$i.parquet").createTempView(s"test$i") + } + + val w = new BufferedWriter(new FileWriter("queries.sql")) + + val uniqueQueries = mutable.HashSet[String]() + + for (_ <- 0 until numQueries) { + val sql = r.nextInt().abs % 3 match { + case 0 => generateJoin(r, spark, numFiles) + case 1 => generateAggregate(r, spark, numFiles) + case 2 => generateScalar(r, spark, numFiles) + // TODO add explicit casts + // TODO add unary and binary arithmetic expressions + // TODO add IF and CASE WHEN expressions + } + if (!uniqueQueries.contains(sql)) { + uniqueQueries += sql + w.write(sql + "\n") + } + } + w.close() + } + + val scalarFunc = Seq( + // string + Function("substring", 3), + Function("coalesce", 1), + Function("starts_with", 2), + Function("ends_with", 2), + Function("contains", 2), + Function("ascii", 1), + Function("bit_length", 1), + Function("octet_length", 1), + Function("upper", 1), + Function("lower", 1), + Function("chr", 1), + Function("init_cap", 1), + Function("trim", 1), + Function("ltrim", 1), + Function("rtrim", 1), + Function("btrim", 1), + Function("concat_ws", 2), + Function("repeat", 2), + Function("length", 1), + Function("reverse", 1), + Function("in_str", 2), + Function("replace", 2), + Function("translate", 2), + // date + Function("year", 1), + Function("hour", 1), + Function("minute", 1), + Function("second", 1), + // math + Function("abs", 1), + Function("acos", 1), + Function("asin", 1), + Function("atan", 1), + Function("Atan2", 1), + Function("Cos", 1), + Function("Exp", 2), + Function("Ln", 1), + Function("Log10", 1), + Function("Log2", 1), + Function("Pow", 2), + Function("Round", 1), + Function("Signum", 1), + Function("Sin", 1), + Function("Sqrt", 1), + Function("Tan", 1), + Function("Ceil", 1), + Function("Floor", 1), + ) + + val aggFunc = Seq( + Function("min", 1), + Function("max", 1), + Function("count", 1), + Function("avg", 1), + Function("sum", 1), + Function("first", 1), + Function("last", 1), + Function("var_pop", 1), + Function("var_samp", 1), + Function("covar_pop", 1), + Function("covar_samp", 1), + Function("stddev_pop", 1), + Function("stddev_samp", 1), + Function("corr", 2)) + + private def generateAggregate(r: Random, spark: SparkSession, numFiles: Int): String = { + val tableName = s"test${r.nextInt(numFiles)}" + val table = spark.table(tableName) + + val func = Utils.randomChoice(aggFunc, r) + val args = Range(0, func.num_args) + // TODO support using literals as well as columns + .map(_ => Utils.randomChoice(table.columns, r)) + val groupingCols = Range(0, 2).map(_ => Utils.randomChoice(table.columns, r)) + if (groupingCols.isEmpty) { + s"SELECT ${func.name}(${args.mkString(", ")}) FROM $tableName" + } else { + s"SELECT ${groupingCols.mkString(", ")}, ${func.name}(${args.mkString(", ")}) " + + s"FROM $tableName GROUP BY ${groupingCols.mkString(",")}" + } + } + + private def generateScalar(r: Random, spark: SparkSession, numFiles: Int): String = { + val tableName = s"test${r.nextInt(numFiles)}" + val table = spark.table(tableName) + + val func = Utils.randomChoice(scalarFunc, r) + val args = Range(0, func.num_args) + // TODO support using literals as well as columns + .map(_ => Utils.randomChoice(table.columns, r)) + + s"SELECT ${func.name}(${args.mkString(", ")}) FROM $tableName" + } + + private def generateJoin(r: Random, spark: SparkSession, numFiles: Int): String = { + val leftTableName = s"test${r.nextInt(numFiles)}" + val rightTableName = s"test${r.nextInt(numFiles)}" + val leftTable = spark.table(leftTableName) + val rightTable = spark.table(rightTableName) + + // TODO support no join keys + // TODO support multiple join keys + // TODO support join conditions that use expressions + val leftCol = Utils.randomChoice(leftTable.columns, r) + val rightCol = Utils.randomChoice(rightTable.columns, r) + + val joinTypes = Seq( + ("INNER", 0.4), + ("LEFT", 0.3), + ("RIGHT", 0.3), + ) + val joinType = Utils.randomWeightedChoice(joinTypes) + + s"SELECT * " + + s"FROM ${leftTableName} " + + s"${joinType} JOIN ${rightTableName} " + + s"ON ${leftTableName}.${leftCol} = ${rightTableName}.${rightCol};" + } + +} + +case class Function(name: String, num_args: Int) diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala new file mode 100644 index 000000000..a724ce885 --- /dev/null +++ b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.fuzz + +import org.apache.spark.sql.SparkSession + +import java.io.{BufferedWriter, FileWriter} +import scala.io.Source + +object QueryRunner { + + def runQueries( + spark: SparkSession, + numFiles: Int, + filename: String, + showMatchingResults: Boolean, + showFailedSparkQueries: Boolean = false): Unit = { + + val outputFilename = s"results-${System.currentTimeMillis()}.md" + println(s"Writing results to $outputFilename") + + val w = new BufferedWriter(new FileWriter(outputFilename)) + + // register input files + for (i <- 0 until numFiles) { + val table = spark.read.parquet(s"test$i.parquet") + val tableName = s"test$i" + table.createTempView(tableName) + w.write(s"Created table $tableName with schema:\n\t" + + s"${table.schema.fields.map(f => s"${f.name}: ${f.dataType}").mkString("\n\t")}\n\n") + } + + val querySource = Source.fromFile(filename) + try { + querySource.getLines().foreach(sql => { + + try { + // execute with Spark + spark.conf.set("spark.comet.enabled", "false") + val df = spark.sql(sql) + val sparkRows = df.collect() + + // TODO for now we sort the output to make this deterministic, but this means + // that we are never testing Comet's sort for correctness + val sparkRowsAsStrings = sparkRows.map(_.toString()).sorted + val sparkResult = sparkRowsAsStrings.mkString("\n") + + val sparkPlan = df.queryExecution.executedPlan.toString + + w.write(s"## $sql\n\n") + + try { + spark.conf.set("spark.comet.enabled", "true") + val df = spark.sql(sql) + val cometRows = df.collect() + // TODO for now we sort the output to make this deterministic, but this means + // that we are never testing Comet's sort for correctness + val cometRowsAsStrings = cometRows.map(_.toString()).sorted + val cometResult = cometRowsAsStrings.mkString("\n") + val cometPlan = df.queryExecution.executedPlan.toString + + if (sparkResult == cometResult) { + w.write(s"Spark and Comet produce the same results (${cometRows.length} rows).\n") + if (showMatchingResults) { + w.write("### Spark Plan\n") + w.write(s"```\n$sparkPlan\n```\n") + + w.write("### Comet Plan\n") + w.write(s"```\n$cometPlan\n```\n") + + w.write("### Query Result\n") + w.write("```\n") + w.write(s"$cometResult\n") + w.write("```\n\n") + } + } else { + w.write("[ERROR] Spark and Comet produced different results.\n") + + w.write("### Spark Plan\n") + w.write(s"```\n$sparkPlan\n```\n") + + w.write("### Comet Plan\n") + w.write(s"```\n$cometPlan\n```\n") + + w.write("### Results \n") + + w.write(s"Spark produced ${sparkRows.length} rows and Comet produced ${cometRows.length} rows.\n") + + if (sparkRows.length == cometRows.length) { + var i = 0 + while (i < sparkRows.length) { + if (sparkRowsAsStrings(i) != cometRowsAsStrings(i)) { + w.write(s"First difference at row $i:\n") + w.write("Spark: `" + sparkRowsAsStrings(i) + "`\n") + w.write("Comet: `" + cometRowsAsStrings(i) + "`\n") + i = sparkRows.length + } + i += 1 + } + } + } + } catch { + case e: Exception => + // the query worked in Spark but failed in Comet, so this is likely a bug in Comet + w.write(s"Query failed in Comet: ${e.getMessage}\n") + } + + // flush after every query so that results are saved in the event of the driver crashing + w.flush() + + } catch { + case e: Exception => + // we expect many generated queries to be invalid + if (showFailedSparkQueries) { + w.write(s"## $sql\n\n") + w.write(s"Query failed in Spark: ${e.getMessage}\n") + } + } + }) + + } finally { + w.close() + querySource.close() + } + } + +} diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Utils.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Utils.scala new file mode 100644 index 000000000..19f9695a9 --- /dev/null +++ b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Utils.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.fuzz + +import scala.util.Random + +object Utils { + + def randomChoice[T](list: Seq[T], r: Random): T = { + list(r.nextInt(list.length)) + } + + def randomWeightedChoice[T](valuesWithWeights: Seq[(T, Double)]): T = { + val totalWeight = valuesWithWeights.map(_._2).sum + val randomValue = Random.nextDouble() * totalWeight + var cumulativeWeight = 0.0 + + for ((value, weight) <- valuesWithWeights) { + cumulativeWeight += weight + if (cumulativeWeight >= randomValue) { + return value + } + } + + // If for some reason the loop doesn't return, return the last value + valuesWithWeights.last._1 + } + +}