From 94849f0b8bf63be08a5ced18dbc86b80343cccfe Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 19 May 2024 10:46:06 -0700 Subject: [PATCH] More --- .../org/apache/comet/serde/QueryPlanSerde.scala | 8 +++++++- .../org/apache/spark/sql/comet/operators.scala | 2 +- .../org/apache/comet/exec/CometJoinSuite.scala | 13 ++++++++----- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index cb95a2b4e..f544b899a 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -25,7 +25,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average, BitAndAgg, BitOrAgg, BitXorAgg, Count, CovPopulation, CovSample, Final, First, Last, Max, Min, Partial, StddevPop, StddevSamp, Sum, VariancePop, VarianceSamp} import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke -import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, NormalizeNaNAndZero} +import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, NormalizeNaNAndZero} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, SinglePartition} import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils @@ -2417,6 +2417,12 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim return None } + if (join.buildSide == BuildRight && join.joinType == LeftAnti) { + // DataFusion HashJoin LeftAnti has bugs on null keys. + withInfo(join, "BuildRight with LeftAnti is not supported") + return None + } + val condition = join.condition.map { cond => val condProto = exprToProto(cond, join.left.output ++ join.right.output) if (condProto.isEmpty) { diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 9cb7a849c..394e4b156 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -836,7 +836,7 @@ case class CometBroadcastHashJoinExec( this.copy(left = newLeft, right = newRight) override def stringArgs: Iterator[Any] = - Iterator(leftKeys, rightKeys, joinType, condition, left, right) + Iterator(leftKeys, rightKeys, joinType, condition, buildSide, left, right) override def equals(obj: Any): Boolean = { obj match { diff --git a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala index f29d43ab5..8bae2ecaa 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala @@ -121,6 +121,7 @@ class CometJoinSuite extends CometTestBase { test("HashJoin without join filter") { withSQLConf( + "spark.sql.join.forceApplyShuffledHashJoin" -> "true", SQLConf.PREFER_SORTMERGEJOIN.key -> "false", SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { @@ -170,12 +171,14 @@ class CometJoinSuite extends CometTestBase { sql("SELECT /*+ SHUFFLE_HASH(tbl_b) */ * FROM tbl_a FULL JOIN tbl_b ON tbl_a._2 = tbl_b._1") checkSparkAnswerAndOperator(df7) - // - // val left = sql("SELECT * FROM tbl_a") - // val right = sql("SELECT * FROM tbl_b") - // // Left semi and anti joins are only supported with build right in Spark. - // left.join(right, left("_2") === right("_1"), "leftsemi") + val left = sql("SELECT * FROM tbl_a") + val right = sql("SELECT * FROM tbl_b") + + val df8 = left.join(right, left("_2") === right("_1"), "leftsemi") + checkSparkAnswerAndOperator(df8) + + // DataFusion HashJoin LeftAnti has bugs in handling nulls and is disabled for now. // left.join(right, left("_2") === right("_1"), "leftanti") } }