Skip to content

Commit

Permalink
More
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed May 19, 2024
1 parent 6dc9745 commit 94849f0
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average, BitAndAgg, BitOrAgg, BitXorAgg, Count, CovPopulation, CovSample, Final, First, Last, Max, Min, Partial, StddevPop, StddevSamp, Sum, VariancePop, VarianceSamp}
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, NormalizeNaNAndZero}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, NormalizeNaNAndZero}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, SinglePartition}
import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils
Expand Down Expand Up @@ -2417,6 +2417,12 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
return None
}

if (join.buildSide == BuildRight && join.joinType == LeftAnti) {
// DataFusion HashJoin LeftAnti has bugs on null keys.
withInfo(join, "BuildRight with LeftAnti is not supported")
return None
}

val condition = join.condition.map { cond =>
val condProto = exprToProto(cond, join.left.output ++ join.right.output)
if (condProto.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ case class CometBroadcastHashJoinExec(
this.copy(left = newLeft, right = newRight)

override def stringArgs: Iterator[Any] =
Iterator(leftKeys, rightKeys, joinType, condition, left, right)
Iterator(leftKeys, rightKeys, joinType, condition, buildSide, left, right)

override def equals(obj: Any): Boolean = {
obj match {
Expand Down
13 changes: 8 additions & 5 deletions spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ class CometJoinSuite extends CometTestBase {

test("HashJoin without join filter") {
withSQLConf(
"spark.sql.join.forceApplyShuffledHashJoin" -> "true",
SQLConf.PREFER_SORTMERGEJOIN.key -> "false",
SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
Expand Down Expand Up @@ -170,12 +171,14 @@ class CometJoinSuite extends CometTestBase {
sql("SELECT /*+ SHUFFLE_HASH(tbl_b) */ * FROM tbl_a FULL JOIN tbl_b ON tbl_a._2 = tbl_b._1")
checkSparkAnswerAndOperator(df7)

//
// val left = sql("SELECT * FROM tbl_a")
// val right = sql("SELECT * FROM tbl_b")
//
// Left semi and anti joins are only supported with build right in Spark.
// left.join(right, left("_2") === right("_1"), "leftsemi")
val left = sql("SELECT * FROM tbl_a")
val right = sql("SELECT * FROM tbl_b")

val df8 = left.join(right, left("_2") === right("_1"), "leftsemi")
checkSparkAnswerAndOperator(df8)

// DataFusion HashJoin LeftAnti has bugs in handling nulls and is disabled for now.
// left.join(right, left("_2") === right("_1"), "leftanti")
}
}
Expand Down

0 comments on commit 94849f0

Please sign in to comment.