From f05339dcfea88e6edcd86cbbc629342c9184c62c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 10 Jun 2024 08:06:57 -0600 Subject: [PATCH 1/2] disable xxhash64 by default --- .../scala/org/apache/comet/CometConf.scala | 6 ++++ docs/source/user-guide/configs.md | 1 + .../apache/comet/serde/QueryPlanSerde.scala | 32 ++++++++++++------- 3 files changed, 27 insertions(+), 12 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 42fb5fb4c..1b40c7cd0 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -401,6 +401,12 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) + val COMET_XXHASH64_ENABLED: ConfigEntry[Boolean] = + conf("spark.comet.xxhash64.enabled") + .doc("The xxhash64 implementation is not optimized yet and may cause performance issues.") + .booleanConf + .createWithDefault(false) + } object ConfigHelpers { diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 104f29ce8..f232dc8b8 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -48,3 +48,4 @@ Comet provides the following configuration settings. | spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature of CometScan. By default is disabled. | false | | spark.comet.scan.preFetch.threadNum | The number of threads running pre-fetching for CometScan. Effective if spark.comet.scan.preFetch.enabled is enabled. By default it is 2. Note that more pre-fetching threads means more memory requirement to store pre-fetched row groups. | 2 | | spark.comet.shuffle.preferDictionary.ratio | The ratio of total values to distinct values in a string column to decide whether to prefer dictionary encoding when shuffling the column. If the ratio is higher than this config, dictionary encoding will be used on shuffling string column. This config is effective if it is higher than 1.0. By default, this config is 10.0. Note that this config is only used when 'spark.comet.columnar.shuffle.enabled' is true. | 10.0 | +| spark.comet.xxhash64.enabled | The xxhash64 implementation is not optimized yet and may cause performance issues. | false | diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 7d9bef48c..5a0ad38d7 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2101,19 +2101,27 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim scalarExprToProtoWithReturnType("murmur3_hash", IntegerType, exprs :+ seedExpr: _*) case XxHash64(children, seed) => - val firstUnSupportedInput = children.find(c => !supportedDataType(c.dataType)) - if (firstUnSupportedInput.isDefined) { - withInfo(expr, s"Unsupported datatype ${firstUnSupportedInput.get.dataType}") - return None + if (CometConf.COMET_XXHASH64_ENABLED.get()) { + val firstUnSupportedInput = children.find(c => !supportedDataType(c.dataType)) + if (firstUnSupportedInput.isDefined) { + withInfo(expr, s"Unsupported datatype ${firstUnSupportedInput.get.dataType}") + return None + } + val exprs = children.map(exprToProtoInternal(_, inputs)) + val seedBuilder = ExprOuterClass.Literal + .newBuilder() + .setDatatype(serializeDataType(LongType).get) + .setLongVal(seed) + val seedExpr = Some(ExprOuterClass.Expr.newBuilder().setLiteral(seedBuilder).build()) + // the seed is put at the end of the arguments + scalarExprToProtoWithReturnType("xxhash64", LongType, exprs :+ seedExpr: _*) + } else { + withInfo( + expr, + "xxhash64 is disabled by default. " + + s"Set ${CometConf.COMET_XXHASH64_ENABLED.key}=true to enable it.") + None } - val exprs = children.map(exprToProtoInternal(_, inputs)) - val seedBuilder = ExprOuterClass.Literal - .newBuilder() - .setDatatype(serializeDataType(LongType).get) - .setLongVal(seed) - val seedExpr = Some(ExprOuterClass.Expr.newBuilder().setLiteral(seedBuilder).build()) - // the seed is put at the end of the arguments - scalarExprToProtoWithReturnType("xxhash64", LongType, exprs :+ seedExpr: _*) case Sha2(left, numBits) => if (!numBits.foldable) { From d2c8409f80a1c6bd06714ef763449270b05a9ed3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 10 Jun 2024 11:25:32 -0600 Subject: [PATCH 2/2] fix regressions --- .../src/test/scala/org/apache/comet/CometExpressionSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 7516a0785..72cf819c1 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1489,6 +1489,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq(true, false).foreach { dictionary => withSQLConf( "parquet.enable.dictionary" -> dictionary.toString, + CometConf.COMET_XXHASH64_ENABLED.key -> "true", CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { val table = "test" withTable(table) { @@ -1521,6 +1522,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq(true, false).foreach { dictionary => withSQLConf( "parquet.enable.dictionary" -> dictionary.toString, + CometConf.COMET_XXHASH64_ENABLED.key -> "true", CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { val table = "test" withTable(table) {