diff --git a/src/query/expression/src/kernels/group_by_hash.rs b/src/query/expression/src/kernels/group_by_hash.rs index ee8e15bdee911..344e9366f378c 100644 --- a/src/query/expression/src/kernels/group_by_hash.rs +++ b/src/query/expression/src/kernels/group_by_hash.rs @@ -24,12 +24,15 @@ use common_exception::ErrorCode; use common_exception::Result; use common_hashtable::DictionaryKeys; use common_hashtable::FastHash; -use common_io::prelude::BinaryWrite; use ethnum::i256; use ethnum::u256; use ethnum::U256; use micromarshal::Marshal; +use crate::kernels::utils::copy_advance_aligned; +use crate::kernels::utils::set_vec_len_by_ptr; +use crate::kernels::utils::store_advance; +use crate::kernels::utils::store_advance_aligned; use crate::types::boolean::BooleanType; use crate::types::decimal::Decimal; use crate::types::decimal::DecimalColumn; @@ -37,7 +40,6 @@ use crate::types::nullable::NullableColumn; use crate::types::number::Number; use crate::types::number::NumberColumn; use crate::types::string::StringColumn; -use crate::types::string::StringColumnBuilder; use crate::types::string::StringIterator; use crate::types::DataType; use crate::types::DecimalDataType; @@ -217,20 +219,20 @@ impl HashMethod for HashMethodSerializer { fn build_keys_state( &self, group_columns: &[(Column, DataType)], - rows: usize, + num_rows: usize, ) -> Result { - let approx_size = group_columns.len() * rows * 8; - let mut builder = StringColumnBuilder::with_capacity(rows, approx_size); - - for row in 0..rows { - for (col, _) in group_columns { - serialize_column_binary(col, row, &mut builder.data); - } - builder.commit_row(); + // The serialize_size is equal to the number of bytes required by serialization. + let mut serialize_size = 0; + let mut serialize_columns = Vec::with_capacity(group_columns.len()); + for (column, _) in group_columns { + serialize_size += column.serialize_size(); + serialize_columns.push(column.clone()); } - - let col = builder.build(); - Ok(KeysState::Column(Column::String(col))) + Ok(KeysState::Column(Column::String(serialize_column( + &serialize_columns, + num_rows, + serialize_size, + )))) } fn build_keys_iter<'a>(&self, key_state: &'a KeysState) -> Result> { @@ -257,42 +259,38 @@ impl HashMethod for HashMethodDictionarySerializer { fn build_keys_state( &self, group_columns: &[(Column, DataType)], - rows: usize, + num_rows: usize, ) -> Result { // fixed type serialize one column to dictionary let mut dictionary_columns = Vec::with_capacity(group_columns.len()); - + let mut serialize_columns = Vec::new(); for (group_column, _) in group_columns { - if let Column::String(v) = group_column { - debug_assert_eq!(v.len(), rows); - dictionary_columns.push(v.clone()); - } else if let Column::Variant(v) = group_column { - debug_assert_eq!(v.len(), rows); - dictionary_columns.push(v.clone()); + match group_column { + Column::String(v) | Column::Variant(v) | Column::Bitmap(v) => { + debug_assert_eq!(v.len(), num_rows); + dictionary_columns.push(v.clone()); + } + _ => serialize_columns.push(group_column.clone()), } } - if dictionary_columns.len() != group_columns.len() { - let approx_size = group_columns.len() * rows * 8; - let mut builder = StringColumnBuilder::with_capacity(rows, approx_size); - - for row in 0..rows { - for (group_column, _) in group_columns { - if !matches!(group_column, Column::String(_) | Column::Variant(_)) { - serialize_column_binary(group_column, row, &mut builder.data); - } - } - - builder.commit_row(); + if !serialize_columns.is_empty() { + // The serialize_size is equal to the number of bytes required by serialization. + let mut serialize_size = 0; + for column in serialize_columns.iter() { + serialize_size += column.serialize_size(); } - - dictionary_columns.push(builder.build()); + dictionary_columns.push(serialize_column( + &serialize_columns, + num_rows, + serialize_size, + )); } - let mut keys = Vec::with_capacity(rows * dictionary_columns.len()); - let mut points = Vec::with_capacity(rows * dictionary_columns.len()); + let mut keys = Vec::with_capacity(num_rows * dictionary_columns.len()); + let mut points = Vec::with_capacity(num_rows * dictionary_columns.len()); - for row in 0..rows { + for row in 0..num_rows { let start = points.len(); for dictionary_column in &dictionary_columns { @@ -623,51 +621,84 @@ fn build( Ok(()) } +/// The serialize_size is equal to the number of bytes required by serialization. +pub fn serialize_column( + columns: &[Column], + num_rows: usize, + serialize_size: usize, +) -> StringColumn { + // [`StringColumn`] consists of [`data`] and [`offset`], we build [`data`] and [`offset`] respectively, + // and then call `StringColumn::new(data.into(), offsets.into())` to create [`StringColumn`]. + let mut data: Vec = Vec::with_capacity(serialize_size); + let mut offsets: Vec = Vec::with_capacity(num_rows + 1); + let mut data_ptr = data.as_mut_ptr(); + let mut offsets_ptr = offsets.as_mut_ptr(); + let mut offset = 0; + + unsafe { + store_advance_aligned::(0, &mut offsets_ptr); + for i in 0..num_rows { + let old_ptr = data_ptr; + for col in columns.iter() { + serialize_column_binary(col, i, &mut data_ptr); + } + offset += data_ptr as u64 - old_ptr as u64; + store_advance_aligned::(offset, &mut offsets_ptr); + } + set_vec_len_by_ptr(&mut data, data_ptr); + set_vec_len_by_ptr(&mut offsets, offsets_ptr); + } + + StringColumn::new(data.into(), offsets.into()) +} + /// This function must be consistent with the `push_binary` function of `src/query/expression/src/values.rs`. -pub fn serialize_column_binary(column: &Column, row: usize, vec: &mut Vec) { +/// # Safety +/// +/// * The size of the memory pointed by `row_space` is equal to the number of bytes required by serialization. +pub unsafe fn serialize_column_binary(column: &Column, row: usize, row_space: &mut *mut u8) { match column { Column::Null { .. } | Column::EmptyArray { .. } | Column::EmptyMap { .. } => {} Column::Number(v) => with_number_mapped_type!(|NUM_TYPE| match v { - NumberColumn::NUM_TYPE(v) => vec.extend_from_slice(v[row].to_le_bytes().as_ref()), + NumberColumn::NUM_TYPE(v) => { + store_advance::(&v[row], row_space); + } }), - Column::Boolean(v) => vec.push(v.get_bit(row) as u8), - Column::String(v) => { - BinaryWrite::write_binary(vec, unsafe { v.index_unchecked(row) }).unwrap() - } - Column::Decimal(_) => { - with_decimal_mapped_type!(|DECIMAL_TYPE| match column { - Column::Decimal(DecimalColumn::DECIMAL_TYPE(v, _)) => - vec.extend_from_slice(v[row].to_le_bytes().as_ref()), - _ => unreachable!(), + Column::Decimal(v) => { + with_decimal_mapped_type!(|DECIMAL_TYPE| match v { + DecimalColumn::DECIMAL_TYPE(v, _) => { + store_advance::(&v[row], row_space); + } }) } - Column::Timestamp(v) => vec.extend_from_slice(v[row].to_le_bytes().as_ref()), - Column::Date(v) => vec.extend_from_slice(v[row].to_le_bytes().as_ref()), + Column::Boolean(v) => store_advance::(&v.get_bit(row), row_space), + Column::String(v) | Column::Bitmap(v) | Column::Variant(v) => { + let value = unsafe { v.index_unchecked(row) }; + let len = value.len(); + store_advance::(&(len as u64), row_space); + copy_advance_aligned::(value.as_ptr(), row_space, len); + } + Column::Timestamp(v) => store_advance::(&v[row], row_space), + Column::Date(v) => store_advance::(&v[row], row_space), Column::Array(array) | Column::Map(array) => { let data = array.index(row).unwrap(); - BinaryWrite::write_uvarint(vec, data.len() as u64).unwrap(); + store_advance::(&(data.len() as u64), row_space); for i in 0..data.len() { - serialize_column_binary(&data, i, vec); + serialize_column_binary(&data, i, row_space); } } - Column::Bitmap(v) => { - BinaryWrite::write_binary(vec, unsafe { v.index_unchecked(row) }).unwrap() - } Column::Nullable(c) => { let valid = c.validity.get_bit(row); - vec.push(valid as u8); + store_advance::(&valid, row_space); if valid { - serialize_column_binary(&c.column, row, vec); + serialize_column_binary(&c.column, row, row_space); } } Column::Tuple(fields) => { for inner_col in fields.iter() { - serialize_column_binary(inner_col, row, vec); + serialize_column_binary(inner_col, row, row_space); } } - Column::Variant(v) => { - BinaryWrite::write_binary(vec, unsafe { v.index_unchecked(row) }).unwrap() - } } } diff --git a/src/query/expression/src/kernels/mod.rs b/src/query/expression/src/kernels/mod.rs index 7e4842295dcb1..d0c530c237707 100644 --- a/src/query/expression/src/kernels/mod.rs +++ b/src/query/expression/src/kernels/mod.rs @@ -22,6 +22,7 @@ mod take; mod take_chunks; mod take_compact; mod topk; +mod utils; pub use group_by::*; pub use group_by_hash::*; diff --git a/src/query/expression/src/kernels/utils.rs b/src/query/expression/src/kernels/utils.rs new file mode 100644 index 0000000000000..26b00c1f73c85 --- /dev/null +++ b/src/query/expression/src/kernels/utils.rs @@ -0,0 +1,66 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// # Safety +/// +/// * `ptr` must be [valid] for writes of `size_of::()` bytes. +/// * The region of memory beginning at `val` with a size of `size_of::()` +/// bytes must *not* overlap with the region of memory beginning at `ptr` +/// with the same size. +#[inline(always)] +pub unsafe fn store_advance(val: &T, ptr: &mut *mut u8) { + unsafe { + std::ptr::copy_nonoverlapping(val as *const T as *const u8, *ptr, std::mem::size_of::()); + *ptr = ptr.add(std::mem::size_of::()) + } +} + +/// # Safety +/// +/// * `ptr` must be [valid] for writes. +/// * `ptr` must be properly aligned. +#[inline(always)] +pub unsafe fn store_advance_aligned(val: T, ptr: &mut *mut T) { + unsafe { + std::ptr::write(*ptr, val); + *ptr = ptr.add(1) + } +} + +/// # Safety +/// +/// * `src` must be [valid] for writes of `count * size_of::()` bytes. +/// * `ptr` must be [valid] for writes of `count * size_of::()` bytes. +/// * Both `src` and `dst` must be properly aligned. +/// * The region of memory beginning at `val` with a size of `count * size_of::()` +/// bytes must *not* overlap with the region of memory beginning at `ptr` with the +/// same size. +#[inline(always)] +pub unsafe fn copy_advance_aligned(src: *const T, ptr: &mut *mut T, count: usize) { + unsafe { + std::ptr::copy_nonoverlapping(src, *ptr, count); + *ptr = ptr.add(count); + } +} + +/// # Safety +/// +/// * `(ptr as usize - vec.as_ptr() as usize) / std::mem::size_of::()` must be +/// less than or equal to the capacity of Vec. +#[inline(always)] +pub unsafe fn set_vec_len_by_ptr(vec: &mut Vec, ptr: *const T) { + unsafe { + vec.set_len((ptr as usize - vec.as_ptr() as usize) / std::mem::size_of::()); + } +} diff --git a/src/query/expression/src/types/string.rs b/src/query/expression/src/types/string.rs index 6f571119d8ad9..c386377407ab7 100644 --- a/src/query/expression/src/types/string.rs +++ b/src/query/expression/src/types/string.rs @@ -208,6 +208,7 @@ impl StringColumn { /// # Safety /// /// Calling this method with an out-of-bounds index is *[undefined behavior]* + #[inline] pub unsafe fn index_unchecked(&self, index: usize) -> &[u8] { &self.data[(self.offsets[index] as usize)..(self.offsets[index + 1] as usize)] } diff --git a/src/query/expression/src/values.rs b/src/query/expression/src/values.rs index e3773433a131b..ebc88039101b9 100755 --- a/src/query/expression/src/values.rs +++ b/src/query/expression/src/values.rs @@ -1819,6 +1819,29 @@ impl Column { } } + pub fn serialize_size(&self) -> usize { + match self { + Column::Null { .. } | Column::EmptyArray { .. } | Column::EmptyMap { .. } => 0, + Column::Number(NumberColumn::UInt8(col)) => col.len(), + Column::Number(NumberColumn::UInt16(col)) => col.len() * 2, + Column::Number(NumberColumn::UInt32(col)) => col.len() * 4, + Column::Number(NumberColumn::UInt64(col)) => col.len() * 8, + Column::Number(NumberColumn::Float32(col)) => col.len() * 4, + Column::Number(NumberColumn::Float64(col)) => col.len() * 8, + Column::Number(NumberColumn::Int8(col)) => col.len(), + Column::Number(NumberColumn::Int16(col)) => col.len() * 2, + Column::Number(NumberColumn::Int32(col)) | Column::Date(col) => col.len() * 4, + Column::Number(NumberColumn::Int64(col)) | Column::Timestamp(col) => col.len() * 8, + Column::Decimal(DecimalColumn::Decimal128(col, _)) => col.len() * 16, + Column::Decimal(DecimalColumn::Decimal256(col, _)) => col.len() * 32, + Column::Boolean(c) => c.len(), + Column::String(col) | Column::Bitmap(col) | Column::Variant(col) => col.memory_size(), + Column::Array(col) | Column::Map(col) => col.values.serialize_size() + col.len() * 8, + Column::Nullable(c) => c.column.serialize_size() + c.len(), + Column::Tuple(fields) => fields.iter().map(|f| f.serialize_size()).sum(), + } + } + /// Returns (is_all_null, Option bitmap) pub fn validity(&self) -> (bool, Option<&Bitmap>) { match self { @@ -2230,10 +2253,10 @@ impl ColumnBuilder { ColumnBuilder::String(builder) | ColumnBuilder::Variant(builder) | ColumnBuilder::Bitmap(builder) => { - let offset: u64 = reader.read_uvarint()?; - builder.data.resize(offset as usize + builder.data.len(), 0); + let offset = reader.read_scalar::()? as usize; + builder.data.resize(offset + builder.data.len(), 0); let last = *builder.offsets.last().unwrap() as usize; - reader.read_exact(&mut builder.data[last..last + offset as usize])?; + reader.read_exact(&mut builder.data[last..last + offset])?; builder.commit_row(); } ColumnBuilder::Timestamp(builder) => { @@ -2246,7 +2269,7 @@ impl ColumnBuilder { builder.push(value); } ColumnBuilder::Array(builder) => { - let len = reader.read_uvarint()?; + let len = reader.read_scalar::()?; for _ in 0..len { builder.builder.push_binary(reader)?; } @@ -2255,7 +2278,7 @@ impl ColumnBuilder { ColumnBuilder::Map(builder) => { const KEY: usize = 0; const VALUE: usize = 1; - let len = reader.read_uvarint()?; + let len = reader.read_scalar::()?; let map_builder = builder.builder.as_tuple_mut().unwrap(); for _ in 0..len { map_builder[KEY].push_binary(reader)?;