Skip to content

Commit

Permalink
get around complie failures for spark 3.2 and 3.3
Browse files Browse the repository at this point in the history
  • Loading branch information
Huaxin Gao committed May 19, 2024
1 parent b758f92 commit fb1ac87
Showing 1 changed file with 17 additions and 1 deletion.
18 changes: 17 additions & 1 deletion common/src/main/java/org/apache/comet/parquet/BatchReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.comet.parquet.CometParquetReadSupport;
import org.apache.spark.sql.execution.datasources.PartitionedFile;
import org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter;
import org.apache.spark.sql.execution.metric.SQLMetric;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.vectorized.ColumnarBatch;
Expand Down Expand Up @@ -256,7 +257,13 @@ public void init() throws URISyntaxException, IOException {
MessageType fileSchema = requestedSchema;

if (sparkSchema == null) {
sparkSchema = new CometParquetToSparkSchemaConverter(conf).convert(requestedSchema);
// TODO: remove this after we drop the support for Spark 3.2 and 3.3
boolean isSpark34 = classExists("org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$");
if (isSpark34) {
sparkSchema = new CometParquetToSparkSchemaConverter(conf).convert(requestedSchema);
} else {
sparkSchema = new ParquetToSparkSchemaConverter(conf).convert(requestedSchema);
}
} else {
requestedSchema =
CometParquetReadSupport.clipParquetSchema(
Expand Down Expand Up @@ -579,6 +586,15 @@ public void submitPrefetchTask(ExecutorService threadPool) {
this.prefetchTask = threadPool.submit(new PrefetchTask());
}

private boolean classExists(String className) {
try {
Class<?> cls = Class.forName(className);
return true;
} catch (ClassNotFoundException e) {
return false;
}
}

// A task for prefetching parquet row groups.
private class PrefetchTask implements Callable<Option<Throwable>> {
private long getBytesRead() {
Expand Down

0 comments on commit fb1ac87

Please sign in to comment.