From 479a97a7a40f13609d591a5a08c1265b1cc74fdd Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 27 May 2024 18:55:19 -0700 Subject: [PATCH] fix: CometReader.loadVector should not overwrite dictionary ids (#476) * fix: CometReader.loadVector should not overwrite dictionary ids * For review --- .../apache/arrow/c/CometSchemaImporter.java | 73 +++++++++++++++++++ .../org/apache/comet/parquet/BatchReader.java | 13 ++++ .../apache/comet/parquet/ColumnReader.java | 23 +++--- .../comet/parquet/LazyColumnReader.java | 4 +- .../java/org/apache/comet/parquet/Utils.java | 10 ++- 5 files changed, 107 insertions(+), 16 deletions(-) create mode 100644 common/src/main/java/org/apache/arrow/c/CometSchemaImporter.java diff --git a/common/src/main/java/org/apache/arrow/c/CometSchemaImporter.java b/common/src/main/java/org/apache/arrow/c/CometSchemaImporter.java new file mode 100644 index 000000000..32955f1ac --- /dev/null +++ b/common/src/main/java/org/apache/arrow/c/CometSchemaImporter.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.arrow.c; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.types.pojo.Field; + +/** This is a simple wrapper around SchemaImporter to make it accessible from Java Arrow. */ +public class CometSchemaImporter { + private final BufferAllocator allocator; + private final SchemaImporter importer; + private final CDataDictionaryProvider provider = new CDataDictionaryProvider(); + + public CometSchemaImporter(BufferAllocator allocator) { + this.allocator = allocator; + this.importer = new SchemaImporter(allocator); + } + + public BufferAllocator getAllocator() { + return allocator; + } + + public CDataDictionaryProvider getProvider() { + return provider; + } + + public Field importField(ArrowSchema schema) { + try { + return importer.importField(schema, provider); + } finally { + schema.release(); + schema.close(); + } + } + + /** + * Imports data from ArrowArray/ArrowSchema into a FieldVector. This is basically the same as Java + * Arrow `Data.importVector`. `Data.importVector` initiates `SchemaImporter` internally which is + * used to fill dictionary ids for dictionary encoded vectors. Every call to `importVector` will + * begin with dictionary ids starting from 0. So, separate calls to `importVector` will overwrite + * dictionary ids. To avoid this, we need to use the same `SchemaImporter` instance for all calls + * to `importVector`. + */ + public FieldVector importVector(ArrowArray array, ArrowSchema schema) { + Field field = importField(schema); + FieldVector vector = field.createVector(allocator); + Data.importIntoVector(allocator, array, vector, provider); + + return vector; + } + + public void close() { + provider.close(); + } +} diff --git a/common/src/main/java/org/apache/comet/parquet/BatchReader.java b/common/src/main/java/org/apache/comet/parquet/BatchReader.java index 9940390dc..bf8e6e550 100644 --- a/common/src/main/java/org/apache/comet/parquet/BatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/BatchReader.java @@ -37,6 +37,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.arrow.c.CometSchemaImporter; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -88,6 +91,7 @@ */ public class BatchReader extends RecordReader implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(FileReader.class); + protected static final BufferAllocator ALLOCATOR = new RootAllocator(); private Configuration conf; private int capacity; @@ -104,6 +108,7 @@ public class BatchReader extends RecordReader implements Cl private MessageType requestedSchema; private CometVector[] vectors; private AbstractColumnReader[] columnReaders; + private CometSchemaImporter importer; private ColumnarBatch currentBatch; private Future> prefetchTask; private LinkedBlockingQueue> prefetchQueue; @@ -515,6 +520,10 @@ public void close() throws IOException { fileReader.close(); fileReader = null; } + if (importer != null) { + importer.close(); + importer = null; + } } @SuppressWarnings("deprecation") @@ -552,6 +561,9 @@ private boolean loadNextRowGroupIfNecessary() throws Throwable { numRowGroupsMetric.add(1); } + if (importer != null) importer.close(); + importer = new CometSchemaImporter(ALLOCATOR); + List columns = requestedSchema.getColumns(); for (int i = 0; i < columns.size(); i++) { if (missingColumns[i]) continue; @@ -564,6 +576,7 @@ private boolean loadNextRowGroupIfNecessary() throws Throwable { Utils.getColumnReader( dataType, columns.get(i), + importer, capacity, useDecimal128, useLazyMaterialization, diff --git a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java index 50991d5b0..3d4cb3aa5 100644 --- a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java @@ -27,10 +27,7 @@ import org.apache.arrow.c.ArrowArray; import org.apache.arrow.c.ArrowSchema; -import org.apache.arrow.c.CDataDictionaryProvider; -import org.apache.arrow.c.Data; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.c.CometSchemaImporter; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.dictionary.Dictionary; import org.apache.arrow.vector.types.pojo.DictionaryEncoding; @@ -53,7 +50,6 @@ public class ColumnReader extends AbstractColumnReader { protected static final Logger LOG = LoggerFactory.getLogger(ColumnReader.class); - protected static final BufferAllocator ALLOCATOR = new RootAllocator(); /** * The current Comet vector holding all the values read by this column reader. Owned by this @@ -89,18 +85,19 @@ public class ColumnReader extends AbstractColumnReader { */ boolean hadNull; - /** Dictionary provider for this column. */ - private final CDataDictionaryProvider dictionaryProvider = new CDataDictionaryProvider(); + private final CometSchemaImporter importer; public ColumnReader( DataType type, ColumnDescriptor descriptor, + CometSchemaImporter importer, int batchSize, boolean useDecimal128, boolean useLegacyDateTimestamp) { super(type, descriptor, useDecimal128, useLegacyDateTimestamp); assert batchSize > 0 : "Batch size must be positive, found " + batchSize; this.batchSize = batchSize; + this.importer = importer; initNative(); } @@ -164,7 +161,6 @@ public void close() { currentVector.close(); currentVector = null; } - dictionaryProvider.close(); super.close(); } @@ -209,10 +205,11 @@ public CometDecodedVector loadVector() { try (ArrowArray array = ArrowArray.wrap(addresses[0]); ArrowSchema schema = ArrowSchema.wrap(addresses[1])) { - FieldVector vector = Data.importVector(ALLOCATOR, array, schema, dictionaryProvider); + FieldVector vector = importer.importVector(array, schema); + DictionaryEncoding dictionaryEncoding = vector.getField().getDictionary(); - CometPlainVector cometVector = new CometPlainVector(vector, useDecimal128, isUuid); + CometPlainVector cometVector = new CometPlainVector(vector, useDecimal128); // Update whether the current vector contains any null values. This is used in the following // batch(s) to determine whether we can skip loading the native vector. @@ -234,15 +231,17 @@ public CometDecodedVector loadVector() { // We should already re-initiate `CometDictionary` here because `Data.importVector` API will // release the previous dictionary vector and create a new one. - Dictionary arrowDictionary = dictionaryProvider.lookup(dictionaryEncoding.getId()); + Dictionary arrowDictionary = importer.getProvider().lookup(dictionaryEncoding.getId()); CometPlainVector dictionaryVector = new CometPlainVector(arrowDictionary.getVector(), useDecimal128, isUuid); dictionary = new CometDictionary(dictionaryVector); currentVector = new CometDictionaryVector( - cometVector, dictionary, dictionaryProvider, useDecimal128, false, isUuid); + cometVector, dictionary, importer.getProvider(), useDecimal128, false, isUuid); + currentVector = + new CometDictionaryVector(cometVector, dictionary, importer.getProvider(), useDecimal128); return currentVector; } } diff --git a/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java b/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java index a15d84192..dd08a88ab 100644 --- a/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java @@ -21,6 +21,7 @@ import java.io.IOException; +import org.apache.arrow.c.CometSchemaImporter; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReader; import org.apache.spark.sql.types.DataType; @@ -45,10 +46,11 @@ public class LazyColumnReader extends ColumnReader { public LazyColumnReader( DataType sparkReadType, ColumnDescriptor descriptor, + CometSchemaImporter importer, int batchSize, boolean useDecimal128, boolean useLegacyDateTimestamp) { - super(sparkReadType, descriptor, batchSize, useDecimal128, useLegacyDateTimestamp); + super(sparkReadType, descriptor, importer, batchSize, useDecimal128, useLegacyDateTimestamp); this.batchSize = 0; // the batch size is set later in `readBatch` this.vector = new CometLazyVector(sparkReadType, this, useDecimal128); } diff --git a/common/src/main/java/org/apache/comet/parquet/Utils.java b/common/src/main/java/org/apache/comet/parquet/Utils.java index 95ca06cda..99f3a4edd 100644 --- a/common/src/main/java/org/apache/comet/parquet/Utils.java +++ b/common/src/main/java/org/apache/comet/parquet/Utils.java @@ -19,6 +19,7 @@ package org.apache.comet.parquet; +import org.apache.arrow.c.CometSchemaImporter; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; @@ -28,26 +29,29 @@ public class Utils { public static ColumnReader getColumnReader( DataType type, ColumnDescriptor descriptor, + CometSchemaImporter importer, int batchSize, boolean useDecimal128, boolean useLazyMaterialization) { // TODO: support `useLegacyDateTimestamp` for Iceberg return getColumnReader( - type, descriptor, batchSize, useDecimal128, useLazyMaterialization, true); + type, descriptor, importer, batchSize, useDecimal128, useLazyMaterialization, true); } public static ColumnReader getColumnReader( DataType type, ColumnDescriptor descriptor, + CometSchemaImporter importer, int batchSize, boolean useDecimal128, boolean useLazyMaterialization, boolean useLegacyDateTimestamp) { if (useLazyMaterialization && supportLazyMaterialization(type)) { return new LazyColumnReader( - type, descriptor, batchSize, useDecimal128, useLegacyDateTimestamp); + type, descriptor, importer, batchSize, useDecimal128, useLegacyDateTimestamp); } else { - return new ColumnReader(type, descriptor, batchSize, useDecimal128, useLegacyDateTimestamp); + return new ColumnReader( + type, descriptor, importer, batchSize, useDecimal128, useLegacyDateTimestamp); } }