diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 49920ab31..645f5f848 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -50,818 +50,815 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } - test("decimals divide by zero") { - // TODO: enable Spark 3.2 & 3.3 tests after supporting decimal divide operation - assume(isSpark34Plus) - - Seq(true, false).foreach { dictionary => - withSQLConf( - SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false", - "parquet.enable.dictionary" -> dictionary.toString) { - withTempPath { dir => - val data = makeDecimalRDD(10, DecimalType(18, 10), dictionary) - data.write.parquet(dir.getCanonicalPath) - readParquetFile(dir.getCanonicalPath) { df => - { - val decimalLiteral = Decimal(0.00) - val cometDf = df.select($"dec" / decimalLiteral, $"dec" % decimalLiteral) - checkSparkAnswerAndOperator(cometDf) - } - } - } - } - } - } - - test("bitwise shift with different left/right types") { - Seq(false, true).foreach { dictionary => - withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { - val table = "test" - withTable(table) { - sql(s"create table $table(col1 long, col2 int) using parquet") - sql(s"insert into $table values(1111, 2)") - sql(s"insert into $table values(1111, 2)") - sql(s"insert into $table values(3333, 4)") - sql(s"insert into $table values(5555, 6)") - - checkSparkAnswerAndOperator( - s"SELECT shiftright(col1, 2), shiftright(col1, col2) FROM $table") - checkSparkAnswerAndOperator( - s"SELECT shiftleft(col1, 2), shiftleft(col1, col2) FROM $table") - } - } - } - } - - test("basic data type support") { - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) - withParquetTable(path.toString, "tbl") { - // TODO: enable test for unsigned ints - checkSparkAnswerAndOperator( - "select _1, _2, _3, _4, _5, _6, _7, _8, _13, _14, _15, _16, _17, " + - "_18, _19, _20 FROM tbl WHERE _2 > 100") - } - } - } - } - - test("null literals") { - val batchSize = 1000 - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, batchSize) - withParquetTable(path.toString, "tbl") { - val sqlString = "SELECT _4 + null, _15 - null, _16 * null FROM tbl" - val df2 = sql(sqlString) - val rows = df2.collect() - assert(rows.length == batchSize) - assert(rows.forall(_ == Row(null, null, null))) - - checkSparkAnswerAndOperator(sqlString) - } - } - } - } - - test("date and timestamp type literals") { - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) - withParquetTable(path.toString, "tbl") { - checkSparkAnswerAndOperator( - "SELECT _4 FROM tbl WHERE " + - "_20 > CAST('2020-01-01' AS DATE) AND _18 < CAST('2020-01-01' AS TIMESTAMP)") - } - } - } - } - - test("dictionary arithmetic") { - // TODO: test ANSI mode - withSQLConf(SQLConf.ANSI_ENABLED.key -> "false", "parquet.enable.dictionary" -> "true") { - withParquetTable((0 until 10).map(i => (i % 5, i % 3)), "tbl") { - checkSparkAnswerAndOperator("SELECT _1 + _2, _1 - _2, _1 * _2, _1 / _2, _1 % _2 FROM tbl") - } - } - } - - test("dictionary arithmetic with scalar") { - withSQLConf("parquet.enable.dictionary" -> "true") { - withParquetTable((0 until 10).map(i => (i % 5, i % 3)), "tbl") { - checkSparkAnswerAndOperator("SELECT _1 + 1, _1 - 1, _1 * 2, _1 / 2, _1 % 2 FROM tbl") - } - } - } - - test("string type and substring") { - withParquetTable((0 until 5).map(i => (i.toString, (i + 100).toString)), "tbl") { - checkSparkAnswerAndOperator("SELECT _1, substring(_2, 2, 2) FROM tbl") - checkSparkAnswerAndOperator("SELECT _1, substring(_2, 2, -2) FROM tbl") - checkSparkAnswerAndOperator("SELECT _1, substring(_2, -2, 2) FROM tbl") - checkSparkAnswerAndOperator("SELECT _1, substring(_2, -2, -2) FROM tbl") - checkSparkAnswerAndOperator("SELECT _1, substring(_2, -2, 10) FROM tbl") - checkSparkAnswerAndOperator("SELECT _1, substring(_2, 0, 0) FROM tbl") - checkSparkAnswerAndOperator("SELECT _1, substring(_2, 1, 0) FROM tbl") - } - } - - test("substring with start < 1") { - withTempPath { _ => - withTable("t") { - sql("create table t (col string) using parquet") - sql("insert into t values('123456')") - checkSparkAnswerAndOperator(sql("select substring(col, 0) from t")) - checkSparkAnswerAndOperator(sql("select substring(col, -1) from t")) - } - } - } - - test("string with coalesce") { - withParquetTable( - (0 until 10).map(i => (i.toString, if (i > 5) None else Some((i + 100).toString))), - "tbl") { - checkSparkAnswerAndOperator( - "SELECT coalesce(_1), coalesce(_1, 1), coalesce(null, _1), coalesce(null, 1), coalesce(_2, _1), coalesce(null) FROM tbl") - } - } - - test("substring with dictionary") { - val data = (0 until 1000) - .map(_ % 5) // reduce value space to trigger dictionary encoding - .map(i => (i.toString, (i + 100).toString)) - withParquetTable(data, "tbl") { - checkSparkAnswerAndOperator("SELECT _1, substring(_2, 2, 2) FROM tbl") - } - } - - test("string_space") { - withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl") { - checkSparkAnswerAndOperator("SELECT space(_1), space(_2) FROM tbl") - } - } - - test("string_space with dictionary") { - val data = (0 until 1000).map(i => Tuple1(i % 5)) - - withSQLConf("parquet.enable.dictionary" -> "true") { - withParquetTable(data, "tbl") { - checkSparkAnswerAndOperator("SELECT space(_1) FROM tbl") - } - } - } - - test("hour, minute, second") { - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "part-r-0.parquet") - val expected = makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000) - readParquetFile(path.toString) { df => - val query = df.select(expr("hour(_1)"), expr("minute(_1)"), expr("second(_1)")) - - checkAnswer( - query, - expected.map { - case None => - Row(null, null, null) - case Some(i) => - val timestamp = new java.sql.Timestamp(i).toLocalDateTime - val hour = timestamp.getHour - val minute = timestamp.getMinute - val second = timestamp.getSecond - - Row(hour, minute, second) - }) - } - } - } - } - - test("hour on int96 timestamp column") { - import testImplicits._ - - val N = 100 - val ts = "2020-01-01 01:02:03.123456" - Seq(true, false).foreach { dictionaryEnabled => - Seq(false, true).foreach { conversionEnabled => - withSQLConf( - SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96", - SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key -> conversionEnabled.toString) { - withTempPath { path => - Seq - .tabulate(N)(_ => ts) - .toDF("ts1") - .select($"ts1".cast("timestamp").as("ts")) - .repartition(1) - .write - .option("parquet.enable.dictionary", dictionaryEnabled) - .parquet(path.getCanonicalPath) - - checkAnswer( - spark.read.parquet(path.getCanonicalPath).select(expr("hour(ts)")), - Seq.tabulate(N)(_ => Row(1))) - } - } - } - } - } - - test("cast timestamp and timestamp_ntz") { - withSQLConf( - SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") - makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000) - withParquetTable(path.toString, "timetbl") { - checkSparkAnswerAndOperator( - "SELECT " + - "cast(_2 as timestamp) tz_millis, " + - "cast(_3 as timestamp) ntz_millis, " + - "cast(_4 as timestamp) tz_micros, " + - "cast(_5 as timestamp) ntz_micros " + - " from timetbl") - } - } - } - } - } - - test("cast timestamp and timestamp_ntz to string") { - // TODO: make the test pass for Spark 3.2 & 3.3 - assume(isSpark34Plus) - - withSQLConf( - SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") - makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 2001) - withParquetTable(path.toString, "timetbl") { - checkSparkAnswerAndOperator( - "SELECT " + - "cast(_2 as string) tz_millis, " + - "cast(_3 as string) ntz_millis, " + - "cast(_4 as string) tz_micros, " + - "cast(_5 as string) ntz_micros " + - " from timetbl") - } - } - } - } - } - - test("cast timestamp and timestamp_ntz to long, date") { - // TODO: make the test pass for Spark 3.2 & 3.3 - assume(isSpark34Plus) - - withSQLConf( - SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") - makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000) - withParquetTable(path.toString, "timetbl") { - checkSparkAnswerAndOperator( - "SELECT " + - "cast(_2 as long) tz_millis, " + - "cast(_4 as long) tz_micros, " + - "cast(_2 as date) tz_millis_to_date, " + - "cast(_3 as date) ntz_millis_to_date, " + - "cast(_4 as date) tz_micros_to_date, " + - "cast(_5 as date) ntz_micros_to_date " + - " from timetbl") - } - } - } - } - } - - test("trunc") { - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "date_trunc.parquet") - makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) - withParquetTable(path.toString, "tbl") { - Seq("YEAR", "YYYY", "YY", "QUARTER", "MON", "MONTH", "MM", "WEEK").foreach { format => - checkSparkAnswerAndOperator(s"SELECT trunc(_20, '$format') from tbl") - } - } - } - } - } - - test("trunc with format array") { - val numRows = 1000 - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "date_trunc_with_format.parquet") - makeDateTimeWithFormatTable(path, dictionaryEnabled = dictionaryEnabled, numRows) - withParquetTable(path.toString, "dateformattbl") { - checkSparkAnswerAndOperator( - "SELECT " + - "dateformat, _7, " + - "trunc(_7, dateformat) " + - " from dateformattbl ") - } - } - } - } - - test("date_trunc") { - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") - makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000) - withParquetTable(path.toString, "timetbl") { - Seq( - "YEAR", - "YYYY", - "YY", - "MON", - "MONTH", - "MM", - "QUARTER", - "WEEK", - "DAY", - "DD", - "HOUR", - "MINUTE", - "SECOND", - "MILLISECOND", - "MICROSECOND").foreach { format => - checkSparkAnswerAndOperator( - "SELECT " + - s"date_trunc('$format', _0), " + - s"date_trunc('$format', _1), " + - s"date_trunc('$format', _2), " + - s"date_trunc('$format', _4) " + - " from timetbl") - } - } - } - } - } - - test("date_trunc with timestamp_ntz") { - assume(!isSpark32, "timestamp functions for timestamp_ntz have incorrect behavior in 3.2") - withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") - makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000) - withParquetTable(path.toString, "timetbl") { - Seq( - "YEAR", - "YYYY", - "YY", - "MON", - "MONTH", - "MM", - "QUARTER", - "WEEK", - "DAY", - "DD", - "HOUR", - "MINUTE", - "SECOND", - "MILLISECOND", - "MICROSECOND").foreach { format => - checkSparkAnswerAndOperator( - "SELECT " + - s"date_trunc('$format', _3), " + - s"date_trunc('$format', _5) " + - " from timetbl") - } - } - } - } - } - } - - test("date_trunc with format array") { - assume(isSpark33Plus, "TimestampNTZ is supported in Spark 3.3+, See SPARK-36182") - withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { - val numRows = 1000 - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "timestamp_trunc_with_format.parquet") - makeDateTimeWithFormatTable(path, dictionaryEnabled = dictionaryEnabled, numRows) - withParquetTable(path.toString, "timeformattbl") { - checkSparkAnswerAndOperator( - "SELECT " + - "format, _0, _1, _2, _3, _4, _5, " + - "date_trunc(format, _0), " + - "date_trunc(format, _1), " + - "date_trunc(format, _2), " + - "date_trunc(format, _3), " + - "date_trunc(format, _4), " + - "date_trunc(format, _5) " + - " from timeformattbl ") - } - } - } - } - } - - test("date_trunc on int96 timestamp column") { - import testImplicits._ - - val N = 100 - val ts = "2020-01-01 01:02:03.123456" - Seq(true, false).foreach { dictionaryEnabled => - Seq(false, true).foreach { conversionEnabled => - withSQLConf( - SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96", - SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key -> conversionEnabled.toString) { - withTempPath { path => - Seq - .tabulate(N)(_ => ts) - .toDF("ts1") - .select($"ts1".cast("timestamp").as("ts")) - .repartition(1) - .write - .option("parquet.enable.dictionary", dictionaryEnabled) - .parquet(path.getCanonicalPath) - - withParquetTable(path.toString, "int96timetbl") { - Seq( - "YEAR", - "YYYY", - "YY", - "MON", - "MONTH", - "MM", - "QUARTER", - "WEEK", - "DAY", - "DD", - "HOUR", - "MINUTE", - "SECOND", - "MILLISECOND", - "MICROSECOND").foreach { format => - checkSparkAnswer( - "SELECT " + - s"date_trunc('$format', ts )" + - " from int96timetbl") - } - } - } - } - } - } - } - - test("charvarchar") { - Seq(false, true).foreach { dictionary => - withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { - val table = "char_tbl4" - withTable(table) { - val view = "str_view" - withView(view) { - sql(s"""create temporary view $view as select c, v from values - | (null, null), (null, null), - | (null, 'S'), (null, 'S'), - | ('N', 'N '), ('N', 'N '), - | ('Ne', 'Sp'), ('Ne', 'Sp'), - | ('Net ', 'Spa '), ('Net ', 'Spa '), - | ('NetE', 'Spar'), ('NetE', 'Spar'), - | ('NetEa ', 'Spark '), ('NetEa ', 'Spark '), - | ('NetEas ', 'Spark'), ('NetEas ', 'Spark'), - | ('NetEase', 'Spark-'), ('NetEase', 'Spark-') t(c, v);""".stripMargin) - sql( - s"create table $table(c7 char(7), c8 char(8), v varchar(6), s string) using parquet;") - sql(s"insert into $table select c, c, v, c from $view;") - val df = sql(s"""select substring(c7, 2), substring(c8, 2), - | substring(v, 3), substring(s, 2) from $table;""".stripMargin) - - val expected = Row(" ", " ", "", "") :: - Row(null, null, "", null) :: Row(null, null, null, null) :: - Row("e ", "e ", "", "e") :: Row("et ", "et ", "a ", "et ") :: - Row("etE ", "etE ", "ar", "etE") :: - Row("etEa ", "etEa ", "ark ", "etEa ") :: - Row("etEas ", "etEas ", "ark", "etEas ") :: - Row("etEase", "etEase ", "ark-", "etEase") :: Nil - checkAnswer(df, expected ::: expected) - } - } - } - } - } - - test("char varchar over length values") { - Seq("char", "varchar").foreach { typ => - withTempPath { dir => - withTable("t") { - sql("select '123456' as col").write.format("parquet").save(dir.toString) - sql(s"create table t (col $typ(2)) using parquet location '$dir'") - sql("insert into t values('1')") - checkSparkAnswerAndOperator(sql("select substring(col, 1) from t")) - checkSparkAnswerAndOperator(sql("select substring(col, 0) from t")) - checkSparkAnswerAndOperator(sql("select substring(col, -1) from t")) - } - } - } - } - - test("like (LikeSimplification enabled)") { - val table = "names" - withTable(table) { - sql(s"create table $table(id int, name varchar(20)) using parquet") - sql(s"insert into $table values(1,'James Smith')") - sql(s"insert into $table values(2,'Michael Rose')") - sql(s"insert into $table values(3,'Robert Williams')") - sql(s"insert into $table values(4,'Rames Rose')") - sql(s"insert into $table values(5,'Rames rose')") - - // Filter column having values 'Rames _ose', where any character matches for '_' - val query = sql(s"select id from $table where name like 'Rames _ose'") - checkAnswer(query, Row(4) :: Row(5) :: Nil) - - // Filter rows that contains 'rose' in 'name' column - val queryContains = sql(s"select id from $table where name like '%rose%'") - checkAnswer(queryContains, Row(5) :: Nil) - - // Filter rows that starts with 'R' following by any characters - val queryStartsWith = sql(s"select id from $table where name like 'R%'") - checkAnswer(queryStartsWith, Row(3) :: Row(4) :: Row(5) :: Nil) - - // Filter rows that ends with 's' following by any characters - val queryEndsWith = sql(s"select id from $table where name like '%s'") - checkAnswer(queryEndsWith, Row(3) :: Nil) - } - } - - test("like with custom escape") { - val table = "names" - withTable(table) { - sql(s"create table $table(id int, name varchar(20)) using parquet") - sql(s"insert into $table values(1,'James Smith')") - sql(s"insert into $table values(2,'Michael_Rose')") - sql(s"insert into $table values(3,'Robert_R_Williams')") - - // Filter column having values that include underscores - val queryDefaultEscape = sql("select id from names where name like '%\\_%'") - checkSparkAnswerAndOperator(queryDefaultEscape) - - val queryCustomEscape = sql("select id from names where name like '%$_%' escape '$'") - checkAnswer(queryCustomEscape, Row(2) :: Row(3) :: Nil) - - } - } - - test("contains") { - assume(!isSpark32) - - val table = "names" - withTable(table) { - sql(s"create table $table(id int, name varchar(20)) using parquet") - sql(s"insert into $table values(1,'James Smith')") - sql(s"insert into $table values(2,'Michael Rose')") - sql(s"insert into $table values(3,'Robert Williams')") - sql(s"insert into $table values(4,'Rames Rose')") - sql(s"insert into $table values(5,'Rames rose')") - - // Filter rows that contains 'rose' in 'name' column - val queryContains = sql(s"select id from $table where contains (name, 'rose')") - checkAnswer(queryContains, Row(5) :: Nil) - } - } - - test("startswith") { - assume(!isSpark32) - - val table = "names" - withTable(table) { - sql(s"create table $table(id int, name varchar(20)) using parquet") - sql(s"insert into $table values(1,'James Smith')") - sql(s"insert into $table values(2,'Michael Rose')") - sql(s"insert into $table values(3,'Robert Williams')") - sql(s"insert into $table values(4,'Rames Rose')") - sql(s"insert into $table values(5,'Rames rose')") - - // Filter rows that starts with 'R' following by any characters - val queryStartsWith = sql(s"select id from $table where startswith (name, 'R')") - checkAnswer(queryStartsWith, Row(3) :: Row(4) :: Row(5) :: Nil) - } - } - - test("endswith") { - assume(!isSpark32) - - val table = "names" - withTable(table) { - sql(s"create table $table(id int, name varchar(20)) using parquet") - sql(s"insert into $table values(1,'James Smith')") - sql(s"insert into $table values(2,'Michael Rose')") - sql(s"insert into $table values(3,'Robert Williams')") - sql(s"insert into $table values(4,'Rames Rose')") - sql(s"insert into $table values(5,'Rames rose')") - - // Filter rows that ends with 's' following by any characters - val queryEndsWith = sql(s"select id from $table where endswith (name, 's')") - checkAnswer(queryEndsWith, Row(3) :: Nil) - } - } - - test("add overflow (ANSI disable)") { - // Enabling ANSI will cause native engine failure, but as we cannot catch - // native error now, we cannot test it here. - withSQLConf(SQLConf.ANSI_ENABLED.key -> "false") { - withParquetTable(Seq((Int.MaxValue, 1)), "tbl") { - checkSparkAnswerAndOperator("SELECT _1 + _2 FROM tbl") - } - } - } - - test("divide by zero (ANSI disable)") { - // Enabling ANSI will cause native engine failure, but as we cannot catch - // native error now, we cannot test it here. - withSQLConf(SQLConf.ANSI_ENABLED.key -> "false") { - withParquetTable(Seq((1, 0, 1.0, 0.0)), "tbl") { - checkSparkAnswerAndOperator("SELECT _1 / _2, _3 / _4 FROM tbl") - } - } - } - - test("decimals arithmetic and comparison") { - // TODO: enable Spark 3.2 & 3.3 tests after supporting decimal reminder operation - assume(isSpark34Plus) - - def makeDecimalRDD(num: Int, decimal: DecimalType, useDictionary: Boolean): DataFrame = { - val div = if (useDictionary) 5 else num // narrow the space to make it dictionary encoded - spark - .range(num) - .map(_ % div) - // Parquet doesn't allow column names with spaces, have to add an alias here. - // Minus 500 here so that negative decimals are also tested. - .select( - (($"value" - 500) / 100.0) cast decimal as Symbol("dec1"), - (($"value" - 600) / 100.0) cast decimal as Symbol("dec2")) - .coalesce(1) - } - - Seq(true, false).foreach { dictionary => - Seq(16, 1024).foreach { batchSize => - withSQLConf( - CometConf.COMET_BATCH_SIZE.key -> batchSize.toString, - SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false", - "parquet.enable.dictionary" -> dictionary.toString) { - var combinations = Seq((5, 2), (1, 0), (18, 10), (18, 17), (19, 0), (38, 37)) - // If ANSI mode is on, the combination (1, 1) will cause a runtime error. Otherwise, the - // decimal RDD contains all null values and should be able to read back from Parquet. - - if (!SQLConf.get.ansiEnabled) { - combinations = combinations ++ Seq((1, 1)) - } - - for ((precision, scale) <- combinations) { - withTempPath { dir => - val data = makeDecimalRDD(10, DecimalType(precision, scale), dictionary) - data.write.parquet(dir.getCanonicalPath) - readParquetFile(dir.getCanonicalPath) { df => - { - val decimalLiteral1 = Decimal(1.00) - val decimalLiteral2 = Decimal(123.456789) - val cometDf = df.select( - $"dec1" + $"dec2", - $"dec1" - $"dec2", - $"dec1" % $"dec2", - $"dec1" >= $"dec1", - $"dec1" === "1.0", - $"dec1" + decimalLiteral1, - $"dec1" - decimalLiteral1, - $"dec1" + decimalLiteral2, - $"dec1" - decimalLiteral2) - - checkAnswer( - cometDf, - data - .select( - $"dec1" + $"dec2", - $"dec1" - $"dec2", - $"dec1" % $"dec2", - $"dec1" >= $"dec1", - $"dec1" === "1.0", - $"dec1" + decimalLiteral1, - $"dec1" - decimalLiteral1, - $"dec1" + decimalLiteral2, - $"dec1" - decimalLiteral2) - .collect() - .toSeq) - } - } - } - } - } - } - } - } - - test("scalar decimal arithmetic operations") { - assume(isSpark34Plus) - withTable("tbl") { - withSQLConf(CometConf.COMET_ENABLED.key -> "true") { - sql("CREATE TABLE tbl (a INT) USING PARQUET") - sql("INSERT INTO tbl VALUES (0)") - - val combinations = Seq((7, 3), (18, 10), (38, 4)) - for ((precision, scale) <- combinations) { - for (op <- Seq("+", "-", "*", "/", "%")) { - val left = s"CAST(1.00 AS DECIMAL($precision, $scale))" - val right = s"CAST(123.45 AS DECIMAL($precision, $scale))" - - withSQLConf( - "spark.sql.optimizer.excludedRules" -> - "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { - - checkSparkAnswerAndOperator(s"SELECT $left $op $right FROM tbl") - } - } - } - } - } - } - - test("cast decimals to int") { - Seq(16, 1024).foreach { batchSize => - withSQLConf( - CometConf.COMET_BATCH_SIZE.key -> batchSize.toString, - SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false") { - var combinations = Seq((5, 2), (1, 0), (18, 10), (18, 17), (19, 0), (38, 37)) - // If ANSI mode is on, the combination (1, 1) will cause a runtime error. Otherwise, the - // decimal RDD contains all null values and should be able to read back from Parquet. - - if (!SQLConf.get.ansiEnabled) { - combinations = combinations ++ Seq((1, 1)) - } - - for ((precision, scale) <- combinations; useDictionary <- Seq(false)) { - withTempPath { dir => - val data = makeDecimalRDD(10, DecimalType(precision, scale), useDictionary) - data.write.parquet(dir.getCanonicalPath) - readParquetFile(dir.getCanonicalPath) { df => - { - val cometDf = df.select($"dec".cast("int")) - - // `data` is not read from Parquet, so it doesn't go Comet exec. - checkAnswer(cometDf, data.select($"dec".cast("int")).collect().toSeq) - } - } - } - } - } - } - } - - test("various math scalar functions") { - Seq("true", "false").foreach { dictionary => - withSQLConf("parquet.enable.dictionary" -> dictionary) { - withParquetTable( - (0 until 5).map(i => (i.toDouble + 0.3, i.toDouble + 0.8)), - "tbl", - withDictionary = dictionary.toBoolean) { - checkSparkAnswerWithTol( - "SELECT abs(_1), acos(_2), asin(_1), atan(_2), atan2(_1, _2), cos(_1) FROM tbl") - checkSparkAnswerWithTol( - "SELECT exp(_1), ln(_2), log10(_1), log2(_1), pow(_1, _2) FROM tbl") - // TODO: comment in the round tests once supported - // checkSparkAnswerWithTol("SELECT round(_1), round(_2) FROM tbl") - checkSparkAnswerWithTol("SELECT signum(_1), sin(_1), sqrt(_1) FROM tbl") - checkSparkAnswerWithTol("SELECT tan(_1) FROM tbl") - } - } - } - } - - test("abs") { - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 100) - withParquetTable(path.toString, "tbl") { - Seq(2, 3, 4, 5, 6, 7, 15, 16, 17).foreach { col => - checkSparkAnswerAndOperator(s"SELECT abs(_${col}) FROM tbl") - } - } - } - } - } + // test("decimals divide by zero") { + // // TODO: enable Spark 3.2 & 3.3 tests after supporting decimal divide operation + // assume(isSpark34Plus) + + // Seq(true, false).foreach { dictionary => + // withSQLConf( + // SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false", + // "parquet.enable.dictionary" -> dictionary.toString) { + // withTempPath { dir => + // val data = makeDecimalRDD(10, DecimalType(18, 10), dictionary) + // data.write.parquet(dir.getCanonicalPath) + // readParquetFile(dir.getCanonicalPath) { df => + // { + // val decimalLiteral = Decimal(0.00) + // val cometDf = df.select($"dec" / decimalLiteral, $"dec" % decimalLiteral) + // checkSparkAnswerAndOperator(cometDf) + // } + // } + // } + // } + // } + // } + + // test("bitwise shift with different left/right types") { + // Seq(false, true).foreach { dictionary => + // withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { + // val table = "test" + // withTable(table) { + // sql(s"create table $table(col1 long, col2 int) using parquet") + // sql(s"insert into $table values(1111, 2)") + // sql(s"insert into $table values(1111, 2)") + // sql(s"insert into $table values(3333, 4)") + // sql(s"insert into $table values(5555, 6)") + + // checkSparkAnswerAndOperator( + // s"SELECT shiftright(col1, 2), shiftright(col1, col2) FROM $table") + // checkSparkAnswerAndOperator( + // s"SELECT shiftleft(col1, 2), shiftleft(col1, col2) FROM $table") + // } + // } + // } + // } + + // test("basic data type support") { + // Seq(true, false).foreach { dictionaryEnabled => + // withTempDir { dir => + // val path = new Path(dir.toURI.toString, "test.parquet") + // makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) + // withParquetTable(path.toString, "tbl") { + // // TODO: enable test for unsigned ints + // checkSparkAnswerAndOperator( + // "select _1, _2, _3, _4, _5, _6, _7, _8, _13, _14, _15, _16, _17, " + + // "_18, _19, _20 FROM tbl WHERE _2 > 100") + // } + // } + // } + // } + + // test("null literals") { + // val batchSize = 1000 + // Seq(true, false).foreach { dictionaryEnabled => + // withTempDir { dir => + // val path = new Path(dir.toURI.toString, "test.parquet") + // makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, batchSize) + // withParquetTable(path.toString, "tbl") { + // val sqlString = "SELECT _4 + null, _15 - null, _16 * null FROM tbl" + // val df2 = sql(sqlString) + // val rows = df2.collect() + // assert(rows.length == batchSize) + // assert(rows.forall(_ == Row(null, null, null))) + + // checkSparkAnswerAndOperator(sqlString) + // } + // } + // } + // } + + // test("date and timestamp type literals") { + // Seq(true, false).foreach { dictionaryEnabled => + // withTempDir { dir => + // val path = new Path(dir.toURI.toString, "test.parquet") + // makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) + // withParquetTable(path.toString, "tbl") { + // checkSparkAnswerAndOperator( + // "SELECT _4 FROM tbl WHERE " + + // "_20 > CAST('2020-01-01' AS DATE) AND _18 < CAST('2020-01-01' AS TIMESTAMP)") + // } + // } + // } + // } + + // test("dictionary arithmetic") { + // // TODO: test ANSI mode + // withSQLConf(SQLConf.ANSI_ENABLED.key -> "false", "parquet.enable.dictionary" -> "true") { + // withParquetTable((0 until 10).map(i => (i % 5, i % 3)), "tbl") { + // checkSparkAnswerAndOperator("SELECT _1 + _2, _1 - _2, _1 * _2, _1 / _2, _1 % _2 FROM tbl") + // } + // } + // } + + // test("dictionary arithmetic with scalar") { + // withSQLConf("parquet.enable.dictionary" -> "true") { + // withParquetTable((0 until 10).map(i => (i % 5, i % 3)), "tbl") { + // checkSparkAnswerAndOperator("SELECT _1 + 1, _1 - 1, _1 * 2, _1 / 2, _1 % 2 FROM tbl") + // } + // } + // } + + // test("string type and substring") { + // withParquetTable((0 until 5).map(i => (i.toString, (i + 100).toString)), "tbl") { + // checkSparkAnswerAndOperator("SELECT _1, substring(_2, 2, 2) FROM tbl") + // checkSparkAnswerAndOperator("SELECT _1, substring(_2, 2, -2) FROM tbl") + // checkSparkAnswerAndOperator("SELECT _1, substring(_2, -2, 2) FROM tbl") + // checkSparkAnswerAndOperator("SELECT _1, substring(_2, -2, -2) FROM tbl") + // checkSparkAnswerAndOperator("SELECT _1, substring(_2, -2, 10) FROM tbl") + // checkSparkAnswerAndOperator("SELECT _1, substring(_2, 0, 0) FROM tbl") + // checkSparkAnswerAndOperator("SELECT _1, substring(_2, 1, 0) FROM tbl") + // } + // } + + // test("substring with start < 1") { + // withTempPath { _ => + // withTable("t") { + // sql("create table t (col string) using parquet") + // sql("insert into t values('123456')") + // checkSparkAnswerAndOperator(sql("select substring(col, 0) from t")) + // checkSparkAnswerAndOperator(sql("select substring(col, -1) from t")) + // } + // } + // } + + // test("string with coalesce") { + // withParquetTable( + // (0 until 10).map(i => (i.toString, if (i > 5) None else Some((i + 100).toString))), + // "tbl") { + // checkSparkAnswerAndOperator( + // "SELECT coalesce(_1), coalesce(_1, 1), coalesce(null, _1), coalesce(null, 1), coalesce(_2, _1), coalesce(null) FROM tbl") + // } + // } + + // test("substring with dictionary") { + // val data = (0 until 1000) + // .map(_ % 5) // reduce value space to trigger dictionary encoding + // .map(i => (i.toString, (i + 100).toString)) + // withParquetTable(data, "tbl") { + // checkSparkAnswerAndOperator("SELECT _1, substring(_2, 2, 2) FROM tbl") + // } + // } + + // test("string_space") { + // withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl") { + // checkSparkAnswerAndOperator("SELECT space(_1), space(_2) FROM tbl") + // } + // } + + // test("string_space with dictionary") { + // val data = (0 until 1000).map(i => Tuple1(i % 5)) + + // withSQLConf("parquet.enable.dictionary" -> "true") { + // withParquetTable(data, "tbl") { + // checkSparkAnswerAndOperator("SELECT space(_1) FROM tbl") + // } + // } + // } + + // test("hour, minute, second") { + // Seq(true, false).foreach { dictionaryEnabled => + // withTempDir { dir => + // val path = new Path(dir.toURI.toString, "part-r-0.parquet") + // val expected = makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000) + // readParquetFile(path.toString) { df => + // val query = df.select(expr("hour(_1)"), expr("minute(_1)"), expr("second(_1)")) + + // checkAnswer( + // query, + // expected.map { + // case None => + // Row(null, null, null) + // case Some(i) => + // val timestamp = new java.sql.Timestamp(i).toLocalDateTime + // val hour = timestamp.getHour + // val minute = timestamp.getMinute + // val second = timestamp.getSecond + + // Row(hour, minute, second) + // }) + // } + // } + // } + // } + + // test("hour on int96 timestamp column") { + // import testImplicits._ + + // val N = 100 + // val ts = "2020-01-01 01:02:03.123456" + // Seq(true, false).foreach { dictionaryEnabled => + // Seq(false, true).foreach { conversionEnabled => + // withSQLConf( + // SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96", + // SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key -> conversionEnabled.toString) { + // withTempPath { path => + // Seq + // .tabulate(N)(_ => ts) + // .toDF("ts1") + // .select($"ts1".cast("timestamp").as("ts")) + // .repartition(1) + // .write + // .option("parquet.enable.dictionary", dictionaryEnabled) + // .parquet(path.getCanonicalPath) + + // checkAnswer( + // spark.read.parquet(path.getCanonicalPath).select(expr("hour(ts)")), + // Seq.tabulate(N)(_ => Row(1))) + // } + // } + // } + // } + // } + + // test("cast timestamp and timestamp_ntz") { + // withSQLConf( + // SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", + // CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + // Seq(true, false).foreach { dictionaryEnabled => + // withTempDir { dir => + // val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") + // makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000) + // withParquetTable(path.toString, "timetbl") { + // checkSparkAnswerAndOperator( + // "SELECT " + + // "cast(_2 as timestamp) tz_millis, " + + // "cast(_3 as timestamp) ntz_millis, " + + // "cast(_4 as timestamp) tz_micros, " + + // "cast(_5 as timestamp) ntz_micros " + + // " from timetbl") + // } + // } + // } + // } + // } + + // test("cast timestamp and timestamp_ntz to string") { + // // TODO: make the test pass for Spark 3.2 & 3.3 + // assume(isSpark34Plus) + + // withSQLConf( + // SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", + // CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + // Seq(true, false).foreach { dictionaryEnabled => + // withTempDir { dir => + // val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") + // makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 2001) + // withParquetTable(path.toString, "timetbl") { + // checkSparkAnswerAndOperator( + // "SELECT " + + // "cast(_2 as string) tz_millis, " + + // "cast(_3 as string) ntz_millis, " + + // "cast(_4 as string) tz_micros, " + + // "cast(_5 as string) ntz_micros " + + // " from timetbl") + // } + // } + // } + // } + // } + + // test("cast timestamp and timestamp_ntz to long, date") { + // // TODO: make the test pass for Spark 3.2 & 3.3 + // assume(isSpark34Plus) + + // withSQLConf( + // SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", + // CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + // Seq(true, false).foreach { dictionaryEnabled => + // withTempDir { dir => + // val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") + // makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000) + // withParquetTable(path.toString, "timetbl") { + // checkSparkAnswerAndOperator( + // "SELECT " + + // "cast(_2 as long) tz_millis, " + + // "cast(_4 as long) tz_micros, " + + // "cast(_2 as date) tz_millis_to_date, " + + // "cast(_3 as date) ntz_millis_to_date, " + + // "cast(_4 as date) tz_micros_to_date, " + + // "cast(_5 as date) ntz_micros_to_date " + + // " from timetbl") + // } + // } + // } + // } + // } + + // test("trunc") { + // Seq(true, false).foreach { dictionaryEnabled => + // withTempDir { dir => + // val path = new Path(dir.toURI.toString, "date_trunc.parquet") + // makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) + // withParquetTable(path.toString, "tbl") { + // Seq("YEAR", "YYYY", "YY", "QUARTER", "MON", "MONTH", "MM", "WEEK").foreach { format => + // checkSparkAnswerAndOperator(s"SELECT trunc(_20, '$format') from tbl") + // } + // } + // } + // } + // } + + // test("trunc with format array") { + // val numRows = 1000 + // Seq(true, false).foreach { dictionaryEnabled => + // withTempDir { dir => + // val path = new Path(dir.toURI.toString, "date_trunc_with_format.parquet") + // makeDateTimeWithFormatTable(path, dictionaryEnabled = dictionaryEnabled, numRows) + // withParquetTable(path.toString, "dateformattbl") { + // checkSparkAnswerAndOperator( + // "SELECT " + + // "dateformat, _7, " + + // "trunc(_7, dateformat) " + + // " from dateformattbl ") + // } + // } + // } + // } + + // test("date_trunc") { + // Seq(true, false).foreach { dictionaryEnabled => + // withTempDir { dir => + // val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") + // makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000) + // withParquetTable(path.toString, "timetbl") { + // Seq( + // "YEAR", + // "YYYY", + // "YY", + // "MON", + // "MONTH", + // "MM", + // "QUARTER", + // "WEEK", + // "DAY", + // "DD", + // "HOUR", + // "MINUTE", + // "SECOND", + // "MILLISECOND", + // "MICROSECOND").foreach { format => + // checkSparkAnswerAndOperator( + // "SELECT " + + // s"date_trunc('$format', _0), " + + // s"date_trunc('$format', _1), " + + // s"date_trunc('$format', _2), " + + // s"date_trunc('$format', _4) " + + // " from timetbl") + // } + // } + // } + // } + // } + + // test("date_trunc with timestamp_ntz") { + // assume(!isSpark32, "timestamp functions for timestamp_ntz have incorrect behavior in 3.2") + // withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + // Seq(true, false).foreach { dictionaryEnabled => + // withTempDir { dir => + // val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") + // makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000) + // withParquetTable(path.toString, "timetbl") { + // Seq( + // "YEAR", + // "YYYY", + // "YY", + // "MON", + // "MONTH", + // "MM", + // "QUARTER", + // "WEEK", + // "DAY", + // "DD", + // "HOUR", + // "MINUTE", + // "SECOND", + // "MILLISECOND", + // "MICROSECOND").foreach { format => + // checkSparkAnswerAndOperator( + // "SELECT " + + // s"date_trunc('$format', _3), " + + // s"date_trunc('$format', _5) " + + // " from timetbl") + // } + // } + // } + // } + // } + // } + + // test("date_trunc with format array") { + // assume(isSpark33Plus, "TimestampNTZ is supported in Spark 3.3+, See SPARK-36182") + // withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + // val numRows = 1000 + // Seq(true, false).foreach { dictionaryEnabled => + // withTempDir { dir => + // val path = new Path(dir.toURI.toString, "timestamp_trunc_with_format.parquet") + // makeDateTimeWithFormatTable(path, dictionaryEnabled = dictionaryEnabled, numRows) + // withParquetTable(path.toString, "timeformattbl") { + // checkSparkAnswerAndOperator( + // "SELECT " + + // "format, _0, _1, _2, _3, _4, _5, " + + // "date_trunc(format, _0), " + + // "date_trunc(format, _1), " + + // "date_trunc(format, _2), " + + // "date_trunc(format, _3), " + + // "date_trunc(format, _4), " + + // "date_trunc(format, _5) " + + // " from timeformattbl ") + // } + // } + // } + // } + // } + + // test("date_trunc on int96 timestamp column") { + // import testImplicits._ + + // val N = 100 + // val ts = "2020-01-01 01:02:03.123456" + // Seq(true, false).foreach { dictionaryEnabled => + // Seq(false, true).foreach { conversionEnabled => + // withSQLConf( + // SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96", + // SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key -> conversionEnabled.toString) { + // withTempPath { path => + // Seq + // .tabulate(N)(_ => ts) + // .toDF("ts1") + // .select($"ts1".cast("timestamp").as("ts")) + // .repartition(1) + // .write + // .option("parquet.enable.dictionary", dictionaryEnabled) + // .parquet(path.getCanonicalPath) + + // withParquetTable(path.toString, "int96timetbl") { + // Seq( + // "YEAR", + // "YYYY", + // "YY", + // "MON", + // "MONTH", + // "MM", + // "QUARTER", + // "WEEK", + // "DAY", + // "DD", + // "HOUR", + // "MINUTE", + // "SECOND", + // "MILLISECOND", + // "MICROSECOND").foreach { format => + // checkSparkAnswer( + // "SELECT " + + // s"date_trunc('$format', ts )" + + // " from int96timetbl") + // } + // } + // } + // } + // } + // } + // } + + // test("charvarchar") { + // Seq(false, true).foreach { dictionary => + // withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { + // val table = "char_tbl4" + // withTable(table) { + // val view = "str_view" + // withView(view) { + // sql(s"""create temporary view $view as select c, v from values + // | (null, null), (null, null), + // | (null, 'S'), (null, 'S'), + // | ('N', 'N '), ('N', 'N '), + // | ('Ne', 'Sp'), ('Ne', 'Sp'), + // | ('Net ', 'Spa '), ('Net ', 'Spa '), + // | ('NetE', 'Spar'), ('NetE', 'Spar'), + // | ('NetEa ', 'Spark '), ('NetEa ', 'Spark '), + // | ('NetEas ', 'Spark'), ('NetEas ', 'Spark'), + // | ('NetEase', 'Spark-'), ('NetEase', 'Spark-') t(c, v);""".stripMargin) + // sql( + // s"create table $table(c7 char(7), c8 char(8), v varchar(6), s string) using parquet;") + // sql(s"insert into $table select c, c, v, c from $view;") + // val df = sql(s"""select substring(c7, 2), substring(c8, 2), + // | substring(v, 3), substring(s, 2) from $table;""".stripMargin) + + // val expected = Row(" ", " ", "", "") :: + // Row(null, null, "", null) :: Row(null, null, null, null) :: + // Row("e ", "e ", "", "e") :: Row("et ", "et ", "a ", "et ") :: + // Row("etE ", "etE ", "ar", "etE") :: + // Row("etEa ", "etEa ", "ark ", "etEa ") :: + // Row("etEas ", "etEas ", "ark", "etEas ") :: + // Row("etEase", "etEase ", "ark-", "etEase") :: Nil + // checkAnswer(df, expected ::: expected) + // } + // } + // } + // } + // } + + // test("char varchar over length values") { + // Seq("char", "varchar").foreach { typ => + // withTempPath { dir => + // withTable("t") { + // sql("select '123456' as col").write.format("parquet").save(dir.toString) + // sql(s"create table t (col $typ(2)) using parquet location '$dir'") + // sql("insert into t values('1')") + // checkSparkAnswerAndOperator(sql("select substring(col, 1) from t")) + // checkSparkAnswerAndOperator(sql("select substring(col, 0) from t")) + // checkSparkAnswerAndOperator(sql("select substring(col, -1) from t")) + // } + // } + // } + // } + + // test("like (LikeSimplification enabled)") { + // val table = "names" + // withTable(table) { + // sql(s"create table $table(id int, name varchar(20)) using parquet") + // sql(s"insert into $table values(1,'James Smith')") + // sql(s"insert into $table values(2,'Michael Rose')") + // sql(s"insert into $table values(3,'Robert Williams')") + // sql(s"insert into $table values(4,'Rames Rose')") + // sql(s"insert into $table values(5,'Rames rose')") + + // // Filter column having values 'Rames _ose', where any character matches for '_' + // val query = sql(s"select id from $table where name like 'Rames _ose'") + // checkAnswer(query, Row(4) :: Row(5) :: Nil) + + // // Filter rows that contains 'rose' in 'name' column + // val queryContains = sql(s"select id from $table where name like '%rose%'") + // checkAnswer(queryContains, Row(5) :: Nil) + + // // Filter rows that starts with 'R' following by any characters + // val queryStartsWith = sql(s"select id from $table where name like 'R%'") + // checkAnswer(queryStartsWith, Row(3) :: Row(4) :: Row(5) :: Nil) + + // // Filter rows that ends with 's' following by any characters + // val queryEndsWith = sql(s"select id from $table where name like '%s'") + // checkAnswer(queryEndsWith, Row(3) :: Nil) + // } + // } + + // test("like with custom escape") { + // val table = "names" + // withTable(table) { + // sql(s"create table $table(id int, name varchar(20)) using parquet") + // sql(s"insert into $table values(1,'James Smith')") + // sql(s"insert into $table values(2,'Michael_Rose')") + // sql(s"insert into $table values(3,'Robert_R_Williams')") + + // // Filter column having values that include underscores + // val queryDefaultEscape = sql("select id from names where name like '%\\_%'") + // checkSparkAnswerAndOperator(queryDefaultEscape) + + // val queryCustomEscape = sql("select id from names where name like '%$_%' escape '$'") + // checkAnswer(queryCustomEscape, Row(2) :: Row(3) :: Nil) + + // } + // } + + // test("contains") { + // assume(!isSpark32) + + // val table = "names" + // withTable(table) { + // sql(s"create table $table(id int, name varchar(20)) using parquet") + // sql(s"insert into $table values(1,'James Smith')") + // sql(s"insert into $table values(2,'Michael Rose')") + // sql(s"insert into $table values(3,'Robert Williams')") + // sql(s"insert into $table values(4,'Rames Rose')") + // sql(s"insert into $table values(5,'Rames rose')") + + // // Filter rows that contains 'rose' in 'name' column + // val queryContains = sql(s"select id from $table where contains (name, 'rose')") + // checkAnswer(queryContains, Row(5) :: Nil) + // } + // } + + // test("startswith") { + // assume(!isSpark32) + + // val table = "names" + // withTable(table) { + // sql(s"create table $table(id int, name varchar(20)) using parquet") + // sql(s"insert into $table values(1,'James Smith')") + // sql(s"insert into $table values(2,'Michael Rose')") + // sql(s"insert into $table values(3,'Robert Williams')") + // sql(s"insert into $table values(4,'Rames Rose')") + // sql(s"insert into $table values(5,'Rames rose')") + + // // Filter rows that starts with 'R' following by any characters + // val queryStartsWith = sql(s"select id from $table where startswith (name, 'R')") + // checkAnswer(queryStartsWith, Row(3) :: Row(4) :: Row(5) :: Nil) + // } + // } + + // test("endswith") { + // assume(!isSpark32) + + // val table = "names" + // withTable(table) { + // sql(s"create table $table(id int, name varchar(20)) using parquet") + // sql(s"insert into $table values(1,'James Smith')") + // sql(s"insert into $table values(2,'Michael Rose')") + // sql(s"insert into $table values(3,'Robert Williams')") + // sql(s"insert into $table values(4,'Rames Rose')") + // sql(s"insert into $table values(5,'Rames rose')") + + // // Filter rows that ends with 's' following by any characters + // val queryEndsWith = sql(s"select id from $table where endswith (name, 's')") + // checkAnswer(queryEndsWith, Row(3) :: Nil) + // } + // } + + // test("add overflow (ANSI disable)") { + // // Enabling ANSI will cause native engine failure, but as we cannot catch + // // native error now, we cannot test it here. + // withSQLConf(SQLConf.ANSI_ENABLED.key -> "false") { + // withParquetTable(Seq((Int.MaxValue, 1)), "tbl") { + // checkSparkAnswerAndOperator("SELECT _1 + _2 FROM tbl") + // } + // } + // } + + // test("divide by zero (ANSI disable)") { + // // Enabling ANSI will cause native engine failure, but as we cannot catch + // // native error now, we cannot test it here. + // withSQLConf(SQLConf.ANSI_ENABLED.key -> "false") { + // withParquetTable(Seq((1, 0, 1.0, 0.0)), "tbl") { + // checkSparkAnswerAndOperator("SELECT _1 / _2, _3 / _4 FROM tbl") + // } + // } + // } + + // test("decimals arithmetic and comparison") { + // // TODO: enable Spark 3.2 & 3.3 tests after supporting decimal reminder operation + // assume(isSpark34Plus) + + // def makeDecimalRDD(num: Int, decimal: DecimalType, useDictionary: Boolean): DataFrame = { + // val div = if (useDictionary) 5 else num // narrow the space to make it dictionary encoded + // spark + // .range(num) + // .map(_ % div) + // // Parquet doesn't allow column names with spaces, have to add an alias here. + // // Minus 500 here so that negative decimals are also tested. + // .select( + // (($"value" - 500) / 100.0) cast decimal as Symbol("dec1"), + // (($"value" - 600) / 100.0) cast decimal as Symbol("dec2")) + // .coalesce(1) + // } + + // Seq(true, false).foreach { dictionary => + // Seq(16, 1024).foreach { batchSize => + // withSQLConf( + // CometConf.COMET_BATCH_SIZE.key -> batchSize.toString, + // SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false", + // "parquet.enable.dictionary" -> dictionary.toString) { + // var combinations = Seq((5, 2), (1, 0), (18, 10), (18, 17), (19, 0), (38, 37)) + // // If ANSI mode is on, the combination (1, 1) will cause a runtime error. Otherwise, the + // // decimal RDD contains all null values and should be able to read back from Parquet. + + // if (!SQLConf.get.ansiEnabled) { + // combinations = combinations ++ Seq((1, 1)) + // } + + // for ((precision, scale) <- combinations) { + // withTempPath { dir => + // val data = makeDecimalRDD(10, DecimalType(precision, scale), dictionary) + // data.write.parquet(dir.getCanonicalPath) + // readParquetFile(dir.getCanonicalPath) { df => + // { + // val decimalLiteral1 = Decimal(1.00) + // val decimalLiteral2 = Decimal(123.456789) + // val cometDf = df.select( + // $"dec1" + $"dec2", + // $"dec1" - $"dec2", + // $"dec1" % $"dec2", + // $"dec1" >= $"dec1", + // $"dec1" === "1.0", + // $"dec1" + decimalLiteral1, + // $"dec1" - decimalLiteral1, + // $"dec1" + decimalLiteral2, + // $"dec1" - decimalLiteral2) + + // checkAnswer( + // cometDf, + // data + // .select( + // $"dec1" + $"dec2", + // $"dec1" - $"dec2", + // $"dec1" % $"dec2", + // $"dec1" >= $"dec1", + // $"dec1" === "1.0", + // $"dec1" + decimalLiteral1, + // $"dec1" - decimalLiteral1, + // $"dec1" + decimalLiteral2, + // $"dec1" - decimalLiteral2) + // .collect() + // .toSeq) + // } + // } + // } + // } + // } + // } + // } + // } + + // test("scalar decimal arithmetic operations") { + // assume(isSpark34Plus) + // withTable("tbl") { + // withSQLConf(CometConf.COMET_ENABLED.key -> "true") { + // sql("CREATE TABLE tbl (a INT) USING PARQUET") + // sql("INSERT INTO tbl VALUES (0)") + + // val combinations = Seq((7, 3), (18, 10), (38, 4)) + // for ((precision, scale) <- combinations) { + // for (op <- Seq("+", "-", "*", "/", "%")) { + // val left = s"CAST(1.00 AS DECIMAL($precision, $scale))" + // val right = s"CAST(123.45 AS DECIMAL($precision, $scale))" + + // withSQLConf( + // "spark.sql.optimizer.excludedRules" -> + // "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { + + // checkSparkAnswerAndOperator(s"SELECT $left $op $right FROM tbl") + // } + // } + // } + // } + // } + // } + + // test("cast decimals to int") { + // Seq(16, 1024).foreach { batchSize => + // withSQLConf( + // CometConf.COMET_BATCH_SIZE.key -> batchSize.toString, + // SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false") { + // var combinations = Seq((5, 2), (1, 0), (18, 10), (18, 17), (19, 0), (38, 37)) + // // If ANSI mode is on, the combination (1, 1) will cause a runtime error. Otherwise, the + // // decimal RDD contains all null values and should be able to read back from Parquet. + + // if (!SQLConf.get.ansiEnabled) { + // combinations = combinations ++ Seq((1, 1)) + // } + + // for ((precision, scale) <- combinations; useDictionary <- Seq(false)) { + // withTempPath { dir => + // val data = makeDecimalRDD(10, DecimalType(precision, scale), useDictionary) + // data.write.parquet(dir.getCanonicalPath) + // readParquetFile(dir.getCanonicalPath) { df => + // { + // val cometDf = df.select($"dec".cast("int")) + + // // `data` is not read from Parquet, so it doesn't go Comet exec. + // checkAnswer(cometDf, data.select($"dec".cast("int")).collect().toSeq) + // } + // } + // } + // } + // } + // } + // } + + // test("various math scalar functions") { + // Seq("true", "false").foreach { dictionary => + // withSQLConf("parquet.enable.dictionary" -> dictionary) { + // withParquetTable( + // (0 until 5).map(i => (i.toDouble + 0.3, i.toDouble + 0.8)), + // "tbl", + // withDictionary = dictionary.toBoolean) { + // checkSparkAnswerWithTol( + // "SELECT abs(_1), acos(_2), asin(_1), atan(_2), atan2(_1, _2), cos(_1) FROM tbl") + // checkSparkAnswerWithTol( + // "SELECT exp(_1), ln(_2), log10(_1), log2(_1), pow(_1, _2) FROM tbl") + // // TODO: comment in the round tests once supported + // // checkSparkAnswerWithTol("SELECT round(_1), round(_2) FROM tbl") + // checkSparkAnswerWithTol("SELECT signum(_1), sin(_1), sqrt(_1) FROM tbl") + // checkSparkAnswerWithTol("SELECT tan(_1) FROM tbl") + // } + // } + // } + // } + + // test("abs") { + // Seq(true, false).foreach { dictionaryEnabled => + // withTempDir { dir => + // val path = new Path(dir.toURI.toString, "test.parquet") + // makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 100) + // withParquetTable(path.toString, "tbl") { + // Seq(2, 3, 4, 5, 6, 7, 15, 16, 17).foreach { col => + // checkSparkAnswerAndOperator(s"SELECT abs(_${col}) FROM tbl") + // } + // } + // } + // } + // } test("remainder") { withTempDir { dir => - // Create a DataFrame with null values val df = Seq((-21840, -0.0)).toDF("c90", "c1") - - // Write the DataFrame to a Parquet file val path = new Path(dir.toURI.toString, "remainder_test.parquet").toString df.write.mode("overwrite").parquet(path)