Skip to content

Commit

Permalink
chore(query): improve build keys state (#13004)
Browse files Browse the repository at this point in the history
* improve serialize_column_binary

* improve serialize_column_binary

* improve serialize string

* fix data_size

* add #Safety comments

* unsafe serialize_column_binary

* add some comments

* group by other_columns

* refine other_columns

* add Safety

* make lint

* remove unsafe

* add comments

* refine unsafe
  • Loading branch information
Dousir9 authored Sep 27, 2023
1 parent 446c6c5 commit 84b0451
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 68 deletions.
157 changes: 94 additions & 63 deletions src/query/expression/src/kernels/group_by_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,22 @@ use common_exception::ErrorCode;
use common_exception::Result;
use common_hashtable::DictionaryKeys;
use common_hashtable::FastHash;
use common_io::prelude::BinaryWrite;
use ethnum::i256;
use ethnum::u256;
use ethnum::U256;
use micromarshal::Marshal;

use crate::kernels::utils::copy_advance_aligned;
use crate::kernels::utils::set_vec_len_by_ptr;
use crate::kernels::utils::store_advance;
use crate::kernels::utils::store_advance_aligned;
use crate::types::boolean::BooleanType;
use crate::types::decimal::Decimal;
use crate::types::decimal::DecimalColumn;
use crate::types::nullable::NullableColumn;
use crate::types::number::Number;
use crate::types::number::NumberColumn;
use crate::types::string::StringColumn;
use crate::types::string::StringColumnBuilder;
use crate::types::string::StringIterator;
use crate::types::DataType;
use crate::types::DecimalDataType;
Expand Down Expand Up @@ -217,20 +219,20 @@ impl HashMethod for HashMethodSerializer {
fn build_keys_state(
&self,
group_columns: &[(Column, DataType)],
rows: usize,
num_rows: usize,
) -> Result<KeysState> {
let approx_size = group_columns.len() * rows * 8;
let mut builder = StringColumnBuilder::with_capacity(rows, approx_size);

for row in 0..rows {
for (col, _) in group_columns {
serialize_column_binary(col, row, &mut builder.data);
}
builder.commit_row();
// The serialize_size is equal to the number of bytes required by serialization.
let mut serialize_size = 0;
let mut serialize_columns = Vec::with_capacity(group_columns.len());
for (column, _) in group_columns {
serialize_size += column.serialize_size();
serialize_columns.push(column.clone());
}

let col = builder.build();
Ok(KeysState::Column(Column::String(col)))
Ok(KeysState::Column(Column::String(serialize_column(
&serialize_columns,
num_rows,
serialize_size,
))))
}

fn build_keys_iter<'a>(&self, key_state: &'a KeysState) -> Result<Self::HashKeyIter<'a>> {
Expand All @@ -257,42 +259,38 @@ impl HashMethod for HashMethodDictionarySerializer {
fn build_keys_state(
&self,
group_columns: &[(Column, DataType)],
rows: usize,
num_rows: usize,
) -> Result<KeysState> {
// fixed type serialize one column to dictionary
let mut dictionary_columns = Vec::with_capacity(group_columns.len());

let mut serialize_columns = Vec::new();
for (group_column, _) in group_columns {
if let Column::String(v) = group_column {
debug_assert_eq!(v.len(), rows);
dictionary_columns.push(v.clone());
} else if let Column::Variant(v) = group_column {
debug_assert_eq!(v.len(), rows);
dictionary_columns.push(v.clone());
match group_column {
Column::String(v) | Column::Variant(v) | Column::Bitmap(v) => {
debug_assert_eq!(v.len(), num_rows);
dictionary_columns.push(v.clone());
}
_ => serialize_columns.push(group_column.clone()),
}
}

if dictionary_columns.len() != group_columns.len() {
let approx_size = group_columns.len() * rows * 8;
let mut builder = StringColumnBuilder::with_capacity(rows, approx_size);

for row in 0..rows {
for (group_column, _) in group_columns {
if !matches!(group_column, Column::String(_) | Column::Variant(_)) {
serialize_column_binary(group_column, row, &mut builder.data);
}
}

builder.commit_row();
if !serialize_columns.is_empty() {
// The serialize_size is equal to the number of bytes required by serialization.
let mut serialize_size = 0;
for column in serialize_columns.iter() {
serialize_size += column.serialize_size();
}

dictionary_columns.push(builder.build());
dictionary_columns.push(serialize_column(
&serialize_columns,
num_rows,
serialize_size,
));
}

let mut keys = Vec::with_capacity(rows * dictionary_columns.len());
let mut points = Vec::with_capacity(rows * dictionary_columns.len());
let mut keys = Vec::with_capacity(num_rows * dictionary_columns.len());
let mut points = Vec::with_capacity(num_rows * dictionary_columns.len());

for row in 0..rows {
for row in 0..num_rows {
let start = points.len();

for dictionary_column in &dictionary_columns {
Expand Down Expand Up @@ -623,51 +621,84 @@ fn build(
Ok(())
}

/// The serialize_size is equal to the number of bytes required by serialization.
pub fn serialize_column(
columns: &[Column],
num_rows: usize,
serialize_size: usize,
) -> StringColumn {
// [`StringColumn`] consists of [`data`] and [`offset`], we build [`data`] and [`offset`] respectively,
// and then call `StringColumn::new(data.into(), offsets.into())` to create [`StringColumn`].
let mut data: Vec<u8> = Vec::with_capacity(serialize_size);
let mut offsets: Vec<u64> = Vec::with_capacity(num_rows + 1);
let mut data_ptr = data.as_mut_ptr();
let mut offsets_ptr = offsets.as_mut_ptr();
let mut offset = 0;

unsafe {
store_advance_aligned::<u64>(0, &mut offsets_ptr);
for i in 0..num_rows {
let old_ptr = data_ptr;
for col in columns.iter() {
serialize_column_binary(col, i, &mut data_ptr);
}
offset += data_ptr as u64 - old_ptr as u64;
store_advance_aligned::<u64>(offset, &mut offsets_ptr);
}
set_vec_len_by_ptr(&mut data, data_ptr);
set_vec_len_by_ptr(&mut offsets, offsets_ptr);
}

StringColumn::new(data.into(), offsets.into())
}

/// This function must be consistent with the `push_binary` function of `src/query/expression/src/values.rs`.
pub fn serialize_column_binary(column: &Column, row: usize, vec: &mut Vec<u8>) {
/// # Safety
///
/// * The size of the memory pointed by `row_space` is equal to the number of bytes required by serialization.
pub unsafe fn serialize_column_binary(column: &Column, row: usize, row_space: &mut *mut u8) {
match column {
Column::Null { .. } | Column::EmptyArray { .. } | Column::EmptyMap { .. } => {}
Column::Number(v) => with_number_mapped_type!(|NUM_TYPE| match v {
NumberColumn::NUM_TYPE(v) => vec.extend_from_slice(v[row].to_le_bytes().as_ref()),
NumberColumn::NUM_TYPE(v) => {
store_advance::<NUM_TYPE>(&v[row], row_space);
}
}),
Column::Boolean(v) => vec.push(v.get_bit(row) as u8),
Column::String(v) => {
BinaryWrite::write_binary(vec, unsafe { v.index_unchecked(row) }).unwrap()
}
Column::Decimal(_) => {
with_decimal_mapped_type!(|DECIMAL_TYPE| match column {
Column::Decimal(DecimalColumn::DECIMAL_TYPE(v, _)) =>
vec.extend_from_slice(v[row].to_le_bytes().as_ref()),
_ => unreachable!(),
Column::Decimal(v) => {
with_decimal_mapped_type!(|DECIMAL_TYPE| match v {
DecimalColumn::DECIMAL_TYPE(v, _) => {
store_advance::<DECIMAL_TYPE>(&v[row], row_space);
}
})
}
Column::Timestamp(v) => vec.extend_from_slice(v[row].to_le_bytes().as_ref()),
Column::Date(v) => vec.extend_from_slice(v[row].to_le_bytes().as_ref()),
Column::Boolean(v) => store_advance::<bool>(&v.get_bit(row), row_space),
Column::String(v) | Column::Bitmap(v) | Column::Variant(v) => {
let value = unsafe { v.index_unchecked(row) };
let len = value.len();
store_advance::<u64>(&(len as u64), row_space);
copy_advance_aligned::<u8>(value.as_ptr(), row_space, len);
}
Column::Timestamp(v) => store_advance::<i64>(&v[row], row_space),
Column::Date(v) => store_advance::<i32>(&v[row], row_space),
Column::Array(array) | Column::Map(array) => {
let data = array.index(row).unwrap();
BinaryWrite::write_uvarint(vec, data.len() as u64).unwrap();
store_advance::<u64>(&(data.len() as u64), row_space);
for i in 0..data.len() {
serialize_column_binary(&data, i, vec);
serialize_column_binary(&data, i, row_space);
}
}
Column::Bitmap(v) => {
BinaryWrite::write_binary(vec, unsafe { v.index_unchecked(row) }).unwrap()
}
Column::Nullable(c) => {
let valid = c.validity.get_bit(row);
vec.push(valid as u8);
store_advance::<bool>(&valid, row_space);
if valid {
serialize_column_binary(&c.column, row, vec);
serialize_column_binary(&c.column, row, row_space);
}
}
Column::Tuple(fields) => {
for inner_col in fields.iter() {
serialize_column_binary(inner_col, row, vec);
serialize_column_binary(inner_col, row, row_space);
}
}
Column::Variant(v) => {
BinaryWrite::write_binary(vec, unsafe { v.index_unchecked(row) }).unwrap()
}
}
}

Expand Down
1 change: 1 addition & 0 deletions src/query/expression/src/kernels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod take;
mod take_chunks;
mod take_compact;
mod topk;
mod utils;

pub use group_by::*;
pub use group_by_hash::*;
Expand Down
66 changes: 66 additions & 0 deletions src/query/expression/src/kernels/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/// # Safety
///
/// * `ptr` must be [valid] for writes of `size_of::<T>()` bytes.
/// * The region of memory beginning at `val` with a size of `size_of::<T>()`
/// bytes must *not* overlap with the region of memory beginning at `ptr`
/// with the same size.
#[inline(always)]
pub unsafe fn store_advance<T>(val: &T, ptr: &mut *mut u8) {
unsafe {
std::ptr::copy_nonoverlapping(val as *const T as *const u8, *ptr, std::mem::size_of::<T>());
*ptr = ptr.add(std::mem::size_of::<T>())
}
}

/// # Safety
///
/// * `ptr` must be [valid] for writes.
/// * `ptr` must be properly aligned.
#[inline(always)]
pub unsafe fn store_advance_aligned<T>(val: T, ptr: &mut *mut T) {
unsafe {
std::ptr::write(*ptr, val);
*ptr = ptr.add(1)
}
}

/// # Safety
///
/// * `src` must be [valid] for writes of `count * size_of::<T>()` bytes.
/// * `ptr` must be [valid] for writes of `count * size_of::<T>()` bytes.
/// * Both `src` and `dst` must be properly aligned.
/// * The region of memory beginning at `val` with a size of `count * size_of::<T>()`
/// bytes must *not* overlap with the region of memory beginning at `ptr` with the
/// same size.
#[inline(always)]
pub unsafe fn copy_advance_aligned<T>(src: *const T, ptr: &mut *mut T, count: usize) {
unsafe {
std::ptr::copy_nonoverlapping(src, *ptr, count);
*ptr = ptr.add(count);
}
}

/// # Safety
///
/// * `(ptr as usize - vec.as_ptr() as usize) / std::mem::size_of::<T>()` must be
/// less than or equal to the capacity of Vec.
#[inline(always)]
pub unsafe fn set_vec_len_by_ptr<T>(vec: &mut Vec<T>, ptr: *const T) {
unsafe {
vec.set_len((ptr as usize - vec.as_ptr() as usize) / std::mem::size_of::<T>());
}
}
1 change: 1 addition & 0 deletions src/query/expression/src/types/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ impl StringColumn {
/// # Safety
///
/// Calling this method with an out-of-bounds index is *[undefined behavior]*
#[inline]
pub unsafe fn index_unchecked(&self, index: usize) -> &[u8] {
&self.data[(self.offsets[index] as usize)..(self.offsets[index + 1] as usize)]
}
Expand Down
33 changes: 28 additions & 5 deletions src/query/expression/src/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1819,6 +1819,29 @@ impl Column {
}
}

pub fn serialize_size(&self) -> usize {
match self {
Column::Null { .. } | Column::EmptyArray { .. } | Column::EmptyMap { .. } => 0,
Column::Number(NumberColumn::UInt8(col)) => col.len(),
Column::Number(NumberColumn::UInt16(col)) => col.len() * 2,
Column::Number(NumberColumn::UInt32(col)) => col.len() * 4,
Column::Number(NumberColumn::UInt64(col)) => col.len() * 8,
Column::Number(NumberColumn::Float32(col)) => col.len() * 4,
Column::Number(NumberColumn::Float64(col)) => col.len() * 8,
Column::Number(NumberColumn::Int8(col)) => col.len(),
Column::Number(NumberColumn::Int16(col)) => col.len() * 2,
Column::Number(NumberColumn::Int32(col)) | Column::Date(col) => col.len() * 4,
Column::Number(NumberColumn::Int64(col)) | Column::Timestamp(col) => col.len() * 8,
Column::Decimal(DecimalColumn::Decimal128(col, _)) => col.len() * 16,
Column::Decimal(DecimalColumn::Decimal256(col, _)) => col.len() * 32,
Column::Boolean(c) => c.len(),
Column::String(col) | Column::Bitmap(col) | Column::Variant(col) => col.memory_size(),
Column::Array(col) | Column::Map(col) => col.values.serialize_size() + col.len() * 8,
Column::Nullable(c) => c.column.serialize_size() + c.len(),
Column::Tuple(fields) => fields.iter().map(|f| f.serialize_size()).sum(),
}
}

/// Returns (is_all_null, Option bitmap)
pub fn validity(&self) -> (bool, Option<&Bitmap>) {
match self {
Expand Down Expand Up @@ -2230,10 +2253,10 @@ impl ColumnBuilder {
ColumnBuilder::String(builder)
| ColumnBuilder::Variant(builder)
| ColumnBuilder::Bitmap(builder) => {
let offset: u64 = reader.read_uvarint()?;
builder.data.resize(offset as usize + builder.data.len(), 0);
let offset = reader.read_scalar::<u64>()? as usize;
builder.data.resize(offset + builder.data.len(), 0);
let last = *builder.offsets.last().unwrap() as usize;
reader.read_exact(&mut builder.data[last..last + offset as usize])?;
reader.read_exact(&mut builder.data[last..last + offset])?;
builder.commit_row();
}
ColumnBuilder::Timestamp(builder) => {
Expand All @@ -2246,7 +2269,7 @@ impl ColumnBuilder {
builder.push(value);
}
ColumnBuilder::Array(builder) => {
let len = reader.read_uvarint()?;
let len = reader.read_scalar::<u64>()?;
for _ in 0..len {
builder.builder.push_binary(reader)?;
}
Expand All @@ -2255,7 +2278,7 @@ impl ColumnBuilder {
ColumnBuilder::Map(builder) => {
const KEY: usize = 0;
const VALUE: usize = 1;
let len = reader.read_uvarint()?;
let len = reader.read_scalar::<u64>()?;
let map_builder = builder.builder.as_tuple_mut().unwrap();
for _ in 0..len {
map_builder[KEY].push_binary(reader)?;
Expand Down

1 comment on commit 84b0451

@vercel
Copy link

@vercel vercel bot commented on 84b0451 Sep 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.