From fb1ac87872e6caaf111a5d8a5945cae52c232216 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Sun, 19 May 2024 10:14:14 -0700 Subject: [PATCH] get around complie failures for spark 3.2 and 3.3 --- .../org/apache/comet/parquet/BatchReader.java | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/common/src/main/java/org/apache/comet/parquet/BatchReader.java b/common/src/main/java/org/apache/comet/parquet/BatchReader.java index 600fb9ea2..a1aef7a21 100644 --- a/common/src/main/java/org/apache/comet/parquet/BatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/BatchReader.java @@ -57,6 +57,7 @@ import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.comet.parquet.CometParquetReadSupport; import org.apache.spark.sql.execution.datasources.PartitionedFile; +import org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter; import org.apache.spark.sql.execution.metric.SQLMetric; import org.apache.spark.sql.types.*; import org.apache.spark.sql.vectorized.ColumnarBatch; @@ -256,7 +257,13 @@ public void init() throws URISyntaxException, IOException { MessageType fileSchema = requestedSchema; if (sparkSchema == null) { - sparkSchema = new CometParquetToSparkSchemaConverter(conf).convert(requestedSchema); + // TODO: remove this after we drop the support for Spark 3.2 and 3.3 + boolean isSpark34 = classExists("org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$"); + if (isSpark34) { + sparkSchema = new CometParquetToSparkSchemaConverter(conf).convert(requestedSchema); + } else { + sparkSchema = new ParquetToSparkSchemaConverter(conf).convert(requestedSchema); + } } else { requestedSchema = CometParquetReadSupport.clipParquetSchema( @@ -579,6 +586,15 @@ public void submitPrefetchTask(ExecutorService threadPool) { this.prefetchTask = threadPool.submit(new PrefetchTask()); } + private boolean classExists(String className) { + try { + Class cls = Class.forName(className); + return true; + } catch (ClassNotFoundException e) { + return false; + } + } + // A task for prefetching parquet row groups. private class PrefetchTask implements Callable> { private long getBytesRead() {