Skip to content

Commit

Permalink
chore: Add ignored tests for reading complex types from Parquet (apac…
Browse files Browse the repository at this point in the history
…he#1167)

* Add ignored tests for reading structs from Parquet

* add basic map test

* add tests for Map and Array
  • Loading branch information
andygrove authored Dec 12, 2024
1 parent 7db9aa6 commit f1d0879
Showing 1 changed file with 127 additions and 0 deletions.
127 changes: 127 additions & 0 deletions spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2195,6 +2195,133 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

ignore("get_struct_field - select primitive fields") {
withTempPath { dir =>
// create input file with Comet disabled
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
val df = spark
.range(5)
// Add both a null struct and null inner value
.select(when(col("id") > 1, struct(when(col("id") > 2, col("id")).alias("id")))
.alias("nested1"))

df.write.parquet(dir.toString())
}

Seq("", "parquet").foreach { v1List =>
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> v1List) {
val df = spark.read.parquet(dir.toString())
checkSparkAnswerAndOperator(df.select("nested1.id"))
}
}
}
}

ignore("get_struct_field - select subset of struct") {
withTempPath { dir =>
// create input file with Comet disabled
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
val df = spark
.range(5)
// Add both a null struct and null inner value
.select(
when(
col("id") > 1,
struct(
when(col("id") > 2, col("id")).alias("id"),
when(col("id") > 2, struct(when(col("id") > 3, col("id")).alias("id")))
.as("nested2")))
.alias("nested1"))

df.write.parquet(dir.toString())
}

Seq("", "parquet").foreach { v1List =>
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> v1List) {
val df = spark.read.parquet(dir.toString())
checkSparkAnswerAndOperator(df.select("nested1.id"))
checkSparkAnswerAndOperator(df.select("nested1.nested2"))
checkSparkAnswerAndOperator(df.select("nested1.nested2.id"))
checkSparkAnswerAndOperator(df.select("nested1.id", "nested1.nested2.id"))
}
}
}
}

ignore("get_struct_field - read entire struct") {
withTempPath { dir =>
// create input file with Comet disabled
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
val df = spark
.range(5)
// Add both a null struct and null inner value
.select(
when(
col("id") > 1,
struct(
when(col("id") > 2, col("id")).alias("id"),
when(col("id") > 2, struct(when(col("id") > 3, col("id")).alias("id")))
.as("nested2")))
.alias("nested1"))

df.write.parquet(dir.toString())
}

Seq("", "parquet").foreach { v1List =>
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> v1List) {
val df = spark.read.parquet(dir.toString())
checkSparkAnswerAndOperator(df.select("nested1"))
}
}
}
}

ignore("read map[int, int] from parquet") {
withTempPath { dir =>
// create input file with Comet disabled
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
val df = spark
.range(5)
// Spark does not allow null as a key but does allow null as a
// value, and the entire map be null
.select(
when(col("id") > 1, map(col("id"), when(col("id") > 2, col("id")))).alias("map1"))
df.write.parquet(dir.toString())
}

Seq("", "parquet").foreach { v1List =>
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> v1List) {
val df = spark.read.parquet(dir.toString())
checkSparkAnswerAndOperator(df.select("map1"))
checkSparkAnswerAndOperator(df.select(map_keys(col("map1"))))
checkSparkAnswerAndOperator(df.select(map_values(col("map1"))))
}
}
}
}

ignore("read array[int] from parquet") {
withTempPath { dir =>
// create input file with Comet disabled
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
val df = spark
.range(5)
// Spark does not allow null as a key but does allow null as a
// value, and the entire map be null
.select(when(col("id") > 1, sequence(lit(0), col("id") * 2)).alias("array1"))
df.write.parquet(dir.toString())
}

Seq("", "parquet").foreach { v1List =>
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> v1List) {
val df = spark.read.parquet(dir.toString())
checkSparkAnswerAndOperator(df.select("array1"))
checkSparkAnswerAndOperator(df.select(element_at(col("array1"), lit(1))))
}
}
}
}

test("CreateArray") {
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
Expand Down

0 comments on commit f1d0879

Please sign in to comment.