Skip to content

Commit

Permalink
fix: Re-implement some Parquet decode methods without `copy_nonoverla…
Browse files Browse the repository at this point in the history
…pping` (#558)

* Re-implement int decode methods using safe code

* fix

* fix tests

* more tests

* fix

* fix

* re-implement using unsafe read/write unaligned and add benchmark

* lint

* macros

* more macros

* combine macros

* replace another impl with the macro

* fix a regression

* remove zero_value arg from generate_cast_to_signed and rename impl_plain_decoding_int to the original name of make_int_variant_impl
  • Loading branch information
andygrove authored Jun 13, 2024
1 parent f95d386 commit 2fc45a2
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 44 deletions.
1 change: 1 addition & 0 deletions core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ criterion = "0.5.1"
jni = { version = "0.21", features = ["invocation"] }
lazy_static = "1.4"
assertables = "7"
hex = "0.4.3"

[features]
default = []
Expand Down Expand Up @@ -136,3 +137,7 @@ harness = false
[[bench]]
name = "shuffle_writer"
harness = false

[[bench]]
name = "parquet_decode"
harness = false
50 changes: 50 additions & 0 deletions core/benches/parquet_decode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_buffer::ToByteSlice;
use comet::parquet::read::values::{copy_i32_to_i16, copy_i32_to_u16};
use criterion::{criterion_group, criterion_main, Criterion};

fn criterion_benchmark(c: &mut Criterion) {
let num = 1000;
let source = vec![78_i8; num * 4];
let mut group = c.benchmark_group("parquet_decode");
group.bench_function("decode_i32_to_i16", |b| {
let mut dest: Vec<u8> = vec![b' '; num * 2];
b.iter(|| {
copy_i32_to_i16(source.to_byte_slice(), dest.as_mut_slice(), num);
});
});
group.bench_function("decode_i32_to_u16", |b| {
let mut dest: Vec<u8> = vec![b' '; num * 4];
b.iter(|| {
copy_i32_to_u16(source.to_byte_slice(), dest.as_mut_slice(), num);
});
});
}

// Create UTF8 batch with strings representing ints, floats, nulls
fn config() -> Criterion {
Criterion::default()
}

criterion_group! {
name = benches;
config = config();
targets = criterion_benchmark
}
criterion_main!(benches);
191 changes: 147 additions & 44 deletions core/src/parquet/read/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::{marker::PhantomData, mem, ptr::copy_nonoverlapping};
use std::{marker::PhantomData, mem};

use arrow::buffer::Buffer;
use bytes::Buf;
Expand Down Expand Up @@ -182,25 +182,6 @@ make_plain_dict_impl! { Int8Type, UInt8Type, Int16Type, UInt16Type, Int32Type, U
make_plain_dict_impl! { Int32DateType, Int64Type, FloatType, FLBAType }
make_plain_dict_impl! { DoubleType, Int64TimestampMillisType, Int64TimestampMicrosType }

impl PlainDecoding for Int32To64Type {
fn decode(src: &mut PlainDecoderInner, dst: &mut ParquetMutableVector, num: usize) {
let src_ptr = src.data.as_ptr() as *const i32;
let dst_ptr = dst.value_buffer.as_mut_ptr() as *mut i64;
unsafe {
for i in 0..num {
dst_ptr
.add(dst.num_values + i)
.write_unaligned(src_ptr.add(src.offset + i).read_unaligned() as i64);
}
}
src.offset += 4 * num;
}

fn skip(src: &mut PlainDecoderInner, num: usize) {
src.offset += 4 * num;
}
}

impl PlainDictDecoding for Int32To64Type {
fn decode_dict_one(
idx: usize,
Expand Down Expand Up @@ -438,41 +419,92 @@ impl PlainDictDecoding for BoolType {
}
}

// Shared implementation for int variants such as Int8 and Int16
macro_rules! make_int_variant_impl {
($ty: ident, $native_ty: ty, $type_size: expr) => {
impl PlainDecoding for $ty {
($dst_type:ty, $copy_fn:ident, $type_width:expr) => {
impl PlainDecoding for $dst_type {
fn decode(src: &mut PlainDecoderInner, dst: &mut ParquetMutableVector, num: usize) {
let src_data = &src.data;
let dst_slice = dst.value_buffer.as_slice_mut();
let mut dst_offset = dst.num_values * $type_size;
for _ in 0..num {
unsafe {
copy_nonoverlapping(
&src_data[src.offset..] as *const [u8] as *const u8
as *const $native_ty,
&mut dst_slice[dst_offset] as *mut u8 as *mut $native_ty,
1,
);
}
src.offset += 4; // Parquet stores Int8/Int16 using 4 bytes
dst_offset += $type_size;
}
let dst_offset = dst.num_values * $type_width;
$copy_fn(&src.data[src.offset..], &mut dst_slice[dst_offset..], num);
src.offset += 4 * num; // Parquet stores Int8/Int16 using 4 bytes
}

fn skip(src: &mut PlainDecoderInner, num: usize) {
let num_bytes = 4 * num; // Parquet stores Int8/Int16 using 4 bytes
src.offset += num_bytes;
src.offset += 4 * num; // Parquet stores Int8/Int16 using 4 bytes
}
}
};
}

make_int_variant_impl!(Int8Type, i8, 1);
make_int_variant_impl!(UInt8Type, u8, 2);
make_int_variant_impl!(Int16Type, i16, 2);
make_int_variant_impl!(UInt16Type, u16, 4);
make_int_variant_impl!(UInt32Type, u32, 8);
make_int_variant_impl!(Int8Type, copy_i32_to_i8, 1);
make_int_variant_impl!(Int16Type, copy_i32_to_i16, 2);
make_int_variant_impl!(Int32To64Type, copy_i32_to_i64, 4);

// unsigned type require double the width and zeroes are written for the second half
// perhaps because they are implemented as the next size up signed type?
make_int_variant_impl!(UInt8Type, copy_i32_to_u8, 2);
make_int_variant_impl!(UInt16Type, copy_i32_to_u16, 4);
make_int_variant_impl!(UInt32Type, copy_i32_to_u32, 8);

macro_rules! generate_cast_to_unsigned {
($name: ident, $src_type:ty, $dst_type:ty, $zero_value:expr) => {
pub fn $name(src: &[u8], dst: &mut [u8], num: usize) {
debug_assert!(
src.len() >= num * std::mem::size_of::<$src_type>(),
"Source slice is too small"
);
debug_assert!(
dst.len() >= num * std::mem::size_of::<$dst_type>() * 2,
"Destination slice is too small"
);

let src_ptr = src.as_ptr() as *const $src_type;
let dst_ptr = dst.as_mut_ptr() as *mut $dst_type;
unsafe {
for i in 0..num {
dst_ptr
.add(2 * i)
.write_unaligned(src_ptr.add(i).read_unaligned() as $dst_type);
// write zeroes
dst_ptr.add(2 * i + 1).write_unaligned($zero_value);
}
}
}
};
}

generate_cast_to_unsigned!(copy_i32_to_u8, i32, u8, 0_u8);
generate_cast_to_unsigned!(copy_i32_to_u16, i32, u16, 0_u16);
generate_cast_to_unsigned!(copy_i32_to_u32, i32, u32, 0_u32);

macro_rules! generate_cast_to_signed {
($name: ident, $src_type:ty, $dst_type:ty) => {
pub fn $name(src: &[u8], dst: &mut [u8], num: usize) {
debug_assert!(
src.len() >= num * std::mem::size_of::<$src_type>(),
"Source slice is too small"
);
debug_assert!(
dst.len() >= num * std::mem::size_of::<$dst_type>(),
"Destination slice is too small"
);

let src_ptr = src.as_ptr() as *const $src_type;
let dst_ptr = dst.as_mut_ptr() as *mut $dst_type;
unsafe {
for i in 0..num {
dst_ptr
.add(i)
.write_unaligned(src_ptr.add(i).read_unaligned() as $dst_type);
}
}
}
};
}

generate_cast_to_signed!(copy_i32_to_i8, i32, i8);
generate_cast_to_signed!(copy_i32_to_i16, i32, i16);
generate_cast_to_signed!(copy_i32_to_i64, i32, i64);

// Shared implementation for variants of Binary type
macro_rules! make_plain_binary_impl {
Expand Down Expand Up @@ -974,3 +1006,74 @@ impl Decoder for DictDecoder {
Encoding::RLE_DICTIONARY
}
}

#[cfg(test)]
mod test {
use super::*;
use parquet::data_type::AsBytes;

#[test]
fn test_i32_to_i8() {
let source =
hex::decode("8a000000dbffffff1800000034ffffff300000001d000000abffffff37fffffff1000000")
.unwrap();
let expected = hex::decode("8adb1834301dab37f1").unwrap();
let num = source.len() / 4;
let mut dest: Vec<u8> = vec![b' '; num];
copy_i32_to_i8(&source.as_bytes(), dest.as_mut_slice(), num);
assert_eq!(expected.as_bytes(), dest.as_bytes());
}

#[test]
fn test_i32_to_u8() {
let source =
hex::decode("8a000000dbffffff1800000034ffffff300000001d000000abffffff37fffffff1000000")
.unwrap();
let expected = hex::decode("8a00db001800340030001d00ab003700f100").unwrap();
let num = source.len() / 4;
let mut dest: Vec<u8> = vec![b' '; num * 2];
copy_i32_to_u8(&source.as_bytes(), dest.as_mut_slice(), num);
assert_eq!(expected.as_bytes(), dest.as_bytes());
}

#[test]
fn test_i32_to_i16() {
let source =
hex::decode("8a0e0000db93ffff1826000034f4ffff300200001d2b0000abe3ffff378dfffff1470000")
.unwrap();
let expected = hex::decode("8a0edb93182634f430021d2babe3378df147").unwrap();
let num = source.len() / 4;
let mut dest: Vec<u8> = vec![b' '; num * 2];
copy_i32_to_i16(&source.as_bytes(), dest.as_mut_slice(), num);
assert_eq!(expected.as_bytes(), dest.as_bytes());
}

#[test]
fn test_i32_to_u16() {
let source = hex::decode(
"ff7f0000008000000180000002800000038000000480000005800000068000000780000008800000",
)
.unwrap();
let expected = hex::decode(
"ff7f0000008000000180000002800000038000000480000005800000068000000780000008800000",
)
.unwrap();
let num = source.len() / 4;
let mut dest: Vec<u8> = vec![b' '; num * 4];
copy_i32_to_u16(&source.as_bytes(), dest.as_mut_slice(), num);
assert_eq!(expected.as_bytes(), dest.as_bytes());
}

#[test]
fn test_i32_to_u32() {
let source = hex::decode(
"ffffff7f000000800100008002000080030000800400008005000080060000800700008008000080",
)
.unwrap();
let expected = hex::decode("ffffff7f00000000000000800000000001000080000000000200008000000000030000800000000004000080000000000500008000000000060000800000000007000080000000000800008000000000").unwrap();
let num = source.len() / 4;
let mut dest: Vec<u8> = vec![b' '; num * 8];
copy_i32_to_u32(&source.as_bytes(), dest.as_mut_slice(), num);
assert_eq!(expected.as_bytes(), dest.as_bytes());
}
}

0 comments on commit 2fc45a2

Please sign in to comment.