From 0eb2ca62105c04c52c039bb55d7fb495152f2076 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh <viirya@gmail.com> Date: Sun, 26 May 2024 11:48:44 -0700 Subject: [PATCH 1/2] fix: CometReader.loadVector should not overwrite dictionary ids --- .../org/apache/comet/parquet/BatchReader.java | 7 +++ .../apache/comet/parquet/ColumnReader.java | 25 ++++++----- .../comet/parquet/LazyColumnReader.java | 4 +- .../java/org/apache/comet/parquet/Utils.java | 10 +++-- .../apache/arrow/c/CometSchemaImporter.scala | 44 +++++++++++++++++++ .../apache/spark/sql/comet/util/Utils.scala | 25 ++++++++++- 6 files changed, 97 insertions(+), 18 deletions(-) create mode 100644 common/src/main/scala/org/apache/arrow/c/CometSchemaImporter.scala diff --git a/common/src/main/java/org/apache/comet/parquet/BatchReader.java b/common/src/main/java/org/apache/comet/parquet/BatchReader.java index 9940390dc..e3303e8ac 100644 --- a/common/src/main/java/org/apache/comet/parquet/BatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/BatchReader.java @@ -37,6 +37,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.arrow.c.CometSchemaImporter; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -88,6 +91,7 @@ */ public class BatchReader extends RecordReader<Void, ColumnarBatch> implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(FileReader.class); + protected static final BufferAllocator ALLOCATOR = new RootAllocator(); private Configuration conf; private int capacity; @@ -552,6 +556,8 @@ private boolean loadNextRowGroupIfNecessary() throws Throwable { numRowGroupsMetric.add(1); } + CometSchemaImporter importer = new CometSchemaImporter(ALLOCATOR); + List<ColumnDescriptor> columns = requestedSchema.getColumns(); for (int i = 0; i < columns.size(); i++) { if (missingColumns[i]) continue; @@ -564,6 +570,7 @@ private boolean loadNextRowGroupIfNecessary() throws Throwable { Utils.getColumnReader( dataType, columns.get(i), + importer, capacity, useDecimal128, useLazyMaterialization, diff --git a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java index 50991d5b0..033ca589b 100644 --- a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java @@ -27,10 +27,7 @@ import org.apache.arrow.c.ArrowArray; import org.apache.arrow.c.ArrowSchema; -import org.apache.arrow.c.CDataDictionaryProvider; -import org.apache.arrow.c.Data; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.c.CometSchemaImporter; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.dictionary.Dictionary; import org.apache.arrow.vector.types.pojo.DictionaryEncoding; @@ -42,6 +39,7 @@ import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.page.PageReader; import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.spark.sql.comet.util.Utils$; import org.apache.spark.sql.types.DataType; import org.apache.comet.CometConf; @@ -53,7 +51,6 @@ public class ColumnReader extends AbstractColumnReader { protected static final Logger LOG = LoggerFactory.getLogger(ColumnReader.class); - protected static final BufferAllocator ALLOCATOR = new RootAllocator(); /** * The current Comet vector holding all the values read by this column reader. Owned by this @@ -89,18 +86,19 @@ public class ColumnReader extends AbstractColumnReader { */ boolean hadNull; - /** Dictionary provider for this column. */ - private final CDataDictionaryProvider dictionaryProvider = new CDataDictionaryProvider(); + private final CometSchemaImporter importer; public ColumnReader( DataType type, ColumnDescriptor descriptor, + CometSchemaImporter importer, int batchSize, boolean useDecimal128, boolean useLegacyDateTimestamp) { super(type, descriptor, useDecimal128, useLegacyDateTimestamp); assert batchSize > 0 : "Batch size must be positive, found " + batchSize; this.batchSize = batchSize; + this.importer = importer; initNative(); } @@ -164,7 +162,6 @@ public void close() { currentVector.close(); currentVector = null; } - dictionaryProvider.close(); super.close(); } @@ -209,10 +206,12 @@ public CometDecodedVector loadVector() { try (ArrowArray array = ArrowArray.wrap(addresses[0]); ArrowSchema schema = ArrowSchema.wrap(addresses[1])) { - FieldVector vector = Data.importVector(ALLOCATOR, array, schema, dictionaryProvider); + FieldVector vector = + Utils$.MODULE$.importVector(importer.getAllocator(), importer, array, schema); + DictionaryEncoding dictionaryEncoding = vector.getField().getDictionary(); - CometPlainVector cometVector = new CometPlainVector(vector, useDecimal128, isUuid); + CometPlainVector cometVector = new CometPlainVector(vector, useDecimal128); // Update whether the current vector contains any null values. This is used in the following // batch(s) to determine whether we can skip loading the native vector. @@ -234,15 +233,17 @@ public CometDecodedVector loadVector() { // We should already re-initiate `CometDictionary` here because `Data.importVector` API will // release the previous dictionary vector and create a new one. - Dictionary arrowDictionary = dictionaryProvider.lookup(dictionaryEncoding.getId()); + Dictionary arrowDictionary = importer.getProvider().lookup(dictionaryEncoding.getId()); CometPlainVector dictionaryVector = new CometPlainVector(arrowDictionary.getVector(), useDecimal128, isUuid); dictionary = new CometDictionary(dictionaryVector); currentVector = new CometDictionaryVector( - cometVector, dictionary, dictionaryProvider, useDecimal128, false, isUuid); + cometVector, dictionary, importer.getProvider(), useDecimal128, false, isUuid); + currentVector = + new CometDictionaryVector(cometVector, dictionary, importer.getProvider(), useDecimal128); return currentVector; } } diff --git a/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java b/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java index a15d84192..dd08a88ab 100644 --- a/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java @@ -21,6 +21,7 @@ import java.io.IOException; +import org.apache.arrow.c.CometSchemaImporter; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReader; import org.apache.spark.sql.types.DataType; @@ -45,10 +46,11 @@ public class LazyColumnReader extends ColumnReader { public LazyColumnReader( DataType sparkReadType, ColumnDescriptor descriptor, + CometSchemaImporter importer, int batchSize, boolean useDecimal128, boolean useLegacyDateTimestamp) { - super(sparkReadType, descriptor, batchSize, useDecimal128, useLegacyDateTimestamp); + super(sparkReadType, descriptor, importer, batchSize, useDecimal128, useLegacyDateTimestamp); this.batchSize = 0; // the batch size is set later in `readBatch` this.vector = new CometLazyVector(sparkReadType, this, useDecimal128); } diff --git a/common/src/main/java/org/apache/comet/parquet/Utils.java b/common/src/main/java/org/apache/comet/parquet/Utils.java index 95ca06cda..99f3a4edd 100644 --- a/common/src/main/java/org/apache/comet/parquet/Utils.java +++ b/common/src/main/java/org/apache/comet/parquet/Utils.java @@ -19,6 +19,7 @@ package org.apache.comet.parquet; +import org.apache.arrow.c.CometSchemaImporter; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; @@ -28,26 +29,29 @@ public class Utils { public static ColumnReader getColumnReader( DataType type, ColumnDescriptor descriptor, + CometSchemaImporter importer, int batchSize, boolean useDecimal128, boolean useLazyMaterialization) { // TODO: support `useLegacyDateTimestamp` for Iceberg return getColumnReader( - type, descriptor, batchSize, useDecimal128, useLazyMaterialization, true); + type, descriptor, importer, batchSize, useDecimal128, useLazyMaterialization, true); } public static ColumnReader getColumnReader( DataType type, ColumnDescriptor descriptor, + CometSchemaImporter importer, int batchSize, boolean useDecimal128, boolean useLazyMaterialization, boolean useLegacyDateTimestamp) { if (useLazyMaterialization && supportLazyMaterialization(type)) { return new LazyColumnReader( - type, descriptor, batchSize, useDecimal128, useLegacyDateTimestamp); + type, descriptor, importer, batchSize, useDecimal128, useLegacyDateTimestamp); } else { - return new ColumnReader(type, descriptor, batchSize, useDecimal128, useLegacyDateTimestamp); + return new ColumnReader( + type, descriptor, importer, batchSize, useDecimal128, useLegacyDateTimestamp); } } diff --git a/common/src/main/scala/org/apache/arrow/c/CometSchemaImporter.scala b/common/src/main/scala/org/apache/arrow/c/CometSchemaImporter.scala new file mode 100644 index 000000000..3dedcddfd --- /dev/null +++ b/common/src/main/scala/org/apache/arrow/c/CometSchemaImporter.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.arrow.c + +import org.apache.arrow.memory.BufferAllocator +import org.apache.arrow.vector.types.pojo.Field + +/** + * This is a simple wrapper around SchemaImporter to make it accessible from Java Arrow. + */ +class CometSchemaImporter(allocator: BufferAllocator) { + val importer = new SchemaImporter(allocator) + val provider = new CDataDictionaryProvider() + + def getAllocator(): BufferAllocator = allocator + + def getProvider(): CDataDictionaryProvider = provider + + def importField(schema: ArrowSchema): Field = { + try { + importer.importField(schema, provider) + } finally { + schema.release(); + schema.close(); + } + } +} diff --git a/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala b/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala index 2300e109a..5569b4b1a 100644 --- a/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala +++ b/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala @@ -25,7 +25,8 @@ import java.nio.channels.Channels import scala.collection.JavaConverters._ -import org.apache.arrow.c.CDataDictionaryProvider +import org.apache.arrow.c.{ArrowArray, ArrowSchema, CDataDictionaryProvider, CometSchemaImporter, Data} +import org.apache.arrow.memory.BufferAllocator import org.apache.arrow.vector.{BigIntVector, BitVector, DateDayVector, DecimalVector, FieldVector, FixedSizeBinaryVector, Float4Vector, Float8Vector, IntVector, SmallIntVector, TimeStampMicroTZVector, TimeStampMicroVector, TinyIntVector, ValueVector, VarBinaryVector, VarCharVector, VectorSchemaRoot} import org.apache.arrow.vector.complex.MapVector import org.apache.arrow.vector.dictionary.DictionaryProvider @@ -38,7 +39,7 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream} -import org.apache.comet.vector.CometVector +import org.apache.comet.vector._ object Utils { def getConfPath(confFileName: String): String = { @@ -264,4 +265,24 @@ object Utils { throw new SparkException(s"Unsupported Arrow Vector for $reason: ${valueVector.getClass}") } } + + /** + * Imports data from ArrowArray/ArrowSchema into a FieldVector. This is basically the same as + * Java Arrow `Data.importVector`. `Data.importVector` initiates `SchemaImporter` internally + * which is used to fill dictionary ids for dictionary encoded vectors. Every call to + * `importVector` will begin with dictionary ids starting from 0. So, separate calls to + * `importVector` will overwrite dictionary ids. To avoid this, we need to use the same + * `SchemaImporter` instance for all calls to `importVector`. + */ + def importVector( + allocator: BufferAllocator, + importer: CometSchemaImporter, + array: ArrowArray, + schema: ArrowSchema): FieldVector = { + val field = importer.importField(schema) + val vector = field.createVector(allocator) + Data.importIntoVector(allocator, array, vector, importer.getProvider()) + + vector + } } From 0bc4933d13c52cd2e976d4cf45eb03a9faa1b746 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh <viirya@gmail.com> Date: Mon, 27 May 2024 10:22:23 -0700 Subject: [PATCH 2/2] For review --- .../apache/arrow/c/CometSchemaImporter.java | 73 +++++++++++++++++++ .../org/apache/comet/parquet/BatchReader.java | 8 +- .../apache/comet/parquet/ColumnReader.java | 4 +- .../apache/arrow/c/CometSchemaImporter.scala | 44 ----------- .../apache/spark/sql/comet/util/Utils.scala | 25 +------ 5 files changed, 83 insertions(+), 71 deletions(-) create mode 100644 common/src/main/java/org/apache/arrow/c/CometSchemaImporter.java delete mode 100644 common/src/main/scala/org/apache/arrow/c/CometSchemaImporter.scala diff --git a/common/src/main/java/org/apache/arrow/c/CometSchemaImporter.java b/common/src/main/java/org/apache/arrow/c/CometSchemaImporter.java new file mode 100644 index 000000000..32955f1ac --- /dev/null +++ b/common/src/main/java/org/apache/arrow/c/CometSchemaImporter.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.arrow.c; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.types.pojo.Field; + +/** This is a simple wrapper around SchemaImporter to make it accessible from Java Arrow. */ +public class CometSchemaImporter { + private final BufferAllocator allocator; + private final SchemaImporter importer; + private final CDataDictionaryProvider provider = new CDataDictionaryProvider(); + + public CometSchemaImporter(BufferAllocator allocator) { + this.allocator = allocator; + this.importer = new SchemaImporter(allocator); + } + + public BufferAllocator getAllocator() { + return allocator; + } + + public CDataDictionaryProvider getProvider() { + return provider; + } + + public Field importField(ArrowSchema schema) { + try { + return importer.importField(schema, provider); + } finally { + schema.release(); + schema.close(); + } + } + + /** + * Imports data from ArrowArray/ArrowSchema into a FieldVector. This is basically the same as Java + * Arrow `Data.importVector`. `Data.importVector` initiates `SchemaImporter` internally which is + * used to fill dictionary ids for dictionary encoded vectors. Every call to `importVector` will + * begin with dictionary ids starting from 0. So, separate calls to `importVector` will overwrite + * dictionary ids. To avoid this, we need to use the same `SchemaImporter` instance for all calls + * to `importVector`. + */ + public FieldVector importVector(ArrowArray array, ArrowSchema schema) { + Field field = importField(schema); + FieldVector vector = field.createVector(allocator); + Data.importIntoVector(allocator, array, vector, provider); + + return vector; + } + + public void close() { + provider.close(); + } +} diff --git a/common/src/main/java/org/apache/comet/parquet/BatchReader.java b/common/src/main/java/org/apache/comet/parquet/BatchReader.java index e3303e8ac..bf8e6e550 100644 --- a/common/src/main/java/org/apache/comet/parquet/BatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/BatchReader.java @@ -108,6 +108,7 @@ public class BatchReader extends RecordReader<Void, ColumnarBatch> implements Cl private MessageType requestedSchema; private CometVector[] vectors; private AbstractColumnReader[] columnReaders; + private CometSchemaImporter importer; private ColumnarBatch currentBatch; private Future<Option<Throwable>> prefetchTask; private LinkedBlockingQueue<Pair<PageReadStore, Long>> prefetchQueue; @@ -519,6 +520,10 @@ public void close() throws IOException { fileReader.close(); fileReader = null; } + if (importer != null) { + importer.close(); + importer = null; + } } @SuppressWarnings("deprecation") @@ -556,7 +561,8 @@ private boolean loadNextRowGroupIfNecessary() throws Throwable { numRowGroupsMetric.add(1); } - CometSchemaImporter importer = new CometSchemaImporter(ALLOCATOR); + if (importer != null) importer.close(); + importer = new CometSchemaImporter(ALLOCATOR); List<ColumnDescriptor> columns = requestedSchema.getColumns(); for (int i = 0; i < columns.size(); i++) { diff --git a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java index 033ca589b..3d4cb3aa5 100644 --- a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java @@ -39,7 +39,6 @@ import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.page.PageReader; import org.apache.parquet.schema.LogicalTypeAnnotation; -import org.apache.spark.sql.comet.util.Utils$; import org.apache.spark.sql.types.DataType; import org.apache.comet.CometConf; @@ -206,8 +205,7 @@ public CometDecodedVector loadVector() { try (ArrowArray array = ArrowArray.wrap(addresses[0]); ArrowSchema schema = ArrowSchema.wrap(addresses[1])) { - FieldVector vector = - Utils$.MODULE$.importVector(importer.getAllocator(), importer, array, schema); + FieldVector vector = importer.importVector(array, schema); DictionaryEncoding dictionaryEncoding = vector.getField().getDictionary(); diff --git a/common/src/main/scala/org/apache/arrow/c/CometSchemaImporter.scala b/common/src/main/scala/org/apache/arrow/c/CometSchemaImporter.scala deleted file mode 100644 index 3dedcddfd..000000000 --- a/common/src/main/scala/org/apache/arrow/c/CometSchemaImporter.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.arrow.c - -import org.apache.arrow.memory.BufferAllocator -import org.apache.arrow.vector.types.pojo.Field - -/** - * This is a simple wrapper around SchemaImporter to make it accessible from Java Arrow. - */ -class CometSchemaImporter(allocator: BufferAllocator) { - val importer = new SchemaImporter(allocator) - val provider = new CDataDictionaryProvider() - - def getAllocator(): BufferAllocator = allocator - - def getProvider(): CDataDictionaryProvider = provider - - def importField(schema: ArrowSchema): Field = { - try { - importer.importField(schema, provider) - } finally { - schema.release(); - schema.close(); - } - } -} diff --git a/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala b/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala index 5569b4b1a..2300e109a 100644 --- a/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala +++ b/common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala @@ -25,8 +25,7 @@ import java.nio.channels.Channels import scala.collection.JavaConverters._ -import org.apache.arrow.c.{ArrowArray, ArrowSchema, CDataDictionaryProvider, CometSchemaImporter, Data} -import org.apache.arrow.memory.BufferAllocator +import org.apache.arrow.c.CDataDictionaryProvider import org.apache.arrow.vector.{BigIntVector, BitVector, DateDayVector, DecimalVector, FieldVector, FixedSizeBinaryVector, Float4Vector, Float8Vector, IntVector, SmallIntVector, TimeStampMicroTZVector, TimeStampMicroVector, TinyIntVector, ValueVector, VarBinaryVector, VarCharVector, VectorSchemaRoot} import org.apache.arrow.vector.complex.MapVector import org.apache.arrow.vector.dictionary.DictionaryProvider @@ -39,7 +38,7 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream} -import org.apache.comet.vector._ +import org.apache.comet.vector.CometVector object Utils { def getConfPath(confFileName: String): String = { @@ -265,24 +264,4 @@ object Utils { throw new SparkException(s"Unsupported Arrow Vector for $reason: ${valueVector.getClass}") } } - - /** - * Imports data from ArrowArray/ArrowSchema into a FieldVector. This is basically the same as - * Java Arrow `Data.importVector`. `Data.importVector` initiates `SchemaImporter` internally - * which is used to fill dictionary ids for dictionary encoded vectors. Every call to - * `importVector` will begin with dictionary ids starting from 0. So, separate calls to - * `importVector` will overwrite dictionary ids. To avoid this, we need to use the same - * `SchemaImporter` instance for all calls to `importVector`. - */ - def importVector( - allocator: BufferAllocator, - importer: CometSchemaImporter, - array: ArrowArray, - schema: ArrowSchema): FieldVector = { - val field = importer.importField(schema) - val vector = field.createVector(allocator) - Data.importIntoVector(allocator, array, vector, importer.getProvider()) - - vector - } }