diff --git a/native/core/benches/parquet_decode.rs b/native/core/benches/parquet_decode.rs index 7176bf9c8..ed604b4e9 100644 --- a/native/core/benches/parquet_decode.rs +++ b/native/core/benches/parquet_decode.rs @@ -16,13 +16,14 @@ // under the License. use arrow_buffer::ToByteSlice; -use comet::parquet::read::values::{copy_i32_to_i16, copy_i32_to_u16}; +use comet::parquet::read::values::{copy_i32_to_i16, copy_i32_to_u16, copy_i64_to_i64}; use criterion::{criterion_group, criterion_main, Criterion}; fn criterion_benchmark(c: &mut Criterion) { let num = 1000; - let source = vec![78_i8; num * 4]; + let source = vec![78_i8; num * 8]; let mut group = c.benchmark_group("parquet_decode"); + /* group.bench_function("decode_i32_to_i16", |b| { let mut dest: Vec = vec![b' '; num * 2]; b.iter(|| { @@ -35,6 +36,13 @@ fn criterion_benchmark(c: &mut Criterion) { copy_i32_to_u16(source.to_byte_slice(), dest.as_mut_slice(), num); }); }); + */ + group.bench_function("decode_i64_to_i64", |b| { + let mut dest: Vec = vec![b' '; num * 8]; + b.iter(|| { + copy_i64_to_i64(source.to_byte_slice(), dest.as_mut_slice(), num); + }); + }); } // Create UTF8 batch with strings representing ints, floats, nulls diff --git a/native/core/src/parquet/data_type.rs b/native/core/src/parquet/data_type.rs index cd578e2a2..a1d0ce99a 100644 --- a/native/core/src/parquet/data_type.rs +++ b/native/core/src/parquet/data_type.rs @@ -45,8 +45,8 @@ make_type!(DoubleType); make_type!(FloatToDoubleType); make_type!(ByteArrayType); make_type!(StringType); -make_type!(Int32DecimalType); -make_type!(Int64DecimalType); +make_type!(Int32ToDecimal128Type); +make_type!(Int64ToDecimal128Type); make_type!(FLBADecimalType); make_type!(FLBADecimal32Type); make_type!(FLBADecimal64Type); diff --git a/native/core/src/parquet/read/column.rs b/native/core/src/parquet/read/column.rs index feb342719..8a4b51dd1 100644 --- a/native/core/src/parquet/read/column.rs +++ b/native/core/src/parquet/read/column.rs @@ -58,13 +58,13 @@ pub enum ColumnReader { Int32ToDecimal64ColumnReader(TypedColumnReader), Int32ToDoubleColumnReader(TypedColumnReader), UInt32ColumnReader(TypedColumnReader), - Int32DecimalColumnReader(TypedColumnReader), + Int32ToDecimal128ColumnReader(TypedColumnReader), Int32DateColumnReader(TypedColumnReader), Int32TimestampMicrosColumnReader(TypedColumnReader), Int64ColumnReader(TypedColumnReader), Int64ToDecimal64ColumnReader(TypedColumnReader), UInt64DecimalColumnReader(TypedColumnReader), - Int64DecimalColumnReader(TypedColumnReader), + Int64ToDecimal128ColumnReader(TypedColumnReader), Int64TimestampMillisColumnReader(TypedColumnReader), Int64TimestampMicrosColumnReader(TypedColumnReader), Int64TimestampNanosColumnReader(TypedColumnReader), @@ -145,7 +145,7 @@ impl ColumnReader { ) } else { typed_reader!( - Int32DecimalColumnReader, + Int32ToDecimal128ColumnReader, ArrowDataType::Decimal128( promotion_info.precision as u8, promotion_info.scale as i8 @@ -178,7 +178,7 @@ impl ColumnReader { ) } else { typed_reader!( - Int32DecimalColumnReader, + Int32ToDecimal128ColumnReader, ArrowDataType::Decimal128( promotion_info.precision as u8, promotion_info.scale as i8 @@ -205,7 +205,7 @@ impl ColumnReader { if use_decimal_128 || promotion_info.precision > DECIMAL_MAX_LONG_DIGITS { typed_reader!( - Int32DecimalColumnReader, + Int32ToDecimal128ColumnReader, ArrowDataType::Decimal128( promotion_info.precision as u8, promotion_info.scale as i8 @@ -254,7 +254,7 @@ impl ColumnReader { ) } else { typed_reader!( - Int32DecimalColumnReader, + Int32ToDecimal128ColumnReader, ArrowDataType::Decimal128( promotion_info.precision as u8, promotion_info.scale as i8 @@ -287,7 +287,7 @@ impl ColumnReader { if use_decimal_128 || promotion_info.precision > DECIMAL_MAX_LONG_DIGITS { typed_reader!( - Int64DecimalColumnReader, + Int64ToDecimal128ColumnReader, ArrowDataType::Decimal128( promotion_info.precision as u8, promotion_info.scale as i8 @@ -347,7 +347,7 @@ impl ColumnReader { typed_reader!(Int64ColumnReader, Int64) } else { typed_reader!( - Int64DecimalColumnReader, + Int64ToDecimal128ColumnReader, ArrowDataType::Decimal128( promotion_info.precision as u8, promotion_info.scale as i8 @@ -452,12 +452,12 @@ macro_rules! make_func { Self::Int32ToDoubleColumnReader(ref typed) => typed.$func($($args), *), Self::UInt32ColumnReader(ref typed) => typed.$func($($args),*), Self::Int32DateColumnReader(ref typed) => typed.$func($($args),*), - Self::Int32DecimalColumnReader(ref typed) => typed.$func($($args),*), + Self::Int32ToDecimal128ColumnReader(ref typed) => typed.$func($($args),*), Self::Int32TimestampMicrosColumnReader(ref typed) => typed.$func($($args),*), Self::Int64ColumnReader(ref typed) => typed.$func($($args),*), Self::Int64ToDecimal64ColumnReader(ref typed) => typed.$func($($args), *), Self::UInt64DecimalColumnReader(ref typed) => typed.$func($($args),*), - Self::Int64DecimalColumnReader(ref typed) => typed.$func($($args),*), + Self::Int64ToDecimal128ColumnReader(ref typed) => typed.$func($($args),*), Self::Int64TimestampMillisColumnReader(ref typed) => typed.$func($($args),*), Self::Int64TimestampMicrosColumnReader(ref typed) => typed.$func($($args),*), Self::Int64TimestampNanosColumnReader(ref typed) => typed.$func($($args),*), @@ -490,12 +490,12 @@ macro_rules! make_func_mut { Self::Int32ToDoubleColumnReader(ref mut typed) => typed.$func($($args), *), Self::UInt32ColumnReader(ref mut typed) => typed.$func($($args),*), Self::Int32DateColumnReader(ref mut typed) => typed.$func($($args),*), - Self::Int32DecimalColumnReader(ref mut typed) => typed.$func($($args),*), + Self::Int32ToDecimal128ColumnReader(ref mut typed) => typed.$func($($args),*), Self::Int32TimestampMicrosColumnReader(ref mut typed) => typed.$func($($args),*), Self::Int64ColumnReader(ref mut typed) => typed.$func($($args),*), Self::Int64ToDecimal64ColumnReader(ref mut typed) => typed.$func($($args), *), Self::UInt64DecimalColumnReader(ref mut typed) => typed.$func($($args),*), - Self::Int64DecimalColumnReader(ref mut typed) => typed.$func($($args),*), + Self::Int64ToDecimal128ColumnReader(ref mut typed) => typed.$func($($args),*), Self::Int64TimestampMillisColumnReader(ref mut typed) => typed.$func($($args),*), Self::Int64TimestampMicrosColumnReader(ref mut typed) => typed.$func($($args),*), Self::Int64TimestampNanosColumnReader(ref mut typed) => typed.$func($($args),*), diff --git a/native/core/src/parquet/read/values.rs b/native/core/src/parquet/read/values.rs index b89face0f..260ae3bf5 100644 --- a/native/core/src/parquet/read/values.rs +++ b/native/core/src/parquet/read/values.rs @@ -27,7 +27,7 @@ use crate::write_null; use crate::write_val_or_null; use crate::{ common::bit::{self, BitReader}, - parquet::{data_type::*, read::DECIMAL_BYTE_WIDTH, ParquetMutableVector}, + parquet::{data_type::*, ParquetMutableVector}, unlikely, }; use arrow::datatypes::DataType as ArrowDataType; @@ -211,9 +211,13 @@ macro_rules! make_int_variant_dict_impl { make_int_variant_dict_impl!(Int16ToDoubleType, i16, f64); make_int_variant_dict_impl!(Int32To64Type, i32, i64); make_int_variant_dict_impl!(Int32ToDecimal64Type, i32, i64); +make_int_variant_dict_impl!(Int32ToDecimal128Type, i32, i128); make_int_variant_dict_impl!(Int32ToDoubleType, i32, f64); make_int_variant_dict_impl!(Int32TimestampMicrosType, i32, i64); +make_int_variant_dict_impl!(Int64ToDecimal128Type, i64, i128); +make_int_variant_dict_impl!(UInt64Type, u64, u128); make_int_variant_dict_impl!(FloatToDoubleType, f32, f64); +make_int_variant_dict_impl!(FLBADecimalType, i128, i128); impl PlainDecoding for Int32DateType { fn decode(src: &mut PlainDecoderInner, dst: &mut ParquetMutableVector, num: usize) { @@ -483,7 +487,7 @@ macro_rules! make_int_decimal_variant_impl { impl PlainDecoding for $ty { fn decode(src: &mut PlainDecoderInner, dst: &mut ParquetMutableVector, num: usize) { let dst_slice = dst.value_buffer.as_slice_mut(); - let dst_offset = dst.num_values * 8; + let dst_offset = dst.num_values * std::mem::size_of::<$dst_type>(); $copy_fn(&src.data[src.offset..], &mut dst_slice[dst_offset..], num); let src_precision = src.desc.type_precision() as u32; @@ -532,7 +536,11 @@ macro_rules! make_int_decimal_variant_impl { }; } make_int_decimal_variant_impl!(Int32ToDecimal64Type, copy_i32_to_i64, 4, i64); +make_int_decimal_variant_impl!(Int32ToDecimal128Type, copy_i32_to_i128, 4, i128); make_int_decimal_variant_impl!(Int64ToDecimal64Type, copy_i64_to_i64, 8, i64); +make_int_decimal_variant_impl!(Int64ToDecimal128Type, copy_i64_to_i128, 8, i128); +make_int_decimal_variant_impl!(UInt64Type, copy_u64_to_u128, 8, u128); +make_int_decimal_variant_impl!(FLBADecimalType, copy_i128_to_i128, 8, i128); #[macro_export] macro_rules! write_val_or_null { @@ -612,22 +620,14 @@ macro_rules! generate_cast_to_signed { generate_cast_to_signed!(copy_i32_to_i8, i32, i8); generate_cast_to_signed!(copy_i32_to_i16, i32, i16); generate_cast_to_signed!(copy_i32_to_i64, i32, i64); +generate_cast_to_signed!(copy_i32_to_i128, i32, i128); generate_cast_to_signed!(copy_i32_to_f64, i32, f64); +generate_cast_to_signed!(copy_i64_to_i64, i64, i64); +generate_cast_to_signed!(copy_i64_to_i128, i64, i128); +generate_cast_to_signed!(copy_u64_to_u128, u64, u128); +generate_cast_to_signed!(copy_i128_to_i128, i128, i128); generate_cast_to_signed!(copy_f32_to_f64, f32, f64); -fn copy_i64_to_i64(src: &[u8], dst: &mut [u8], num: usize) { - debug_assert!( - src.len() >= num * std::mem::size_of::(), - "Source slice is too small" - ); - debug_assert!( - dst.len() >= num * std::mem::size_of::(), - "Destination slice is too small" - ); - - bit::memcpy_value(src, std::mem::size_of::() * num, dst); -} - // Shared implementation for variants of Binary type macro_rules! make_plain_binary_impl { ($($ty: ident), *) => { @@ -758,102 +758,6 @@ macro_rules! make_plain_dict_binary_impl { make_plain_dict_binary_impl! { ByteArrayType, StringType } -macro_rules! make_plain_decimal_impl { - ($is_signed: expr, $($ty: ident; $need_convert: expr), *) => { - $( - impl PlainDecoding for $ty { - fn decode(src: &mut PlainDecoderInner, dst: &mut ParquetMutableVector, num: usize) { - let byte_width = src.desc.type_length() as usize; - - let src_data = &src.data[src.offset..]; - let dst_data = &mut dst.value_buffer[dst.num_values * DECIMAL_BYTE_WIDTH..]; - - let mut src_offset = 0; - let mut dst_offset = 0; - - debug_assert!(byte_width <= DECIMAL_BYTE_WIDTH); - - let src_precision = src.desc.type_precision() as u32; - let src_scale = ::std::cmp::max(src.desc.type_scale(), 0) as u32; - let (dst_precision, dst_scale) = match dst.arrow_type { - ArrowDataType::Decimal128(p, s) if s >= 0 => (p as u32, s as u32), - _ => unreachable!(), - }; - let upper = 10_i128.pow(dst_precision); - let mul_div = 10_i128.pow(dst_scale.abs_diff(src_scale)); - - for i in 0..num { - let s = &mut dst_data[dst_offset..]; - - bit::memcpy( - &src_data[src_offset..src_offset + byte_width], - s, - ); - - // Swap the order of bytes to make it little-endian. - if $need_convert { - for i in 0..byte_width / 2 { - s.swap(i, byte_width - i - 1); - } - } - - if $is_signed { - // Check if the most significant bit is 1 (negative in 2's complement). - // If so, also fill pad the remaining bytes with 0xff. - if s[byte_width - 1] & 0x80 == 0x80 { - s[byte_width..DECIMAL_BYTE_WIDTH].fill(0xff); - } - } - - if dst_scale > src_scale { - let v = s.as_mut_ptr() as *mut i128; - unsafe { - // SAFETY the caller must ensure `i`th pointer is in bounds - write_val_or_null!(v, v.read_unaligned() * mul_div, upper, dst, i); - } - } else if dst_scale < src_scale { - let v = s.as_mut_ptr() as *mut i128; - unsafe { - // SAFETY the caller must ensure `i`th pointer is in bounds - write_val_or_null!(v, v.read_unaligned() / mul_div, upper, dst, i); - } - } else if src_precision > dst_precision { - let v = s.as_mut_ptr() as *mut i128; - write_null!(unsafe { v.read_unaligned() }, upper, dst, i); - } - - src_offset += byte_width; - dst_offset += DECIMAL_BYTE_WIDTH; - } - - src.offset += num * byte_width; - } - - #[inline] - fn skip(src: &mut PlainDecoderInner, num: usize) { - let num_bytes_to_skip = num * src.desc.type_length() as usize; - src.offset += num_bytes_to_skip; - } - } - - impl PlainDictDecoding for $ty { - fn decode_dict_one(_: usize, val_idx: usize, src: &ParquetMutableVector, dst: &mut ParquetMutableVector, _: usize) { - let src_offset = val_idx * DECIMAL_BYTE_WIDTH; - let dst_offset = dst.num_values * DECIMAL_BYTE_WIDTH; - - bit::memcpy( - &src.value_buffer[src_offset..src_offset + DECIMAL_BYTE_WIDTH], - &mut dst.value_buffer[dst_offset..dst_offset + DECIMAL_BYTE_WIDTH], - ); - } - } - )* - } -} - -make_plain_decimal_impl!(true, Int32DecimalType; false, Int64DecimalType; false, FLBADecimalType; true); -make_plain_decimal_impl!(false, UInt64Type; false); - macro_rules! make_plain_decimal_int_impl { ($($ty: ident; $num_bytes: expr), *) => { $(