From 6f6d4e11ffc2c48c368bb927a3119cf04de27786 Mon Sep 17 00:00:00 2001 From: Binwei Yang Date: Tue, 7 Nov 2023 07:39:51 +0000 Subject: [PATCH] fix for customer sparkq --- package/pom.xml | 1 + .../sql/shims/spark32/Spark32Shims.scala | 5 +++-- .../datasources/parquet/ParquetFileFormat.scala | 6 ++++-- .../execution/datasources/v2/BatchScanExecShim.scala | 12 +----------- .../execution/datasources/v2/utils/CatalogUtil.scala | 4 ---- 5 files changed, 9 insertions(+), 19 deletions(-) diff --git a/package/pom.xml b/package/pom.xml index 262391a67673..45676b43a281 100644 --- a/package/pom.xml +++ b/package/pom.xml @@ -252,6 +252,7 @@ org.apache.spark.sql.execution.datasources.WriterBucketSpec$ org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$ + org.apache.spark.sql.execution.stat.StatFunctions diff --git a/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala b/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala index 580cc93bdf75..64e45595b03e 100644 --- a/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala +++ b/shims/spark32/src/main/scala/io/glutenproject/sql/shims/spark32/Spark32Shims.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.plans.physical.{Distribution, HashClusteredDistribution} +import org.apache.spark.sql.catalyst.plans.physical.{Distribution, StatefulOpClusteredDistribution} import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil} @@ -41,7 +41,8 @@ class Spark32Shims extends SparkShims { override def getDistribution( leftKeys: Seq[Expression], rightKeys: Seq[Expression]): Seq[Distribution] = { - HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil + val dist = StatefulOpClusteredDistribution(leftKeys, 0) + dist :: StatefulOpClusteredDistribution(rightKeys, 0) :: Nil } override def expressionMappings: Seq[Sig] = Seq(Sig[Empty2Null](ExpressionNames.EMPTY2NULL)) diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 521539975750..d631b90c1dc0 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -445,7 +445,8 @@ object ParquetFileFormat extends Logging { val converter = new ParquetToSparkSchemaConverter( sparkSession.sessionState.conf.isParquetBinaryAsString, - sparkSession.sessionState.conf.isParquetINT96AsTimestamp) + sparkSession.sessionState.conf.isParquetINT96AsTimestamp, + false) val seen = mutable.HashSet[String]() val finalSchemas: Seq[StructType] = footers.flatMap { @@ -556,7 +557,8 @@ object ParquetFileFormat extends Logging { // Converter used to convert Parquet `MessageType` to Spark SQL `StructType` val converter = new ParquetToSparkSchemaConverter( assumeBinaryIsString = assumeBinaryIsString, - assumeInt96IsTimestamp = assumeInt96IsTimestamp) + assumeInt96IsTimestamp = assumeInt96IsTimestamp, + false) readParquetFootersInParallel(conf, files, ignoreCorruptFiles) .map(ParquetFileFormat.readSchemaFromFooter(_, converter)) diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala index b867c71cefda..37a3bc590457 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala @@ -67,19 +67,9 @@ class BatchScanExecShim( // call toBatch again to get filtered partitions val newPartitions = scan.toBatch.planInputPartitions() - originalPartitioning match { - case p: DataSourcePartitioning if p.numPartitions != newPartitions.size => - throw new SparkException( - "Data source must have preserved the original partitioning during runtime filtering; " + - s"reported num partitions: ${p.numPartitions}, " + - s"num partitions after runtime filtering: ${newPartitions.size}") - case _ => - // no validation is needed as the data source did not report any specific partitioning - } - newPartitions.map(Seq(_)) } else { - partitions.map(Seq(_)) + partitions } } diff --git a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/utils/CatalogUtil.scala b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/utils/CatalogUtil.scala index a5b951c23ef7..0aaa226a2475 100644 --- a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/utils/CatalogUtil.scala +++ b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/utils/CatalogUtil.scala @@ -31,10 +31,6 @@ object CatalogUtil { partitions.map { case IdentityTransform(FieldReference(Seq(col))) => identityCols += col - - case BucketTransform(numBuckets, FieldReference(Seq(col))) => - bucketSpec = Some(BucketSpec(numBuckets, col :: Nil, Nil)) - case transform => throw new UnsupportedOperationException(s"Partitioning by expressions") }