Skip to content

Commit

Permalink
fix for customer sparkq
Browse files Browse the repository at this point in the history
  • Loading branch information
Binwei Yang committed Nov 7, 2023
1 parent 776f1ab commit 6f6d4e1
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 19 deletions.
1 change: 1 addition & 0 deletions package/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@
<ignoreClass>org.apache.spark.sql.execution.datasources.WriterBucketSpec$</ignoreClass>
<ignoreClass>org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand</ignoreClass>
<ignoreClass>org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$</ignoreClass>
<ignoreClass>org.apache.spark.sql.execution.stat.StatFunctions</ignoreClass>

</ignoreClasses>
<scopes>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.physical.{Distribution, HashClusteredDistribution}
import org.apache.spark.sql.catalyst.plans.physical.{Distribution, StatefulOpClusteredDistribution}
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil}
Expand All @@ -41,7 +41,8 @@ class Spark32Shims extends SparkShims {
override def getDistribution(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression]): Seq[Distribution] = {
HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil
val dist = StatefulOpClusteredDistribution(leftKeys, 0)
dist :: StatefulOpClusteredDistribution(rightKeys, 0) :: Nil
}

override def expressionMappings: Seq[Sig] = Seq(Sig[Empty2Null](ExpressionNames.EMPTY2NULL))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,8 @@ object ParquetFileFormat extends Logging {

val converter = new ParquetToSparkSchemaConverter(
sparkSession.sessionState.conf.isParquetBinaryAsString,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
sparkSession.sessionState.conf.isParquetINT96AsTimestamp,
false)

val seen = mutable.HashSet[String]()
val finalSchemas: Seq[StructType] = footers.flatMap {
Expand Down Expand Up @@ -556,7 +557,8 @@ object ParquetFileFormat extends Logging {
// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
val converter = new ParquetToSparkSchemaConverter(
assumeBinaryIsString = assumeBinaryIsString,
assumeInt96IsTimestamp = assumeInt96IsTimestamp)
assumeInt96IsTimestamp = assumeInt96IsTimestamp,
false)

readParquetFootersInParallel(conf, files, ignoreCorruptFiles)
.map(ParquetFileFormat.readSchemaFromFooter(_, converter))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,9 @@ class BatchScanExecShim(
// call toBatch again to get filtered partitions
val newPartitions = scan.toBatch.planInputPartitions()

originalPartitioning match {
case p: DataSourcePartitioning if p.numPartitions != newPartitions.size =>
throw new SparkException(
"Data source must have preserved the original partitioning during runtime filtering; " +
s"reported num partitions: ${p.numPartitions}, " +
s"num partitions after runtime filtering: ${newPartitions.size}")
case _ =>
// no validation is needed as the data source did not report any specific partitioning
}

newPartitions.map(Seq(_))
} else {
partitions.map(Seq(_))
partitions
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ object CatalogUtil {
partitions.map {
case IdentityTransform(FieldReference(Seq(col))) =>
identityCols += col

case BucketTransform(numBuckets, FieldReference(Seq(col))) =>
bucketSpec = Some(BucketSpec(numBuckets, col :: Nil, Nil))

case transform =>
throw new UnsupportedOperationException(s"Partitioning by expressions")
}
Expand Down

0 comments on commit 6f6d4e1

Please sign in to comment.