Skip to content

Commit

Permalink
convert some to unsafe
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Jun 11, 2024
1 parent 79d4e8f commit d4f4625
Show file tree
Hide file tree
Showing 2 changed files with 224 additions and 89 deletions.
91 changes: 91 additions & 0 deletions core/benches/parquet_decode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_array::{builder::StringBuilder, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use comet::execution::datafusion::expressions::cast::{Cast, EvalMode};
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion_physical_expr::{expressions::Column, PhysicalExpr};
use std::sync::Arc;

fn criterion_benchmark(c: &mut Criterion) {
let batch = create_utf8_batch();
let expr = Arc::new(Column::new("a", 0));
let timezone = "".to_string();
let cast_string_to_i8 = Cast::new(
expr.clone(),
DataType::Int8,
EvalMode::Legacy,
timezone.clone(),
);
let cast_string_to_i16 = Cast::new(
expr.clone(),
DataType::Int16,
EvalMode::Legacy,
timezone.clone(),
);
let cast_string_to_i32 = Cast::new(
expr.clone(),
DataType::Int32,
EvalMode::Legacy,
timezone.clone(),
);
let cast_string_to_i64 = Cast::new(expr, DataType::Int64, EvalMode::Legacy, timezone);

let mut group = c.benchmark_group("cast_string_to_int");
group.bench_function("cast_string_to_i8", |b| {
b.iter(|| cast_string_to_i8.evaluate(&batch).unwrap());
});
group.bench_function("cast_string_to_i16", |b| {
b.iter(|| cast_string_to_i16.evaluate(&batch).unwrap());
});
group.bench_function("cast_string_to_i32", |b| {
b.iter(|| cast_string_to_i32.evaluate(&batch).unwrap());
});
group.bench_function("cast_string_to_i64", |b| {
b.iter(|| cast_string_to_i64.evaluate(&batch).unwrap());
});
}

// Create UTF8 batch with strings representing ints, floats, nulls
fn create_utf8_batch() -> RecordBatch {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
let mut b = StringBuilder::new();
for i in 0..1000 {
if i % 10 == 0 {
b.append_null();
} else if i % 2 == 0 {
b.append_value(format!("{}", rand::random::<f64>()));
} else {
b.append_value(format!("{}", rand::random::<i64>()));
}
}
let array = b.finish();
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
batch
}

fn config() -> Criterion {
Criterion::default()
}

criterion_group! {
name = benches;
config = config();
targets = criterion_benchmark
}
criterion_main!(benches);
222 changes: 133 additions & 89 deletions core/src/parquet/read/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ impl PlainDecoding for UInt8Type {
fn decode(src: &mut PlainDecoderInner, dst: &mut ParquetMutableVector, num: usize) {
let src_data = &src.data;
let dst_slice = dst.value_buffer.as_slice_mut();
let dst_offset = dst.num_values;
let dst_offset = dst.num_values * 2;
copy_i32_to_u8(&src_data[src.offset..], &mut dst_slice[dst_offset..], num);
src.offset += 4 * num; // Parquet stores Int8/Int16 using 4 bytes
}
Expand Down Expand Up @@ -484,7 +484,7 @@ impl PlainDecoding for UInt16Type {
fn decode(src: &mut PlainDecoderInner, dst: &mut ParquetMutableVector, num: usize) {
let src_data = &src.data;
let dst_slice = dst.value_buffer.as_slice_mut();
let dst_offset = dst.num_values * 2;
let dst_offset = dst.num_values * 4;
copy_i32_to_u16(&src_data[src.offset..], &mut dst_slice[dst_offset..], num);
src.offset += 4 * num; // Parquet stores Int8/Int16 using 4 bytes
}
Expand All @@ -498,16 +498,104 @@ impl PlainDecoding for UInt32Type {
fn decode(src: &mut PlainDecoderInner, dst: &mut ParquetMutableVector, num: usize) {
let src_data = &src.data;
let dst_slice = dst.value_buffer.as_slice_mut();
let dst_offset = dst.num_values * 4;
let dst_offset = dst.num_values * 8;
copy_i32_to_u32(&src_data[src.offset..], &mut dst_slice[dst_offset..], num);
src.offset += 4 * num; // Parquet stores Int8/Int16 using 4 bytes
src.offset += 4 * num;
}

fn skip(src: &mut PlainDecoderInner, num: usize) {
src.offset += 4 * num;
}
}

fn copy_i32_to_i8(src: &[u8], dst: &mut [u8], num: usize) {
debug_assert!(src.len() >= num * 4, "Source slice is too small");
debug_assert!(dst.len() >= num, "Destination slice is too small");

let src_ptr = src.as_ptr() as *const i32;
let dst_ptr = dst.as_mut_ptr() as *mut i8;
unsafe {
for i in 0..num {
dst_ptr
.add(i)
.write_unaligned(src_ptr.add(i).read_unaligned() as i8);
}
}
}

fn copy_i32_to_u8(src: &[u8], dst: &mut [u8], num: usize) {
debug_assert!(src.len() >= num * 4, "Source slice is too small");
debug_assert!(dst.len() >= num * 2, "Destination slice is too small");

for i in 0..num {
let i32_value =
i32::from_le_bytes([src[i * 4], src[i * 4 + 1], src[i * 4 + 2], src[i * 4 + 3]]);

// Downcast to u8, potentially losing data
let u8_value = i32_value as u8;
let u8_bytes = u8_value.to_le_bytes();

dst[i * 2] = u8_bytes[0];
dst[i * 2 + 1] = 0;
}
}

pub fn copy_i32_to_i16(src: &[u8], dst: &mut [u8], num: usize) {
debug_assert!(src.len() >= num * 4, "Source slice is too small");
debug_assert!(dst.len() >= num * 2, "Destination slice is too small");

let src_ptr = src.as_ptr() as *const i32;
let dst_ptr = dst.as_mut_ptr() as *mut i16;
unsafe {
for i in 0..num {
dst_ptr
.add(i)
.write_unaligned(src_ptr.add(i).read_unaligned() as i16);
}
}
}

fn copy_i32_to_u16(src: &[u8], dst: &mut [u8], num: usize) {
debug_assert!(src.len() >= num * 4, "Source slice is too small");
debug_assert!(dst.len() >= num * 2, "Destination slice is too small");

for i in 0..num {
let i32_value =
i32::from_le_bytes([src[i * 4], src[i * 4 + 1], src[i * 4 + 2], src[i * 4 + 3]]);

// Downcast to u16, potentially losing data
let u16_value = i32_value as u16;
let u16_bytes = u16_value.to_le_bytes();

dst[i * 4] = u16_bytes[0];
dst[i * 4 + 1] = u16_bytes[1];
dst[i * 4 + 2] = 0;
dst[i * 4 + 3] = 0;
}
}

fn copy_i32_to_u32(src: &[u8], dst: &mut [u8], num: usize) {
debug_assert!(src.len() >= num * 4, "Source slice is too small");
debug_assert!(dst.len() >= num * 8, "Destination slice is too small");

for i in 0..num {
let i32_value =
i32::from_le_bytes([src[i * 4], src[i * 4 + 1], src[i * 4 + 2], src[i * 4 + 3]]);

// Downcast to u32, potentially losing data
let u32_value = i32_value as u32;
let u32_bytes = u32_value.to_le_bytes();

dst[i * 8] = u32_bytes[0];
dst[i * 8 + 1] = u32_bytes[1];
dst[i * 8 + 2] = u32_bytes[2];
dst[i * 8 + 3] = u32_bytes[3];
dst[i * 8 + 4] = 0;
dst[i * 8 + 5] = 0;
dst[i * 8 + 6] = 0;
dst[i * 8 + 7] = 0;
}
}

// Shared implementation for variants of Binary type
macro_rules! make_plain_binary_impl {
Expand Down Expand Up @@ -1014,91 +1102,6 @@ impl Decoder for DictDecoder {
}
}

fn copy_i32_to_i8(src: &[u8], dst: &mut [u8], num: usize) {
debug_assert!(src.len() >= num * 4, "Source slice is too small");
debug_assert!(dst.len() >= num * 1, "Destination slice is too small");

for i in 0..num {
let i32_value =
i32::from_le_bytes([src[i * 4], src[i * 4 + 1], src[i * 4 + 2], src[i * 4 + 3]]);

// Downcast to i8, potentially losing data
let i8_value = i32_value as i8;
let i8_bytes = i8_value.to_le_bytes();

dst[i] = i8_bytes[0];
}
}

fn copy_i32_to_u8(src: &[u8], dst: &mut [u8], num: usize) {
debug_assert!(src.len() >= num * 4, "Source slice is too small");
debug_assert!(dst.len() >= num * 1, "Destination slice is too small");

for i in 0..num {
let i32_value =
i32::from_le_bytes([src[i * 4], src[i * 4 + 1], src[i * 4 + 2], src[i * 4 + 3]]);

// Downcast to u8, potentially losing data
let u8_value = i32_value as u8;
let u8_bytes = u8_value.to_le_bytes();

dst[i] = u8_bytes[0];
}
}

fn copy_i32_to_i16(src: &[u8], dst: &mut [u8], num: usize) {
debug_assert!(src.len() >= num * 4, "Source slice is too small");
debug_assert!(dst.len() >= num * 2, "Destination slice is too small");

for i in 0..num {
let i32_value =
i32::from_le_bytes([src[i * 4], src[i * 4 + 1], src[i * 4 + 2], src[i * 4 + 3]]);

// Downcast to i16, potentially losing data
let i16_value = i32_value as i16;
let i16_bytes = i16_value.to_le_bytes();

dst[i * 2] = i16_bytes[0];
dst[i * 2 + 1] = i16_bytes[1];
}
}

fn copy_i32_to_u16(src: &[u8], dst: &mut [u8], num: usize) {
debug_assert!(src.len() >= num * 4, "Source slice is too small");
debug_assert!(dst.len() >= num * 2, "Destination slice is too small");

for i in 0..num {
let i32_value =
i32::from_le_bytes([src[i * 4], src[i * 4 + 1], src[i * 4 + 2], src[i * 4 + 3]]);

// Downcast to u16, potentially losing data
let u16_value = i32_value as u16;
let u16_bytes = u16_value.to_le_bytes();

dst[i * 2] = u16_bytes[0];
dst[i * 2 + 1] = u16_bytes[1];
}
}

fn copy_i32_to_u32(src: &[u8], dst: &mut [u8], num: usize) {
debug_assert!(src.len() >= num * 4, "Source slice is too small");
debug_assert!(dst.len() >= num * 2, "Destination slice is too small");

for i in 0..num {
let i32_value =
i32::from_le_bytes([src[i * 4], src[i * 4 + 1], src[i * 4 + 2], src[i * 4 + 3]]);

// Downcast to u32, potentially losing data
let u32_value = i32_value as u32;
let u32_bytes = u32_value.to_le_bytes();

dst[i * 2] = u32_bytes[0];
dst[i * 2 + 1] = u32_bytes[1];
dst[i * 2 + 2] = u32_bytes[2];
dst[i * 2 + 3] = u32_bytes[3];
}
}

#[cfg(test)]
mod test {
use super::*;
Expand All @@ -1116,6 +1119,18 @@ mod test {
assert_eq!(expected.as_bytes(), dest.as_bytes());
}

#[test]
fn test_i32_to_u8() {
let source =
hex::decode("8a000000dbffffff1800000034ffffff300000001d000000abffffff37fffffff1000000")
.unwrap();
let expected = hex::decode("8a00db001800340030001d00ab003700f100").unwrap();
let num = source.len() / 4;
let mut dest: Vec<u8> = vec![b' '; num * 2];
copy_i32_to_u8(&source.as_bytes(), dest.as_mut_slice(), num);
assert_eq!(expected.as_bytes(), dest.as_bytes());
}

#[test]
fn test_i32_to_i16() {
let source =
Expand All @@ -1127,4 +1142,33 @@ mod test {
copy_i32_to_i16(&source.as_bytes(), dest.as_mut_slice(), num);
assert_eq!(expected.as_bytes(), dest.as_bytes());
}

#[test]
fn test_i32_to_u16() {
let source = hex::decode(
"ff7f0000008000000180000002800000038000000480000005800000068000000780000008800000",
)
.unwrap();
let expected = hex::decode(
"ff7f0000008000000180000002800000038000000480000005800000068000000780000008800000",
)
.unwrap();
let num = source.len() / 4;
let mut dest: Vec<u8> = vec![b' '; num * 4];
copy_i32_to_u16(&source.as_bytes(), dest.as_mut_slice(), num);
assert_eq!(expected.as_bytes(), dest.as_bytes());
}

#[test]
fn test_i32_to_u32() {
let source = hex::decode(
"ffffff7f000000800100008002000080030000800400008005000080060000800700008008000080",
)
.unwrap();
let expected = hex::decode("ffffff7f00000000000000800000000001000080000000000200008000000000030000800000000004000080000000000500008000000000060000800000000007000080000000000800008000000000").unwrap();
let num = source.len() / 4;
let mut dest: Vec<u8> = vec![b' '; num * 8];
copy_i32_to_u32(&source.as_bytes(), dest.as_mut_slice(), num);
assert_eq!(expected.as_bytes(), dest.as_bytes());
}
}

0 comments on commit d4f4625

Please sign in to comment.