From 2fc45a2faea3c6dc4c2a5517d6e1e6b22b8ef1a0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 13 Jun 2024 15:38:42 -0600 Subject: [PATCH] fix: Re-implement some Parquet decode methods without `copy_nonoverlapping` (#558) * Re-implement int decode methods using safe code * fix * fix tests * more tests * fix * fix * re-implement using unsafe read/write unaligned and add benchmark * lint * macros * more macros * combine macros * replace another impl with the macro * fix a regression * remove zero_value arg from generate_cast_to_signed and rename impl_plain_decoding_int to the original name of make_int_variant_impl --- core/Cargo.lock | 1 + core/Cargo.toml | 5 + core/benches/parquet_decode.rs | 50 +++++++++ core/src/parquet/read/values.rs | 191 ++++++++++++++++++++++++-------- 4 files changed, 203 insertions(+), 44 deletions(-) create mode 100644 core/benches/parquet_decode.rs diff --git a/core/Cargo.lock b/core/Cargo.lock index 217ab98c9..eca8b97d4 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -878,6 +878,7 @@ dependencies = [ "futures", "half", "hashbrown", + "hex", "itertools 0.11.0", "jni", "lazy_static", diff --git a/core/Cargo.toml b/core/Cargo.toml index fe74b3554..04a45f6ca 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -93,6 +93,7 @@ criterion = "0.5.1" jni = { version = "0.21", features = ["invocation"] } lazy_static = "1.4" assertables = "7" +hex = "0.4.3" [features] default = [] @@ -136,3 +137,7 @@ harness = false [[bench]] name = "shuffle_writer" harness = false + +[[bench]] +name = "parquet_decode" +harness = false diff --git a/core/benches/parquet_decode.rs b/core/benches/parquet_decode.rs new file mode 100644 index 000000000..7176bf9c8 --- /dev/null +++ b/core/benches/parquet_decode.rs @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_buffer::ToByteSlice; +use comet::parquet::read::values::{copy_i32_to_i16, copy_i32_to_u16}; +use criterion::{criterion_group, criterion_main, Criterion}; + +fn criterion_benchmark(c: &mut Criterion) { + let num = 1000; + let source = vec![78_i8; num * 4]; + let mut group = c.benchmark_group("parquet_decode"); + group.bench_function("decode_i32_to_i16", |b| { + let mut dest: Vec = vec![b' '; num * 2]; + b.iter(|| { + copy_i32_to_i16(source.to_byte_slice(), dest.as_mut_slice(), num); + }); + }); + group.bench_function("decode_i32_to_u16", |b| { + let mut dest: Vec = vec![b' '; num * 4]; + b.iter(|| { + copy_i32_to_u16(source.to_byte_slice(), dest.as_mut_slice(), num); + }); + }); +} + +// Create UTF8 batch with strings representing ints, floats, nulls +fn config() -> Criterion { + Criterion::default() +} + +criterion_group! { + name = benches; + config = config(); + targets = criterion_benchmark +} +criterion_main!(benches); diff --git a/core/src/parquet/read/values.rs b/core/src/parquet/read/values.rs index 76c8a4a1d..7f1195fa9 100644 --- a/core/src/parquet/read/values.rs +++ b/core/src/parquet/read/values.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::{marker::PhantomData, mem, ptr::copy_nonoverlapping}; +use std::{marker::PhantomData, mem}; use arrow::buffer::Buffer; use bytes::Buf; @@ -182,25 +182,6 @@ make_plain_dict_impl! { Int8Type, UInt8Type, Int16Type, UInt16Type, Int32Type, U make_plain_dict_impl! { Int32DateType, Int64Type, FloatType, FLBAType } make_plain_dict_impl! { DoubleType, Int64TimestampMillisType, Int64TimestampMicrosType } -impl PlainDecoding for Int32To64Type { - fn decode(src: &mut PlainDecoderInner, dst: &mut ParquetMutableVector, num: usize) { - let src_ptr = src.data.as_ptr() as *const i32; - let dst_ptr = dst.value_buffer.as_mut_ptr() as *mut i64; - unsafe { - for i in 0..num { - dst_ptr - .add(dst.num_values + i) - .write_unaligned(src_ptr.add(src.offset + i).read_unaligned() as i64); - } - } - src.offset += 4 * num; - } - - fn skip(src: &mut PlainDecoderInner, num: usize) { - src.offset += 4 * num; - } -} - impl PlainDictDecoding for Int32To64Type { fn decode_dict_one( idx: usize, @@ -438,41 +419,92 @@ impl PlainDictDecoding for BoolType { } } -// Shared implementation for int variants such as Int8 and Int16 macro_rules! make_int_variant_impl { - ($ty: ident, $native_ty: ty, $type_size: expr) => { - impl PlainDecoding for $ty { + ($dst_type:ty, $copy_fn:ident, $type_width:expr) => { + impl PlainDecoding for $dst_type { fn decode(src: &mut PlainDecoderInner, dst: &mut ParquetMutableVector, num: usize) { - let src_data = &src.data; let dst_slice = dst.value_buffer.as_slice_mut(); - let mut dst_offset = dst.num_values * $type_size; - for _ in 0..num { - unsafe { - copy_nonoverlapping( - &src_data[src.offset..] as *const [u8] as *const u8 - as *const $native_ty, - &mut dst_slice[dst_offset] as *mut u8 as *mut $native_ty, - 1, - ); - } - src.offset += 4; // Parquet stores Int8/Int16 using 4 bytes - dst_offset += $type_size; - } + let dst_offset = dst.num_values * $type_width; + $copy_fn(&src.data[src.offset..], &mut dst_slice[dst_offset..], num); + src.offset += 4 * num; // Parquet stores Int8/Int16 using 4 bytes } fn skip(src: &mut PlainDecoderInner, num: usize) { - let num_bytes = 4 * num; // Parquet stores Int8/Int16 using 4 bytes - src.offset += num_bytes; + src.offset += 4 * num; // Parquet stores Int8/Int16 using 4 bytes } } }; } -make_int_variant_impl!(Int8Type, i8, 1); -make_int_variant_impl!(UInt8Type, u8, 2); -make_int_variant_impl!(Int16Type, i16, 2); -make_int_variant_impl!(UInt16Type, u16, 4); -make_int_variant_impl!(UInt32Type, u32, 8); +make_int_variant_impl!(Int8Type, copy_i32_to_i8, 1); +make_int_variant_impl!(Int16Type, copy_i32_to_i16, 2); +make_int_variant_impl!(Int32To64Type, copy_i32_to_i64, 4); + +// unsigned type require double the width and zeroes are written for the second half +// perhaps because they are implemented as the next size up signed type? +make_int_variant_impl!(UInt8Type, copy_i32_to_u8, 2); +make_int_variant_impl!(UInt16Type, copy_i32_to_u16, 4); +make_int_variant_impl!(UInt32Type, copy_i32_to_u32, 8); + +macro_rules! generate_cast_to_unsigned { + ($name: ident, $src_type:ty, $dst_type:ty, $zero_value:expr) => { + pub fn $name(src: &[u8], dst: &mut [u8], num: usize) { + debug_assert!( + src.len() >= num * std::mem::size_of::<$src_type>(), + "Source slice is too small" + ); + debug_assert!( + dst.len() >= num * std::mem::size_of::<$dst_type>() * 2, + "Destination slice is too small" + ); + + let src_ptr = src.as_ptr() as *const $src_type; + let dst_ptr = dst.as_mut_ptr() as *mut $dst_type; + unsafe { + for i in 0..num { + dst_ptr + .add(2 * i) + .write_unaligned(src_ptr.add(i).read_unaligned() as $dst_type); + // write zeroes + dst_ptr.add(2 * i + 1).write_unaligned($zero_value); + } + } + } + }; +} + +generate_cast_to_unsigned!(copy_i32_to_u8, i32, u8, 0_u8); +generate_cast_to_unsigned!(copy_i32_to_u16, i32, u16, 0_u16); +generate_cast_to_unsigned!(copy_i32_to_u32, i32, u32, 0_u32); + +macro_rules! generate_cast_to_signed { + ($name: ident, $src_type:ty, $dst_type:ty) => { + pub fn $name(src: &[u8], dst: &mut [u8], num: usize) { + debug_assert!( + src.len() >= num * std::mem::size_of::<$src_type>(), + "Source slice is too small" + ); + debug_assert!( + dst.len() >= num * std::mem::size_of::<$dst_type>(), + "Destination slice is too small" + ); + + let src_ptr = src.as_ptr() as *const $src_type; + let dst_ptr = dst.as_mut_ptr() as *mut $dst_type; + unsafe { + for i in 0..num { + dst_ptr + .add(i) + .write_unaligned(src_ptr.add(i).read_unaligned() as $dst_type); + } + } + } + }; +} + +generate_cast_to_signed!(copy_i32_to_i8, i32, i8); +generate_cast_to_signed!(copy_i32_to_i16, i32, i16); +generate_cast_to_signed!(copy_i32_to_i64, i32, i64); // Shared implementation for variants of Binary type macro_rules! make_plain_binary_impl { @@ -974,3 +1006,74 @@ impl Decoder for DictDecoder { Encoding::RLE_DICTIONARY } } + +#[cfg(test)] +mod test { + use super::*; + use parquet::data_type::AsBytes; + + #[test] + fn test_i32_to_i8() { + let source = + hex::decode("8a000000dbffffff1800000034ffffff300000001d000000abffffff37fffffff1000000") + .unwrap(); + let expected = hex::decode("8adb1834301dab37f1").unwrap(); + let num = source.len() / 4; + let mut dest: Vec = vec![b' '; num]; + copy_i32_to_i8(&source.as_bytes(), dest.as_mut_slice(), num); + assert_eq!(expected.as_bytes(), dest.as_bytes()); + } + + #[test] + fn test_i32_to_u8() { + let source = + hex::decode("8a000000dbffffff1800000034ffffff300000001d000000abffffff37fffffff1000000") + .unwrap(); + let expected = hex::decode("8a00db001800340030001d00ab003700f100").unwrap(); + let num = source.len() / 4; + let mut dest: Vec = vec![b' '; num * 2]; + copy_i32_to_u8(&source.as_bytes(), dest.as_mut_slice(), num); + assert_eq!(expected.as_bytes(), dest.as_bytes()); + } + + #[test] + fn test_i32_to_i16() { + let source = + hex::decode("8a0e0000db93ffff1826000034f4ffff300200001d2b0000abe3ffff378dfffff1470000") + .unwrap(); + let expected = hex::decode("8a0edb93182634f430021d2babe3378df147").unwrap(); + let num = source.len() / 4; + let mut dest: Vec = vec![b' '; num * 2]; + copy_i32_to_i16(&source.as_bytes(), dest.as_mut_slice(), num); + assert_eq!(expected.as_bytes(), dest.as_bytes()); + } + + #[test] + fn test_i32_to_u16() { + let source = hex::decode( + "ff7f0000008000000180000002800000038000000480000005800000068000000780000008800000", + ) + .unwrap(); + let expected = hex::decode( + "ff7f0000008000000180000002800000038000000480000005800000068000000780000008800000", + ) + .unwrap(); + let num = source.len() / 4; + let mut dest: Vec = vec![b' '; num * 4]; + copy_i32_to_u16(&source.as_bytes(), dest.as_mut_slice(), num); + assert_eq!(expected.as_bytes(), dest.as_bytes()); + } + + #[test] + fn test_i32_to_u32() { + let source = hex::decode( + "ffffff7f000000800100008002000080030000800400008005000080060000800700008008000080", + ) + .unwrap(); + let expected = hex::decode("ffffff7f00000000000000800000000001000080000000000200008000000000030000800000000004000080000000000500008000000000060000800000000007000080000000000800008000000000").unwrap(); + let num = source.len() / 4; + let mut dest: Vec = vec![b' '; num * 8]; + copy_i32_to_u32(&source.as_bytes(), dest.as_mut_slice(), num); + assert_eq!(expected.as_bytes(), dest.as_bytes()); + } +}