Skip to content

Commit

Permalink
Add Comet Fuzz utility
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed May 25, 2024
1 parent 93af704 commit fbf074d
Show file tree
Hide file tree
Showing 8 changed files with 741 additions and 0 deletions.
6 changes: 6 additions & 0 deletions fuzz-testing/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
.idea
target
spark-warehouse
queries.sql
results.md
test*.parquet
64 changes: 64 additions & 0 deletions fuzz-testing/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Comet Fuzz

Comet Fuzz is a standalone project for generating random data and queries and executing queries against Spark
with Comet disabled and enabled and checking for incompatibilities.

Although it is a simple tool it has already been useful in finding many bugs.

Comet Fuzz is inspired by the [SparkFuzz](https://ir.cwi.nl/pub/30222) paper from Databricks and CWI.

## Usage

Build the jar file first.

```shell
mvn package
```

Set appropriate values for `SPARK_HOME`, `SPARK_MASTER`, and `COMET_JAR` environment variables and then use
`spark-submit` to run CometFuzz against a Spark cluster.

### Generating Data Files

```shell
$SPARK_HOME/bin/spark-submit \
--master $SPARK_MASTER \
--class org.apache.comet.fuzz.Main \
target/cometfuzz-0.1.0-SNAPSHOT-jar-with-dependencies.jar \
data --num-files=2 --num-rows=200 --num-columns=100
```

### Generating Queries

Generate random queries that are based on the available test files.

```shell
$SPARK_HOME/bin/spark-submit \
--master $SPARK_MASTER \
--class org.apache.comet.fuzz.Main \
target/cometfuzz-0.1.0-SNAPSHOT-jar-with-dependencies.jar \
queries --num-files=2 --num-queries=500
```

Note that the output filename is currently hard-coded as `queries.sql`

### Execute Queries

```shell
$SPARK_HOME/bin/spark-submit \
--master $SPARK_MASTER \
--conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
--conf spark.comet.enabled=true \
--conf spark.comet.exec.enabled=true \
--conf spark.comet.exec.all.enabled=true \
--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
--conf spark.comet.exec.shuffle.enabled=true \
--conf spark.comet.columnar.shuffle.enabled=true \
--jars $COMET_JAR \
--driver-class-path $COMET_JAR \
--class org.apache.comet.fuzz.Main \
target/cometfuzz-0.1.0-SNAPSHOT-jar-with-dependencies.jar \
run --num-files=2 --filename=queries.sql
```

Note that the output filename is currently hard-coded as `results.md`
87 changes: 87 additions & 0 deletions fuzz-testing/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.apache.comet.fuzz</groupId>
<artifactId>cometfuzz</artifactId>
<version>0.1.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>comet-fuzz</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>11</java.version>
<scala.version>2.12.7</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.4.2</spark.version>
</properties>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.rogach</groupId>
<artifactId>scallop_${scala.binary.version}</artifactId>
<version>3.5.1</version>
</dependency>
</dependencies>

<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.7.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
143 changes: 143 additions & 0 deletions fuzz-testing/src/main/scala/org/apache/comet/fuzz/DataGen.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.fuzz

import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType}

import java.sql.Timestamp
import scala.util.Random

object DataGen {

def generateRandomFiles(r: Random, spark: SparkSession, numFiles: Int, numRows: Int, numColumns: Int): Unit = {
for (i <- 0 until numFiles) {
generateRandomParquetFile(r, spark, s"test$i.parquet", numRows, numColumns)
}
}

def generateRandomParquetFile(r: Random, spark: SparkSession, filename: String, numRows: Int, numColumns: Int): Unit = {

// TODO add examples of all supported types, including complex types
val dataTypes = Seq(
(DataTypes.ByteType, 0.2),
(DataTypes.ShortType, 0.2),
(DataTypes.IntegerType, 0.2),
(DataTypes.LongType, 0.2),
(DataTypes.FloatType, 0.2),
(DataTypes.DoubleType, 0.2),
// TODO add support for all Comet supported types
// (DataTypes.createDecimalType(10,2), 0.2),
// (DataTypes.createDecimalType(10,0), 0.2),
// (DataTypes.createDecimalType(4,0), 0.2),
(DataTypes.DateType, 0.2),
(DataTypes.TimestampType, 0.2),
// (DataTypes.TimestampNTZType, 0.2),
(DataTypes.StringType, 0.2))

// generate schema using random data types
val fields = Range(0, numColumns)
.map(i => StructField(s"c$i", Utils.randomWeightedChoice(dataTypes), nullable = true))
val schema = StructType(fields)

// generate columnar data
val cols: Seq[Seq[Any]] = fields.map(f => generateColumn(r, f.dataType, numRows))

// convert to rows
val rows = Range(0, numRows).map(rowIndex => {
Row.fromSeq(cols.map(_(rowIndex)))
})

// TODO random partitioning and bucketing
// TODO random parquet write options
val df = spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)
df.write.mode(SaveMode.Overwrite).parquet(filename)
}

def generateColumn(r: Random, dataType: DataType, numRows: Int): Seq[Any] = {
dataType match {
case DataTypes.ByteType =>
generateColumn(r, DataTypes.LongType, numRows).map(_.asInstanceOf[Long].toByte)
case DataTypes.ShortType =>
generateColumn(r, DataTypes.LongType, numRows).map(_.asInstanceOf[Long].toShort)
case DataTypes.IntegerType =>
generateColumn(r, DataTypes.LongType, numRows).map(_.asInstanceOf[Long].toInt)
case DataTypes.LongType =>
Range(0, numRows).map(_ => {
r.nextInt(50) match {
case 0 => null
case 1 => 0L
case 2 => Byte.MinValue.toLong
case 3 => Byte.MaxValue.toLong
case 4 => Short.MinValue.toLong
case 5 => Short.MaxValue.toLong
case 6 => Int.MinValue.toLong
case 7 => Int.MaxValue.toLong
case 8 => Long.MinValue
case 9 => Long.MaxValue
case _ => r.nextLong()
}
})
case DataTypes.FloatType =>
Range(0, numRows).map(_ => {
r.nextInt(20) match {
case 0 => null
case 1 => Float.NegativeInfinity
case 2 => Float.PositiveInfinity
case 3 => Float.MinValue
case 4 => Float.MaxValue
case 5 => 0.0f
case 6 => -0.0f
case _ => r.nextFloat()
}
})
case DataTypes.DoubleType =>
Range(0, numRows).map(_ => {
r.nextInt(20) match {
case 0 => null
case 1 => Double.NegativeInfinity
case 2 => Double.PositiveInfinity
case 3 => Double.MinValue
case 4 => Double.MaxValue
case 5 => 0.0
case 6 => -0.0
case _ => r.nextDouble()
}
})
case DataTypes.StringType =>
Range(0, numRows).map(_ => {
r.nextInt(10) match {
case 0 => null
case 1 => r.nextInt().toByte.toString
case 2 => r.nextLong().toString
case 3 => r.nextDouble().toString
case _ => r.nextString(8)
}
})
case DataTypes.DateType =>
Range(0, numRows).map(_ => new java.sql.Date(1716645600011L + r.nextInt()))
case DataTypes.TimestampType =>
Range(0, numRows).map(_ => new Timestamp(1716645600011L + r.nextInt()))
case _ => throw new IllegalStateException(s"Cannot generate data for $dataType yet")
}
}


}
71 changes: 71 additions & 0 deletions fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.fuzz

import org.apache.spark.sql.SparkSession
import org.rogach.scallop.{ScallopConf, Subcommand}

import scala.util.Random

class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
val generateData = new Subcommand("data") {
val numFiles = opt[Int](required = true)
val numRows = opt[Int](required = true)
val numColumns = opt[Int](required = true)
}
val generateQueries = new Subcommand("queries") {
val numFiles = opt[Int](required = false)
val numQueries = opt[Int](required = true)
}
val runQueries = new Subcommand("run") {
val filename = opt[String](required = true)
val numFiles = opt[Int](required = false)
val showMatchingResults = opt[Boolean](required = false)
}
addSubcommand(generateData)
addSubcommand(generateQueries)
addSubcommand(runQueries)
verify()
}

object Main {

lazy val spark = SparkSession.builder()
.master("local[*]")
.getOrCreate()

def main(args: Array[String]): Unit = {
val r = new Random(42)

val conf = new Conf(args)
conf.subcommand match {
case Some(opt @ conf.generateData) =>
DataGen.generateRandomFiles(r, spark, numFiles = opt.numFiles(), numRows = opt.numRows(),
numColumns = opt.numColumns())
case Some(opt @ conf.generateQueries) =>
QueryGen.generateRandomQueries(r, spark, numFiles = opt.numFiles(), opt.numQueries())
case Some(opt @ conf.runQueries) =>
QueryRunner.runQueries(spark, opt.numFiles(), opt.filename(), opt.showMatchingResults())
case _ =>
println("Invalid subcommand")
sys.exit(-1)
}
}
}
Loading

0 comments on commit fbf074d

Please sign in to comment.