diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index 8f8dc17b8..b3604c9e0 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -20,9 +20,7 @@ package org.apache.comet import org.apache.spark._ -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.comet.CometMetricNode -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.vectorized._ import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION} @@ -46,8 +44,7 @@ class CometExecIterator( val id: Long, inputs: Seq[Iterator[ColumnarBatch]], protobufQueryPlan: Array[Byte], - nativeMetrics: CometMetricNode, - ansiEnabled: Boolean) + nativeMetrics: CometMetricNode) extends Iterator[ColumnarBatch] { private val nativeLib = new Native() @@ -102,7 +99,6 @@ class CometExecIterator( result.put("memory_fraction", String.valueOf(COMET_EXEC_MEMORY_FRACTION.get())) result.put("batch_size", String.valueOf(COMET_BATCH_SIZE.get())) result.put("debug_native", String.valueOf(COMET_DEBUG_ENABLED.get())) - result.put("ansi_mode", String.valueOf(ansiEnabled)) // Strip mandatory prefix spark. which is not required for DataFusion session params conf.getAll.foreach { diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala index 3addb3ba1..dd4855126 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala @@ -21,12 +21,10 @@ package org.apache.spark.sql.comet import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.comet.execution.shuffle.{CometShuffledBatchRDD, CometShuffleExchangeExec} import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan, UnaryExecNode, UnsafeRowSerializer} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.vectorized.ColumnarBatch import com.google.common.base.Objects @@ -75,11 +73,7 @@ case class CometCollectLimitExec( childRDD } else { val localLimitedRDD = if (limit >= 0) { - CometExecUtils.getNativeLimitRDD( - childRDD, - output, - limit, - SparkSession.active.conf.get(SQLConf.ANSI_ENABLED)) + CometExecUtils.getNativeLimitRDD(childRDD, output, limit) } else { childRDD } @@ -94,11 +88,7 @@ case class CometCollectLimitExec( new CometShuffledBatchRDD(dep, readMetrics) } - CometExecUtils.getNativeLimitRDD( - singlePartitionRDD, - output, - limit, - SparkSession.active.conf.get(SQLConf.ANSI_ENABLED)) + CometExecUtils.getNativeLimitRDD(singlePartitionRDD, output, limit) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala index 805216d66..5931920a2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala @@ -50,11 +50,10 @@ object CometExecUtils { def getNativeLimitRDD( childPlan: RDD[ColumnarBatch], outputAttribute: Seq[Attribute], - limit: Int, - ansiMode: Boolean): RDD[ColumnarBatch] = { + limit: Int): RDD[ColumnarBatch] = { childPlan.mapPartitionsInternal { iter => val limitOp = CometExecUtils.getLimitNativePlan(outputAttribute, limit).get - CometExec.getCometIterator(Seq(iter), limitOp, ansiMode) + CometExec.getCometIterator(Seq(iter), limitOp) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala index 7ecdbfd5a..26ec401ed 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala @@ -21,14 +21,12 @@ package org.apache.spark.sql.comet import org.apache.spark.rdd.{ParallelCollectionRDD, RDD} import org.apache.spark.serializer.Serializer -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, SortOrder} import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.comet.execution.shuffle.{CometShuffledBatchRDD, CometShuffleExchangeExec} import org.apache.spark.sql.execution.{SparkPlan, TakeOrderedAndProjectExec, UnaryExecNode, UnsafeRowSerializer} import org.apache.spark.sql.execution.metric.{SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} import org.apache.spark.sql.execution.metric.SQLMetric -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.comet.serde.QueryPlanSerde.exprToProto @@ -75,19 +73,18 @@ case class CometTakeOrderedAndProjectExec( if (childRDD.getNumPartitions == 0) { new ParallelCollectionRDD(sparkContext, Seq.empty[ColumnarBatch], 1, Map.empty) } else { - val ansiEnabled = SparkSession.active.conf.get(SQLConf.ANSI_ENABLED) val singlePartitionRDD = if (childRDD.getNumPartitions == 1) { childRDD } else { val localTopK = if (orderingSatisfies) { - CometExecUtils.getNativeLimitRDD(childRDD, output, limit, ansiEnabled) + CometExecUtils.getNativeLimitRDD(childRDD, output, limit) } else { childRDD.mapPartitionsInternal { iter => val topK = CometExecUtils .getTopKNativePlan(output, sortOrder, child, limit) .get - CometExec.getCometIterator(Seq(iter), topK, ansiEnabled) + CometExec.getCometIterator(Seq(iter), topK) } } @@ -107,7 +104,7 @@ case class CometTakeOrderedAndProjectExec( val topKAndProjection = CometExecUtils .getProjectionNativePlan(projectList, output, sortOrder, child, limit) .get - CometExec.getCometIterator(Seq(iter), topKAndProjection, ansiEnabled) + CometExec.getCometIterator(Seq(iter), topKAndProjection) } } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index 742015a5d..232b6bf17 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -33,7 +33,6 @@ import org.apache.spark.scheduler.MapStatus import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleWriteMetricsReporter, ShuffleWriteProcessor} import org.apache.spark.shuffle.sort.SortShuffleManager -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering @@ -463,11 +462,7 @@ class CometShuffleWriteProcessor( val nativeMetrics = CometMetricNode(nativeSQLMetrics) val rawIter = cometRDD.iterator(partition, context) - val cometIter = CometExec.getCometIterator( - Seq(rawIter), - nativePlan, - nativeMetrics, - SparkSession.active.conf.get(SQLConf.ANSI_ENABLED)) + val cometIter = CometExec.getCometIterator(Seq(rawIter), nativePlan, nativeMetrics) while (cometIter.hasNext) { cometIter.next() diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index e45b191c2..8545eee90 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -91,21 +91,19 @@ object CometExec { def getCometIterator( inputs: Seq[Iterator[ColumnarBatch]], - nativePlan: Operator, - ansiMode: Boolean): CometExecIterator = { - getCometIterator(inputs, nativePlan, CometMetricNode(Map.empty), ansiMode) + nativePlan: Operator): CometExecIterator = { + getCometIterator(inputs, nativePlan, CometMetricNode(Map.empty)) } def getCometIterator( inputs: Seq[Iterator[ColumnarBatch]], nativePlan: Operator, - nativeMetrics: CometMetricNode, - ansiMode: Boolean): CometExecIterator = { + nativeMetrics: CometMetricNode): CometExecIterator = { val outputStream = new ByteArrayOutputStream() nativePlan.writeTo(outputStream) outputStream.close() val bytes = outputStream.toByteArray - new CometExecIterator(newIterId, inputs, bytes, nativeMetrics, ansiMode) + new CometExecIterator(newIterId, inputs, bytes, nativeMetrics) } /** @@ -201,7 +199,6 @@ abstract class CometNativeExec extends CometExec { // Switch to use Decimal128 regardless of precision, since Arrow native execution // doesn't support Decimal32 and Decimal64 yet. SQLConf.get.setConfString(CometConf.COMET_USE_DECIMAL_128.key, "true") - val ansiEnabled: Boolean = SQLConf.get.getConf[Boolean](SQLConf.ANSI_ENABLED) val serializedPlanCopy = serializedPlan // TODO: support native metrics for all operators. @@ -209,12 +206,7 @@ abstract class CometNativeExec extends CometExec { def createCometExecIter(inputs: Seq[Iterator[ColumnarBatch]]): CometExecIterator = { val it = - new CometExecIterator( - CometExec.newIterId, - inputs, - serializedPlanCopy, - nativeMetrics, - ansiEnabled) + new CometExecIterator(CometExec.newIterId, inputs, serializedPlanCopy, nativeMetrics) setSubqueries(it.id, originalPlan)