Skip to content

Commit

Permalink
feat: Add "Comet Fuzz" fuzz-testing utility (#472)
Browse files Browse the repository at this point in the history
* Add Comet Fuzz utility

* update git ignore

* asf headers

* make comet-fuzz a submodule

* spotless

* improve comparison logic, make queries deterministic

* make tests more deterministic

* lint

* fix for scala 2.13

* Move TODO comments to README and address review comments

* refactor and formatting

* more refactoring

* fix decimal

* comment out some types for now

* fix CI failure

* enable boolean and binary types

* improve formatting of SQL in report

* enable decimal types

* trigger tests

* trigger CI

* remove hard-coded master
  • Loading branch information
andygrove authored Jun 3, 2024
1 parent a71f68b commit 8e01149
Show file tree
Hide file tree
Showing 11 changed files with 903 additions and 1 deletion.
2 changes: 1 addition & 1 deletion dev/scalastyle-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ This file is divided into 3 sections:
<parameter name="groups">java,scala,org,apache,3rdParty,comet</parameter>
<parameter name="group.java">javax?\..*</parameter>
<parameter name="group.scala">scala\..*</parameter>
<parameter name="group.org">org\.(?!apache\.comet).*</parameter>
<parameter name="group.org">org\.(?!apache).*</parameter>
<parameter name="group.apache">org\.apache\.(?!comet).*</parameter>
<parameter name="group.3rdParty">(?!(javax?\.|scala\.|org\.apache\.comet\.)).*</parameter>
<parameter name="group.comet">org\.apache\.comet\..*</parameter>
Expand Down
6 changes: 6 additions & 0 deletions fuzz-testing/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
.idea
target
spark-warehouse
queries.sql
results*.md
test*.parquet
100 changes: 100 additions & 0 deletions fuzz-testing/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Comet Fuzz

Comet Fuzz is a standalone project for generating random data and queries and executing queries against Spark
with Comet disabled and enabled and checking for incompatibilities.

Although it is a simple tool it has already been useful in finding many bugs.

Comet Fuzz is inspired by the [SparkFuzz](https://ir.cwi.nl/pub/30222) paper from Databricks and CWI.

## Roadmap

Planned areas of improvement:

- Support for all data types, expressions, and operators supported by Comet
- Explicit casts
- Unary and binary arithmetic expressions
- IF and CASE WHEN expressions
- Complex (nested) expressions
- Literal scalar values in queries
- Add option to avoid grouping and sorting on floating-point columns
- Improve join query support:
- Support joins without join keys
- Support composite join keys
- Support multiple join keys
- Support join conditions that use expressions

## Usage

Build the jar file first.

```shell
mvn package
```

Set appropriate values for `SPARK_HOME`, `SPARK_MASTER`, and `COMET_JAR` environment variables and then use
`spark-submit` to run CometFuzz against a Spark cluster.

### Generating Data Files

```shell
$SPARK_HOME/bin/spark-submit \
--master $SPARK_MASTER \
--class org.apache.comet.fuzz.Main \
target/comet-fuzz-spark3.4_2.12-0.1.0-SNAPSHOT-jar-with-dependencies.jar \
data --num-files=2 --num-rows=200 --num-columns=100
```

### Generating Queries

Generate random queries that are based on the available test files.

```shell
$SPARK_HOME/bin/spark-submit \
--master $SPARK_MASTER \
--class org.apache.comet.fuzz.Main \
target/comet-fuzz-spark3.4_2.12-0.1.0-SNAPSHOT-jar-with-dependencies.jar \
queries --num-files=2 --num-queries=500
```

Note that the output filename is currently hard-coded as `queries.sql`

### Execute Queries

```shell
$SPARK_HOME/bin/spark-submit \
--master $SPARK_MASTER \
--conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
--conf spark.comet.enabled=true \
--conf spark.comet.exec.enabled=true \
--conf spark.comet.exec.all.enabled=true \
--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
--conf spark.comet.exec.shuffle.enabled=true \
--conf spark.comet.exec.shuffle.mode=auto \
--jars $COMET_JAR \
--driver-class-path $COMET_JAR \
--class org.apache.comet.fuzz.Main \
target/comet-fuzz-spark3.4_2.12-0.1.0-SNAPSHOT-jar-with-dependencies.jar \
run --num-files=2 --filename=queries.sql
```

Note that the output filename is currently hard-coded as `results-${System.currentTimeMillis()}.md`
105 changes: 105 additions & 0 deletions fuzz-testing/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.comet</groupId>
<artifactId>comet-parent-spark${spark.version.short}_${scala.binary.version}</artifactId>
<version>0.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>comet-fuzz-spark${spark.version.short}_${scala.binary.version}</artifactId>
<name>comet-fuzz</name>
<url>http://maven.apache.org</url>
<packaging>jar</packaging>

<properties>
<!-- Reverse default (skip installation), and then enable only for child modules -->
<maven.deploy.skip>false</maven.deploy.skip>
</properties>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.rogach</groupId>
<artifactId>scallop_${scala.binary.version}</artifactId>
</dependency>
</dependencies>

<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.7.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
151 changes: 151 additions & 0 deletions fuzz-testing/src/main/scala/org/apache/comet/fuzz/DataGen.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.fuzz

import java.math.{BigDecimal, RoundingMode}
import java.nio.charset.Charset
import java.sql.Timestamp

import scala.util.Random

import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.types.{DataType, DataTypes, DecimalType, StructField, StructType}

object DataGen {

def generateRandomFiles(
r: Random,
spark: SparkSession,
numFiles: Int,
numRows: Int,
numColumns: Int): Unit = {
for (i <- 0 until numFiles) {
generateRandomParquetFile(r, spark, s"test$i.parquet", numRows, numColumns)
}
}

def generateRandomParquetFile(
r: Random,
spark: SparkSession,
filename: String,
numRows: Int,
numColumns: Int): Unit = {

// generate schema using random data types
val fields = Range(0, numColumns)
.map(i => StructField(s"c$i", Utils.randomWeightedChoice(Meta.dataTypes), nullable = true))
val schema = StructType(fields)

// generate columnar data
val cols: Seq[Seq[Any]] = fields.map(f => generateColumn(r, f.dataType, numRows))

// convert to rows
val rows = Range(0, numRows).map(rowIndex => {
Row.fromSeq(cols.map(_(rowIndex)))
})

val df = spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)
df.write.mode(SaveMode.Overwrite).parquet(filename)
}

def generateColumn(r: Random, dataType: DataType, numRows: Int): Seq[Any] = {
dataType match {
case DataTypes.BooleanType =>
generateColumn(r, DataTypes.LongType, numRows)
.map(_.asInstanceOf[Long].toShort)
.map(s => s % 2 == 0)
case DataTypes.ByteType =>
generateColumn(r, DataTypes.LongType, numRows).map(_.asInstanceOf[Long].toByte)
case DataTypes.ShortType =>
generateColumn(r, DataTypes.LongType, numRows).map(_.asInstanceOf[Long].toShort)
case DataTypes.IntegerType =>
generateColumn(r, DataTypes.LongType, numRows).map(_.asInstanceOf[Long].toInt)
case DataTypes.LongType =>
Range(0, numRows).map(_ => {
r.nextInt(50) match {
case 0 => null
case 1 => 0L
case 2 => Byte.MinValue.toLong
case 3 => Byte.MaxValue.toLong
case 4 => Short.MinValue.toLong
case 5 => Short.MaxValue.toLong
case 6 => Int.MinValue.toLong
case 7 => Int.MaxValue.toLong
case 8 => Long.MinValue
case 9 => Long.MaxValue
case _ => r.nextLong()
}
})
case DataTypes.FloatType =>
Range(0, numRows).map(_ => {
r.nextInt(20) match {
case 0 => null
case 1 => Float.NegativeInfinity
case 2 => Float.PositiveInfinity
case 3 => Float.MinValue
case 4 => Float.MaxValue
case 5 => 0.0f
case 6 => -0.0f
case _ => r.nextFloat()
}
})
case DataTypes.DoubleType =>
Range(0, numRows).map(_ => {
r.nextInt(20) match {
case 0 => null
case 1 => Double.NegativeInfinity
case 2 => Double.PositiveInfinity
case 3 => Double.MinValue
case 4 => Double.MaxValue
case 5 => 0.0
case 6 => -0.0
case _ => r.nextDouble()
}
})
case dt: DecimalType =>
Range(0, numRows).map(_ =>
new BigDecimal(r.nextDouble()).setScale(dt.scale, RoundingMode.HALF_UP))
case DataTypes.StringType =>
Range(0, numRows).map(_ => {
r.nextInt(10) match {
case 0 => null
case 1 => r.nextInt().toByte.toString
case 2 => r.nextLong().toString
case 3 => r.nextDouble().toString
case _ => r.nextString(8)
}
})
case DataTypes.BinaryType =>
generateColumn(r, DataTypes.StringType, numRows)
.map {
case x: String =>
x.getBytes(Charset.defaultCharset())
case _ =>
null
}
case DataTypes.DateType =>
Range(0, numRows).map(_ => new java.sql.Date(1716645600011L + r.nextInt()))
case DataTypes.TimestampType =>
Range(0, numRows).map(_ => new Timestamp(1716645600011L + r.nextInt()))
case _ => throw new IllegalStateException(s"Cannot generate data for $dataType yet")
}
}

}
Loading

0 comments on commit 8e01149

Please sign in to comment.