From 0d9754bfcc6c5ec48148329652a5e6d6ad2bc12b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 3 May 2024 13:24:04 -0600 Subject: [PATCH] feat: Only allow incompatible cast expressions to run in comet if a config is enabled (#362) --- .../scala/org/apache/comet/CometConf.scala | 49 +---- .../user-guide/compatibility-template.md | 50 +++++ docs/source/user-guide/compatibility.md | 159 ++++++++++++-- docs/source/user-guide/configs.md | 2 +- spark/pom.xml | 13 +- .../scala/org/apache/comet/GenerateDocs.scala | 94 +++++++++ .../apache/comet/expressions/CometCast.scala | 196 ++++++++++++++++++ .../apache/comet/serde/QueryPlanSerde.scala | 54 ++--- .../org/apache/comet/CometCastSuite.scala | 111 ++++++---- .../apache/comet/CometExpressionSuite.scala | 108 +++++----- .../comet/exec/CometAggregateSuite.scala | 3 +- .../apache/comet/exec/CometExecSuite.scala | 4 +- .../sql/comet/CometPlanStabilitySuite.scala | 1 + 13 files changed, 663 insertions(+), 181 deletions(-) create mode 100644 docs/source/user-guide/compatibility-template.md create mode 100644 spark/src/main/scala/org/apache/comet/GenerateDocs.scala create mode 100644 spark/src/main/scala/org/apache/comet/expressions/CometCast.scala diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index ca4bf47098..26114090fd 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -19,11 +19,9 @@ package org.apache.comet -import java.io.{BufferedOutputStream, FileOutputStream} import java.util.concurrent.TimeUnit import scala.collection.mutable.ListBuffer -import scala.io.Source import org.apache.spark.network.util.ByteUnit import org.apache.spark.network.util.JavaUtils @@ -376,12 +374,14 @@ object CometConf { .booleanConf .createWithDefault(false) - val COMET_CAST_STRING_TO_TIMESTAMP: ConfigEntry[Boolean] = conf( - "spark.comet.cast.stringToTimestamp") - .doc( - "Comet is not currently fully compatible with Spark when casting from String to Timestamp.") - .booleanConf - .createWithDefault(false) + val COMET_CAST_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] = + conf("spark.comet.cast.allowIncompatible") + .doc( + "Comet is not currently fully compatible with Spark for all cast operations. " + + "Set this config to true to allow them anyway. See compatibility guide " + + "for more information.") + .booleanConf + .createWithDefault(false) } @@ -625,36 +625,3 @@ private[comet] case class ConfigBuilder(key: String) { private object ConfigEntry { val UNDEFINED = "" } - -/** - * Utility for generating markdown documentation from the configs. - * - * This is invoked when running `mvn clean package -DskipTests`. - */ -object CometConfGenerateDocs { - def main(args: Array[String]): Unit = { - if (args.length != 2) { - // scalastyle:off println - println("Missing arguments for template file and output file") - // scalastyle:on println - sys.exit(-1) - } - val templateFilename = args.head - val outputFilename = args(1) - val w = new BufferedOutputStream(new FileOutputStream(outputFilename)) - for (line <- Source.fromFile(templateFilename).getLines()) { - if (line.trim == "") { - val publicConfigs = CometConf.allConfs.filter(_.isPublic) - val confs = publicConfigs.sortBy(_.key) - w.write("| Config | Description | Default Value |\n".getBytes) - w.write("|--------|-------------|---------------|\n".getBytes) - for (conf <- confs) { - w.write(s"| ${conf.key} | ${conf.doc.trim} | ${conf.defaultValueString} |\n".getBytes) - } - } else { - w.write(s"${line.trim}\n".getBytes) - } - } - w.close() - } -} diff --git a/docs/source/user-guide/compatibility-template.md b/docs/source/user-guide/compatibility-template.md new file mode 100644 index 0000000000..deaca2d247 --- /dev/null +++ b/docs/source/user-guide/compatibility-template.md @@ -0,0 +1,50 @@ + + +# Compatibility Guide + +Comet aims to provide consistent results with the version of Apache Spark that is being used. + +This guide offers information about areas of functionality where there are known differences. + +## ANSI mode + +Comet currently ignores ANSI mode in most cases, and therefore can produce different results than Spark. By default, +Comet will fall back to Spark if ANSI mode is enabled. To enable Comet to accelerate queries when ANSI mode is enabled, +specify `spark.comet.ansi.enabled=true` in the Spark configuration. Comet's ANSI support is experimental and should not +be used in production. + +There is an [epic](https://github.com/apache/datafusion-comet/issues/313) where we are tracking the work to fully implement ANSI support. + +## Cast + +Cast operations in Comet fall into three levels of support: + +- **Compatible**: The results match Apache Spark +- **Incompatible**: The results may match Apache Spark for some inputs, but there are known issues where some inputs + will result in incorrect results or exceptions. The query stage will fall back to Spark by default. Setting + `spark.comet.cast.allowIncompatible=true` will allow all incompatible casts to run natively in Comet, but this is not + recommended for production use. +- **Unsupported**: Comet does not provide a native version of this cast expression and the query stage will fall back to + Spark. + +The following table shows the current cast operations supported by Comet. Any cast that does not appear in this +table (such as those involving complex types and timestamp_ntz, for example) are not supported by Comet. + + diff --git a/docs/source/user-guide/compatibility.md b/docs/source/user-guide/compatibility.md index b4b4c92eb6..9a2478d376 100644 --- a/docs/source/user-guide/compatibility.md +++ b/docs/source/user-guide/compatibility.md @@ -1,20 +1,20 @@ # Compatibility Guide @@ -34,13 +34,126 @@ There is an [epic](https://github.com/apache/datafusion-comet/issues/313) where ## Cast -Comet currently delegates to Apache DataFusion for most cast operations, and this means that the behavior is not -guaranteed to be consistent with Spark. +Cast operations in Comet fall into three levels of support: -There is an [epic](https://github.com/apache/datafusion-comet/issues/286) where we are tracking the work to implement Spark-compatible cast expressions. +- **Compatible**: The results match Apache Spark +- **Incompatible**: The results may match Apache Spark for some inputs, but there are known issues where some inputs + will result in incorrect results or exceptions. The query stage will fall back to Spark by default. Setting + `spark.comet.cast.allowIncompatible=true` will allow all incompatible casts to run natively in Comet, but this is not + recommended for production use. +- **Unsupported**: Comet does not provide a native version of this cast expression and the query stage will fall back to + Spark. -### Cast from String to Timestamp +The following table shows the current cast operations supported by Comet. Any cast that does not appear in this +table (such as those involving complex types and timestamp_ntz, for example) are not supported by Comet. -Casting from String to Timestamp is disabled by default due to incompatibilities with Spark, including timezone -issues, and can be enabled by setting `spark.comet.castStringToTimestamp=true`. See the -[tracking issue](https://github.com/apache/datafusion-comet/issues/328) for more information. +| From Type | To Type | Compatible? | Notes | +| --------- | --------- | ------------ | ----------------------------------- | +| boolean | byte | Compatible | | +| boolean | short | Compatible | | +| boolean | integer | Compatible | | +| boolean | long | Compatible | | +| boolean | float | Compatible | | +| boolean | double | Compatible | | +| boolean | decimal | Unsupported | | +| boolean | string | Compatible | | +| boolean | timestamp | Unsupported | | +| byte | boolean | Compatible | | +| byte | short | Compatible | | +| byte | integer | Compatible | | +| byte | long | Compatible | | +| byte | float | Compatible | | +| byte | double | Compatible | | +| byte | decimal | Compatible | | +| byte | string | Compatible | | +| byte | binary | Unsupported | | +| byte | timestamp | Unsupported | | +| short | boolean | Compatible | | +| short | byte | Compatible | | +| short | integer | Compatible | | +| short | long | Compatible | | +| short | float | Compatible | | +| short | double | Compatible | | +| short | decimal | Compatible | | +| short | string | Compatible | | +| short | binary | Unsupported | | +| short | timestamp | Unsupported | | +| integer | boolean | Compatible | | +| integer | byte | Compatible | | +| integer | short | Compatible | | +| integer | long | Compatible | | +| integer | float | Compatible | | +| integer | double | Compatible | | +| integer | decimal | Compatible | | +| integer | string | Compatible | | +| integer | binary | Unsupported | | +| integer | timestamp | Unsupported | | +| long | boolean | Compatible | | +| long | byte | Compatible | | +| long | short | Compatible | | +| long | integer | Compatible | | +| long | float | Compatible | | +| long | double | Compatible | | +| long | decimal | Compatible | | +| long | string | Compatible | | +| long | binary | Unsupported | | +| long | timestamp | Unsupported | | +| float | boolean | Compatible | | +| float | byte | Unsupported | | +| float | short | Unsupported | | +| float | integer | Unsupported | | +| float | long | Unsupported | | +| float | double | Compatible | | +| float | decimal | Unsupported | | +| float | string | Incompatible | | +| float | timestamp | Unsupported | | +| double | boolean | Compatible | | +| double | byte | Unsupported | | +| double | short | Unsupported | | +| double | integer | Unsupported | | +| double | long | Unsupported | | +| double | float | Compatible | | +| double | decimal | Incompatible | | +| double | string | Incompatible | | +| double | timestamp | Unsupported | | +| decimal | boolean | Unsupported | | +| decimal | byte | Unsupported | | +| decimal | short | Unsupported | | +| decimal | integer | Unsupported | | +| decimal | long | Unsupported | | +| decimal | float | Compatible | | +| decimal | double | Compatible | | +| decimal | string | Unsupported | | +| decimal | timestamp | Unsupported | | +| string | boolean | Compatible | | +| string | byte | Compatible | | +| string | short | Compatible | | +| string | integer | Compatible | | +| string | long | Compatible | | +| string | float | Unsupported | | +| string | double | Unsupported | | +| string | decimal | Unsupported | | +| string | binary | Compatible | | +| string | date | Unsupported | | +| string | timestamp | Incompatible | Not all valid formats are supported | +| binary | string | Incompatible | | +| date | boolean | Unsupported | | +| date | byte | Unsupported | | +| date | short | Unsupported | | +| date | integer | Unsupported | | +| date | long | Unsupported | | +| date | float | Unsupported | | +| date | double | Unsupported | | +| date | decimal | Unsupported | | +| date | string | Compatible | | +| date | timestamp | Unsupported | | +| timestamp | boolean | Unsupported | | +| timestamp | byte | Unsupported | | +| timestamp | short | Unsupported | | +| timestamp | integer | Unsupported | | +| timestamp | long | Compatible | | +| timestamp | float | Unsupported | | +| timestamp | double | Unsupported | | +| timestamp | decimal | Unsupported | | +| timestamp | string | Compatible | | +| timestamp | date | Compatible | | diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 3a16cd47db..02ecbd6938 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -25,7 +25,7 @@ Comet provides the following configuration settings. |--------|-------------|---------------| | spark.comet.ansi.enabled | Comet does not respect ANSI mode in most cases and by default will not accelerate queries when ansi mode is enabled. Enable this setting to test Comet's experimental support for ANSI mode. This should not be used in production. | false | | spark.comet.batchSize | The columnar batch size, i.e., the maximum number of rows that a batch can contain. | 8192 | -| spark.comet.cast.stringToTimestamp | Comet is not currently fully compatible with Spark when casting from String to Timestamp. | false | +| spark.comet.cast.allowIncompatible | Comet is not currently fully compatible with Spark for all cast operations. Set this config to true to allow them anyway. See compatibility guide for more information. | false | | spark.comet.columnar.shuffle.async.enabled | Whether to enable asynchronous shuffle for Arrow-based shuffle. By default, this config is false. | false | | spark.comet.columnar.shuffle.async.max.thread.num | Maximum number of threads on an executor used for Comet async columnar shuffle. By default, this config is 100. This is the upper bound of total number of shuffle threads per executor. In other words, if the number of cores * the number of shuffle threads per task `spark.comet.columnar.shuffle.async.thread.num` is larger than this config. Comet will use this config as the number of shuffle threads per executor instead. | 100 | | spark.comet.columnar.shuffle.async.thread.num | Number of threads used for Comet async columnar shuffle per shuffle task. By default, this config is 3. Note that more threads means more memory requirement to buffer shuffle data before flushing to disk. Also, more threads may not always improve performance, and should be set based on the number of cores available. | 3 | diff --git a/spark/pom.xml b/spark/pom.xml index 7677d0a0d1..7c4524bd62 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -58,6 +58,11 @@ under the License. org.scala-lang scala-library + + org.scala-lang + scala-reflect + provided + com.google.protobuf protobuf-java @@ -270,17 +275,13 @@ under the License. 3.2.0 - generate-config-docs + generate-user-guide-reference-docs package java - org.apache.comet.CometConfGenerateDocs - - docs/source/user-guide/configs-template.md - docs/source/user-guide/configs.md - + org.apache.comet.GenerateDocs compile diff --git a/spark/src/main/scala/org/apache/comet/GenerateDocs.scala b/spark/src/main/scala/org/apache/comet/GenerateDocs.scala new file mode 100644 index 0000000000..8c414c7fed --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/GenerateDocs.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet + +import java.io.{BufferedOutputStream, FileOutputStream} + +import scala.io.Source + +import org.apache.spark.sql.catalyst.expressions.Cast + +import org.apache.comet.expressions.{CometCast, Compatible, Incompatible, Unsupported} + +/** + * Utility for generating markdown documentation from the configs. + * + * This is invoked when running `mvn clean package -DskipTests`. + */ +object GenerateDocs { + + def main(args: Array[String]): Unit = { + generateConfigReference() + generateCompatibilityGuide() + } + + private def generateConfigReference(): Unit = { + val templateFilename = "docs/source/user-guide/configs-template.md" + val outputFilename = "docs/source/user-guide/configs.md" + val w = new BufferedOutputStream(new FileOutputStream(outputFilename)) + for (line <- Source.fromFile(templateFilename).getLines()) { + if (line.trim == "") { + val publicConfigs = CometConf.allConfs.filter(_.isPublic) + val confs = publicConfigs.sortBy(_.key) + w.write("| Config | Description | Default Value |\n".getBytes) + w.write("|--------|-------------|---------------|\n".getBytes) + for (conf <- confs) { + w.write(s"| ${conf.key} | ${conf.doc.trim} | ${conf.defaultValueString} |\n".getBytes) + } + } else { + w.write(s"${line.trim}\n".getBytes) + } + } + w.close() + } + + private def generateCompatibilityGuide(): Unit = { + val templateFilename = "docs/source/user-guide/compatibility-template.md" + val outputFilename = "docs/source/user-guide/compatibility.md" + val w = new BufferedOutputStream(new FileOutputStream(outputFilename)) + for (line <- Source.fromFile(templateFilename).getLines()) { + if (line.trim == "") { + w.write("| From Type | To Type | Compatible? | Notes |\n".getBytes) + w.write("|-|-|-|-|\n".getBytes) + for (fromType <- CometCast.supportedTypes) { + for (toType <- CometCast.supportedTypes) { + if (Cast.canCast(fromType, toType) && fromType != toType) { + val fromTypeName = fromType.typeName.replace("(10,2)", "") + val toTypeName = toType.typeName.replace("(10,2)", "") + CometCast.isSupported(fromType, toType, None, "LEGACY") match { + case Compatible => + w.write(s"| $fromTypeName | $toTypeName | Compatible | |\n".getBytes) + case Incompatible(Some(reason)) => + w.write(s"| $fromTypeName | $toTypeName | Incompatible | $reason |\n".getBytes) + case Incompatible(None) => + w.write(s"| $fromTypeName | $toTypeName | Incompatible | |\n".getBytes) + case Unsupported => + w.write(s"| $fromTypeName | $toTypeName | Unsupported | |\n".getBytes) + } + } + } + } + } else { + w.write(s"${line.trim}\n".getBytes) + } + } + w.close() + } +} diff --git a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala new file mode 100644 index 0000000000..5641c94a87 --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.expressions + +import org.apache.spark.sql.types.{DataType, DataTypes, DecimalType} + +sealed trait SupportLevel + +/** We support this feature with full compatibility with Spark */ +object Compatible extends SupportLevel + +/** We support this feature but results can be different from Spark */ +case class Incompatible(reason: Option[String] = None) extends SupportLevel + +/** We do not support this feature */ +object Unsupported extends SupportLevel + +object CometCast { + + def supportedTypes: Seq[DataType] = + Seq( + DataTypes.BooleanType, + DataTypes.ByteType, + DataTypes.ShortType, + DataTypes.IntegerType, + DataTypes.LongType, + DataTypes.FloatType, + DataTypes.DoubleType, + DataTypes.createDecimalType(10, 2), + DataTypes.StringType, + DataTypes.BinaryType, + DataTypes.DateType, + DataTypes.TimestampType) + // TODO add DataTypes.TimestampNTZType for Spark 3.4 and later + // https://github.com/apache/datafusion-comet/issues/378 + + def isSupported( + fromType: DataType, + toType: DataType, + timeZoneId: Option[String], + evalMode: String): SupportLevel = { + + if (fromType == toType) { + return Compatible + } + + (fromType, toType) match { + case (dt: DataType, _) if dt.typeName == "timestamp_ntz" => + // https://github.com/apache/datafusion-comet/issues/378 + toType match { + case DataTypes.TimestampType | DataTypes.DateType | DataTypes.StringType => + Incompatible() + case _ => + Unsupported + } + case (_: DecimalType, _: DecimalType) => + // https://github.com/apache/datafusion-comet/issues/375 + Incompatible() + case (DataTypes.StringType, _) => + canCastFromString(toType, timeZoneId, evalMode) + case (_, DataTypes.StringType) => + canCastToString(fromType) + case (DataTypes.TimestampType, _) => + canCastFromTimestamp(toType) + case (_: DecimalType, _) => + canCastFromDecimal(toType) + case (DataTypes.BooleanType, _) => + canCastFromBoolean(toType) + case ( + DataTypes.ByteType | DataTypes.ShortType | DataTypes.IntegerType | DataTypes.LongType, + _) => + canCastFromInt(toType) + case (DataTypes.FloatType, _) => + canCastFromFloat(toType) + case (DataTypes.DoubleType, _) => + canCastFromDouble(toType) + case _ => Unsupported + } + } + + private def canCastFromString( + toType: DataType, + timeZoneId: Option[String], + evalMode: String): SupportLevel = { + toType match { + case DataTypes.BooleanType => + Compatible + case DataTypes.ByteType | DataTypes.ShortType | DataTypes.IntegerType | + DataTypes.LongType => + Compatible + case DataTypes.BinaryType => + Compatible + case DataTypes.FloatType | DataTypes.DoubleType => + // https://github.com/apache/datafusion-comet/issues/326 + Unsupported + case _: DecimalType => + // https://github.com/apache/datafusion-comet/issues/325 + Unsupported + case DataTypes.DateType => + // https://github.com/apache/datafusion-comet/issues/327 + Unsupported + case DataTypes.TimestampType if timeZoneId.exists(tz => tz != "UTC") => + Incompatible(Some(s"Cast will use UTC instead of $timeZoneId")) + case DataTypes.TimestampType if evalMode == "ANSI" => + Incompatible(Some("ANSI mode not supported")) + case DataTypes.TimestampType => + // https://github.com/apache/datafusion-comet/issues/328 + Incompatible(Some("Not all valid formats are supported")) + case _ => + Unsupported + } + } + + private def canCastToString(fromType: DataType): SupportLevel = { + fromType match { + case DataTypes.BooleanType => Compatible + case DataTypes.ByteType | DataTypes.ShortType | DataTypes.IntegerType | + DataTypes.LongType => + Compatible + case DataTypes.DateType => Compatible + case DataTypes.TimestampType => Compatible + case DataTypes.FloatType | DataTypes.DoubleType => + // https://github.com/apache/datafusion-comet/issues/326 + Incompatible() + case DataTypes.BinaryType => + // https://github.com/apache/datafusion-comet/issues/377 + Incompatible() + case _ => Unsupported + } + } + + private def canCastFromTimestamp(toType: DataType): SupportLevel = { + toType match { + case DataTypes.BooleanType | DataTypes.ByteType | DataTypes.ShortType | + DataTypes.IntegerType => + // https://github.com/apache/datafusion-comet/issues/352 + // this seems like an edge case that isn't important for us to support + Unsupported + case DataTypes.LongType => + // https://github.com/apache/datafusion-comet/issues/352 + Compatible + case DataTypes.StringType => Compatible + case DataTypes.DateType => Compatible + case _ => Unsupported + } + } + + private def canCastFromBoolean(toType: DataType): SupportLevel = toType match { + case DataTypes.ByteType | DataTypes.ShortType | DataTypes.IntegerType | DataTypes.LongType | + DataTypes.FloatType | DataTypes.DoubleType => + Compatible + case _ => Unsupported + } + + private def canCastFromInt(toType: DataType): SupportLevel = toType match { + case DataTypes.BooleanType | DataTypes.ByteType | DataTypes.ShortType | + DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType | DataTypes.DoubleType | + _: DecimalType => + Compatible + case _ => Unsupported + } + + private def canCastFromFloat(toType: DataType): SupportLevel = toType match { + case DataTypes.BooleanType | DataTypes.DoubleType => Compatible + case _ => Unsupported + } + + private def canCastFromDouble(toType: DataType): SupportLevel = toType match { + case DataTypes.BooleanType | DataTypes.FloatType => Compatible + case _: DecimalType => Incompatible() + case _ => Unsupported + } + + private def canCastFromDecimal(toType: DataType): SupportLevel = toType match { + case DataTypes.FloatType | DataTypes.DoubleType => Compatible + case _ => Unsupported + } + +} diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index c07b2b3c50..e77adc9bb9 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -43,6 +43,7 @@ import org.apache.spark.unsafe.types.UTF8String import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.{isCometOperatorEnabled, isCometScan, isSpark32, isSpark34Plus, withInfo} +import org.apache.comet.expressions.{CometCast, Compatible, Incompatible, Unsupported} import org.apache.comet.serde.ExprOuterClass.{AggExpr, DataType => ProtoDataType, Expr, ScalarFunc} import org.apache.comet.serde.ExprOuterClass.DataType.{DataTypeInfo, DecimalInfo, ListInfo, MapInfo, StructInfo} import org.apache.comet.serde.OperatorOuterClass.{AggregateMode => CometAggregateMode, JoinType, Operator} @@ -585,30 +586,35 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { // Spark 3.4+ has EvalMode enum with values LEGACY, ANSI, and TRY evalMode.toString } - - val supportedTimezone = (child.dataType, dt) match { - case (DataTypes.StringType, DataTypes.TimestampType) - if !timeZoneId.contains("UTC") => - withInfo(expr, s"Unsupported timezone ${timeZoneId} for timestamp cast") - false - case _ => true - } - - val supportedCast = (child.dataType, dt) match { - case (DataTypes.StringType, DataTypes.TimestampType) - if !CometConf.COMET_CAST_STRING_TO_TIMESTAMP.get() => - // https://github.com/apache/datafusion-comet/issues/328 - withInfo(expr, s"${CometConf.COMET_CAST_STRING_TO_TIMESTAMP.key} is disabled") - false - case _ => true - } - - if (supportedCast && supportedTimezone) { - castToProto(timeZoneId, dt, childExpr, evalModeStr) - } else { - // no need to call withInfo here since it was called when determining - // the value for `supportedCast` - None + val castSupport = + CometCast.isSupported(child.dataType, dt, timeZoneId, evalModeStr) + + def getIncompatMessage(reason: Option[String]) = + "Comet does not guarantee correct results for cast " + + s"from ${child.dataType} to $dt " + + s"with timezone $timeZoneId and evalMode $evalModeStr" + + reason.map(str => s" ($str)").getOrElse("") + + castSupport match { + case Compatible => + castToProto(timeZoneId, dt, childExpr, evalModeStr) + case Incompatible(reason) => + if (CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.get()) { + logWarning(getIncompatMessage(reason)) + castToProto(timeZoneId, dt, childExpr, evalModeStr) + } else { + withInfo( + expr, + s"${getIncompatMessage(reason)}. To enable all incompatible casts, set " + + s"${CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key}=true") + None + } + case Unsupported => + withInfo( + expr, + s"Unsupported cast from ${child.dataType} to $dt " + + s"with timezone $timeZoneId and evalMode $evalModeStr") + None } } else { withInfo(expr, child) diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 3be7dcb64f..54b1367912 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -30,6 +30,8 @@ import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, DataTypes} +import org.apache.comet.expressions.CometCast + class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { import testImplicits._ @@ -72,22 +74,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } - // make sure we have tests for all combinations of our supported types - val supportedTypes = - Seq( - DataTypes.BooleanType, - DataTypes.ByteType, - DataTypes.ShortType, - DataTypes.IntegerType, - DataTypes.LongType, - DataTypes.FloatType, - DataTypes.DoubleType, - DataTypes.createDecimalType(10, 2), - DataTypes.StringType, - DataTypes.DateType, - DataTypes.TimestampType) - // TODO add DataTypes.TimestampNTZType for Spark 3.4 and later - assertTestsExist(supportedTypes, supportedTypes) + assertTestsExist(CometCast.supportedTypes, CometCast.supportedTypes) } // CAST from BooleanType @@ -164,6 +151,10 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { castTest(generateBytes(), DataTypes.StringType) } + ignore("cast ByteType to BinaryType") { + castTest(generateBytes(), DataTypes.BinaryType) + } + ignore("cast ByteType to TimestampType") { // input: -1, expected: 1969-12-31 15:59:59.0, actual: 1969-12-31 15:59:59.999999 castTest(generateBytes(), DataTypes.TimestampType) @@ -204,6 +195,10 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { castTest(generateShorts(), DataTypes.StringType) } + ignore("cast ShortType to BinaryType") { + castTest(generateShorts(), DataTypes.BinaryType) + } + ignore("cast ShortType to TimestampType") { // input: -1003, expected: 1969-12-31 15:43:17.0, actual: 1969-12-31 15:59:59.998997 castTest(generateShorts(), DataTypes.TimestampType) @@ -246,6 +241,10 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { castTest(generateInts(), DataTypes.StringType) } + ignore("cast IntegerType to BinaryType") { + castTest(generateInts(), DataTypes.BinaryType) + } + ignore("cast IntegerType to TimestampType") { // input: -1000479329, expected: 1938-04-19 01:04:31.0, actual: 1969-12-31 15:43:19.520671 castTest(generateInts(), DataTypes.TimestampType) @@ -289,6 +288,10 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { castTest(generateLongs(), DataTypes.StringType) } + ignore("cast LongType to BinaryType") { + castTest(generateLongs(), DataTypes.BinaryType) + } + ignore("cast LongType to TimestampType") { // java.lang.ArithmeticException: long overflow castTest(generateLongs(), DataTypes.TimestampType) @@ -537,23 +540,47 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { castTest(values, DataTypes.createDecimalType(10, 2)) } + test("cast StringType to BinaryType") { + castTest(generateStrings(numericPattern, 8).toDF("a"), DataTypes.BinaryType) + } + ignore("cast StringType to DateType") { // https://github.com/apache/datafusion-comet/issues/327 castTest(generateStrings(datePattern, 8).toDF("a"), DataTypes.DateType) } test("cast StringType to TimestampType disabled by default") { - val values = Seq("2020-01-01T12:34:56.123456", "T2").toDF("a") - castFallbackTest( - values.toDF("a"), - DataTypes.TimestampType, - "spark.comet.cast.stringToTimestamp is disabled") + withSQLConf((SQLConf.SESSION_LOCAL_TIMEZONE.key, "UTC")) { + val values = Seq("2020-01-01T12:34:56.123456", "T2").toDF("a") + castFallbackTest( + values.toDF("a"), + DataTypes.TimestampType, + "Not all valid formats are supported") + } + } + + test("cast StringType to TimestampType disabled for non-UTC timezone") { + withSQLConf((SQLConf.SESSION_LOCAL_TIMEZONE.key, "America/Denver")) { + val values = Seq("2020-01-01T12:34:56.123456", "T2").toDF("a") + castFallbackTest( + values.toDF("a"), + DataTypes.TimestampType, + "Cast will use UTC instead of Some(America/Denver)") + } + } + + ignore("cast StringType to TimestampType (fuzz test)") { + // https://github.com/apache/datafusion-comet/issues/328 + withSQLConf((CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key, "true")) { + val values = Seq("2020-01-01T12:34:56.123456", "T2") ++ generateStrings(timestampPattern, 8) + castTest(values.toDF("a"), DataTypes.TimestampType) + } } test("cast StringType to TimestampType") { withSQLConf( SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC", - CometConf.COMET_CAST_STRING_TO_TIMESTAMP.key -> "true") { + CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { val values = Seq( "2020", "2020-01", @@ -570,15 +597,17 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { // test for invalid inputs withSQLConf( SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC", - CometConf.COMET_CAST_STRING_TO_TIMESTAMP.key -> "true") { + CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { val values = Seq("-9?", "1-", "0.5") castTimestampTest(values.toDF("a"), DataTypes.TimestampType) } } - test("cast StringType to TimestampType with invalid timezone") { - val values = Seq("2020-01-01T12:34:56.123456", "T2") - castFallbackTestTimezone(values.toDF("a"), DataTypes.TimestampType, "Unsupported timezone") + // CAST from BinaryType + + ignore("cast BinaryType to StringType") { + // TODO implement this + // https://github.com/apache/datafusion-comet/issues/377 } // CAST from DateType @@ -657,9 +686,8 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { castTest(generateTimestamps(), DataTypes.IntegerType) } - ignore("cast TimestampType to LongType") { - // https://github.com/apache/datafusion-comet/issues/352 - // input: 2023-12-31 17:00:00.0, expected: 1.70407078E9, actual: 1.70407082E15] + test("cast TimestampType to LongType") { + assume(CometSparkSessionExtensions.isSpark33Plus) castTest(generateTimestamps(), DataTypes.LongType) } @@ -818,7 +846,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( (SQLConf.ANSI_ENABLED.key, "false"), - (CometConf.COMET_CAST_STRING_TO_TIMESTAMP.key, "true"), + (CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key, "true"), (SQLConf.SESSION_LOCAL_TIMEZONE.key, "America/Los_Angeles")) { val df = data.withColumn("converted", col("a").cast(toType)) df.collect() @@ -847,6 +875,11 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { } private def castTest(input: DataFrame, toType: DataType): Unit = { + + // we do not support the TryCast expression in Spark 3.2 and 3.3 + // https://github.com/apache/datafusion-comet/issues/374 + val testTryCast = CometSparkSessionExtensions.isSpark34Plus + withTempPath { dir => val data = roundtripParquet(input, dir).coalesce(1) data.createOrReplaceTempView("t") @@ -854,12 +887,14 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf((SQLConf.ANSI_ENABLED.key, "false")) { // cast() should return null for invalid inputs when ansi mode is disabled val df = spark.sql(s"select a, cast(a as ${toType.sql}) from t order by a") - checkSparkAnswer(df) + checkSparkAnswerAndOperator(df) // try_cast() should always return null for invalid inputs - val df2 = - spark.sql(s"select a, try_cast(a as ${toType.sql}) from t order by a") - checkSparkAnswer(df2) + if (testTryCast) { + val df2 = + spark.sql(s"select a, try_cast(a as ${toType.sql}) from t order by a") + checkSparkAnswerAndOperator(df2) + } } // with ANSI enabled, we should produce the same exception as Spark @@ -899,9 +934,11 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { } // try_cast() should always return null for invalid inputs - val df2 = - spark.sql(s"select a, try_cast(a as ${toType.sql}) from t order by a") - checkSparkAnswer(df2) + if (testTryCast) { + val df2 = + spark.sql(s"select a, try_cast(a as ${toType.sql}) from t order by a") + checkSparkAnswerAndOperator(df2) + } } } } diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 3683c8d442..c8c7ffd5ce 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -259,7 +259,9 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("cast timestamp and timestamp_ntz") { - withSQLConf(SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu") { + withSQLConf( + SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", + CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") @@ -282,7 +284,9 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { // TODO: make the test pass for Spark 3.2 & 3.3 assume(isSpark34Plus) - withSQLConf(SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu") { + withSQLConf( + SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", + CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") @@ -305,7 +309,9 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { // TODO: make the test pass for Spark 3.2 & 3.3 assume(isSpark34Plus) - withSQLConf(SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu") { + withSQLConf( + SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", + CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") @@ -394,32 +400,34 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("date_trunc with timestamp_ntz") { assume(!isSpark32, "timestamp functions for timestamp_ntz have incorrect behavior in 3.2") - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") - makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000) - withParquetTable(path.toString, "timetbl") { - Seq( - "YEAR", - "YYYY", - "YY", - "MON", - "MONTH", - "MM", - "QUARTER", - "WEEK", - "DAY", - "DD", - "HOUR", - "MINUTE", - "SECOND", - "MILLISECOND", - "MICROSECOND").foreach { format => - checkSparkAnswerAndOperator( - "SELECT " + - s"date_trunc('$format', _3), " + - s"date_trunc('$format', _5) " + - " from timetbl") + withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") + makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000) + withParquetTable(path.toString, "timetbl") { + Seq( + "YEAR", + "YYYY", + "YY", + "MON", + "MONTH", + "MM", + "QUARTER", + "WEEK", + "DAY", + "DD", + "HOUR", + "MINUTE", + "SECOND", + "MILLISECOND", + "MICROSECOND").foreach { format => + checkSparkAnswerAndOperator( + "SELECT " + + s"date_trunc('$format', _3), " + + s"date_trunc('$format', _5) " + + " from timetbl") + } } } } @@ -428,22 +436,24 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("date_trunc with format array") { assume(isSpark33Plus, "TimestampNTZ is supported in Spark 3.3+, See SPARK-36182") - val numRows = 1000 - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "timestamp_trunc_with_format.parquet") - makeDateTimeWithFormatTable(path, dictionaryEnabled = dictionaryEnabled, numRows) - withParquetTable(path.toString, "timeformattbl") { - checkSparkAnswerAndOperator( - "SELECT " + - "format, _0, _1, _2, _3, _4, _5, " + - "date_trunc(format, _0), " + - "date_trunc(format, _1), " + - "date_trunc(format, _2), " + - "date_trunc(format, _3), " + - "date_trunc(format, _4), " + - "date_trunc(format, _5) " + - " from timeformattbl ") + withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + val numRows = 1000 + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "timestamp_trunc_with_format.parquet") + makeDateTimeWithFormatTable(path, dictionaryEnabled = dictionaryEnabled, numRows) + withParquetTable(path.toString, "timeformattbl") { + checkSparkAnswerAndOperator( + "SELECT " + + "format, _0, _1, _2, _3, _4, _5, " + + "date_trunc(format, _0), " + + "date_trunc(format, _1), " + + "date_trunc(format, _2), " + + "date_trunc(format, _3), " + + "date_trunc(format, _4), " + + "date_trunc(format, _5) " + + " from timeformattbl ") + } } } } @@ -818,7 +828,9 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("ceil and floor") { Seq("true", "false").foreach { dictionary => - withSQLConf("parquet.enable.dictionary" -> dictionary) { + withSQLConf( + "parquet.enable.dictionary" -> dictionary, + CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { withParquetTable( (-5 until 5).map(i => (i.toDouble + 0.3, i.toDouble + 0.8)), "tbl", @@ -1406,7 +1418,9 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("hash functions") { Seq(true, false).foreach { dictionary => - withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { + withSQLConf( + "parquet.enable.dictionary" -> dictionary.toString, + CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { val table = "test" withTable(table) { sql(s"create table $table(col string, a int, b float) using parquet") diff --git a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala index bd4042ec11..fc6876fd19 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala @@ -863,7 +863,8 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq(true, false).foreach { nativeShuffleEnabled => withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> nativeShuffleEnabled.toString, - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") { + CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false", + CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { withTempDir { dir => val path = new Path(dir.toURI.toString, "test") makeParquetFile(path, 1000, 20, dictionaryEnabled) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index af553088c7..2bc82a9580 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -949,7 +949,9 @@ class CometExecSuite extends CometTestBase { } test("SPARK-33474: Support typed literals as partition spec values") { - withSQLConf(SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu") { + withSQLConf( + SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", + CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { withTable("t1") { val binaryStr = "Spark SQL" val binaryHexStr = Hex.hex(UTF8String.fromString(binaryStr).getBytes).toString diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala index 48969ea41d..90ea794739 100644 --- a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala @@ -261,6 +261,7 @@ trait CometPlanStabilitySuite extends DisableAdaptiveExecutionSuite with TPCDSBa CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_EXEC_ALL_EXPR_ENABLED.key -> "true", + CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true", // needed for v1.4/q9, v1.4/q44, v2.7.0/q6, v2.7.0/q64 "spark.sql.readSideCharPadding" -> "false", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB") { val qe = sql(queryString).queryExecution