diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 645f5f848..2b7f8cd9d 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -50,811 +50,811 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } - // test("decimals divide by zero") { - // // TODO: enable Spark 3.2 & 3.3 tests after supporting decimal divide operation - // assume(isSpark34Plus) - - // Seq(true, false).foreach { dictionary => - // withSQLConf( - // SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false", - // "parquet.enable.dictionary" -> dictionary.toString) { - // withTempPath { dir => - // val data = makeDecimalRDD(10, DecimalType(18, 10), dictionary) - // data.write.parquet(dir.getCanonicalPath) - // readParquetFile(dir.getCanonicalPath) { df => - // { - // val decimalLiteral = Decimal(0.00) - // val cometDf = df.select($"dec" / decimalLiteral, $"dec" % decimalLiteral) - // checkSparkAnswerAndOperator(cometDf) - // } - // } - // } - // } - // } - // } - - // test("bitwise shift with different left/right types") { - // Seq(false, true).foreach { dictionary => - // withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { - // val table = "test" - // withTable(table) { - // sql(s"create table $table(col1 long, col2 int) using parquet") - // sql(s"insert into $table values(1111, 2)") - // sql(s"insert into $table values(1111, 2)") - // sql(s"insert into $table values(3333, 4)") - // sql(s"insert into $table values(5555, 6)") - - // checkSparkAnswerAndOperator( - // s"SELECT shiftright(col1, 2), shiftright(col1, col2) FROM $table") - // checkSparkAnswerAndOperator( - // s"SELECT shiftleft(col1, 2), shiftleft(col1, col2) FROM $table") - // } - // } - // } - // } - - // test("basic data type support") { - // Seq(true, false).foreach { dictionaryEnabled => - // withTempDir { dir => - // val path = new Path(dir.toURI.toString, "test.parquet") - // makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) - // withParquetTable(path.toString, "tbl") { - // // TODO: enable test for unsigned ints - // checkSparkAnswerAndOperator( - // "select _1, _2, _3, _4, _5, _6, _7, _8, _13, _14, _15, _16, _17, " + - // "_18, _19, _20 FROM tbl WHERE _2 > 100") - // } - // } - // } - // } - - // test("null literals") { - // val batchSize = 1000 - // Seq(true, false).foreach { dictionaryEnabled => - // withTempDir { dir => - // val path = new Path(dir.toURI.toString, "test.parquet") - // makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, batchSize) - // withParquetTable(path.toString, "tbl") { - // val sqlString = "SELECT _4 + null, _15 - null, _16 * null FROM tbl" - // val df2 = sql(sqlString) - // val rows = df2.collect() - // assert(rows.length == batchSize) - // assert(rows.forall(_ == Row(null, null, null))) - - // checkSparkAnswerAndOperator(sqlString) - // } - // } - // } - // } - - // test("date and timestamp type literals") { - // Seq(true, false).foreach { dictionaryEnabled => - // withTempDir { dir => - // val path = new Path(dir.toURI.toString, "test.parquet") - // makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) - // withParquetTable(path.toString, "tbl") { - // checkSparkAnswerAndOperator( - // "SELECT _4 FROM tbl WHERE " + - // "_20 > CAST('2020-01-01' AS DATE) AND _18 < CAST('2020-01-01' AS TIMESTAMP)") - // } - // } - // } - // } - - // test("dictionary arithmetic") { - // // TODO: test ANSI mode - // withSQLConf(SQLConf.ANSI_ENABLED.key -> "false", "parquet.enable.dictionary" -> "true") { - // withParquetTable((0 until 10).map(i => (i % 5, i % 3)), "tbl") { - // checkSparkAnswerAndOperator("SELECT _1 + _2, _1 - _2, _1 * _2, _1 / _2, _1 % _2 FROM tbl") - // } - // } - // } - - // test("dictionary arithmetic with scalar") { - // withSQLConf("parquet.enable.dictionary" -> "true") { - // withParquetTable((0 until 10).map(i => (i % 5, i % 3)), "tbl") { - // checkSparkAnswerAndOperator("SELECT _1 + 1, _1 - 1, _1 * 2, _1 / 2, _1 % 2 FROM tbl") - // } - // } - // } - - // test("string type and substring") { - // withParquetTable((0 until 5).map(i => (i.toString, (i + 100).toString)), "tbl") { - // checkSparkAnswerAndOperator("SELECT _1, substring(_2, 2, 2) FROM tbl") - // checkSparkAnswerAndOperator("SELECT _1, substring(_2, 2, -2) FROM tbl") - // checkSparkAnswerAndOperator("SELECT _1, substring(_2, -2, 2) FROM tbl") - // checkSparkAnswerAndOperator("SELECT _1, substring(_2, -2, -2) FROM tbl") - // checkSparkAnswerAndOperator("SELECT _1, substring(_2, -2, 10) FROM tbl") - // checkSparkAnswerAndOperator("SELECT _1, substring(_2, 0, 0) FROM tbl") - // checkSparkAnswerAndOperator("SELECT _1, substring(_2, 1, 0) FROM tbl") - // } - // } - - // test("substring with start < 1") { - // withTempPath { _ => - // withTable("t") { - // sql("create table t (col string) using parquet") - // sql("insert into t values('123456')") - // checkSparkAnswerAndOperator(sql("select substring(col, 0) from t")) - // checkSparkAnswerAndOperator(sql("select substring(col, -1) from t")) - // } - // } - // } - - // test("string with coalesce") { - // withParquetTable( - // (0 until 10).map(i => (i.toString, if (i > 5) None else Some((i + 100).toString))), - // "tbl") { - // checkSparkAnswerAndOperator( - // "SELECT coalesce(_1), coalesce(_1, 1), coalesce(null, _1), coalesce(null, 1), coalesce(_2, _1), coalesce(null) FROM tbl") - // } - // } - - // test("substring with dictionary") { - // val data = (0 until 1000) - // .map(_ % 5) // reduce value space to trigger dictionary encoding - // .map(i => (i.toString, (i + 100).toString)) - // withParquetTable(data, "tbl") { - // checkSparkAnswerAndOperator("SELECT _1, substring(_2, 2, 2) FROM tbl") - // } - // } - - // test("string_space") { - // withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl") { - // checkSparkAnswerAndOperator("SELECT space(_1), space(_2) FROM tbl") - // } - // } - - // test("string_space with dictionary") { - // val data = (0 until 1000).map(i => Tuple1(i % 5)) - - // withSQLConf("parquet.enable.dictionary" -> "true") { - // withParquetTable(data, "tbl") { - // checkSparkAnswerAndOperator("SELECT space(_1) FROM tbl") - // } - // } - // } - - // test("hour, minute, second") { - // Seq(true, false).foreach { dictionaryEnabled => - // withTempDir { dir => - // val path = new Path(dir.toURI.toString, "part-r-0.parquet") - // val expected = makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000) - // readParquetFile(path.toString) { df => - // val query = df.select(expr("hour(_1)"), expr("minute(_1)"), expr("second(_1)")) - - // checkAnswer( - // query, - // expected.map { - // case None => - // Row(null, null, null) - // case Some(i) => - // val timestamp = new java.sql.Timestamp(i).toLocalDateTime - // val hour = timestamp.getHour - // val minute = timestamp.getMinute - // val second = timestamp.getSecond - - // Row(hour, minute, second) - // }) - // } - // } - // } - // } - - // test("hour on int96 timestamp column") { - // import testImplicits._ - - // val N = 100 - // val ts = "2020-01-01 01:02:03.123456" - // Seq(true, false).foreach { dictionaryEnabled => - // Seq(false, true).foreach { conversionEnabled => - // withSQLConf( - // SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96", - // SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key -> conversionEnabled.toString) { - // withTempPath { path => - // Seq - // .tabulate(N)(_ => ts) - // .toDF("ts1") - // .select($"ts1".cast("timestamp").as("ts")) - // .repartition(1) - // .write - // .option("parquet.enable.dictionary", dictionaryEnabled) - // .parquet(path.getCanonicalPath) - - // checkAnswer( - // spark.read.parquet(path.getCanonicalPath).select(expr("hour(ts)")), - // Seq.tabulate(N)(_ => Row(1))) - // } - // } - // } - // } - // } - - // test("cast timestamp and timestamp_ntz") { - // withSQLConf( - // SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", - // CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { - // Seq(true, false).foreach { dictionaryEnabled => - // withTempDir { dir => - // val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") - // makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000) - // withParquetTable(path.toString, "timetbl") { - // checkSparkAnswerAndOperator( - // "SELECT " + - // "cast(_2 as timestamp) tz_millis, " + - // "cast(_3 as timestamp) ntz_millis, " + - // "cast(_4 as timestamp) tz_micros, " + - // "cast(_5 as timestamp) ntz_micros " + - // " from timetbl") - // } - // } - // } - // } - // } - - // test("cast timestamp and timestamp_ntz to string") { - // // TODO: make the test pass for Spark 3.2 & 3.3 - // assume(isSpark34Plus) - - // withSQLConf( - // SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", - // CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { - // Seq(true, false).foreach { dictionaryEnabled => - // withTempDir { dir => - // val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") - // makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 2001) - // withParquetTable(path.toString, "timetbl") { - // checkSparkAnswerAndOperator( - // "SELECT " + - // "cast(_2 as string) tz_millis, " + - // "cast(_3 as string) ntz_millis, " + - // "cast(_4 as string) tz_micros, " + - // "cast(_5 as string) ntz_micros " + - // " from timetbl") - // } - // } - // } - // } - // } - - // test("cast timestamp and timestamp_ntz to long, date") { - // // TODO: make the test pass for Spark 3.2 & 3.3 - // assume(isSpark34Plus) - - // withSQLConf( - // SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", - // CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { - // Seq(true, false).foreach { dictionaryEnabled => - // withTempDir { dir => - // val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") - // makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000) - // withParquetTable(path.toString, "timetbl") { - // checkSparkAnswerAndOperator( - // "SELECT " + - // "cast(_2 as long) tz_millis, " + - // "cast(_4 as long) tz_micros, " + - // "cast(_2 as date) tz_millis_to_date, " + - // "cast(_3 as date) ntz_millis_to_date, " + - // "cast(_4 as date) tz_micros_to_date, " + - // "cast(_5 as date) ntz_micros_to_date " + - // " from timetbl") - // } - // } - // } - // } - // } - - // test("trunc") { - // Seq(true, false).foreach { dictionaryEnabled => - // withTempDir { dir => - // val path = new Path(dir.toURI.toString, "date_trunc.parquet") - // makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) - // withParquetTable(path.toString, "tbl") { - // Seq("YEAR", "YYYY", "YY", "QUARTER", "MON", "MONTH", "MM", "WEEK").foreach { format => - // checkSparkAnswerAndOperator(s"SELECT trunc(_20, '$format') from tbl") - // } - // } - // } - // } - // } - - // test("trunc with format array") { - // val numRows = 1000 - // Seq(true, false).foreach { dictionaryEnabled => - // withTempDir { dir => - // val path = new Path(dir.toURI.toString, "date_trunc_with_format.parquet") - // makeDateTimeWithFormatTable(path, dictionaryEnabled = dictionaryEnabled, numRows) - // withParquetTable(path.toString, "dateformattbl") { - // checkSparkAnswerAndOperator( - // "SELECT " + - // "dateformat, _7, " + - // "trunc(_7, dateformat) " + - // " from dateformattbl ") - // } - // } - // } - // } - - // test("date_trunc") { - // Seq(true, false).foreach { dictionaryEnabled => - // withTempDir { dir => - // val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") - // makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000) - // withParquetTable(path.toString, "timetbl") { - // Seq( - // "YEAR", - // "YYYY", - // "YY", - // "MON", - // "MONTH", - // "MM", - // "QUARTER", - // "WEEK", - // "DAY", - // "DD", - // "HOUR", - // "MINUTE", - // "SECOND", - // "MILLISECOND", - // "MICROSECOND").foreach { format => - // checkSparkAnswerAndOperator( - // "SELECT " + - // s"date_trunc('$format', _0), " + - // s"date_trunc('$format', _1), " + - // s"date_trunc('$format', _2), " + - // s"date_trunc('$format', _4) " + - // " from timetbl") - // } - // } - // } - // } - // } - - // test("date_trunc with timestamp_ntz") { - // assume(!isSpark32, "timestamp functions for timestamp_ntz have incorrect behavior in 3.2") - // withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { - // Seq(true, false).foreach { dictionaryEnabled => - // withTempDir { dir => - // val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") - // makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000) - // withParquetTable(path.toString, "timetbl") { - // Seq( - // "YEAR", - // "YYYY", - // "YY", - // "MON", - // "MONTH", - // "MM", - // "QUARTER", - // "WEEK", - // "DAY", - // "DD", - // "HOUR", - // "MINUTE", - // "SECOND", - // "MILLISECOND", - // "MICROSECOND").foreach { format => - // checkSparkAnswerAndOperator( - // "SELECT " + - // s"date_trunc('$format', _3), " + - // s"date_trunc('$format', _5) " + - // " from timetbl") - // } - // } - // } - // } - // } - // } - - // test("date_trunc with format array") { - // assume(isSpark33Plus, "TimestampNTZ is supported in Spark 3.3+, See SPARK-36182") - // withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { - // val numRows = 1000 - // Seq(true, false).foreach { dictionaryEnabled => - // withTempDir { dir => - // val path = new Path(dir.toURI.toString, "timestamp_trunc_with_format.parquet") - // makeDateTimeWithFormatTable(path, dictionaryEnabled = dictionaryEnabled, numRows) - // withParquetTable(path.toString, "timeformattbl") { - // checkSparkAnswerAndOperator( - // "SELECT " + - // "format, _0, _1, _2, _3, _4, _5, " + - // "date_trunc(format, _0), " + - // "date_trunc(format, _1), " + - // "date_trunc(format, _2), " + - // "date_trunc(format, _3), " + - // "date_trunc(format, _4), " + - // "date_trunc(format, _5) " + - // " from timeformattbl ") - // } - // } - // } - // } - // } - - // test("date_trunc on int96 timestamp column") { - // import testImplicits._ - - // val N = 100 - // val ts = "2020-01-01 01:02:03.123456" - // Seq(true, false).foreach { dictionaryEnabled => - // Seq(false, true).foreach { conversionEnabled => - // withSQLConf( - // SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96", - // SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key -> conversionEnabled.toString) { - // withTempPath { path => - // Seq - // .tabulate(N)(_ => ts) - // .toDF("ts1") - // .select($"ts1".cast("timestamp").as("ts")) - // .repartition(1) - // .write - // .option("parquet.enable.dictionary", dictionaryEnabled) - // .parquet(path.getCanonicalPath) - - // withParquetTable(path.toString, "int96timetbl") { - // Seq( - // "YEAR", - // "YYYY", - // "YY", - // "MON", - // "MONTH", - // "MM", - // "QUARTER", - // "WEEK", - // "DAY", - // "DD", - // "HOUR", - // "MINUTE", - // "SECOND", - // "MILLISECOND", - // "MICROSECOND").foreach { format => - // checkSparkAnswer( - // "SELECT " + - // s"date_trunc('$format', ts )" + - // " from int96timetbl") - // } - // } - // } - // } - // } - // } - // } - - // test("charvarchar") { - // Seq(false, true).foreach { dictionary => - // withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { - // val table = "char_tbl4" - // withTable(table) { - // val view = "str_view" - // withView(view) { - // sql(s"""create temporary view $view as select c, v from values - // | (null, null), (null, null), - // | (null, 'S'), (null, 'S'), - // | ('N', 'N '), ('N', 'N '), - // | ('Ne', 'Sp'), ('Ne', 'Sp'), - // | ('Net ', 'Spa '), ('Net ', 'Spa '), - // | ('NetE', 'Spar'), ('NetE', 'Spar'), - // | ('NetEa ', 'Spark '), ('NetEa ', 'Spark '), - // | ('NetEas ', 'Spark'), ('NetEas ', 'Spark'), - // | ('NetEase', 'Spark-'), ('NetEase', 'Spark-') t(c, v);""".stripMargin) - // sql( - // s"create table $table(c7 char(7), c8 char(8), v varchar(6), s string) using parquet;") - // sql(s"insert into $table select c, c, v, c from $view;") - // val df = sql(s"""select substring(c7, 2), substring(c8, 2), - // | substring(v, 3), substring(s, 2) from $table;""".stripMargin) - - // val expected = Row(" ", " ", "", "") :: - // Row(null, null, "", null) :: Row(null, null, null, null) :: - // Row("e ", "e ", "", "e") :: Row("et ", "et ", "a ", "et ") :: - // Row("etE ", "etE ", "ar", "etE") :: - // Row("etEa ", "etEa ", "ark ", "etEa ") :: - // Row("etEas ", "etEas ", "ark", "etEas ") :: - // Row("etEase", "etEase ", "ark-", "etEase") :: Nil - // checkAnswer(df, expected ::: expected) - // } - // } - // } - // } - // } - - // test("char varchar over length values") { - // Seq("char", "varchar").foreach { typ => - // withTempPath { dir => - // withTable("t") { - // sql("select '123456' as col").write.format("parquet").save(dir.toString) - // sql(s"create table t (col $typ(2)) using parquet location '$dir'") - // sql("insert into t values('1')") - // checkSparkAnswerAndOperator(sql("select substring(col, 1) from t")) - // checkSparkAnswerAndOperator(sql("select substring(col, 0) from t")) - // checkSparkAnswerAndOperator(sql("select substring(col, -1) from t")) - // } - // } - // } - // } - - // test("like (LikeSimplification enabled)") { - // val table = "names" - // withTable(table) { - // sql(s"create table $table(id int, name varchar(20)) using parquet") - // sql(s"insert into $table values(1,'James Smith')") - // sql(s"insert into $table values(2,'Michael Rose')") - // sql(s"insert into $table values(3,'Robert Williams')") - // sql(s"insert into $table values(4,'Rames Rose')") - // sql(s"insert into $table values(5,'Rames rose')") - - // // Filter column having values 'Rames _ose', where any character matches for '_' - // val query = sql(s"select id from $table where name like 'Rames _ose'") - // checkAnswer(query, Row(4) :: Row(5) :: Nil) - - // // Filter rows that contains 'rose' in 'name' column - // val queryContains = sql(s"select id from $table where name like '%rose%'") - // checkAnswer(queryContains, Row(5) :: Nil) - - // // Filter rows that starts with 'R' following by any characters - // val queryStartsWith = sql(s"select id from $table where name like 'R%'") - // checkAnswer(queryStartsWith, Row(3) :: Row(4) :: Row(5) :: Nil) - - // // Filter rows that ends with 's' following by any characters - // val queryEndsWith = sql(s"select id from $table where name like '%s'") - // checkAnswer(queryEndsWith, Row(3) :: Nil) - // } - // } - - // test("like with custom escape") { - // val table = "names" - // withTable(table) { - // sql(s"create table $table(id int, name varchar(20)) using parquet") - // sql(s"insert into $table values(1,'James Smith')") - // sql(s"insert into $table values(2,'Michael_Rose')") - // sql(s"insert into $table values(3,'Robert_R_Williams')") - - // // Filter column having values that include underscores - // val queryDefaultEscape = sql("select id from names where name like '%\\_%'") - // checkSparkAnswerAndOperator(queryDefaultEscape) - - // val queryCustomEscape = sql("select id from names where name like '%$_%' escape '$'") - // checkAnswer(queryCustomEscape, Row(2) :: Row(3) :: Nil) - - // } - // } - - // test("contains") { - // assume(!isSpark32) - - // val table = "names" - // withTable(table) { - // sql(s"create table $table(id int, name varchar(20)) using parquet") - // sql(s"insert into $table values(1,'James Smith')") - // sql(s"insert into $table values(2,'Michael Rose')") - // sql(s"insert into $table values(3,'Robert Williams')") - // sql(s"insert into $table values(4,'Rames Rose')") - // sql(s"insert into $table values(5,'Rames rose')") - - // // Filter rows that contains 'rose' in 'name' column - // val queryContains = sql(s"select id from $table where contains (name, 'rose')") - // checkAnswer(queryContains, Row(5) :: Nil) - // } - // } - - // test("startswith") { - // assume(!isSpark32) - - // val table = "names" - // withTable(table) { - // sql(s"create table $table(id int, name varchar(20)) using parquet") - // sql(s"insert into $table values(1,'James Smith')") - // sql(s"insert into $table values(2,'Michael Rose')") - // sql(s"insert into $table values(3,'Robert Williams')") - // sql(s"insert into $table values(4,'Rames Rose')") - // sql(s"insert into $table values(5,'Rames rose')") - - // // Filter rows that starts with 'R' following by any characters - // val queryStartsWith = sql(s"select id from $table where startswith (name, 'R')") - // checkAnswer(queryStartsWith, Row(3) :: Row(4) :: Row(5) :: Nil) - // } - // } - - // test("endswith") { - // assume(!isSpark32) - - // val table = "names" - // withTable(table) { - // sql(s"create table $table(id int, name varchar(20)) using parquet") - // sql(s"insert into $table values(1,'James Smith')") - // sql(s"insert into $table values(2,'Michael Rose')") - // sql(s"insert into $table values(3,'Robert Williams')") - // sql(s"insert into $table values(4,'Rames Rose')") - // sql(s"insert into $table values(5,'Rames rose')") - - // // Filter rows that ends with 's' following by any characters - // val queryEndsWith = sql(s"select id from $table where endswith (name, 's')") - // checkAnswer(queryEndsWith, Row(3) :: Nil) - // } - // } - - // test("add overflow (ANSI disable)") { - // // Enabling ANSI will cause native engine failure, but as we cannot catch - // // native error now, we cannot test it here. - // withSQLConf(SQLConf.ANSI_ENABLED.key -> "false") { - // withParquetTable(Seq((Int.MaxValue, 1)), "tbl") { - // checkSparkAnswerAndOperator("SELECT _1 + _2 FROM tbl") - // } - // } - // } - - // test("divide by zero (ANSI disable)") { - // // Enabling ANSI will cause native engine failure, but as we cannot catch - // // native error now, we cannot test it here. - // withSQLConf(SQLConf.ANSI_ENABLED.key -> "false") { - // withParquetTable(Seq((1, 0, 1.0, 0.0)), "tbl") { - // checkSparkAnswerAndOperator("SELECT _1 / _2, _3 / _4 FROM tbl") - // } - // } - // } - - // test("decimals arithmetic and comparison") { - // // TODO: enable Spark 3.2 & 3.3 tests after supporting decimal reminder operation - // assume(isSpark34Plus) - - // def makeDecimalRDD(num: Int, decimal: DecimalType, useDictionary: Boolean): DataFrame = { - // val div = if (useDictionary) 5 else num // narrow the space to make it dictionary encoded - // spark - // .range(num) - // .map(_ % div) - // // Parquet doesn't allow column names with spaces, have to add an alias here. - // // Minus 500 here so that negative decimals are also tested. - // .select( - // (($"value" - 500) / 100.0) cast decimal as Symbol("dec1"), - // (($"value" - 600) / 100.0) cast decimal as Symbol("dec2")) - // .coalesce(1) - // } - - // Seq(true, false).foreach { dictionary => - // Seq(16, 1024).foreach { batchSize => - // withSQLConf( - // CometConf.COMET_BATCH_SIZE.key -> batchSize.toString, - // SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false", - // "parquet.enable.dictionary" -> dictionary.toString) { - // var combinations = Seq((5, 2), (1, 0), (18, 10), (18, 17), (19, 0), (38, 37)) - // // If ANSI mode is on, the combination (1, 1) will cause a runtime error. Otherwise, the - // // decimal RDD contains all null values and should be able to read back from Parquet. - - // if (!SQLConf.get.ansiEnabled) { - // combinations = combinations ++ Seq((1, 1)) - // } - - // for ((precision, scale) <- combinations) { - // withTempPath { dir => - // val data = makeDecimalRDD(10, DecimalType(precision, scale), dictionary) - // data.write.parquet(dir.getCanonicalPath) - // readParquetFile(dir.getCanonicalPath) { df => - // { - // val decimalLiteral1 = Decimal(1.00) - // val decimalLiteral2 = Decimal(123.456789) - // val cometDf = df.select( - // $"dec1" + $"dec2", - // $"dec1" - $"dec2", - // $"dec1" % $"dec2", - // $"dec1" >= $"dec1", - // $"dec1" === "1.0", - // $"dec1" + decimalLiteral1, - // $"dec1" - decimalLiteral1, - // $"dec1" + decimalLiteral2, - // $"dec1" - decimalLiteral2) - - // checkAnswer( - // cometDf, - // data - // .select( - // $"dec1" + $"dec2", - // $"dec1" - $"dec2", - // $"dec1" % $"dec2", - // $"dec1" >= $"dec1", - // $"dec1" === "1.0", - // $"dec1" + decimalLiteral1, - // $"dec1" - decimalLiteral1, - // $"dec1" + decimalLiteral2, - // $"dec1" - decimalLiteral2) - // .collect() - // .toSeq) - // } - // } - // } - // } - // } - // } - // } - // } - - // test("scalar decimal arithmetic operations") { - // assume(isSpark34Plus) - // withTable("tbl") { - // withSQLConf(CometConf.COMET_ENABLED.key -> "true") { - // sql("CREATE TABLE tbl (a INT) USING PARQUET") - // sql("INSERT INTO tbl VALUES (0)") - - // val combinations = Seq((7, 3), (18, 10), (38, 4)) - // for ((precision, scale) <- combinations) { - // for (op <- Seq("+", "-", "*", "/", "%")) { - // val left = s"CAST(1.00 AS DECIMAL($precision, $scale))" - // val right = s"CAST(123.45 AS DECIMAL($precision, $scale))" - - // withSQLConf( - // "spark.sql.optimizer.excludedRules" -> - // "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { - - // checkSparkAnswerAndOperator(s"SELECT $left $op $right FROM tbl") - // } - // } - // } - // } - // } - // } - - // test("cast decimals to int") { - // Seq(16, 1024).foreach { batchSize => - // withSQLConf( - // CometConf.COMET_BATCH_SIZE.key -> batchSize.toString, - // SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false") { - // var combinations = Seq((5, 2), (1, 0), (18, 10), (18, 17), (19, 0), (38, 37)) - // // If ANSI mode is on, the combination (1, 1) will cause a runtime error. Otherwise, the - // // decimal RDD contains all null values and should be able to read back from Parquet. - - // if (!SQLConf.get.ansiEnabled) { - // combinations = combinations ++ Seq((1, 1)) - // } - - // for ((precision, scale) <- combinations; useDictionary <- Seq(false)) { - // withTempPath { dir => - // val data = makeDecimalRDD(10, DecimalType(precision, scale), useDictionary) - // data.write.parquet(dir.getCanonicalPath) - // readParquetFile(dir.getCanonicalPath) { df => - // { - // val cometDf = df.select($"dec".cast("int")) - - // // `data` is not read from Parquet, so it doesn't go Comet exec. - // checkAnswer(cometDf, data.select($"dec".cast("int")).collect().toSeq) - // } - // } - // } - // } - // } - // } - // } - - // test("various math scalar functions") { - // Seq("true", "false").foreach { dictionary => - // withSQLConf("parquet.enable.dictionary" -> dictionary) { - // withParquetTable( - // (0 until 5).map(i => (i.toDouble + 0.3, i.toDouble + 0.8)), - // "tbl", - // withDictionary = dictionary.toBoolean) { - // checkSparkAnswerWithTol( - // "SELECT abs(_1), acos(_2), asin(_1), atan(_2), atan2(_1, _2), cos(_1) FROM tbl") - // checkSparkAnswerWithTol( - // "SELECT exp(_1), ln(_2), log10(_1), log2(_1), pow(_1, _2) FROM tbl") - // // TODO: comment in the round tests once supported - // // checkSparkAnswerWithTol("SELECT round(_1), round(_2) FROM tbl") - // checkSparkAnswerWithTol("SELECT signum(_1), sin(_1), sqrt(_1) FROM tbl") - // checkSparkAnswerWithTol("SELECT tan(_1) FROM tbl") - // } - // } - // } - // } - - // test("abs") { - // Seq(true, false).foreach { dictionaryEnabled => - // withTempDir { dir => - // val path = new Path(dir.toURI.toString, "test.parquet") - // makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 100) - // withParquetTable(path.toString, "tbl") { - // Seq(2, 3, 4, 5, 6, 7, 15, 16, 17).foreach { col => - // checkSparkAnswerAndOperator(s"SELECT abs(_${col}) FROM tbl") - // } - // } - // } - // } - // } + test("decimals divide by zero") { + // TODO: enable Spark 3.2 & 3.3 tests after supporting decimal divide operation + assume(isSpark34Plus) + + Seq(true, false).foreach { dictionary => + withSQLConf( + SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false", + "parquet.enable.dictionary" -> dictionary.toString) { + withTempPath { dir => + val data = makeDecimalRDD(10, DecimalType(18, 10), dictionary) + data.write.parquet(dir.getCanonicalPath) + readParquetFile(dir.getCanonicalPath) { df => + { + val decimalLiteral = Decimal(0.00) + val cometDf = df.select($"dec" / decimalLiteral, $"dec" % decimalLiteral) + checkSparkAnswerAndOperator(cometDf) + } + } + } + } + } + } + + test("bitwise shift with different left/right types") { + Seq(false, true).foreach { dictionary => + withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { + val table = "test" + withTable(table) { + sql(s"create table $table(col1 long, col2 int) using parquet") + sql(s"insert into $table values(1111, 2)") + sql(s"insert into $table values(1111, 2)") + sql(s"insert into $table values(3333, 4)") + sql(s"insert into $table values(5555, 6)") + + checkSparkAnswerAndOperator( + s"SELECT shiftright(col1, 2), shiftright(col1, col2) FROM $table") + checkSparkAnswerAndOperator( + s"SELECT shiftleft(col1, 2), shiftleft(col1, col2) FROM $table") + } + } + } + } + + test("basic data type support") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) + withParquetTable(path.toString, "tbl") { + // TODO: enable test for unsigned ints + checkSparkAnswerAndOperator( + "select _1, _2, _3, _4, _5, _6, _7, _8, _13, _14, _15, _16, _17, " + + "_18, _19, _20 FROM tbl WHERE _2 > 100") + } + } + } + } + + test("null literals") { + val batchSize = 1000 + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, batchSize) + withParquetTable(path.toString, "tbl") { + val sqlString = "SELECT _4 + null, _15 - null, _16 * null FROM tbl" + val df2 = sql(sqlString) + val rows = df2.collect() + assert(rows.length == batchSize) + assert(rows.forall(_ == Row(null, null, null))) + + checkSparkAnswerAndOperator(sqlString) + } + } + } + } + + test("date and timestamp type literals") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) + withParquetTable(path.toString, "tbl") { + checkSparkAnswerAndOperator( + "SELECT _4 FROM tbl WHERE " + + "_20 > CAST('2020-01-01' AS DATE) AND _18 < CAST('2020-01-01' AS TIMESTAMP)") + } + } + } + } + + test("dictionary arithmetic") { + // TODO: test ANSI mode + withSQLConf(SQLConf.ANSI_ENABLED.key -> "false", "parquet.enable.dictionary" -> "true") { + withParquetTable((0 until 10).map(i => (i % 5, i % 3)), "tbl") { + checkSparkAnswerAndOperator("SELECT _1 + _2, _1 - _2, _1 * _2, _1 / _2, _1 % _2 FROM tbl") + } + } + } + + test("dictionary arithmetic with scalar") { + withSQLConf("parquet.enable.dictionary" -> "true") { + withParquetTable((0 until 10).map(i => (i % 5, i % 3)), "tbl") { + checkSparkAnswerAndOperator("SELECT _1 + 1, _1 - 1, _1 * 2, _1 / 2, _1 % 2 FROM tbl") + } + } + } + + test("string type and substring") { + withParquetTable((0 until 5).map(i => (i.toString, (i + 100).toString)), "tbl") { + checkSparkAnswerAndOperator("SELECT _1, substring(_2, 2, 2) FROM tbl") + checkSparkAnswerAndOperator("SELECT _1, substring(_2, 2, -2) FROM tbl") + checkSparkAnswerAndOperator("SELECT _1, substring(_2, -2, 2) FROM tbl") + checkSparkAnswerAndOperator("SELECT _1, substring(_2, -2, -2) FROM tbl") + checkSparkAnswerAndOperator("SELECT _1, substring(_2, -2, 10) FROM tbl") + checkSparkAnswerAndOperator("SELECT _1, substring(_2, 0, 0) FROM tbl") + checkSparkAnswerAndOperator("SELECT _1, substring(_2, 1, 0) FROM tbl") + } + } + + test("substring with start < 1") { + withTempPath { _ => + withTable("t") { + sql("create table t (col string) using parquet") + sql("insert into t values('123456')") + checkSparkAnswerAndOperator(sql("select substring(col, 0) from t")) + checkSparkAnswerAndOperator(sql("select substring(col, -1) from t")) + } + } + } + + test("string with coalesce") { + withParquetTable( + (0 until 10).map(i => (i.toString, if (i > 5) None else Some((i + 100).toString))), + "tbl") { + checkSparkAnswerAndOperator( + "SELECT coalesce(_1), coalesce(_1, 1), coalesce(null, _1), coalesce(null, 1), coalesce(_2, _1), coalesce(null) FROM tbl") + } + } + + test("substring with dictionary") { + val data = (0 until 1000) + .map(_ % 5) // reduce value space to trigger dictionary encoding + .map(i => (i.toString, (i + 100).toString)) + withParquetTable(data, "tbl") { + checkSparkAnswerAndOperator("SELECT _1, substring(_2, 2, 2) FROM tbl") + } + } + + test("string_space") { + withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl") { + checkSparkAnswerAndOperator("SELECT space(_1), space(_2) FROM tbl") + } + } + + test("string_space with dictionary") { + val data = (0 until 1000).map(i => Tuple1(i % 5)) + + withSQLConf("parquet.enable.dictionary" -> "true") { + withParquetTable(data, "tbl") { + checkSparkAnswerAndOperator("SELECT space(_1) FROM tbl") + } + } + } + + test("hour, minute, second") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "part-r-0.parquet") + val expected = makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000) + readParquetFile(path.toString) { df => + val query = df.select(expr("hour(_1)"), expr("minute(_1)"), expr("second(_1)")) + + checkAnswer( + query, + expected.map { + case None => + Row(null, null, null) + case Some(i) => + val timestamp = new java.sql.Timestamp(i).toLocalDateTime + val hour = timestamp.getHour + val minute = timestamp.getMinute + val second = timestamp.getSecond + + Row(hour, minute, second) + }) + } + } + } + } + + test("hour on int96 timestamp column") { + import testImplicits._ + + val N = 100 + val ts = "2020-01-01 01:02:03.123456" + Seq(true, false).foreach { dictionaryEnabled => + Seq(false, true).foreach { conversionEnabled => + withSQLConf( + SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96", + SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key -> conversionEnabled.toString) { + withTempPath { path => + Seq + .tabulate(N)(_ => ts) + .toDF("ts1") + .select($"ts1".cast("timestamp").as("ts")) + .repartition(1) + .write + .option("parquet.enable.dictionary", dictionaryEnabled) + .parquet(path.getCanonicalPath) + + checkAnswer( + spark.read.parquet(path.getCanonicalPath).select(expr("hour(ts)")), + Seq.tabulate(N)(_ => Row(1))) + } + } + } + } + } + + test("cast timestamp and timestamp_ntz") { + withSQLConf( + SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", + CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") + makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000) + withParquetTable(path.toString, "timetbl") { + checkSparkAnswerAndOperator( + "SELECT " + + "cast(_2 as timestamp) tz_millis, " + + "cast(_3 as timestamp) ntz_millis, " + + "cast(_4 as timestamp) tz_micros, " + + "cast(_5 as timestamp) ntz_micros " + + " from timetbl") + } + } + } + } + } + + test("cast timestamp and timestamp_ntz to string") { + // TODO: make the test pass for Spark 3.2 & 3.3 + assume(isSpark34Plus) + + withSQLConf( + SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", + CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") + makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 2001) + withParquetTable(path.toString, "timetbl") { + checkSparkAnswerAndOperator( + "SELECT " + + "cast(_2 as string) tz_millis, " + + "cast(_3 as string) ntz_millis, " + + "cast(_4 as string) tz_micros, " + + "cast(_5 as string) ntz_micros " + + " from timetbl") + } + } + } + } + } + + test("cast timestamp and timestamp_ntz to long, date") { + // TODO: make the test pass for Spark 3.2 & 3.3 + assume(isSpark34Plus) + + withSQLConf( + SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", + CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") + makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000) + withParquetTable(path.toString, "timetbl") { + checkSparkAnswerAndOperator( + "SELECT " + + "cast(_2 as long) tz_millis, " + + "cast(_4 as long) tz_micros, " + + "cast(_2 as date) tz_millis_to_date, " + + "cast(_3 as date) ntz_millis_to_date, " + + "cast(_4 as date) tz_micros_to_date, " + + "cast(_5 as date) ntz_micros_to_date " + + " from timetbl") + } + } + } + } + } + + test("trunc") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "date_trunc.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) + withParquetTable(path.toString, "tbl") { + Seq("YEAR", "YYYY", "YY", "QUARTER", "MON", "MONTH", "MM", "WEEK").foreach { format => + checkSparkAnswerAndOperator(s"SELECT trunc(_20, '$format') from tbl") + } + } + } + } + } + + test("trunc with format array") { + val numRows = 1000 + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "date_trunc_with_format.parquet") + makeDateTimeWithFormatTable(path, dictionaryEnabled = dictionaryEnabled, numRows) + withParquetTable(path.toString, "dateformattbl") { + checkSparkAnswerAndOperator( + "SELECT " + + "dateformat, _7, " + + "trunc(_7, dateformat) " + + " from dateformattbl ") + } + } + } + } + + test("date_trunc") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") + makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000) + withParquetTable(path.toString, "timetbl") { + Seq( + "YEAR", + "YYYY", + "YY", + "MON", + "MONTH", + "MM", + "QUARTER", + "WEEK", + "DAY", + "DD", + "HOUR", + "MINUTE", + "SECOND", + "MILLISECOND", + "MICROSECOND").foreach { format => + checkSparkAnswerAndOperator( + "SELECT " + + s"date_trunc('$format', _0), " + + s"date_trunc('$format', _1), " + + s"date_trunc('$format', _2), " + + s"date_trunc('$format', _4) " + + " from timetbl") + } + } + } + } + } + + test("date_trunc with timestamp_ntz") { + assume(!isSpark32, "timestamp functions for timestamp_ntz have incorrect behavior in 3.2") + withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") + makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000) + withParquetTable(path.toString, "timetbl") { + Seq( + "YEAR", + "YYYY", + "YY", + "MON", + "MONTH", + "MM", + "QUARTER", + "WEEK", + "DAY", + "DD", + "HOUR", + "MINUTE", + "SECOND", + "MILLISECOND", + "MICROSECOND").foreach { format => + checkSparkAnswerAndOperator( + "SELECT " + + s"date_trunc('$format', _3), " + + s"date_trunc('$format', _5) " + + " from timetbl") + } + } + } + } + } + } + + test("date_trunc with format array") { + assume(isSpark33Plus, "TimestampNTZ is supported in Spark 3.3+, See SPARK-36182") + withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + val numRows = 1000 + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "timestamp_trunc_with_format.parquet") + makeDateTimeWithFormatTable(path, dictionaryEnabled = dictionaryEnabled, numRows) + withParquetTable(path.toString, "timeformattbl") { + checkSparkAnswerAndOperator( + "SELECT " + + "format, _0, _1, _2, _3, _4, _5, " + + "date_trunc(format, _0), " + + "date_trunc(format, _1), " + + "date_trunc(format, _2), " + + "date_trunc(format, _3), " + + "date_trunc(format, _4), " + + "date_trunc(format, _5) " + + " from timeformattbl ") + } + } + } + } + } + + test("date_trunc on int96 timestamp column") { + import testImplicits._ + + val N = 100 + val ts = "2020-01-01 01:02:03.123456" + Seq(true, false).foreach { dictionaryEnabled => + Seq(false, true).foreach { conversionEnabled => + withSQLConf( + SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96", + SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key -> conversionEnabled.toString) { + withTempPath { path => + Seq + .tabulate(N)(_ => ts) + .toDF("ts1") + .select($"ts1".cast("timestamp").as("ts")) + .repartition(1) + .write + .option("parquet.enable.dictionary", dictionaryEnabled) + .parquet(path.getCanonicalPath) + + withParquetTable(path.toString, "int96timetbl") { + Seq( + "YEAR", + "YYYY", + "YY", + "MON", + "MONTH", + "MM", + "QUARTER", + "WEEK", + "DAY", + "DD", + "HOUR", + "MINUTE", + "SECOND", + "MILLISECOND", + "MICROSECOND").foreach { format => + checkSparkAnswer( + "SELECT " + + s"date_trunc('$format', ts )" + + " from int96timetbl") + } + } + } + } + } + } + } + + test("charvarchar") { + Seq(false, true).foreach { dictionary => + withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { + val table = "char_tbl4" + withTable(table) { + val view = "str_view" + withView(view) { + sql(s"""create temporary view $view as select c, v from values + | (null, null), (null, null), + | (null, 'S'), (null, 'S'), + | ('N', 'N '), ('N', 'N '), + | ('Ne', 'Sp'), ('Ne', 'Sp'), + | ('Net ', 'Spa '), ('Net ', 'Spa '), + | ('NetE', 'Spar'), ('NetE', 'Spar'), + | ('NetEa ', 'Spark '), ('NetEa ', 'Spark '), + | ('NetEas ', 'Spark'), ('NetEas ', 'Spark'), + | ('NetEase', 'Spark-'), ('NetEase', 'Spark-') t(c, v);""".stripMargin) + sql( + s"create table $table(c7 char(7), c8 char(8), v varchar(6), s string) using parquet;") + sql(s"insert into $table select c, c, v, c from $view;") + val df = sql(s"""select substring(c7, 2), substring(c8, 2), + | substring(v, 3), substring(s, 2) from $table;""".stripMargin) + + val expected = Row(" ", " ", "", "") :: + Row(null, null, "", null) :: Row(null, null, null, null) :: + Row("e ", "e ", "", "e") :: Row("et ", "et ", "a ", "et ") :: + Row("etE ", "etE ", "ar", "etE") :: + Row("etEa ", "etEa ", "ark ", "etEa ") :: + Row("etEas ", "etEas ", "ark", "etEas ") :: + Row("etEase", "etEase ", "ark-", "etEase") :: Nil + checkAnswer(df, expected ::: expected) + } + } + } + } + } + + test("char varchar over length values") { + Seq("char", "varchar").foreach { typ => + withTempPath { dir => + withTable("t") { + sql("select '123456' as col").write.format("parquet").save(dir.toString) + sql(s"create table t (col $typ(2)) using parquet location '$dir'") + sql("insert into t values('1')") + checkSparkAnswerAndOperator(sql("select substring(col, 1) from t")) + checkSparkAnswerAndOperator(sql("select substring(col, 0) from t")) + checkSparkAnswerAndOperator(sql("select substring(col, -1) from t")) + } + } + } + } + + test("like (LikeSimplification enabled)") { + val table = "names" + withTable(table) { + sql(s"create table $table(id int, name varchar(20)) using parquet") + sql(s"insert into $table values(1,'James Smith')") + sql(s"insert into $table values(2,'Michael Rose')") + sql(s"insert into $table values(3,'Robert Williams')") + sql(s"insert into $table values(4,'Rames Rose')") + sql(s"insert into $table values(5,'Rames rose')") + + // Filter column having values 'Rames _ose', where any character matches for '_' + val query = sql(s"select id from $table where name like 'Rames _ose'") + checkAnswer(query, Row(4) :: Row(5) :: Nil) + + // Filter rows that contains 'rose' in 'name' column + val queryContains = sql(s"select id from $table where name like '%rose%'") + checkAnswer(queryContains, Row(5) :: Nil) + + // Filter rows that starts with 'R' following by any characters + val queryStartsWith = sql(s"select id from $table where name like 'R%'") + checkAnswer(queryStartsWith, Row(3) :: Row(4) :: Row(5) :: Nil) + + // Filter rows that ends with 's' following by any characters + val queryEndsWith = sql(s"select id from $table where name like '%s'") + checkAnswer(queryEndsWith, Row(3) :: Nil) + } + } + + test("like with custom escape") { + val table = "names" + withTable(table) { + sql(s"create table $table(id int, name varchar(20)) using parquet") + sql(s"insert into $table values(1,'James Smith')") + sql(s"insert into $table values(2,'Michael_Rose')") + sql(s"insert into $table values(3,'Robert_R_Williams')") + + // Filter column having values that include underscores + val queryDefaultEscape = sql("select id from names where name like '%\\_%'") + checkSparkAnswerAndOperator(queryDefaultEscape) + + val queryCustomEscape = sql("select id from names where name like '%$_%' escape '$'") + checkAnswer(queryCustomEscape, Row(2) :: Row(3) :: Nil) + + } + } + + test("contains") { + assume(!isSpark32) + + val table = "names" + withTable(table) { + sql(s"create table $table(id int, name varchar(20)) using parquet") + sql(s"insert into $table values(1,'James Smith')") + sql(s"insert into $table values(2,'Michael Rose')") + sql(s"insert into $table values(3,'Robert Williams')") + sql(s"insert into $table values(4,'Rames Rose')") + sql(s"insert into $table values(5,'Rames rose')") + + // Filter rows that contains 'rose' in 'name' column + val queryContains = sql(s"select id from $table where contains (name, 'rose')") + checkAnswer(queryContains, Row(5) :: Nil) + } + } + + test("startswith") { + assume(!isSpark32) + + val table = "names" + withTable(table) { + sql(s"create table $table(id int, name varchar(20)) using parquet") + sql(s"insert into $table values(1,'James Smith')") + sql(s"insert into $table values(2,'Michael Rose')") + sql(s"insert into $table values(3,'Robert Williams')") + sql(s"insert into $table values(4,'Rames Rose')") + sql(s"insert into $table values(5,'Rames rose')") + + // Filter rows that starts with 'R' following by any characters + val queryStartsWith = sql(s"select id from $table where startswith (name, 'R')") + checkAnswer(queryStartsWith, Row(3) :: Row(4) :: Row(5) :: Nil) + } + } + + test("endswith") { + assume(!isSpark32) + + val table = "names" + withTable(table) { + sql(s"create table $table(id int, name varchar(20)) using parquet") + sql(s"insert into $table values(1,'James Smith')") + sql(s"insert into $table values(2,'Michael Rose')") + sql(s"insert into $table values(3,'Robert Williams')") + sql(s"insert into $table values(4,'Rames Rose')") + sql(s"insert into $table values(5,'Rames rose')") + + // Filter rows that ends with 's' following by any characters + val queryEndsWith = sql(s"select id from $table where endswith (name, 's')") + checkAnswer(queryEndsWith, Row(3) :: Nil) + } + } + + test("add overflow (ANSI disable)") { + // Enabling ANSI will cause native engine failure, but as we cannot catch + // native error now, we cannot test it here. + withSQLConf(SQLConf.ANSI_ENABLED.key -> "false") { + withParquetTable(Seq((Int.MaxValue, 1)), "tbl") { + checkSparkAnswerAndOperator("SELECT _1 + _2 FROM tbl") + } + } + } + + test("divide by zero (ANSI disable)") { + // Enabling ANSI will cause native engine failure, but as we cannot catch + // native error now, we cannot test it here. + withSQLConf(SQLConf.ANSI_ENABLED.key -> "false") { + withParquetTable(Seq((1, 0, 1.0, 0.0)), "tbl") { + checkSparkAnswerAndOperator("SELECT _1 / _2, _3 / _4 FROM tbl") + } + } + } + + test("decimals arithmetic and comparison") { + // TODO: enable Spark 3.2 & 3.3 tests after supporting decimal reminder operation + assume(isSpark34Plus) + + def makeDecimalRDD(num: Int, decimal: DecimalType, useDictionary: Boolean): DataFrame = { + val div = if (useDictionary) 5 else num // narrow the space to make it dictionary encoded + spark + .range(num) + .map(_ % div) + // Parquet doesn't allow column names with spaces, have to add an alias here. + // Minus 500 here so that negative decimals are also tested. + .select( + (($"value" - 500) / 100.0) cast decimal as Symbol("dec1"), + (($"value" - 600) / 100.0) cast decimal as Symbol("dec2")) + .coalesce(1) + } + + Seq(true, false).foreach { dictionary => + Seq(16, 1024).foreach { batchSize => + withSQLConf( + CometConf.COMET_BATCH_SIZE.key -> batchSize.toString, + SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false", + "parquet.enable.dictionary" -> dictionary.toString) { + var combinations = Seq((5, 2), (1, 0), (18, 10), (18, 17), (19, 0), (38, 37)) + // If ANSI mode is on, the combination (1, 1) will cause a runtime error. Otherwise, the + // decimal RDD contains all null values and should be able to read back from Parquet. + + if (!SQLConf.get.ansiEnabled) { + combinations = combinations ++ Seq((1, 1)) + } + + for ((precision, scale) <- combinations) { + withTempPath { dir => + val data = makeDecimalRDD(10, DecimalType(precision, scale), dictionary) + data.write.parquet(dir.getCanonicalPath) + readParquetFile(dir.getCanonicalPath) { df => + { + val decimalLiteral1 = Decimal(1.00) + val decimalLiteral2 = Decimal(123.456789) + val cometDf = df.select( + $"dec1" + $"dec2", + $"dec1" - $"dec2", + $"dec1" % $"dec2", + $"dec1" >= $"dec1", + $"dec1" === "1.0", + $"dec1" + decimalLiteral1, + $"dec1" - decimalLiteral1, + $"dec1" + decimalLiteral2, + $"dec1" - decimalLiteral2) + + checkAnswer( + cometDf, + data + .select( + $"dec1" + $"dec2", + $"dec1" - $"dec2", + $"dec1" % $"dec2", + $"dec1" >= $"dec1", + $"dec1" === "1.0", + $"dec1" + decimalLiteral1, + $"dec1" - decimalLiteral1, + $"dec1" + decimalLiteral2, + $"dec1" - decimalLiteral2) + .collect() + .toSeq) + } + } + } + } + } + } + } + } + + test("scalar decimal arithmetic operations") { + assume(isSpark34Plus) + withTable("tbl") { + withSQLConf(CometConf.COMET_ENABLED.key -> "true") { + sql("CREATE TABLE tbl (a INT) USING PARQUET") + sql("INSERT INTO tbl VALUES (0)") + + val combinations = Seq((7, 3), (18, 10), (38, 4)) + for ((precision, scale) <- combinations) { + for (op <- Seq("+", "-", "*", "/", "%")) { + val left = s"CAST(1.00 AS DECIMAL($precision, $scale))" + val right = s"CAST(123.45 AS DECIMAL($precision, $scale))" + + withSQLConf( + "spark.sql.optimizer.excludedRules" -> + "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { + + checkSparkAnswerAndOperator(s"SELECT $left $op $right FROM tbl") + } + } + } + } + } + } + + test("cast decimals to int") { + Seq(16, 1024).foreach { batchSize => + withSQLConf( + CometConf.COMET_BATCH_SIZE.key -> batchSize.toString, + SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false") { + var combinations = Seq((5, 2), (1, 0), (18, 10), (18, 17), (19, 0), (38, 37)) + // If ANSI mode is on, the combination (1, 1) will cause a runtime error. Otherwise, the + // decimal RDD contains all null values and should be able to read back from Parquet. + + if (!SQLConf.get.ansiEnabled) { + combinations = combinations ++ Seq((1, 1)) + } + + for ((precision, scale) <- combinations; useDictionary <- Seq(false)) { + withTempPath { dir => + val data = makeDecimalRDD(10, DecimalType(precision, scale), useDictionary) + data.write.parquet(dir.getCanonicalPath) + readParquetFile(dir.getCanonicalPath) { df => + { + val cometDf = df.select($"dec".cast("int")) + + // `data` is not read from Parquet, so it doesn't go Comet exec. + checkAnswer(cometDf, data.select($"dec".cast("int")).collect().toSeq) + } + } + } + } + } + } + } + + test("various math scalar functions") { + Seq("true", "false").foreach { dictionary => + withSQLConf("parquet.enable.dictionary" -> dictionary) { + withParquetTable( + (0 until 5).map(i => (i.toDouble + 0.3, i.toDouble + 0.8)), + "tbl", + withDictionary = dictionary.toBoolean) { + checkSparkAnswerWithTol( + "SELECT abs(_1), acos(_2), asin(_1), atan(_2), atan2(_1, _2), cos(_1) FROM tbl") + checkSparkAnswerWithTol( + "SELECT exp(_1), ln(_2), log10(_1), log2(_1), pow(_1, _2) FROM tbl") + // TODO: comment in the round tests once supported + // checkSparkAnswerWithTol("SELECT round(_1), round(_2) FROM tbl") + checkSparkAnswerWithTol("SELECT signum(_1), sin(_1), sqrt(_1) FROM tbl") + checkSparkAnswerWithTol("SELECT tan(_1) FROM tbl") + } + } + } + } + + test("abs") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 100) + withParquetTable(path.toString, "tbl") { + Seq(2, 3, 4, 5, 6, 7, 15, 16, 17).foreach { col => + checkSparkAnswerAndOperator(s"SELECT abs(_${col}) FROM tbl") + } + } + } + } + } test("remainder") { withTempDir { dir =>