From bf479c4ebddb2a017b5062a197624f7b58e66acd Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Wed, 1 May 2024 09:02:39 -0700 Subject: [PATCH] feat: Add DeleteColumnReader and PositionColumnReader --- .../comet/parquet/DeleteColumnReader.java | 46 ++++++++++++++++++ .../comet/parquet/PositionColumnReader.java | 47 +++++++++++++++++++ 2 files changed, 93 insertions(+) create mode 100644 common/src/main/java/org/apache/comet/parquet/DeleteColumnReader.java create mode 100644 common/src/main/java/org/apache/comet/parquet/PositionColumnReader.java diff --git a/common/src/main/java/org/apache/comet/parquet/DeleteColumnReader.java b/common/src/main/java/org/apache/comet/parquet/DeleteColumnReader.java new file mode 100644 index 000000000..16d069c53 --- /dev/null +++ b/common/src/main/java/org/apache/comet/parquet/DeleteColumnReader.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.parquet; + +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; + +/** A column reader for reading delete column */ +public class DeleteColumnReader extends MetadataColumnReader { + private boolean[] isDeleted; + + public DeleteColumnReader(boolean[] isDeleted) { + super( + DataTypes.BooleanType, + TypeUtil.convertToParquet( + new StructField("deleted", DataTypes.BooleanType, false, Metadata.empty())), + false); + this.isDeleted = isDeleted; + } + + @Override + public void readBatch(int total) { + Native.resetBatch(nativeHandle); + Native.setIsDeleted(nativeHandle, isDeleted); + + super.readBatch(total); + } +} diff --git a/common/src/main/java/org/apache/comet/parquet/PositionColumnReader.java b/common/src/main/java/org/apache/comet/parquet/PositionColumnReader.java new file mode 100644 index 000000000..03e3b9961 --- /dev/null +++ b/common/src/main/java/org/apache/comet/parquet/PositionColumnReader.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.parquet; + +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.spark.sql.types.DataTypes; + +/** A column reader for reading position column */ +public class PositionColumnReader extends MetadataColumnReader { + /** The current position value of the column that are used to initialize this column reader. */ + private long position; + + public PositionColumnReader(ColumnDescriptor descriptor) { + this(descriptor, 0L); + } + + PositionColumnReader(ColumnDescriptor descriptor, long position) { + super(DataTypes.LongType, descriptor, false); + this.position = position; + } + + @Override + public void readBatch(int total) { + Native.resetBatch(nativeHandle); + Native.setPosition(nativeHandle, position, total); + position += total; + + super.readBatch(total); + } +}