From 5f24f97e072e1628e04428a22d6d33a0e0f5027b Mon Sep 17 00:00:00 2001 From: binwei Date: Fri, 9 Aug 2024 23:39:40 +0000 Subject: [PATCH 1/6] Fix for pinterest spark Spark3.2.0 support --- .../MergeTwoPhasesHashAggregate.scala | 14 +-- package/pom.xml | 1 + pom.xml | 10 +- .../sql/shims/spark32/Spark32Shims.scala | 8 +- .../datasources/FileFormatDataWriter.scala | 23 +++-- .../InsertIntoHadoopFsRelationCommand.scala | 75 ++++++++++++++- .../parquet/ParquetFileFormat.scala | 93 +++++++------------ .../v2/AbstractBatchScanExec.scala | 19 ++-- .../datasources/v2/BatchScanExecShim.scala | 19 ++-- .../datasources/v2/utils/CatalogUtil.scala | 3 - 10 files changed, 149 insertions(+), 116 deletions(-) diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MergeTwoPhasesHashAggregate.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MergeTwoPhasesHashAggregate.scala index e19cd09a01a3..a264ac4b19c2 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MergeTwoPhasesHashAggregate.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MergeTwoPhasesHashAggregate.scala @@ -65,15 +65,13 @@ case class MergeTwoPhasesHashBaseAggregate(session: SparkSession) extends Rule[S } else { plan.transformDown { case hashAgg @ HashAggregateExec( - _, - isStreaming, _, _, aggregateExpressions, aggregateAttributes, _, resultExpressions, - child: HashAggregateExec) if !isStreaming && isPartialAgg(child, hashAgg) => + child: HashAggregateExec) if isPartialAgg(child, hashAgg) => // convert to complete mode aggregate expressions val completeAggregateExpressions = aggregateExpressions.map(_.copy(mode = Complete)) hashAgg.copy( @@ -83,17 +81,15 @@ case class MergeTwoPhasesHashBaseAggregate(session: SparkSession) extends Rule[S initialInputBufferOffset = 0, child = child.child ) + case objectHashAgg @ ObjectHashAggregateExec( - _, - isStreaming, _, _, aggregateExpressions, aggregateAttributes, _, resultExpressions, - child: ObjectHashAggregateExec) - if !isStreaming && isPartialAgg(child, objectHashAgg) => + child: ObjectHashAggregateExec) if isPartialAgg(child, objectHashAgg) => // convert to complete mode aggregate expressions val completeAggregateExpressions = aggregateExpressions.map(_.copy(mode = Complete)) objectHashAgg.copy( @@ -104,8 +100,6 @@ case class MergeTwoPhasesHashBaseAggregate(session: SparkSession) extends Rule[S child = child.child ) case sortAgg @ SortAggregateExec( - _, - isStreaming, _, _, aggregateExpressions, @@ -113,7 +107,7 @@ case class MergeTwoPhasesHashBaseAggregate(session: SparkSession) extends Rule[S _, resultExpressions, child: SortAggregateExec) - if replaceSortAggWithHashAgg && !isStreaming && isPartialAgg(child, sortAgg) => + if replaceSortAggWithHashAgg && isPartialAgg(child, sortAgg) => // convert to complete mode aggregate expressions val completeAggregateExpressions = aggregateExpressions.map(_.copy(mode = Complete)) sortAgg.copy( diff --git a/package/pom.xml b/package/pom.xml index ae1432770f8a..15a5834111af 100644 --- a/package/pom.xml +++ b/package/pom.xml @@ -332,6 +332,7 @@ org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$ com.google.protobuf.* + org.apache.spark.sql.execution.stat.StatFunctions compile diff --git a/pom.xml b/pom.xml index fd44d06b759c..1366f8188b0c 100644 --- a/pom.xml +++ b/pom.xml @@ -57,10 +57,10 @@ 2.12 2.12.15 3 - 3.4 - 3.4.2 - spark34 - spark-sql-columnar-shims-spark34 + 3.2 + 3.2.0 + spark32 + spark-sql-columnar-shims-spark32 1.5.0 delta-core 2.4.0 @@ -257,7 +257,7 @@ 3.2 spark32 spark-sql-columnar-shims-spark32 - 3.2.2 + 3.2.0 1.3.1 delta-core 2.0.1 diff --git a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala index 995d5b087d3c..6c646ac24c32 100644 --- a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala +++ b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala @@ -30,10 +30,9 @@ import org.apache.spark.sql.catalyst.csv.CSVOptions import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BinaryExpression, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName} import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.plans.physical.{Distribution, HashClusteredDistribution} +import org.apache.spark.sql.catalyst.plans.physical.{Distribution, StatefulOpClusteredDistribution} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap -import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil, SparkPlan} @@ -61,7 +60,8 @@ class Spark32Shims extends SparkShims { override def getDistribution( leftKeys: Seq[Expression], rightKeys: Seq[Expression]): Seq[Distribution] = { - HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil + val dist = StatefulOpClusteredDistribution(leftKeys, 0) + dist :: StatefulOpClusteredDistribution(rightKeys, 0) :: Nil } override def scalarExpressionMappings: Seq[Sig] = Seq(Sig[Empty2Null](ExpressionNames.EMPTY2NULL)) @@ -273,7 +273,7 @@ class Spark32Shims extends SparkShims { conf.parquetFilterPushDownStringStartWith, conf.parquetFilterPushDownInFilterThreshold, caseSensitive.getOrElse(conf.caseSensitiveAnalysis), - RebaseSpec(LegacyBehaviorPolicy.CORRECTED) + LegacyBehaviorPolicy.CORRECTED ) } diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala index e5aaff691168..a7c24bdc0690 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StringType import org.apache.spark.util.{SerializableConfiguration, Utils} +import org.apache.spark.util.Utils.isDirectWriteScheme import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.TaskAttemptContext @@ -161,9 +162,12 @@ class SingleDirectoryDataWriter( releaseResources() val ext = description.outputWriterFactory.getFileExtension(taskAttemptContext) - val currentPath = - committer.newTaskTempFile(taskAttemptContext, None, f"-c$fileCounter%03d" + ext) - + val fileCounterExt = if (description.maxRecordsPerFile > 0) { + f"-c$fileCounter%03d" + } else { + "" + } + val currentPath = committer.newTaskTempFile(taskAttemptContext, None, fileCounterExt + ext) currentWriter = description.outputWriterFactory.newInstance( path = currentPath, dataSchema = description.dataColumns.toStructType, @@ -293,14 +297,21 @@ abstract class BaseDynamicPartitionDataWriter( val bucketIdStr = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("") // This must be in a form that matches our bucketing format. See BucketingUtils. - val ext = f"$bucketIdStr.c$fileCounter%03d" + + val ext = if (!bucketIdStr.isEmpty || description.maxRecordsPerFile > 0) { + f"$bucketIdStr.c$fileCounter%03d" + + description.outputWriterFactory.getFileExtension(taskAttemptContext) + } else { description.outputWriterFactory.getFileExtension(taskAttemptContext) - + } val customPath = partDir.flatMap { dir => description.customPartitionLocations.get(PartitioningUtils.parsePathFragment(dir)) } val currentPath = if (customPath.isDefined) { - committer.newTaskTempFileAbsPath(taskAttemptContext, customPath.get, ext) + if (isDirectWriteScheme(taskAttemptContext.getConfiguration, customPath.get)) { + committer.newTaskTempFile(taskAttemptContext, None, ext) + } else { + committer.newTaskTempFileAbsPath(taskAttemptContext, customPath.get, ext) + } } else { committer.newTaskTempFile(taskAttemptContext, partDir, ext) } diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 9221ecbd1294..e4cda6fad487 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -21,14 +21,15 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTablePartition} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._ -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Literal} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.util.Utils.isDirectWriteScheme import org.apache.hadoop.fs.{FileSystem, Path} @@ -105,11 +106,73 @@ case class InsertIntoHadoopFsRelationCommand( } val jobId = java.util.UUID.randomUUID().toString + + val isDirectWrite = isDirectWriteScheme(hadoopConf, outputPath.toString) + val isStaticPartitionInsert = + staticPartitions.size == partitionColumns.length && partitionColumns.nonEmpty + + val formattedOutputPath = if (isStaticPartitionInsert) { + // This will not work correctly for cases where spark.sql.storeAssignmentPolicy=LEGACY + // and we insert into a partition where the types don't match i.e + // create table t(a int, b string) using parquet partitioned by (a); + // insert into t partition(a='ansi') values('ansi'); + // The correct behavior is to insert data under a=__HIVE_DEFAULT_PARTITION__ + // since ansi cannot be an integer but this will insert data under a=ansi + // so we extract the value of the partition by analyzing the query. + val formattedStaticPartitions = staticPartitions.map { + case (k, v) => + val index = outputColumnNames.indexOf(k) + val isNull = query match { + case Project(projectList, _) => + projectList(index).asInstanceOf[Alias].child.asInstanceOf[Literal].value == null + case LocalRelation(_, Nil, _) => false + case LocalRelation(_, data, _) => + data.head.isNullAt(index) + case _ => false + } + if (isNull) { + k -> null + } else { + k -> v + } + } + val defaultLocation = outputPath + + "/" + PartitioningUtils.getPathFragment(formattedStaticPartitions, partitionColumns) + new Path(customPartitionLocations.getOrElse(staticPartitions, defaultLocation)) + } else { + outputPath + } + + hadoopConf.setBoolean("pinterest.spark.sql.staticPartitionInsert", isStaticPartitionInsert) + // DataSourceStrategy.scala relies on the s3 committer being set here when checking + // if the writePath == readPath + if (isDirectWrite) { + if ((isStaticPartitionInsert || partitionColumns.isEmpty) && bucketSpec.isEmpty) { + hadoopConf.set( + "spark.sql.sources.outputCommitterClass", + "com.netflix.bdp.s3.S3DirectoryOutputCommitter") + hadoopConf.set( + "spark.sql.parquet.output.committer.class", + "com.netflix.bdp.s3.S3DirectoryOutputCommitter") + } else { + hadoopConf.set( + "spark.sql.sources.outputCommitterClass", + "com.netflix.bdp.s3.S3PartitionedOutputCommitter") + hadoopConf.set( + "spark.sql.parquet.output.committer.class", + "com.netflix.bdp.s3.S3PartitionedOutputCommitter") + } + hadoopConf.set( + "s3.multipart.committer.conflict-mode", + if (mode == SaveMode.Overwrite) "replace" else "append") + } + val committer = FileCommitProtocol.instantiate( sparkSession.sessionState.conf.fileCommitProtocolClass, jobId = jobId, - outputPath = outputPath.toString, - dynamicPartitionOverwrite = dynamicPartitionOverwrite) + outputPath = formattedOutputPath.toString, + dynamicPartitionOverwrite = !isDirectWrite && dynamicPartitionOverwrite + ) val doInsertion = if (mode == SaveMode.Append) { true @@ -125,7 +188,9 @@ case class InsertIntoHadoopFsRelationCommand( // For dynamic partition overwrite, do not delete partition directories ahead. true } else { - deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer) + if (!isDirectWrite) { + deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer) + } true } case (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) => diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index fb03fb5f4a21..5e5e264a890b 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -261,7 +261,8 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging sparkSession.sessionState.conf.isParquetBinaryAsString) hadoopConf.setBoolean( SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, - sparkSession.sessionState.conf.isParquetINT96AsTimestamp) + sparkSession.sessionState.conf.isParquetINT96AsTimestamp + ) val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) @@ -273,8 +274,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging val sqlConf = sparkSession.sessionState.conf val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled val enableVectorizedReader: Boolean = - sqlConf.parquetVectorizedReaderEnabled && - resultSchema.forall(_.dataType.isInstanceOf[AtomicType]) + ParquetUtils.isBatchReadSupportedForSchema(sqlConf, resultSchema) val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion val capacity = sqlConf.parquetVectorizedReaderBatchSize @@ -301,7 +301,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging lazy val footerFileMetaData = ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData - val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec( + val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) // Try to push down filters when filter push-down is enabled. @@ -315,7 +315,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive, - datetimeRebaseSpec) + datetimeRebaseMode) filters // Collects all converted Parquet filter predicates. Notice that not all predicates can be // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` @@ -341,7 +341,7 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging None } - val int96RebaseSpec = DataSourceUtils.int96RebaseSpec( + val int96RebaseMode = DataSourceUtils.int96RebaseMode( footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) @@ -358,46 +358,30 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging if (enableVectorizedReader) { val vectorizedReader = new VectorizedParquetRecordReader( convertTz.orNull, - datetimeRebaseSpec.mode.toString, - datetimeRebaseSpec.timeZone, - int96RebaseSpec.mode.toString, - int96RebaseSpec.timeZone, + datetimeRebaseMode.toString, + int96RebaseMode.toString, enableOffHeapColumnVector && taskContext.isDefined, - capacity - ) - // SPARK-37089: We cannot register a task completion listener to close this iterator here - // because downstream exec nodes have already registered their listeners. Since listeners - // are executed in reverse order of registration, a listener registered here would close the - // iterator while downstream exec nodes are still running. When off-heap column vectors are - // enabled, this can cause a use-after-free bug leading to a segfault. - // - // Instead, we use FileScanRDD's task completion listener to close this iterator. + capacity) val iter = new RecordReaderIterator(vectorizedReader) - try { - vectorizedReader.initialize(split, hadoopAttemptContext) - logDebug(s"Appending $partitionSchema ${file.partitionValues}") - vectorizedReader.initBatch(partitionSchema, file.partitionValues) - if (returningBatch) { - vectorizedReader.enableReturningBatches() - } - - // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. - iter.asInstanceOf[Iterator[InternalRow]] - } catch { - case e: Throwable => - // SPARK-23457: In case there is an exception in initialization, close the iterator to - // avoid leaking resources. - iter.close() - throw e + // SPARK-23457 Register a task completion listener before `initialization`. + taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) + vectorizedReader.initialize(split, hadoopAttemptContext) + logDebug(s"Appending $partitionSchema ${file.partitionValues}") + vectorizedReader.initBatch(partitionSchema, file.partitionValues) + if (returningBatch) { + vectorizedReader.enableReturningBatches() } + + // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. + iter.asInstanceOf[Iterator[InternalRow]] } else { logDebug(s"Falling back to parquet-mr") // ParquetRecordReader returns InternalRow val readSupport = new ParquetReadSupport( convertTz, enableVectorizedReader = false, - datetimeRebaseSpec, - int96RebaseSpec) + datetimeRebaseMode, + int96RebaseMode) val reader = if (pushed.isDefined && enableRecordFilter) { val parquetFilter = FilterCompat.get(pushed.get, null) new ParquetRecordReader[InternalRow](readSupport, parquetFilter) @@ -405,25 +389,19 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging new ParquetRecordReader[InternalRow](readSupport) } val iter = new RecordReaderIterator[InternalRow](reader) - try { - reader.initialize(split, hadoopAttemptContext) - - val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes - val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) - - if (partitionSchema.length == 0) { - // There is no partition columns - iter.map(unsafeProjection) - } else { - val joinedRow = new JoinedRow() - iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues))) - } - } catch { - case e: Throwable => - // SPARK-23457: In case there is an exception in initialization, close the iterator to - // avoid leaking resources. - iter.close() - throw e + // SPARK-23457 Register a task completion listener before `initialization`. + taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) + reader.initialize(split, hadoopAttemptContext) + + val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes + val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) + + if (partitionSchema.length == 0) { + // There is no partition columns + iter.map(unsafeProjection) + } else { + val joinedRow = new JoinedRow() + iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues))) } } } @@ -458,7 +436,8 @@ object ParquetFileFormat extends Logging { val converter = new ParquetToSparkSchemaConverter( sparkSession.sessionState.conf.isParquetBinaryAsString, - sparkSession.sessionState.conf.isParquetINT96AsTimestamp) + sparkSession.sessionState.conf.isParquetINT96AsTimestamp, + false) val seen = mutable.HashSet[String]() val finalSchemas: Seq[StructType] = footers.flatMap { diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala index 6b495105d9c6..8eae7995723f 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala @@ -46,9 +46,12 @@ abstract class AbstractBatchScanExec( override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters) - @transient override lazy val partitions: Seq[InputPartition] = batch.planInputPartitions() + @transient override lazy val inputPartitions: Seq[InputPartition] = batch.planInputPartitions() - @transient private lazy val filteredPartitions: Seq[InputPartition] = { + @transient override lazy val partitions: Seq[Seq[InputPartition]] = + batch.planInputPartitions().map(Seq(_)) + + @transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = { val dataSourceFilters = runtimeFilters.flatMap { case DynamicPruningExpression(e) => DataSourceStrategy.translateRuntimeFilter(e) case _ => None @@ -64,17 +67,7 @@ abstract class AbstractBatchScanExec( // call toBatch again to get filtered partitions val newPartitions = scan.toBatch.planInputPartitions() - originalPartitioning match { - case p: DataSourcePartitioning if p.numPartitions != newPartitions.size => - throw new SparkException( - "Data source must have preserved the original partitioning during runtime filtering; " + - s"reported num partitions: ${p.numPartitions}, " + - s"num partitions after runtime filtering: ${newPartitions.size}") - case _ => - // no validation is needed as the data source did not report any specific partitioning - } - - newPartitions + newPartitions.map(Seq(_)) } else { partitions } diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala index e445dd33a585..6290e86315f9 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala @@ -69,20 +69,9 @@ abstract class BatchScanExecShim( // call toBatch again to get filtered partitions val newPartitions = scan.toBatch.planInputPartitions() - - originalPartitioning match { - case p: DataSourcePartitioning if p.numPartitions != newPartitions.size => - throw new SparkException( - "Data source must have preserved the original partitioning during runtime filtering; " + - s"reported num partitions: ${p.numPartitions}, " + - s"num partitions after runtime filtering: ${newPartitions.size}") - case _ => - // no validation is needed as the data source did not report any specific partitioning - } - newPartitions.map(Seq(_)) } else { - partitions.map(Seq(_)) + partitions } } @@ -102,5 +91,9 @@ abstract class BatchScanExecShim( } abstract class ArrowBatchScanExecShim(original: BatchScanExec) extends DataSourceV2ScanExecBase { - @transient override lazy val partitions: Seq[InputPartition] = original.partitions + @transient override lazy val partitions: Seq[Seq[InputPartition]] = + original.partitions + @transient override lazy val inputPartitions: Seq[InputPartition] = original.inputPartitions + + override def keyGroupedPartitioning: Option[Seq[Expression]] = original.keyGroupedPartitioning } diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/utils/CatalogUtil.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/utils/CatalogUtil.scala index a5b951c23ef7..ef613dac91ec 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/utils/CatalogUtil.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/utils/CatalogUtil.scala @@ -32,9 +32,6 @@ object CatalogUtil { case IdentityTransform(FieldReference(Seq(col))) => identityCols += col - case BucketTransform(numBuckets, FieldReference(Seq(col))) => - bucketSpec = Some(BucketSpec(numBuckets, col :: Nil, Nil)) - case transform => throw new UnsupportedOperationException(s"Partitioning by expressions") } From 29e1613dd94c973b20e4c9d537ff30f075ca6222 Mon Sep 17 00:00:00 2001 From: binwei Date: Fri, 9 Aug 2024 23:40:31 +0000 Subject: [PATCH 2/6] update build script --- dev/buildbundle-veloxbe.sh | 12 ++---------- dev/builddeps-veloxbe.sh | 11 ++++++++--- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/dev/buildbundle-veloxbe.sh b/dev/buildbundle-veloxbe.sh index eaa82730bb25..0a0fe48eac8d 100755 --- a/dev/buildbundle-veloxbe.sh +++ b/dev/buildbundle-veloxbe.sh @@ -5,17 +5,9 @@ source "$BASEDIR/builddeps-veloxbe.sh" function build_for_spark { spark_version=$1 - mvn clean package -Pbackends-velox -Pceleborn -Puniffle -Pspark-$spark_version -DskipTests + mvn clean package -Pbackends-velox -Pceleborn -Puniffle -Pspark-$spark_version -DskipTests -Dcheckstyle.skip=true -DskipScalastyle=true } cd $GLUTEN_DIR -# SPARK_VERSION is defined in builddeps-veloxbe.sh -if [ "$SPARK_VERSION" = "ALL" ]; then - for spark_version in 3.2 3.3 3.4 3.5 - do - build_for_spark $spark_version - done -else - build_for_spark $SPARK_VERSION -fi +build_for_spark 3.2 diff --git a/dev/builddeps-veloxbe.sh b/dev/builddeps-veloxbe.sh index 82c9cfc8d6c0..2a7ade96932c 100755 --- a/dev/builddeps-veloxbe.sh +++ b/dev/builddeps-veloxbe.sh @@ -6,6 +6,10 @@ #################################################################################################### set -exu +export CFLAGS=" -g " +export CXXFLAGS=" -g " + + CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd) GLUTEN_DIR="$CURRENT_DIR/.." BUILD_TYPE=Release @@ -20,11 +24,12 @@ ENABLE_QAT=OFF ENABLE_IAA=OFF ENABLE_HBM=OFF ENABLE_GCS=OFF -ENABLE_S3=OFF +ENABLE_S3=ON ENABLE_HDFS=OFF ENABLE_ABFS=OFF -ENABLE_EP_CACHE=OFF -ENABLE_VCPKG=OFF +ENABLE_EP_CACHE=ON +ARROW_ENABLE_CUSTOM_CODEC=OFF +ENABLE_VCPKG=ON RUN_SETUP_SCRIPT=ON VELOX_REPO="" VELOX_BRANCH="" From ef3912880175d5c4ed9ec12a9ad2a573883d8dfc Mon Sep 17 00:00:00 2001 From: binwei Date: Fri, 9 Aug 2024 23:42:20 +0000 Subject: [PATCH 3/6] fix for pinterest build --- dev/builddeps-veloxbe.sh | 5 ++--- ep/build-velox/src/get_velox.sh | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/dev/builddeps-veloxbe.sh b/dev/builddeps-veloxbe.sh index 2a7ade96932c..1b9b16ffd2bb 100755 --- a/dev/builddeps-veloxbe.sh +++ b/dev/builddeps-veloxbe.sh @@ -15,9 +15,9 @@ GLUTEN_DIR="$CURRENT_DIR/.." BUILD_TYPE=Release BUILD_TESTS=OFF BUILD_EXAMPLES=OFF -BUILD_BENCHMARKS=OFF +BUILD_BENCHMARKS=ON BUILD_JEMALLOC=OFF -BUILD_PROTOBUF=OFF +BUILD_PROTOBUF=ON BUILD_VELOX_TESTS=OFF BUILD_VELOX_BENCHMARKS=OFF ENABLE_QAT=OFF @@ -28,7 +28,6 @@ ENABLE_S3=ON ENABLE_HDFS=OFF ENABLE_ABFS=OFF ENABLE_EP_CACHE=ON -ARROW_ENABLE_CUSTOM_CODEC=OFF ENABLE_VCPKG=ON RUN_SETUP_SCRIPT=ON VELOX_REPO="" diff --git a/ep/build-velox/src/get_velox.sh b/ep/build-velox/src/get_velox.sh index 7befe2aeddb0..8502bbf54b0f 100755 --- a/ep/build-velox/src/get_velox.sh +++ b/ep/build-velox/src/get_velox.sh @@ -16,8 +16,8 @@ set -exu -VELOX_REPO=https://github.com/oap-project/velox.git -VELOX_BRANCH=2024_08_09 +VELOX_REPO=https://github.com/yma11/velox.git +VELOX_BRANCH=golden-ptst VELOX_HOME="" OS=`uname -s` From 21e58cd6f89e5eb3996b687ec39c5410d0a219a9 Mon Sep 17 00:00:00 2001 From: binwei Date: Fri, 9 Aug 2024 23:43:25 +0000 Subject: [PATCH 4/6] fix arrow build --- .gitignore | 6 ++++++ dev/build_arrow.sh | 4 ++++ 2 files changed, 10 insertions(+) diff --git a/.gitignore b/.gitignore index 224f61bae6ec..f7483cf1c90d 100644 --- a/.gitignore +++ b/.gitignore @@ -95,3 +95,9 @@ dist/ /cpp-ch/local-engine/Parser/*_udf !/cpp-ch/local-engine/Parser/example_udf + + +# build arrow +dev/arrow_ep/ +ep/_ep/ + diff --git a/dev/build_arrow.sh b/dev/build_arrow.sh index e7496350f988..58cc1d44e244 100755 --- a/dev/build_arrow.sh +++ b/dev/build_arrow.sh @@ -23,6 +23,10 @@ VELOX_ARROW_BUILD_VERSION=15.0.0 ARROW_PREFIX=$CURRENT_DIR/../ep/_ep/arrow_ep BUILD_TYPE=Release + +export CFLAGS=" -g " +export CXXFLAGS=" -g " + function prepare_arrow_build() { mkdir -p ${ARROW_PREFIX}/../ && pushd ${ARROW_PREFIX}/../ && sudo rm -rf arrow_ep/ wget_and_untar https://archive.apache.org/dist/arrow/arrow-${VELOX_ARROW_BUILD_VERSION}/apache-arrow-${VELOX_ARROW_BUILD_VERSION}.tar.gz arrow_ep From 22676e13bcac849584cae4a8149ed90014a50b8b Mon Sep 17 00:00:00 2001 From: binwei Date: Fri, 9 Aug 2024 23:51:43 +0000 Subject: [PATCH 5/6] enable all complex data type --- .../backendsapi/velox/VeloxBackend.scala | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 33efdbc5ec93..84c3aa301366 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -114,28 +114,28 @@ object VeloxBackendSettings extends BackendSettingsApi { case ParquetReadFormat => val typeValidator: PartialFunction[StructField, String] = { // Parquet scan of nested array with struct/array as element type is unsupported in Velox. - case StructField(_, arrayType: ArrayType, _, _) - if arrayType.elementType.isInstanceOf[StructType] => - "StructType as element in ArrayType" - case StructField(_, arrayType: ArrayType, _, _) - if arrayType.elementType.isInstanceOf[ArrayType] => - "ArrayType as element in ArrayType" + // case StructField(_, arrayType: ArrayType, _, _) + // if arrayType.elementType.isInstanceOf[StructType] => + // "StructType as element in ArrayType" + // case StructField(_, arrayType: ArrayType, _, _) + // if arrayType.elementType.isInstanceOf[ArrayType] => + // "ArrayType as element in ArrayType" // Parquet scan of nested map with struct as key type, // or array type as value type is not supported in Velox. - case StructField(_, mapType: MapType, _, _) if mapType.keyType.isInstanceOf[StructType] => - "StructType as Key in MapType" - case StructField(_, mapType: MapType, _, _) - if mapType.valueType.isInstanceOf[ArrayType] => - "ArrayType as Value in MapType" + // case StructField(_, mapType: MapType, _, _) if mapType.keyType.isInstanceOf[StructType] => + // "StructType as Key in MapType" + // case StructField(_, mapType: MapType, _, _) + // if mapType.valueType.isInstanceOf[ArrayType] => + // "ArrayType as Value in MapType" case StructField(_, TimestampType, _, _) if GlutenConfig.getConf.forceParquetTimestampTypeScanFallbackEnabled => "TimestampType" } - if (!GlutenConfig.getConf.forceComplexTypeScanFallbackEnabled) { - validateTypes(typeValidator) - } else { - validateTypes(parquetTypeValidatorWithComplexTypeFallback) - } + // if (!GlutenConfig.getConf.forceComplexTypeScanFallbackEnabled) { + validateTypes(typeValidator) + // } else { + // validateTypes(parquetTypeValidatorWithComplexTypeFallback) + // } case DwrfReadFormat => ValidationResult.succeeded case OrcReadFormat => if (!GlutenConfig.getConf.veloxOrcScanEnabled) { From c6d8d36857f6a4aba48e9b2ec0e52cfa384b27bb Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Wed, 14 Aug 2024 11:47:26 +0000 Subject: [PATCH 6/6] support user udf --- .../velox/VeloxSparkPlanExecApi.scala | 8 + .../datasource/ArrowConvertorRule.scala | 11 +- .../execution/ProjectColumnarExec.scala | 117 +++++ .../execution/RowToVeloxColumnarExec.scala | 20 + .../SparkPartialProjectColumnarExec.scala | 410 ++++++++++++++++++ .../execution/VeloxColumnarToRowExec.scala | 15 + .../apache/spark/sql/hive/HiveUdfUtil.scala | 30 ++ .../expression/UDFPartialProjectSuite.scala | 129 ++++++ cpp/core/jni/JniWrapper.cc | 13 + cpp/core/memory/ColumnarBatch.h | 4 + cpp/velox/memory/VeloxColumnarBatch.h | 5 + .../serializer/VeloxRowToColumnarConverter.cc | 270 ++++++++++++ .../serializer/VeloxRowToColumnarConverter.h | 4 +- cpp/velox/tests/VeloxRowToColumnarTest.cc | 50 ++- .../gluten/backendsapi/SparkPlanExecApi.scala | 9 +- .../expression/ExpressionConverter.scala | 18 +- .../columnar/MiscColumnarRules.scala | 4 +- .../columnar/OffloadSingleNode.scala | 33 ++ .../columnar/validator/Validators.scala | 2 +- .../ColumnarBatchJniWrapper.java | 2 + .../gluten/columnarbatch/ColumnarBatches.java | 35 ++ .../vectorized/ArrowWritableColumnVector.java | 5 +- .../spark/sql/utils/SparkSchemaUtil.scala | 10 + gluten-ut/pom.xml | 5 + .../apache/gluten/execution/CustomerUDF.java | 39 ++ .../utils/velox/VeloxTestSettings.scala | 3 +- .../hive/execution/GlutenHiveUDFSuite.scala | 139 ++++++ .../apache/gluten/execution/CustomerUDF.java | 39 ++ .../utils/velox/VeloxTestSettings.scala | 3 +- .../hive/execution/GlutenHiveUDFSuite.scala | 139 ++++++ pom.xml | 7 + .../org/apache/gluten/GlutenConfig.scala | 16 + .../gluten/expression/ExpressionNames.scala | 1 + .../sql/shims/spark35/Spark35Shims.scala | 4 +- 34 files changed, 1573 insertions(+), 26 deletions(-) create mode 100644 backends-velox/src/main/scala/org/apache/gluten/execution/ProjectColumnarExec.scala create mode 100644 backends-velox/src/main/scala/org/apache/gluten/execution/SparkPartialProjectColumnarExec.scala create mode 100644 backends-velox/src/main/scala/org/apache/spark/sql/hive/HiveUdfUtil.scala create mode 100644 backends-velox/src/test/scala/org/apache/gluten/expression/UDFPartialProjectSuite.scala create mode 100644 gluten-ut/spark32/src/test/java/org/apache/gluten/execution/CustomerUDF.java create mode 100644 gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveUDFSuite.scala create mode 100644 gluten-ut/spark35/src/test/java/org/apache/gluten/execution/CustomerUDF.java create mode 100644 gluten-ut/spark35/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveUDFSuite.scala diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index c1c5332d75fd..87b9cc2cbcb7 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -315,6 +315,14 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { FilterExecTransformer(condition, child) } + override def genSparkPartialProjectColumnarExec(original: ProjectExec): GlutenPlan = { + SparkPartialProjectColumnarExec.create(original) + } + + override def genProjectColumnarExec(original: ProjectExec): GlutenPlan = { + ProjectColumnarExec(original.projectList, original.child) + } + /** Generate HashAggregateExecTransformer. */ override def genHashAggregateExecTransformer( requiredChildDistributionExpressions: Option[Seq[Expression]], diff --git a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowConvertorRule.scala b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowConvertorRule.scala index 2778710155bf..34583d0449e2 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowConvertorRule.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowConvertorRule.scala @@ -87,7 +87,7 @@ case class ArrowConvertorRule(session: SparkSession) extends Rule[LogicalPlan] { options, columnPruning = session.sessionState.conf.csvColumnPruning, session.sessionState.conf.sessionLocalTimeZone) - checkSchema(dataSchema) && + SparkSchemaUtil.checkSchema(dataSchema) && checkCsvOptions(csvOptions, session.sessionState.conf.sessionLocalTimeZone) && dataSchema.nonEmpty } @@ -106,13 +106,4 @@ case class ArrowConvertorRule(session: SparkSession) extends Rule[LogicalPlan] { SparkShimLoader.getSparkShims.dateTimestampFormatInReadIsDefaultValue(csvOptions, timeZone) } - private def checkSchema(schema: StructType): Boolean = { - try { - SparkSchemaUtil.toArrowSchema(schema) - true - } catch { - case _: Exception => - false - } - } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/ProjectColumnarExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/ProjectColumnarExec.scala new file mode 100644 index 000000000000..8e57a107bb03 --- /dev/null +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/ProjectColumnarExec.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.GlutenConfig +import org.apache.gluten.columnarbatch.ColumnarBatches +import org.apache.gluten.extension.{GlutenPlan, ValidationResult} +import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators +import org.apache.gluten.sql.shims.SparkShimLoader +import org.apache.gluten.utils.iterator.Iterators +import org.apache.gluten.vectorized.ArrowWritableColumnVector + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, MutableProjection, NamedExpression, SortOrder} +import org.apache.spark.sql.execution.{OrderPreservingNodeShim, PartitioningPreservingNodeShim, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.vectorized.{MutableColumnarRow, WritableColumnVector} +import org.apache.spark.sql.utils.SparkSchemaUtil +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} + +case class ProjectColumnarExec(projectList: Seq[NamedExpression], child: SparkPlan) + extends UnaryExecNode + with PartitioningPreservingNodeShim + with OrderPreservingNodeShim + with GlutenPlan { + + override protected def orderingExpressions: Seq[SortOrder] = child.outputOrdering + + override protected def outputExpressions: Seq[NamedExpression] = projectList + + override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException() + + override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = + copy(child = newChild) + + override def output: Seq[Attribute] = projectList.map(_.toAttribute) + + override def supportsColumnar: Boolean = true + + override def doExecuteColumnar(): RDD[ColumnarBatch] = { + child.executeColumnar().mapPartitions { + batches => + val res: Iterator[Iterator[ColumnarBatch]] = new Iterator[Iterator[ColumnarBatch]] { + override def hasNext: Boolean = batches.hasNext + + override def next(): Iterator[ColumnarBatch] = { + val batch = batches.next() + if (batch.numRows == 0) { + Iterator.empty + } else { + val proj = MutableProjection.create(projectList, child.output) + val numRows = batch.numRows() + val arrowBatch = + ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance(), batch) + + val schema = + SparkShimLoader.getSparkShims.structFromAttributes(child.output.map(_.toAttribute)) + val vectors: Array[WritableColumnVector] = ArrowWritableColumnVector + .allocateColumns(numRows, schema) + .map { + vector => + vector.setValueCount(numRows) + vector.asInstanceOf[WritableColumnVector] + } + val targetRow = new MutableColumnarRow(vectors) + for (i <- 0 until numRows) { + targetRow.rowId = i + proj.target(targetRow).apply(arrowBatch.getRow(i)) + } + val targetBatch = + new ColumnarBatch(vectors.map(_.asInstanceOf[ColumnVector]), numRows) + val veloxBatch = ColumnarBatches + .ensureOffloaded(ArrowBufferAllocators.contextInstance(), targetBatch) + Iterators + .wrap(Iterator.single(veloxBatch)) + .recycleIterator({ + arrowBatch.close() + targetBatch.close() + }) + .create() + + } + } + } + Iterators + .wrap(res.flatten) + .protectInvocationFlow() // Spark may call `hasNext()` again after a false output which + // is not allowed by Gluten iterators. E.g. GroupedIterator#fetchNextGroupIterator + .recyclePayload(_.close()) + .create() + } + } + + override protected def doValidateInternal(): ValidationResult = { + if (!GlutenConfig.getConf.enableProjectColumnarExec) { + return ValidationResult.failed("Config disable this feature") + } + if (!(SparkSchemaUtil.checkSchema(schema) && SparkSchemaUtil.checkSchema(child.schema))) { + return ValidationResult.failed("Input type or output type cannot convert to arrow") + } + ValidationResult.succeeded + } +} diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala index 9ceb6b5b6e52..6bc53f423f4d 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala @@ -48,6 +48,9 @@ case class RowToVeloxColumnarExec(child: SparkPlan) extends RowToColumnarExecBas val numInputRows = longMetric("numInputRows") val numOutputBatches = longMetric("numOutputBatches") val convertTime = longMetric("convertTime") + + // Instead of creating a new config we are reusing columnBatchSize. In the future if we do + // combine with some of the Arrow conversion tools we will need to unify some of the configs. val numRows = GlutenConfig.getConf.maxBatchSize // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire // plan (this) in the closure. @@ -92,6 +95,23 @@ case class RowToVeloxColumnarExec(child: SparkPlan) extends RowToColumnarExecBas } object RowToVeloxColumnarExec { + + def toColumnarBatchIterator( + in: Iterator[InternalRow], + schema: StructType, + columnBatchSize: Int): Iterator[ColumnarBatch] = { + val numInputRows = new SQLMetric("numInputRows") + val numOutputBatches = new SQLMetric("numOutputBatches") + val convertTime = new SQLMetric("convertTime") + RowToVeloxColumnarExec.toColumnarBatchIterator( + in, + schema, + numInputRows, + numOutputBatches, + convertTime, + columnBatchSize) + } + def toColumnarBatchIterator( it: Iterator[InternalRow], schema: StructType, diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/SparkPartialProjectColumnarExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/SparkPartialProjectColumnarExec.scala new file mode 100644 index 000000000000..3214ac13b791 --- /dev/null +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/SparkPartialProjectColumnarExec.scala @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.GlutenConfig +import org.apache.gluten.columnarbatch.ColumnarBatches +import org.apache.gluten.extension.{GlutenPlan, ValidationResult} +import org.apache.gluten.extension.columnar.validator.Validator.Passed +import org.apache.gluten.extension.columnar.validator.Validators.FallbackComplexExpressions +import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators +import org.apache.gluten.sql.shims.SparkShimLoader +import org.apache.gluten.utils.iterator.Iterators +import org.apache.gluten.vectorized.ArrowWritableColumnVector + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CaseWhen, Coalesce, Expression, If, LambdaFunction, MutableProjection, NamedExpression, NaNvl, ScalaUDF, UnsafeProjection} +import org.apache.spark.sql.execution.{ExplainUtils, ProjectExec, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.execution.vectorized.{MutableColumnarRow, WritableColumnVector} +import org.apache.spark.sql.hive.HiveUdfUtil +import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, NullType, ShortType, StringType, TimestampType, YearMonthIntervalType} +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} + +import scala.collection.mutable.ListBuffer + +/** + * Change the Project to ProjectExecTransformer + SparkPartialProjectColumnarExec e.g. sum(myudf(a) + * + b + hash(c)), child is (a, b,c ) SparkPartialProjectColumnarExec (a, b, c, myudf(a)), + * ProjectExecTransformer(myudf(a) + b + hash(c)) + * + * @param original + * extract the ScalaUDF from original project list as Alias in UnsafeProjection and + * AttributeReference in SparkPartialProjectColumnarExec output + * @param child + * child plan + */ +case class SparkPartialProjectColumnarExec(original: ProjectExec, child: SparkPlan)( + replacedAliasUdf: ListBuffer[Alias]) + extends UnaryExecNode + with GlutenPlan { + + private val debug = GlutenConfig.getConf.debug + + private val projectAttributes: ListBuffer[Attribute] = ListBuffer() + private val projectIndexInChild: ListBuffer[Int] = ListBuffer() + private var UDFAttrNotExists = false + private var hasComplexDataType = replacedAliasUdf.exists(a => !validateDataType(a.dataType)) + if (!hasComplexDataType) { + getProjectIndexInChildOutput(replacedAliasUdf) + } + + @transient override lazy val metrics = Map( + "time" -> SQLMetrics.createTimingMetric(sparkContext, "time of project"), + "column_to_row_time" -> SQLMetrics.createTimingMetric( + sparkContext, + "time of velox to Arrow ColumnarBatch"), + "row_to_column_time" -> SQLMetrics.createTimingMetric( + sparkContext, + "time of Arrow ColumnarBatch to velox") + ) + + override def output: Seq[Attribute] = child.output ++ replacedAliasUdf.map(_.toAttribute) + + final override def doExecute(): RDD[InternalRow] = { + throw new UnsupportedOperationException( + s"${this.getClass.getSimpleName} doesn't support doExecute") + } + + final override protected def otherCopyArgs: Seq[AnyRef] = { + replacedAliasUdf :: Nil + } + + final override lazy val supportsColumnar: Boolean = true + + private def validateExpression(expr: Expression): Boolean = { + expr.deterministic && !expr.isInstanceOf[LambdaFunction] && expr.children + .forall(validateExpression) + } + + private def validateDataType(dataType: DataType): Boolean = { + dataType match { + case _: BooleanType => true + case _: ByteType => true + case _: ShortType => true + case _: IntegerType => true + case _: LongType => true + case _: FloatType => true + case _: DoubleType => true + case _: StringType => true + case _: TimestampType => true + case _: DateType => true + case _: BinaryType => true + case _: DecimalType => true + case YearMonthIntervalType.DEFAULT => true + case _: NullType => true + case _ => false + } + } + + private def getProjectIndexInChildOutput(exprs: Seq[Expression]): Unit = { + exprs.foreach { + case a: AttributeReference => + val index = child.output.indexWhere(s => s.exprId.equals(a.exprId)) + // Some child operator as HashAggregateTransformer will not have udf child column + if (index < 0) { + UDFAttrNotExists = true + log.debug(s"Expression $a should exist in child output ${child.output}") + return + } else if (!validateDataType(a.dataType)) { + hasComplexDataType = true + log.debug(s"Expression $a contains unsupported data type ${a.dataType}") + } else if (!projectIndexInChild.contains(index)) { + projectAttributes.append(a.toAttribute) + projectIndexInChild.append(index) + } + case p => getProjectIndexInChildOutput(p.children) + } + } + + override protected def doValidateInternal(): ValidationResult = { + if (!GlutenConfig.getConf.enableColumnarPartialProject) { + return ValidationResult.failed("Config disable this feature") + } + if (UDFAttrNotExists) { + ValidationResult.failed("Attribute in the UDF does not exists in its child") + } else if (hasComplexDataType) { + ValidationResult.failed("Attribute in the UDF contains unsupported type") + } else if (projectAttributes.size == child.output.size) { + ValidationResult.failed("UDF need all the columns in child output") + } else if (original.output.isEmpty) { + ValidationResult.failed("Project fallback because output is empty") + } else if (replacedAliasUdf.isEmpty) { + ValidationResult.failed("No UDF") + } else if (replacedAliasUdf.size > original.output.size) { + // e.g. udf1(col) + udf2(col), it will introduce 2 cols for r2c + ValidationResult.failed("Number of RowToColumn columns is more than ProjectExec") + } else if (!original.projectList.forall(validateExpression(_))) { + ValidationResult.failed("Contains expression not supported") + } else if (isComplexExpression()) { + ValidationResult.failed("Fallback by complex expression") + } else { + ValidationResult.succeeded + } + } + + private def isComplexExpression(): Boolean = { + new FallbackComplexExpressions(GlutenConfig.getConf.fallbackExpressionsThreshold) + .validate(original) match { + case Passed => false + case _ => true + } + } + + override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { + val totalTime = longMetric("time") + val c2r = longMetric("column_to_row_time") + val r2c = longMetric("row_to_column_time") + val isMutable = canUseMutableProjection() + child.executeColumnar().mapPartitions { + batches => + val res: Iterator[Iterator[ColumnarBatch]] = new Iterator[Iterator[ColumnarBatch]] { + override def hasNext: Boolean = batches.hasNext + + override def next(): Iterator[ColumnarBatch] = { + val batch = batches.next() + if (batch.numRows == 0) { + Iterator.empty + } else { + val start = System.currentTimeMillis() + val childData = ColumnarBatches.select(batch, projectIndexInChild.toArray) + val projectedBatch = if (isMutable) { + getProjectedBatchArrow(childData, c2r, r2c) + } else getProjectedBatch(childData, c2r, r2c) + val batchIterator = projectedBatch.map { + b => +// print("batch 1" + ColumnarBatches.toString(batch, 0, 20) + "\n") +// print("batch 2" + ColumnarBatches.toString(b, 0, 20) + "\n") + val compositeBatch = if (b.numCols() != 0) { + val handle = ColumnarBatches.compose(batch, b) + b.close() + ColumnarBatches.create(handle) + } else { + b.close() + ColumnarBatches.retain(batch) + batch + } + if (debug && compositeBatch.numCols() != output.length) { + throw new IllegalStateException( + s"Composite batch column number is ${compositeBatch.numCols()}, " + + s"output size is ${output.length}, " + + s"original batch column number is ${batch.numCols()}") + } + compositeBatch + } + childData.close() + totalTime += System.currentTimeMillis() - start + batchIterator + } + } + } + Iterators + .wrap(res.flatten) + .protectInvocationFlow() // Spark may call `hasNext()` again after a false output which + // is not allowed by Gluten iterators. E.g. GroupedIterator#fetchNextGroupIterator + .recyclePayload(_.close()) + .create() + + } + } + + // scalastyle:off line.size.limit + // String type cannot use MutableProjection + // Otherwise will throw java.lang.UnsupportedOperationException: Datatype not supported StringType + // at org.apache.spark.sql.execution.vectorized.MutableColumnarRow.update(MutableColumnarRow.java:224) + // at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown Source) + // scalastyle:on line.size.limit + private def canUseMutableProjection(): Boolean = { + replacedAliasUdf.forall( + r => + r.dataType match { + case StringType | BinaryType => false + case _ => true + }) + } + + /** + * add c2r and r2c for unsupported expression child data c2r get Iterator[InternalRow], then call + * Spark project, then r2c + */ + private def getProjectedBatch( + childData: ColumnarBatch, + c2r: SQLMetric, + r2c: SQLMetric): Iterator[ColumnarBatch] = { + // select part of child output and child data + val proj = UnsafeProjection.create(replacedAliasUdf, projectAttributes) + val numOutputRows = new SQLMetric("numOutputRows") + val numInputBatches = new SQLMetric("numInputBatches") + val rows = VeloxColumnarToRowExec + .toRowIterator( + Iterator.single[ColumnarBatch](childData), + projectAttributes, + numOutputRows, + numInputBatches, + c2r) + .map(proj) + + val schema = + SparkShimLoader.getSparkShims.structFromAttributes(replacedAliasUdf.map(_.toAttribute)) + RowToVeloxColumnarExec.toColumnarBatchIterator( + rows, + schema, + numOutputRows, + numInputBatches, + r2c, + childData.numRows()) + // TODO: should check the size <= 1, but now it has bug, will change iterator to empty + } + + private def getProjectedBatchArrow( + childData: ColumnarBatch, + c2r: SQLMetric, + r2c: SQLMetric): Iterator[ColumnarBatch] = { + // select part of child output and child data + val proj = MutableProjection.create(replacedAliasUdf, projectAttributes) + val numRows = childData.numRows() + val start = System.currentTimeMillis() + val arrowBatch = + ColumnarBatches.ensureLoadedWithoutRefcount( + ArrowBufferAllocators.contextInstance(), + childData) + c2r += System.currentTimeMillis() - start + + val schema = + SparkShimLoader.getSparkShims.structFromAttributes(replacedAliasUdf.map(_.toAttribute)) + val vectors: Array[WritableColumnVector] = ArrowWritableColumnVector + .allocateColumns(numRows, schema) + .map { + vector => + vector.setValueCount(numRows) + vector + } + val targetRow = new MutableColumnarRow(vectors) + for (i <- 0 until numRows) { + targetRow.rowId = i + proj.target(targetRow).apply(arrowBatch.getRow(i)) + } + val targetBatch = new ColumnarBatch(vectors.map(_.asInstanceOf[ColumnVector]), numRows) + val start2 = System.currentTimeMillis() + val veloxBatch = + ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance(), targetBatch) + r2c += System.currentTimeMillis() - start2 + Iterators + .wrap(Iterator.single(veloxBatch)) + .recycleIterator({ + arrowBatch.close() + targetBatch.close() + }) + .create() + // TODO: should check the size <= 1, but now it has bug, will change iterator to empty + } + + override def verboseStringWithOperatorId(): String = { + s""" + |$formattedNodeName + |${ExplainUtils.generateFieldString("Output", output)} + |${ExplainUtils.generateFieldString("Input", child.output)} + |${ExplainUtils.generateFieldString("ScalaUDF", replacedAliasUdf)} + |${ExplainUtils.generateFieldString("ProjectOutput", projectAttributes)} + |${ExplainUtils.generateFieldString("ProjectInputIndex", projectIndexInChild)} + |""".stripMargin + } + + override def simpleString(maxFields: Int): String = + super.simpleString(maxFields) + " PartialProject " + replacedAliasUdf + + override protected def withNewChildInternal( + newChild: SparkPlan): SparkPartialProjectColumnarExec = { + copy(child = newChild)(replacedAliasUdf) + } +} + +object SparkPartialProjectColumnarExec { + + val projectPrefix = "_SparkPartialProject" + + private def containsUDF(expr: Expression): Boolean = { + if (expr == null) return false + expr match { + case _: ScalaUDF => true + case h if HiveUdfUtil.isHiveUdf(h) => true + case p => p.children.exists(c => containsUDF(c)) + } + } + + private def replaceByAlias(expr: Expression, replacedAliasUdf: ListBuffer[Alias]): Expression = { + val replaceIndex = replacedAliasUdf.indexWhere(r => r.child.equals(expr)) + if (replaceIndex == -1) { + val replace = Alias(expr, s"$projectPrefix${replacedAliasUdf.size}")() + replacedAliasUdf.append(replace) + replace.toAttribute + } else { + replacedAliasUdf(replaceIndex).toAttribute + } + } + + private def isConditionalExpression(expr: Expression): Boolean = expr match { + case _: If => true + case _: CaseWhen => true + case _: NaNvl => true + case _: Coalesce => true + case _ => false + } + + private def replaceExpressionUDF( + expr: Expression, + replacedAliasUdf: ListBuffer[Alias]): Expression = { + if (expr == null) return null + expr match { + case u: ScalaUDF => + replaceByAlias(u, replacedAliasUdf) + case h if HiveUdfUtil.isHiveUdf(h) => + replaceByAlias(h, replacedAliasUdf) + case au @ Alias(_: ScalaUDF, _) => + val replaceIndex = replacedAliasUdf.indexWhere(r => r.exprId == au.exprId) + if (replaceIndex == -1) { + replacedAliasUdf.append(au) + au.toAttribute + } else { + replacedAliasUdf(replaceIndex).toAttribute + } + // Alias(HiveSimpleUDF) not exists, only be Alias(ToPrettyString(HiveSimpleUDF)), + // so don't process this condition + case x if isConditionalExpression(x) => + // For example: + // myudf is udf((x: Int) => x + 1) + // if (isnull(cast(l_extendedprice#9 as bigint))) null + // else myudf(knownnotnull(cast(l_extendedprice#9 as bigint))) + // if we extract else branch, and use the data child l_extendedprice, + // the result is incorrect for null value + if (containsUDF(expr)) { + replaceByAlias(expr, replacedAliasUdf) + } else expr + case p => p.withNewChildren(p.children.map(c => replaceExpressionUDF(c, replacedAliasUdf))) + } + } + + def create(original: ProjectExec): ProjectExecTransformer = { + val replacedAliasUdf: ListBuffer[Alias] = ListBuffer() + val newProjectList = original.projectList.map { + p => replaceExpressionUDF(p, replacedAliasUdf).asInstanceOf[NamedExpression] + } + val partialProject = SparkPartialProjectColumnarExec(original, original.child)(replacedAliasUdf) + ProjectExecTransformer(newProjectList, partialProject) + } +} diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala index d3fb9c3ffc70..34796655f9c7 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala @@ -99,6 +99,21 @@ case class VeloxColumnarToRowExec(child: SparkPlan) extends ColumnarToRowExecBas } object VeloxColumnarToRowExec { + + def toRowIterator( + batches: Iterator[ColumnarBatch], + output: Seq[Attribute]): Iterator[InternalRow] = { + val numOutputRows = new SQLMetric("numOutputRows") + val numInputBatches = new SQLMetric("numInputBatches") + val convertTime = new SQLMetric("convertTime") + toRowIterator( + batches, + output, + numOutputRows, + numInputBatches, + convertTime + ) + } def toRowIterator( batches: Iterator[ColumnarBatch], output: Seq[Attribute], diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/hive/HiveUdfUtil.scala b/backends-velox/src/main/scala/org/apache/spark/sql/hive/HiveUdfUtil.scala new file mode 100644 index 000000000000..76864558960c --- /dev/null +++ b/backends-velox/src/main/scala/org/apache/spark/sql/hive/HiveUdfUtil.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.spark.sql.catalyst.expressions.Expression + +object HiveUdfUtil { + def isHiveUdf(expr: Expression): Boolean = expr match { + case _: HiveSimpleUDF => true + case _: HiveGenericUDF => true + case _: HiveUDAFFunction => true + case _: HiveGenericUDTF => true + case _ => false + } + +} diff --git a/backends-velox/src/test/scala/org/apache/gluten/expression/UDFPartialProjectSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/expression/UDFPartialProjectSuite.scala new file mode 100644 index 000000000000..318df93fd702 --- /dev/null +++ b/backends-velox/src/test/scala/org/apache/gluten/expression/UDFPartialProjectSuite.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.expression + +import org.apache.gluten.execution.{SparkPartialProjectColumnarExec, WholeStageTransformerSuite} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.catalyst.optimizer.{ConstantFolding, NullPropagation} +import org.apache.spark.sql.functions.udf + +import java.io.File + +class UDFPartialProjectSuite extends WholeStageTransformerSuite { + disableFallbackCheck + override protected val resourcePath: String = "/tpch-data-parquet-velox" + override protected val fileFormat: String = "parquet" + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + .set("spark.sql.files.maxPartitionBytes", "1g") + .set("spark.sql.shuffle.partitions", "1") + .set("spark.memory.offHeap.size", "2g") + .set("spark.unsafe.exceptionOnMemoryLeak", "true") + .set("spark.sql.autoBroadcastJoinThreshold", "-1") + .set("spark.sql.sources.useV1SourceList", "avro") + .set( + "spark.sql.optimizer.excludedRules", + ConstantFolding.ruleName + "," + + NullPropagation.ruleName) + .set("spark.gluten.sql.debug", "false") + } + + override def beforeAll(): Unit = { + super.beforeAll() + val table = "lineitem" + val tableDir = getClass.getResource(resourcePath).getFile + val tablePath = new File(tableDir, table).getAbsolutePath + val tableDF = spark.read.format(fileFormat).load(tablePath) + tableDF.createOrReplaceTempView(table) + + val plusOne = udf((x: Long) => x + 1) + spark.udf.register("plus_one", plusOne) + val noArgument = udf(() => 15) + spark.udf.register("no_argument", noArgument) + + } + + ignore("test plus_one") { + runQueryAndCompare("SELECT sum(plus_one(cast(l_orderkey as long))) from lineitem") { + checkGlutenOperatorMatch[SparkPartialProjectColumnarExec] + } + } + + ignore("test plus_one with column used twice") { + runQueryAndCompare( + "SELECT sum(plus_one(cast(l_orderkey as long)) + hash(l_orderkey)) from lineitem") { + checkGlutenOperatorMatch[SparkPartialProjectColumnarExec] + } + } + + ignore("test plus_one without cast") { + runQueryAndCompare("SELECT sum(plus_one(l_orderkey) + hash(l_orderkey)) from lineitem") { + checkGlutenOperatorMatch[SparkPartialProjectColumnarExec] + } + } + + test("test plus_one with many columns") { + runQueryAndCompare( + "SELECT sum(plus_one(cast(l_orderkey as long)) + hash(l_partkey))" + + "from lineitem " + + "where l_orderkey < 3") { + checkGlutenOperatorMatch[SparkPartialProjectColumnarExec] + } + } + + test("test plus_one with many columns in project") { + runQueryAndCompare("SELECT plus_one(cast(l_orderkey as long)), hash(l_partkey) from lineitem") { + checkGlutenOperatorMatch[SparkPartialProjectColumnarExec] + } + } + + test("test function no argument") { + runQueryAndCompare("""SELECT no_argument(), l_orderkey + | from lineitem limit 100""".stripMargin) { + checkGlutenOperatorMatch[SparkPartialProjectColumnarExec] + } + } + + test("test nondeterministic function input_file_name") { + val df = spark.sql("""SELECT input_file_name(), l_orderkey + | from lineitem limit 100""".stripMargin) + df.collect() + assert( + df.queryExecution.executedPlan + .find(p => p.isInstanceOf[SparkPartialProjectColumnarExec]) + .isEmpty) + } + + test("udf in agg simple") { + runQueryAndCompare("""select sum(hash(plus_one(l_extendedprice)) + hash(l_orderkey) ) as revenue + | from lineitem""".stripMargin) { + checkGlutenOperatorMatch[SparkPartialProjectColumnarExec] + } + } + + test("udf in agg") { + runQueryAndCompare("""select sum(hash(plus_one(l_extendedprice)) * l_discount + | + hash(l_orderkey) + hash(l_comment)) as revenue + | from lineitem""".stripMargin) { + checkGlutenOperatorMatch[SparkPartialProjectColumnarExec] + } + } + +} diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 5c2752f18ae7..addf81ef9f39 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -732,6 +732,19 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWra JNI_METHOD_END(kInvalidObjectHandle) } +JNIEXPORT jstring JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrapper_toString( // NOLINT + JNIEnv* env, + jobject wrapper, + jlong handle, + jint start, + jint length) { + JNI_METHOD_START + GLUTEN_CHECK(length >= 0, "ColumnarBatch toString length should be greater or equal than 0"); + auto batch = ObjectStore::retrieve(handle); + return env->NewStringUTF(batch->toString(start, length).c_str()); + JNI_METHOD_END(nullptr) +} + JNIEXPORT jlong JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrapper_getForEmptySchema( // NOLINT JNIEnv* env, jobject wrapper, diff --git a/cpp/core/memory/ColumnarBatch.h b/cpp/core/memory/ColumnarBatch.h index fd8189aa6a20..b4ab5f220f08 100644 --- a/cpp/core/memory/ColumnarBatch.h +++ b/cpp/core/memory/ColumnarBatch.h @@ -52,6 +52,10 @@ class ColumnarBatch { // Serializes one single row to byte array that can be accessed as Spark-compatible unsafe row. virtual std::vector toUnsafeRow(int32_t rowId) const; + virtual std::string toString(int32_t start, int32_t length) const { + throw GlutenException("Not implement"); + } + friend std::ostream& operator<<(std::ostream& os, const ColumnarBatch& columnarBatch); private: diff --git a/cpp/velox/memory/VeloxColumnarBatch.h b/cpp/velox/memory/VeloxColumnarBatch.h index 6c79f2772d2d..bac2506cce3c 100644 --- a/cpp/velox/memory/VeloxColumnarBatch.h +++ b/cpp/velox/memory/VeloxColumnarBatch.h @@ -42,6 +42,11 @@ class VeloxColumnarBatch final : public ColumnarBatch { std::shared_ptr exportArrowSchema() override; std::shared_ptr exportArrowArray() override; std::vector toUnsafeRow(int32_t rowId) const override; + + std::string toString(int32_t start, int32_t length) const override { + return getRowVector()->toString(start, start + length); + } + std::shared_ptr select(facebook::velox::memory::MemoryPool* pool, std::vector columnIndices); facebook::velox::RowVectorPtr getRowVector() const; facebook::velox::RowVectorPtr getFlattenedRowVector(); diff --git a/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.cc b/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.cc index 75fa3c3d3262..a247b4021a42 100644 --- a/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.cc +++ b/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.cc @@ -21,6 +21,248 @@ #include "velox/vector/arrow/Bridge.h" using namespace facebook::velox; +namespace { + +inline int64_t calculateBitSetWidthInBytes(int32_t numFields) { + return ((numFields + 63) / 64) * 8; +} + +inline int64_t getFieldOffset(int64_t nullBitsetWidthInBytes, int32_t index) { + return nullBitsetWidthInBytes + 8L * index; +} + +inline bool isNull(uint8_t* buffer_address, int32_t index) { + int64_t mask = 1L << (index & 0x3f); // mod 64 and shift + int64_t wordOffset = (index >> 6) * 8; + int64_t value = *((int64_t*)(buffer_address + wordOffset)); + return (value & mask) != 0; +} + +int32_t getTotalStringSize( + int32_t columnIdx, + int32_t numRows, + int64_t fieldOffset, + std::vector& offsets, + uint8_t* memoryAddress) { + size_t size = 0; + for (auto pos = 0; pos < numRows; pos++) { + if (isNull(memoryAddress + offsets[pos], columnIdx)) { + continue; + } + + int64_t offsetAndSize = *(int64_t*)(memoryAddress + offsets[pos] + fieldOffset); + int32_t length = static_cast(offsetAndSize); + if (!StringView::isInline(length)) { + size += length; + } + } + return size; +} + +template +VectorPtr createFlatVector( + const TypePtr& type, + int32_t columnIdx, + int32_t numRows, + int64_t fieldOffset, + std::vector& offsets, + uint8_t* memoryAddress, + memory::MemoryPool* pool) { + using T = typename TypeTraits::NativeType; + auto typeWidth = sizeof(T); + auto column = BaseVector::create>(type, numRows, pool); + auto rawValues = column->template mutableRawValues(); +#if defined(__x86_64__) + auto shift = _tzcnt_u32(typeWidth); +#else + auto shift = __builtin_ctz((uint32_t)typeWidth); +#endif + for (auto pos = 0; pos < numRows; pos++) { + if (!isNull(memoryAddress + offsets[pos], columnIdx)) { + const uint8_t* srcptr = (memoryAddress + offsets[pos] + fieldOffset); + uint8_t* destptr = rawValues + (pos << shift); + memcpy(destptr, srcptr, typeWidth); + } else { + column->setNull(pos, true); + } + } + return column; +} + +template <> +VectorPtr createFlatVector( + const TypePtr& type, + int32_t columnIdx, + int32_t numRows, + int64_t fieldOffset, + std::vector& offsets, + uint8_t* memoryAddress, + memory::MemoryPool* pool) { + auto column = BaseVector::create>(type, numRows, pool); + auto rawValues = column->mutableRawValues(); + auto typeWidth = sizeof(int128_t); +#if defined(__x86_64__) + auto shift = _tzcnt_u32(typeWidth); +#else + auto shift = __builtin_ctz((uint32_t)typeWidth); +#endif + for (auto pos = 0; pos < numRows; pos++) { + if (!isNull(memoryAddress + offsets[pos], columnIdx)) { + uint8_t* destptr = rawValues + (pos << shift); + int64_t offsetAndSize = *(int64_t*)(memoryAddress + offsets[pos] + fieldOffset); + int32_t length = static_cast(offsetAndSize); + int32_t wordoffset = static_cast(offsetAndSize >> 32); + uint8_t bytesValue[length]; + memcpy(bytesValue, memoryAddress + offsets[pos] + wordoffset, length); + uint8_t bytesValue2[16]{}; + for (int k = length - 1; k >= 0; k--) { + bytesValue2[length - 1 - k] = bytesValue[k]; + } + if (int8_t(bytesValue[0]) < 0) { + memset(bytesValue2 + length, 255, 16 - length); + } + memcpy(destptr, bytesValue2, typeWidth); + } else { + column->setNull(pos, true); + } + } + return column; +} + +template <> +VectorPtr createFlatVector( + const TypePtr& type, + int32_t columnIdx, + int32_t numRows, + int64_t fieldOffset, + std::vector& offsets, + uint8_t* memoryAddress, + memory::MemoryPool* pool) { + auto column = BaseVector::create>(type, numRows, pool); + auto rawValues = column->mutableRawValues(); + for (auto pos = 0; pos < numRows; pos++) { + if (!isNull(memoryAddress + offsets[pos], columnIdx)) { + bool value = *(bool*)(memoryAddress + offsets[pos] + fieldOffset); + bits::setBit(rawValues, pos, value); + } else { + column->setNull(pos, true); + } + } + return column; +} + +template <> +VectorPtr createFlatVector( + const TypePtr& type, + int32_t columnIdx, + int32_t numRows, + int64_t fieldOffset, + std::vector& offsets, + uint8_t* memoryAddress, + memory::MemoryPool* pool) { + auto column = BaseVector::create>(type, numRows, pool); + for (auto pos = 0; pos < numRows; pos++) { + if (!isNull(memoryAddress + offsets[pos], columnIdx)) { + int64_t value = *(int64_t*)(memoryAddress + offsets[pos] + fieldOffset); + column->set(pos, Timestamp::fromMicros(value)); + } else { + column->setNull(pos, true); + } + } + return column; +} + +VectorPtr createFlatVectorStringView( + const TypePtr& type, + int32_t columnIdx, + int32_t numRows, + int64_t fieldOffset, + std::vector& offsets, + uint8_t* memoryAddress, + memory::MemoryPool* pool) { + auto column = BaseVector::create>(type, numRows, pool); + auto size = getTotalStringSize(columnIdx, numRows, fieldOffset, offsets, memoryAddress); + char* rawBuffer = column->getRawStringBufferWithSpace(size, true); + for (auto pos = 0; pos < numRows; pos++) { + if (!isNull(memoryAddress + offsets[pos], columnIdx)) { + int64_t offsetAndSize = *(int64_t*)(memoryAddress + offsets[pos] + fieldOffset); + int32_t length = static_cast(offsetAndSize); + int32_t wordoffset = static_cast(offsetAndSize >> 32); + auto valueSrcPtr = memoryAddress + offsets[pos] + wordoffset; + if (StringView::isInline(length)) { + column->set(pos, StringView(reinterpret_cast(valueSrcPtr), length)); + } else { + memcpy(rawBuffer, valueSrcPtr, length); + column->setNoCopy(pos, StringView(rawBuffer, length)); + rawBuffer += length; + } + } else { + column->setNull(pos, true); + } + } + return column; +} + +template <> +VectorPtr createFlatVector( + const TypePtr& type, + int32_t columnIdx, + int32_t numRows, + int64_t fieldOffset, + std::vector& offsets, + uint8_t* memoryAddress, + memory::MemoryPool* pool) { + return createFlatVectorStringView(type, columnIdx, numRows, fieldOffset, offsets, memoryAddress, pool); +} + +template <> +VectorPtr createFlatVector( + const TypePtr& type, + int32_t columnIdx, + int32_t numRows, + int64_t fieldOffset, + std::vector& offsets, + uint8_t* memoryAddress, + memory::MemoryPool* pool) { + return createFlatVectorStringView(type, columnIdx, numRows, fieldOffset, offsets, memoryAddress, pool); +} + +template <> +VectorPtr createFlatVector( + const TypePtr& /*type*/, + int32_t /*columnIdx*/, + int32_t numRows, + int64_t /*fieldOffset*/, + std::vector& /*offsets*/, + uint8_t* /*memoryAddress*/, + memory::MemoryPool* pool) { + auto nulls = allocateNulls(numRows, pool, bits::kNull); + return std::make_shared>( + pool, + UNKNOWN(), + nulls, + numRows, + nullptr, // values + std::vector{}); // stringBuffers +} + +bool supporteType(const RowTypePtr rowType) { + for (auto i = 0; i < rowType->size(); i++) { + auto kind = rowType->childAt(i)->kind(); + switch (kind) { + case TypeKind::ARRAY: + case TypeKind::MAP: + case TypeKind::ROW: + return false; + default: + break; + } + } + return true; +} + +} // namespace + namespace gluten { VeloxRowToColumnarConverter::VeloxRowToColumnarConverter( struct ArrowSchema* cSchema, @@ -32,6 +274,10 @@ VeloxRowToColumnarConverter::VeloxRowToColumnarConverter( std::shared_ptr VeloxRowToColumnarConverter::convert(int64_t numRows, int64_t* rowLength, uint8_t* memoryAddress) { + if (supporteType(asRowType(rowType_))) { + return convertPrimitive(numRows, rowLength, memoryAddress); + } + std::vector> data; int64_t offset = 0; for (auto i = 0; i < numRows; i++) { @@ -41,4 +287,28 @@ VeloxRowToColumnarConverter::convert(int64_t numRows, int64_t* rowLength, uint8_ auto vp = row::UnsafeRowDeserializer::deserialize(data, rowType_, pool_.get()); return std::make_shared(std::dynamic_pointer_cast(vp)); } + +std::shared_ptr +VeloxRowToColumnarConverter::convertPrimitive(int64_t numRows, int64_t* rowLength, uint8_t* memoryAddress) { + auto numFields = rowType_->size(); + int64_t nullBitsetWidthInBytes = calculateBitSetWidthInBytes(numFields); + std::vector offsets; + offsets.resize(numRows); + for (auto i = 1; i < numRows; i++) { + offsets[i] = offsets[i - 1] + rowLength[i - 1]; + } + + std::vector columns; + columns.resize(numFields); + + for (auto i = 0; i < numFields; i++) { + auto fieldOffset = getFieldOffset(nullBitsetWidthInBytes, i); + auto& type = rowType_->childAt(i); + columns[i] = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL( + createFlatVector, type->kind(), type, i, numRows, fieldOffset, offsets, memoryAddress, pool_.get()); + } + + auto rowVector = std::make_shared(pool_.get(), rowType_, BufferPtr(nullptr), numRows, std::move(columns)); + return std::make_shared(rowVector); +} } // namespace gluten diff --git a/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.h b/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.h index 30006c4f0757..1fbc4f6021a7 100644 --- a/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.h +++ b/cpp/velox/operators/serializer/VeloxRowToColumnarConverter.h @@ -33,7 +33,9 @@ class VeloxRowToColumnarConverter final : public RowToColumnarConverter { std::shared_ptr convert(int64_t numRows, int64_t* rowLength, uint8_t* memoryAddress); - protected: + private: + std::shared_ptr convertPrimitive(int64_t numRows, int64_t* rowLength, uint8_t* memoryAddress); + facebook::velox::TypePtr rowType_; std::shared_ptr pool_; }; diff --git a/cpp/velox/tests/VeloxRowToColumnarTest.cc b/cpp/velox/tests/VeloxRowToColumnarTest.cc index c784dbd59c34..0d11dd4acbc9 100644 --- a/cpp/velox/tests/VeloxRowToColumnarTest.cc +++ b/cpp/velox/tests/VeloxRowToColumnarTest.cc @@ -87,10 +87,58 @@ TEST_F(VeloxRowToColumnarTest, allTypes) { makeNullableFlatVector( {std::nullopt, true, false, std::nullopt, true, true, false, true, std::nullopt, std::nullopt}), makeFlatVector( - {"alice0", "bob1", "alice2", "bob3", "Alice4", "Bob5", "AlicE6", "boB7", "ALICE8", "BOB9"}), + {"alice0", + "bob1", + "alice2", + "bob3", + "Alice4", + "Bob5123456789098766notinline", + "AlicE6", + "boB7", + "ALICE8", + "BOB9"}), makeNullableFlatVector( {"alice", "bob", std::nullopt, std::nullopt, "Alice", "Bob", std::nullopt, "alicE", std::nullopt, "boB"}), }); testRowVectorEqual(vector); } + +TEST_F(VeloxRowToColumnarTest, bigint) { + auto vector = makeRowVector({ + makeNullableFlatVector({1, 2, 3, std::nullopt, 4, std::nullopt, 5, 6, std::nullopt, 7}), + }); + testRowVectorEqual(vector); +} + +TEST_F(VeloxRowToColumnarTest, decimal) { + auto vector = makeRowVector({ + makeNullableFlatVector( + {123456, HugeInt::build(1045, 1789), 3678, std::nullopt, 4, std::nullopt, 5, 687987, std::nullopt, 7}, + DECIMAL(38, 2)), + makeNullableFlatVector( + {178987, 2, 3, std::nullopt, 4, std::nullopt, 5, 6, std::nullopt, 7}, DECIMAL(12, 3)), + }); + testRowVectorEqual(vector); +} + +TEST_F(VeloxRowToColumnarTest, timestamp) { + auto vector = makeRowVector({ + makeNullableFlatVector( + {Timestamp(-946684800, 0), + Timestamp(-7266, 0), + Timestamp(0, 0), + Timestamp(946684800, 0), + Timestamp(9466848000, 0), + Timestamp(94668480000, 0), + Timestamp(946729316, 0), + Timestamp(946729316, 0), + Timestamp(946729316, 0), + Timestamp(7266, 0), + Timestamp(-50049331200, 0), + Timestamp(253405036800, 0), + Timestamp(-62480037600, 0), + std::nullopt}), + }); + testRowVectorEqual(vector); +} } // namespace gluten diff --git a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala index c37c55a5f0ef..d3bc098ce483 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala @@ -19,10 +19,10 @@ package org.apache.gluten.backendsapi import org.apache.gluten.exception.GlutenNotSupportException import org.apache.gluten.execution._ import org.apache.gluten.expression._ +import org.apache.gluten.extension.GlutenPlan import org.apache.gluten.extension.columnar.transition.{Convention, ConventionFunc} import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode, WindowFunctionNode} - import org.apache.spark.ShuffleDependency import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer @@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{ColumnarWriteFilesExec, FileSourceScanExec, GenerateExec, LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.{ColumnarWriteFilesExec, FileSourceScanExec, GenerateExec, LeafExecNode, ProjectExec, SparkPlan} import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan} import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec @@ -53,7 +53,6 @@ import org.apache.spark.sql.vectorized.ColumnarBatch import java.lang.{Long => JLong} import java.util.{ArrayList => JArrayList, List => JList, Map => JMap} - import scala.collection.JavaConverters._ trait SparkPlanExecApi { @@ -88,6 +87,10 @@ trait SparkPlanExecApi { child: SparkPlan): ProjectExecTransformer = ProjectExecTransformer.createUnsafe(projectList, child) + def genSparkPartialProjectColumnarExec(original: ProjectExec): GlutenPlan = null + + def genProjectColumnarExec(original: ProjectExec): GlutenPlan = null + /** Generate HashAggregateExecTransformer. */ def genHashAggregateExecTransformer( requiredChildDistributionExpressions: Option[Seq[Expression]], diff --git a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala index 3ca66b51897b..ada1ad33fdbe 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala @@ -714,19 +714,31 @@ object ExpressionConverter extends SQLConfHelper with Logging { } private def getAndCheckSubstraitName(expr: Expression, expressionsMap: Map[Class[_], String]) = { + val res = getSubstraitName(expr, expressionsMap) + if (res._1 != null) { + res._1 + } else { + throw new GlutenNotSupportException(res._2) + } + } + + def getSubstraitName( + expr: Expression, + expressionsMap: Map[Class[_], String]): (String, String) = { TestStats.addExpressionClassName(expr.getClass.getName) // Check whether Gluten supports this expression val substraitExprNameOpt = expressionsMap.get(expr.getClass) if (substraitExprNameOpt.isEmpty) { - throw new GlutenNotSupportException( + return ( + null, s"Not supported to map spark function name" + s" to substrait function name: $expr, class name: ${expr.getClass.getSimpleName}.") } val substraitExprName = substraitExprNameOpt.get // Check whether each backend supports this expression if (!BackendsApiManager.getValidatorApiInstance.doExprValidate(substraitExprName, expr)) { - throw new GlutenNotSupportException(s"Not supported: $expr.") + return (null, s"Not supported: $expr.") } - substraitExprName + (substraitExprName, "OK") } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala index b7a30f7e177a..cafb72fbd329 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala @@ -39,7 +39,9 @@ object MiscColumnarRules { OffloadOthers(), OffloadAggregate(), OffloadExchange(), - OffloadJoin() + OffloadJoin(), + OffloadProjectPartial(), + OffloadProjectColumnar() ) ) } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala index a8cc791286b2..b08b2f07c54d 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala @@ -120,6 +120,39 @@ case class OffloadExchange() extends OffloadSingleNode with LogLevelUtil { } } +case class OffloadProjectPartial() extends OffloadSingleNode with LogLevelUtil { + override def offload(plan: SparkPlan): SparkPlan = plan match { + case p: ProjectExec if FallbackTags.nonEmpty(p) => + val original = p + val partialProject = BackendsApiManager.getSparkPlanExecApiInstance + .genSparkPartialProjectColumnarExec(original) + if (partialProject != null) { + val projectTransformer = partialProject.asInstanceOf[ProjectExecTransformer] + if (projectTransformer.doValidate().ok()) { + val project = projectTransformer.child.asInstanceOf[GlutenPlan] + if (project.doValidate().ok()) { + partialProject + } else p + } else p + } else p + case other => other + } +} + +case class OffloadProjectColumnar() extends OffloadSingleNode with LogLevelUtil { + override def offload(plan: SparkPlan): SparkPlan = plan match { + case p: ProjectExec if FallbackTags.nonEmpty(p) => + val projectColumnar = BackendsApiManager.getSparkPlanExecApiInstance + .genProjectColumnarExec(p) + if (projectColumnar != null) { + if (projectColumnar.doValidate().ok()) { + projectColumnar + } else p + } else p + case other => other + } +} + // Join transformation. case class OffloadJoin() extends OffloadSingleNode with LogLevelUtil { diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index a85cb163ceaa..f0b1d69a6876 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -116,7 +116,7 @@ object Validators { } } - private class FallbackComplexExpressions(threshold: Int) extends Validator { + class FallbackComplexExpressions(threshold: Int) extends Validator { override def validate(plan: SparkPlan): Validator.OutCome = { if (plan.expressions.exists(e => ExpressionUtils.getExpressionTreeDepth(e) > threshold)) { return fail( diff --git a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatchJniWrapper.java b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatchJniWrapper.java index 37376951c543..df1f5e8a7218 100644 --- a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatchJniWrapper.java +++ b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatchJniWrapper.java @@ -48,6 +48,8 @@ public static ColumnarBatchJniWrapper create(Runtime runtime) { public native long select(long batch, int[] columnIndices); + public native String toString(long handle, int start, int length); + public native void close(long batch); @Override diff --git a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java index 543e6d4cf97b..dcccc5d57037 100644 --- a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java +++ b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ColumnarBatches.java @@ -162,6 +162,36 @@ public static ColumnarBatch ensureLoaded(BufferAllocator allocator, ColumnarBatc return load(allocator, batch); } + public static ColumnarBatch ensureLoadedWithoutRefcount( + BufferAllocator allocator, ColumnarBatch input) { + if (isHeavyBatch(input)) { + return input; + } + + if (!ColumnarBatches.isLightBatch(input)) { + throw new IllegalArgumentException( + "Input is not light columnar batch. " + + "Please consider to use vanilla spark's row based input by setting one of the below" + + " configs: \n" + + "spark.sql.parquet.enableVectorizedReader=false\n" + + "spark.sql.inMemoryColumnarStorage.enableVectorizedReader=false\n" + + "spark.sql.orc.enableVectorizedReader=false\n"); + } + IndicatorVector iv = (IndicatorVector) input.column(0); + try (ArrowSchema cSchema = ArrowSchema.allocateNew(allocator); + ArrowArray cArray = ArrowArray.allocateNew(allocator); + ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator); + CDataDictionaryProvider provider = new CDataDictionaryProvider()) { + ColumnarBatchJniWrapper.create(Runtimes.contextInstance("ColumnarBatches#load")) + .exportToArrow(iv.handle(), cSchema.memoryAddress(), cArray.memoryAddress()); + + Data.exportSchema( + allocator, ArrowUtil.toArrowSchema(cSchema, allocator, provider), provider, arrowSchema); + + return ArrowAbiUtil.importToSparkColumnarBatch(allocator, arrowSchema, cArray); + } + } + private static ColumnarBatch load(BufferAllocator allocator, ColumnarBatch input) { if (!ColumnarBatches.isLightBatch(input)) { throw new IllegalArgumentException( @@ -335,6 +365,11 @@ public static long compose(ColumnarBatch... batches) { .compose(handles); } + public static String toString(ColumnarBatch batch, int start, int length) { + return ColumnarBatchJniWrapper.create(Runtimes.contextInstance("ColumnarBatches#toString")) + .toString(getNativeHandle(batch), start, length); + } + private static ColumnarBatch create(IndicatorVector iv) { int numColumns = Math.toIntExact(iv.getNumColumns()); int numRows = Math.toIntExact(iv.getNumRows()); diff --git a/gluten-data/src/main/java/org/apache/gluten/vectorized/ArrowWritableColumnVector.java b/gluten-data/src/main/java/org/apache/gluten/vectorized/ArrowWritableColumnVector.java index dfd570debc0a..a354e2a1bfa2 100644 --- a/gluten-data/src/main/java/org/apache/gluten/vectorized/ArrowWritableColumnVector.java +++ b/gluten-data/src/main/java/org/apache/gluten/vectorized/ArrowWritableColumnVector.java @@ -1255,9 +1255,8 @@ void setNull(int rowId) { throw new UnsupportedOperationException(); } - void setNotNull(int rowId) { - throw new UnsupportedOperationException(); - } + // No need to setNotNull then setValue, only setValue is enough + void setNotNull(int rowId) {} void setNulls(int rowId, int count) { throw new UnsupportedOperationException(); diff --git a/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkSchemaUtil.scala b/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkSchemaUtil.scala index b49077bd2740..8e66981ac72f 100644 --- a/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkSchemaUtil.scala +++ b/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkSchemaUtil.scala @@ -37,6 +37,16 @@ object SparkSchemaUtil { SparkArrowUtil.toArrowSchema(schema, timeZoneId) } + def checkSchema(schema: StructType): Boolean = { + try { + SparkSchemaUtil.toArrowSchema(schema) + true + } catch { + case _: Exception => + false + } + } + def isTimeZoneIDEquivalentToUTC(zoneId: String): Boolean = { getTimeZoneIDOffset(zoneId) == 0 } diff --git a/gluten-ut/pom.xml b/gluten-ut/pom.xml index 90644b832bf8..a48d21877a69 100644 --- a/gluten-ut/pom.xml +++ b/gluten-ut/pom.xml @@ -79,6 +79,11 @@ spark-core_${scala.binary.version} test-jar + + org.apache.spark + spark-hive_${scala.binary.version} + test-jar + org.apache.spark spark-sql_${scala.binary.version} diff --git a/gluten-ut/spark32/src/test/java/org/apache/gluten/execution/CustomerUDF.java b/gluten-ut/spark32/src/test/java/org/apache/gluten/execution/CustomerUDF.java new file mode 100644 index 000000000000..257bd07021f2 --- /dev/null +++ b/gluten-ut/spark32/src/test/java/org/apache/gluten/execution/CustomerUDF.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDF; + +/** + * UDF that generates a the link id (MD5 hash) of a URL. Used to join with link join. + * + *

Usage example: + * + *

CREATE TEMPORARY FUNCTION linkid AS 'com.pinterest.hadoop.hive.LinkIdUDF'; + */ +@Description( + name = "linkid", + value = "_FUNC_(String) - Returns linkid as String, it's the MD5 hash of url.") +public class CustomerUDF extends UDF { + public String evaluate(String url) { + if (url == null || url == "") { + return ""; + } + return "extendedudf" + url; + } +} diff --git a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index f8a075ec0c41..05a3f9260876 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.datasources.v2.GlutenFileTableSuite import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite} import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenCustomerExpressionTransformerSuite, GlutenCustomerExtensionSuite, GlutenSessionExtensionSuite} -import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite +import org.apache.spark.sql.hive.execution.{GlutenHiveSQLQuerySuite, GlutenHiveUDFSuite} import org.apache.spark.sql.sources.{GlutenBucketedReadWithoutHiveSupportSuite, GlutenBucketedWriteWithoutHiveSupportSuite, GlutenCreateTableAsSelectSuite, GlutenDDLSourceLoadSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuiteAE, GlutenExternalCommandRunnerSuite, GlutenFilteredScanSuite, GlutenFiltersSuite, GlutenInsertSuite, GlutenPartitionedWriteSuite, GlutenPathOptionSuite, GlutenPrunedScanSuite, GlutenResolvedDataSourceSuite, GlutenSaveLoadSuite, GlutenTableScanSuite} // Some settings' line length exceeds 100 @@ -1124,6 +1124,7 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenHiveSQLQuerySuite] enableSuite[GlutenCollapseProjectExecTransformerSuite] enableSuite[GlutenSparkSessionExtensionSuite] + enableSuite[GlutenHiveUDFSuite] override def getSQLQueryTestSettings: SQLQueryTestSettings = VeloxSQLQueryTestSettings } diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveUDFSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveUDFSuite.scala new file mode 100644 index 000000000000..cc9f6f1d893a --- /dev/null +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveUDFSuite.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution + +import org.apache.gluten.execution.CustomerUDF + +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.internal.config +import org.apache.spark.internal.config.UI.UI_ENABLED +import org.apache.spark.sql.{GlutenTestsBaseTrait, QueryTest, SparkSession} +import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode +import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation +import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} +import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.hive.test.TestHiveContext +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH +import org.apache.spark.sql.test.SQLTestUtils + +import org.scalatest.BeforeAndAfterAll + +import java.io.File + +trait GlutenTestHiveSingleton extends SparkFunSuite with BeforeAndAfterAll { + override protected val enableAutoThreadAudit = false + +} + +object GlutenTestHive + extends TestHiveContext( + new SparkContext( + System.getProperty("spark.sql.test.master", "local[1]"), + "TestSQLContext", + new SparkConf() + .set("spark.sql.test", "") + .set(SQLConf.CODEGEN_FALLBACK.key, "false") + .set(SQLConf.CODEGEN_FACTORY_MODE.key, CodegenObjectFactoryMode.CODEGEN_ONLY.toString) + .set( + HiveUtils.HIVE_METASTORE_BARRIER_PREFIXES.key, + "org.apache.spark.sql.hive.execution.PairSerDe") + .set(WAREHOUSE_PATH.key, TestHiveContext.makeWarehouseDir().toURI.getPath) + // SPARK-8910 + .set(UI_ENABLED, false) + .set(config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true) + // Hive changed the default of hive.metastore.disallow.incompatible.col.type.changes + // from false to true. For details, see the JIRA HIVE-12320 and HIVE-17764. + .set("spark.hadoop.hive.metastore.disallow.incompatible.col.type.changes", "false") + .set("spark.driver.memory", "1G") + .set("spark.sql.adaptive.enabled", "true") + .set("spark.sql.shuffle.partitions", "1") + .set("spark.sql.files.maxPartitionBytes", "134217728") + .set("spark.memory.offHeap.enabled", "true") + .set("spark.memory.offHeap.size", "1024MB") + .set("spark.plugins", "org.apache.gluten.GlutenPlugin") + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + // Disable ConvertToLocalRelation for better test coverage. Test cases built on + // LocalRelation will exercise the optimization rules better by disabling it as + // this rule may potentially block testing of other optimization rules such as + // ConstantPropagation etc. + .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName) + ), + false + ) {} + +class GlutenHiveUDFSuite + extends QueryTest + with GlutenTestHiveSingleton + with SQLTestUtils + with GlutenTestsBaseTrait { + override protected val spark: SparkSession = GlutenTestHive.sparkSession + protected val hiveContext: TestHiveContext = GlutenTestHive + protected val hiveClient: HiveClient = + spark.sharedState.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client + + override protected def beforeAll(): Unit = { + super.beforeAll() + val table = "lineitem" + val tableDir = + getClass.getResource("").getPath + "/../../../../../../../../../../../" + + "/backends-velox/src/test/resources/tpch-data-parquet-velox/" + val tablePath = new File(tableDir, table).getAbsolutePath + val tableDF = spark.read.format("parquet").load(tablePath) + tableDF.createOrReplaceTempView(table) + } + + override protected def afterAll(): Unit = { + try { + hiveContext.reset() + } finally { + super.afterAll() + } + } + + override protected def shouldRun(testName: String): Boolean = { + false + } + + test("customer udf") { + sql(s"CREATE TEMPORARY FUNCTION testUDF AS '${classOf[CustomerUDF].getName}'") + val df = spark.sql("""select testUDF(l_comment) + | from lineitem""".stripMargin) + df.show() + print(df.queryExecution.executedPlan) + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDF") + hiveContext.reset() + } + + test("customer udf wrapped in function") { + sql(s"CREATE TEMPORARY FUNCTION testUDF AS '${classOf[CustomerUDF].getName}'") + val df = spark.sql("""select hash(testUDF(l_comment)) + | from lineitem""".stripMargin) + df.show() + print(df.queryExecution.executedPlan) + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDF") + hiveContext.reset() + } + + test("example") { + spark.sql("CREATE TEMPORARY FUNCTION testUDF AS 'org.apache.hadoop.hive.ql.udf.UDFSubstr';") + spark.sql("select testUDF('l_commen', 1, 5)").show() + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDF") + hiveContext.reset() + } + +} diff --git a/gluten-ut/spark35/src/test/java/org/apache/gluten/execution/CustomerUDF.java b/gluten-ut/spark35/src/test/java/org/apache/gluten/execution/CustomerUDF.java new file mode 100644 index 000000000000..257bd07021f2 --- /dev/null +++ b/gluten-ut/spark35/src/test/java/org/apache/gluten/execution/CustomerUDF.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.UDF; + +/** + * UDF that generates a the link id (MD5 hash) of a URL. Used to join with link join. + * + *

Usage example: + * + *

CREATE TEMPORARY FUNCTION linkid AS 'com.pinterest.hadoop.hive.LinkIdUDF'; + */ +@Description( + name = "linkid", + value = "_FUNC_(String) - Returns linkid as String, it's the MD5 hash of url.") +public class CustomerUDF extends UDF { + public String evaluate(String url) { + if (url == null || url == "") { + return ""; + } + return "extendedudf" + url; + } +} diff --git a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index e8d8730e9366..8dc6baed39b0 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite} import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer} import org.apache.spark.sql.gluten.GlutenFallbackSuite -import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite +import org.apache.spark.sql.hive.execution.{GlutenHiveSQLQuerySuite, GlutenHiveUDFSuite} import org.apache.spark.sql.sources._ // Some settings' line length exceeds 100 @@ -1231,6 +1231,7 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenParquetFileMetadataStructRowIndexSuite] enableSuite[GlutenTableLocationSuite] enableSuite[GlutenRemoveRedundantWindowGroupLimitsSuite] + enableSuite[GlutenHiveUDFSuite] override def getSQLQueryTestSettings: SQLQueryTestSettings = VeloxSQLQueryTestSettings } diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveUDFSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveUDFSuite.scala new file mode 100644 index 000000000000..cc9f6f1d893a --- /dev/null +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveUDFSuite.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution + +import org.apache.gluten.execution.CustomerUDF + +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.internal.config +import org.apache.spark.internal.config.UI.UI_ENABLED +import org.apache.spark.sql.{GlutenTestsBaseTrait, QueryTest, SparkSession} +import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode +import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation +import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} +import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.hive.test.TestHiveContext +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH +import org.apache.spark.sql.test.SQLTestUtils + +import org.scalatest.BeforeAndAfterAll + +import java.io.File + +trait GlutenTestHiveSingleton extends SparkFunSuite with BeforeAndAfterAll { + override protected val enableAutoThreadAudit = false + +} + +object GlutenTestHive + extends TestHiveContext( + new SparkContext( + System.getProperty("spark.sql.test.master", "local[1]"), + "TestSQLContext", + new SparkConf() + .set("spark.sql.test", "") + .set(SQLConf.CODEGEN_FALLBACK.key, "false") + .set(SQLConf.CODEGEN_FACTORY_MODE.key, CodegenObjectFactoryMode.CODEGEN_ONLY.toString) + .set( + HiveUtils.HIVE_METASTORE_BARRIER_PREFIXES.key, + "org.apache.spark.sql.hive.execution.PairSerDe") + .set(WAREHOUSE_PATH.key, TestHiveContext.makeWarehouseDir().toURI.getPath) + // SPARK-8910 + .set(UI_ENABLED, false) + .set(config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true) + // Hive changed the default of hive.metastore.disallow.incompatible.col.type.changes + // from false to true. For details, see the JIRA HIVE-12320 and HIVE-17764. + .set("spark.hadoop.hive.metastore.disallow.incompatible.col.type.changes", "false") + .set("spark.driver.memory", "1G") + .set("spark.sql.adaptive.enabled", "true") + .set("spark.sql.shuffle.partitions", "1") + .set("spark.sql.files.maxPartitionBytes", "134217728") + .set("spark.memory.offHeap.enabled", "true") + .set("spark.memory.offHeap.size", "1024MB") + .set("spark.plugins", "org.apache.gluten.GlutenPlugin") + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + // Disable ConvertToLocalRelation for better test coverage. Test cases built on + // LocalRelation will exercise the optimization rules better by disabling it as + // this rule may potentially block testing of other optimization rules such as + // ConstantPropagation etc. + .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName) + ), + false + ) {} + +class GlutenHiveUDFSuite + extends QueryTest + with GlutenTestHiveSingleton + with SQLTestUtils + with GlutenTestsBaseTrait { + override protected val spark: SparkSession = GlutenTestHive.sparkSession + protected val hiveContext: TestHiveContext = GlutenTestHive + protected val hiveClient: HiveClient = + spark.sharedState.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client + + override protected def beforeAll(): Unit = { + super.beforeAll() + val table = "lineitem" + val tableDir = + getClass.getResource("").getPath + "/../../../../../../../../../../../" + + "/backends-velox/src/test/resources/tpch-data-parquet-velox/" + val tablePath = new File(tableDir, table).getAbsolutePath + val tableDF = spark.read.format("parquet").load(tablePath) + tableDF.createOrReplaceTempView(table) + } + + override protected def afterAll(): Unit = { + try { + hiveContext.reset() + } finally { + super.afterAll() + } + } + + override protected def shouldRun(testName: String): Boolean = { + false + } + + test("customer udf") { + sql(s"CREATE TEMPORARY FUNCTION testUDF AS '${classOf[CustomerUDF].getName}'") + val df = spark.sql("""select testUDF(l_comment) + | from lineitem""".stripMargin) + df.show() + print(df.queryExecution.executedPlan) + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDF") + hiveContext.reset() + } + + test("customer udf wrapped in function") { + sql(s"CREATE TEMPORARY FUNCTION testUDF AS '${classOf[CustomerUDF].getName}'") + val df = spark.sql("""select hash(testUDF(l_comment)) + | from lineitem""".stripMargin) + df.show() + print(df.queryExecution.executedPlan) + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDF") + hiveContext.reset() + } + + test("example") { + spark.sql("CREATE TEMPORARY FUNCTION testUDF AS 'org.apache.hadoop.hive.ql.udf.UDFSubstr';") + spark.sql("select testUDF('l_commen', 1, 5)").show() + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDF") + hiveContext.reset() + } + +} diff --git a/pom.xml b/pom.xml index 1366f8188b0c..a523b165429c 100644 --- a/pom.xml +++ b/pom.xml @@ -646,6 +646,13 @@ test-jar test + + org.apache.spark + spark-hive_${scala.binary.version} + ${spark.version} + test-jar + test + org.apache.hadoop hadoop-client diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index 9ddce7191002..8cf85d8ca5ec 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -441,6 +441,10 @@ class GlutenConfig(conf: SQLConf) extends Logging { def enableColumnarProjectCollapse: Boolean = conf.getConf(ENABLE_COLUMNAR_PROJECT_COLLAPSE) + def enableColumnarPartialProject: Boolean = conf.getConf(ENABLE_COLUMNAR_PARTIAL_PROJECT) + + def enableProjectColumnarExec: Boolean = conf.getConf(ENABLE_PROJECT_COLUMNAR_EXEC) + def awsSdkLogLevel: String = conf.getConf(AWS_SDK_LOG_LEVEL) def awsS3RetryMode: String = conf.getConf(AWS_S3_RETRY_MODE) @@ -1823,6 +1827,18 @@ object GlutenConfig { .booleanConf .createWithDefault(true) + val ENABLE_COLUMNAR_PARTIAL_PROJECT = + buildConf("spark.gluten.sql.columnar.partial.project") + .doc("Execute partial project which is not supported in backend in Spark") + .booleanConf + .createWithDefault(true) + + val ENABLE_PROJECT_COLUMNAR_EXEC = + buildConf("spark.gluten.sql.columnar.project.exec") + .doc("Execute project whose input and output is columnar batch, control ProjectColumnarExec") + .booleanConf + .createWithDefault(false) + val ENABLE_COMMON_SUBEXPRESSION_ELIMINATE = buildConf("spark.gluten.sql.commonSubexpressionEliminate") .internal() diff --git a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala index 96a615615179..280ee2f5ce81 100644 --- a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala +++ b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala @@ -133,6 +133,7 @@ object ExpressionNames { final val BASE64 = "base64" final val MASK = "mask" final val FORMAT_STRING = "format_string" + final val TO_PRETTY_STRING = "toprettystring" // URL functions final val PARSE_URL = "parse_url" diff --git a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala index 93785d7a2659..f1ed91b64cd9 100644 --- a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala +++ b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala @@ -17,6 +17,7 @@ package org.apache.gluten.sql.shims.spark35 import org.apache.gluten.expression.{ExpressionNames, Sig} +import org.apache.gluten.expression.ExpressionNames.TO_PRETTY_STRING import org.apache.gluten.sql.shims.{ShimDescriptor, SparkShims} import org.apache.spark._ @@ -81,7 +82,8 @@ class Spark35Shims extends SparkShims { Sig[Mask](ExpressionNames.MASK), Sig[TimestampAdd](ExpressionNames.TIMESTAMP_ADD), Sig[RoundFloor](ExpressionNames.FLOOR), - Sig[RoundCeil](ExpressionNames.CEIL) + Sig[RoundCeil](ExpressionNames.CEIL), + Sig[ToPrettyString](TO_PRETTY_STRING) ) }