Skip to content

Commit

Permalink
revert ansi mode changes
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Apr 19, 2024
1 parent 5023635 commit 7169bad
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
package org.apache.comet

import org.apache.spark._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.comet.CometMetricNode
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized._

import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION}
Expand All @@ -46,8 +44,7 @@ class CometExecIterator(
val id: Long,
inputs: Seq[Iterator[ColumnarBatch]],
protobufQueryPlan: Array[Byte],
nativeMetrics: CometMetricNode,
ansiEnabled: Boolean)
nativeMetrics: CometMetricNode)
extends Iterator[ColumnarBatch] {

private val nativeLib = new Native()
Expand Down Expand Up @@ -102,7 +99,6 @@ class CometExecIterator(
result.put("memory_fraction", String.valueOf(COMET_EXEC_MEMORY_FRACTION.get()))
result.put("batch_size", String.valueOf(COMET_BATCH_SIZE.get()))
result.put("debug_native", String.valueOf(COMET_DEBUG_ENABLED.get()))
result.put("ansi_mode", String.valueOf(ansiEnabled))

// Strip mandatory prefix spark. which is not required for DataFusion session params
conf.getAll.foreach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ package org.apache.spark.sql.comet

import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.comet.execution.shuffle.{CometShuffledBatchRDD, CometShuffleExchangeExec}
import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan, UnaryExecNode, UnsafeRowSerializer}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized.ColumnarBatch

import com.google.common.base.Objects
Expand Down Expand Up @@ -75,11 +73,7 @@ case class CometCollectLimitExec(
childRDD
} else {
val localLimitedRDD = if (limit >= 0) {
CometExecUtils.getNativeLimitRDD(
childRDD,
output,
limit,
SparkSession.active.conf.get(SQLConf.ANSI_ENABLED))
CometExecUtils.getNativeLimitRDD(childRDD, output, limit)
} else {
childRDD
}
Expand All @@ -94,11 +88,7 @@ case class CometCollectLimitExec(

new CometShuffledBatchRDD(dep, readMetrics)
}
CometExecUtils.getNativeLimitRDD(
singlePartitionRDD,
output,
limit,
SparkSession.active.conf.get(SQLConf.ANSI_ENABLED))
CometExecUtils.getNativeLimitRDD(singlePartitionRDD, output, limit)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,10 @@ object CometExecUtils {
def getNativeLimitRDD(
childPlan: RDD[ColumnarBatch],
outputAttribute: Seq[Attribute],
limit: Int,
ansiMode: Boolean): RDD[ColumnarBatch] = {
limit: Int): RDD[ColumnarBatch] = {
childPlan.mapPartitionsInternal { iter =>
val limitOp = CometExecUtils.getLimitNativePlan(outputAttribute, limit).get
CometExec.getCometIterator(Seq(iter), limitOp, ansiMode)
CometExec.getCometIterator(Seq(iter), limitOp)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@ package org.apache.spark.sql.comet

import org.apache.spark.rdd.{ParallelCollectionRDD, RDD}
import org.apache.spark.serializer.Serializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.comet.execution.shuffle.{CometShuffledBatchRDD, CometShuffleExchangeExec}
import org.apache.spark.sql.execution.{SparkPlan, TakeOrderedAndProjectExec, UnaryExecNode, UnsafeRowSerializer}
import org.apache.spark.sql.execution.metric.{SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized.ColumnarBatch

import org.apache.comet.serde.QueryPlanSerde.exprToProto
Expand Down Expand Up @@ -75,19 +73,18 @@ case class CometTakeOrderedAndProjectExec(
if (childRDD.getNumPartitions == 0) {
new ParallelCollectionRDD(sparkContext, Seq.empty[ColumnarBatch], 1, Map.empty)
} else {
val ansiEnabled = SparkSession.active.conf.get(SQLConf.ANSI_ENABLED)
val singlePartitionRDD = if (childRDD.getNumPartitions == 1) {
childRDD
} else {
val localTopK = if (orderingSatisfies) {
CometExecUtils.getNativeLimitRDD(childRDD, output, limit, ansiEnabled)
CometExecUtils.getNativeLimitRDD(childRDD, output, limit)
} else {
childRDD.mapPartitionsInternal { iter =>
val topK =
CometExecUtils
.getTopKNativePlan(output, sortOrder, child, limit)
.get
CometExec.getCometIterator(Seq(iter), topK, ansiEnabled)
CometExec.getCometIterator(Seq(iter), topK)
}
}

Expand All @@ -107,7 +104,7 @@ case class CometTakeOrderedAndProjectExec(
val topKAndProjection = CometExecUtils
.getProjectionNativePlan(projectList, output, sortOrder, child, limit)
.get
CometExec.getCometIterator(Seq(iter), topKAndProjection, ansiEnabled)
CometExec.getCometIterator(Seq(iter), topKAndProjection)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.spark.scheduler.MapStatus
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleWriteMetricsReporter, ShuffleWriteProcessor}
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
Expand Down Expand Up @@ -463,11 +462,7 @@ class CometShuffleWriteProcessor(
val nativeMetrics = CometMetricNode(nativeSQLMetrics)

val rawIter = cometRDD.iterator(partition, context)
val cometIter = CometExec.getCometIterator(
Seq(rawIter),
nativePlan,
nativeMetrics,
SparkSession.active.conf.get(SQLConf.ANSI_ENABLED))
val cometIter = CometExec.getCometIterator(Seq(rawIter), nativePlan, nativeMetrics)

while (cometIter.hasNext) {
cometIter.next()
Expand Down
18 changes: 5 additions & 13 deletions spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,21 +91,19 @@ object CometExec {

def getCometIterator(
inputs: Seq[Iterator[ColumnarBatch]],
nativePlan: Operator,
ansiMode: Boolean): CometExecIterator = {
getCometIterator(inputs, nativePlan, CometMetricNode(Map.empty), ansiMode)
nativePlan: Operator): CometExecIterator = {
getCometIterator(inputs, nativePlan, CometMetricNode(Map.empty))
}

def getCometIterator(
inputs: Seq[Iterator[ColumnarBatch]],
nativePlan: Operator,
nativeMetrics: CometMetricNode,
ansiMode: Boolean): CometExecIterator = {
nativeMetrics: CometMetricNode): CometExecIterator = {
val outputStream = new ByteArrayOutputStream()
nativePlan.writeTo(outputStream)
outputStream.close()
val bytes = outputStream.toByteArray
new CometExecIterator(newIterId, inputs, bytes, nativeMetrics, ansiMode)
new CometExecIterator(newIterId, inputs, bytes, nativeMetrics)
}

/**
Expand Down Expand Up @@ -201,20 +199,14 @@ abstract class CometNativeExec extends CometExec {
// Switch to use Decimal128 regardless of precision, since Arrow native execution
// doesn't support Decimal32 and Decimal64 yet.
SQLConf.get.setConfString(CometConf.COMET_USE_DECIMAL_128.key, "true")
val ansiEnabled: Boolean = SQLConf.get.getConf[Boolean](SQLConf.ANSI_ENABLED)

val serializedPlanCopy = serializedPlan
// TODO: support native metrics for all operators.
val nativeMetrics = CometMetricNode.fromCometPlan(this)

def createCometExecIter(inputs: Seq[Iterator[ColumnarBatch]]): CometExecIterator = {
val it =
new CometExecIterator(
CometExec.newIterId,
inputs,
serializedPlanCopy,
nativeMetrics,
ansiEnabled)
new CometExecIterator(CometExec.newIterId, inputs, serializedPlanCopy, nativeMetrics)

setSubqueries(it.id, originalPlan)

Expand Down

0 comments on commit 7169bad

Please sign in to comment.