From 35f3c4ee4d3ede6f807811beaf8b2a59ed05376d Mon Sep 17 00:00:00 2001 From: Jk Xu <54522439+Dousir9@users.noreply.github.com> Date: Sun, 8 Oct 2023 13:42:24 +0800 Subject: [PATCH] refactor(query): improve hash join (#12928) * improve hash join * improve concat * improve take_string and add take_boolean * fix * improve concat * improve concat_string_types * improve take * improve filter * update * remove get_function_context * improve settings * allow too_many_arguments * merge * merge * refine primitive comments * refine * refine * refine take_compact * fix take_compact * add safety comment * fix take_compact_string * refine: use extend from iter and get_unchecked_mut * refine concat_primitive_types * reduce pr size * reduce pr size --- src/query/expression/src/kernels/concat.rs | 203 ++++++++++--- src/query/expression/src/kernels/filter.rs | 274 ++++++++++-------- src/query/expression/src/kernels/sort.rs | 2 +- src/query/expression/src/kernels/take.rs | 229 +++++++++++---- .../expression/src/kernels/take_chunks.rs | 173 +++++++++-- .../expression/src/kernels/take_compact.rs | 244 +++++++++++----- src/query/expression/src/types.rs | 9 + src/query/expression/tests/it/common.rs | 2 +- .../src/api/rpc/exchange/exchange_manager.rs | 2 + .../service/src/pipelines/pipeline_builder.rs | 120 ++++---- .../processors/transforms/hash_join/common.rs | 14 +- .../hash_join/hash_join_build_state.rs | 10 +- .../hash_join/hash_join_probe_state.rs | 12 + .../hash_join/probe_join/inner_join.rs | 17 +- .../hash_join/probe_join/left_join.rs | 27 +- .../hash_join/probe_join/left_mark.rs | 22 +- .../hash_join/probe_join/left_semi_join.rs | 38 ++- .../hash_join/probe_join/right_anti_join.rs | 12 +- .../hash_join/probe_join/right_join.rs | 14 +- .../hash_join/probe_join/right_mark.rs | 20 +- .../hash_join/probe_join/right_semi_join.rs | 12 +- .../transforms/hash_join/probe_state.rs | 10 +- .../processors/transforms/hash_join/row.rs | 2 + .../hash_join/transform_hash_join_probe.rs | 9 +- src/query/service/src/schedulers/scheduler.rs | 2 + .../optimizer/cascades/tasks/apply_rule.rs | 6 +- .../planner/optimizer/heuristic/heuristic.rs | 10 +- .../src/planner/optimizer/hyper_dp/dphyp.rs | 6 +- .../sql/src/planner/optimizer/rule/factory.rs | 7 +- .../fuse/src/statistics/cluster_statistics.rs | 2 +- 30 files changed, 1069 insertions(+), 441 deletions(-) diff --git a/src/query/expression/src/kernels/concat.rs b/src/query/expression/src/kernels/concat.rs index dbee0d8b3d5da..02ac02333d4fc 100644 --- a/src/query/expression/src/kernels/concat.rs +++ b/src/query/expression/src/kernels/concat.rs @@ -12,26 +12,30 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + +use common_arrow::arrow::bitmap::Bitmap; +use common_arrow::arrow::buffer::Buffer; use common_exception::ErrorCode; use common_exception::Result; use itertools::Itertools; +use crate::kernels::take::BIT_MASK; +use crate::kernels::utils::copy_advance_aligned; +use crate::kernels::utils::set_vec_len_by_ptr; use crate::types::array::ArrayColumnBuilder; use crate::types::decimal::DecimalColumn; use crate::types::map::KvColumnBuilder; use crate::types::nullable::NullableColumn; use crate::types::number::NumberColumn; -use crate::types::string::StringColumnBuilder; +use crate::types::string::StringColumn; use crate::types::AnyType; use crate::types::ArgType; use crate::types::ArrayType; use crate::types::BitmapType; use crate::types::BooleanType; use crate::types::DateType; -use crate::types::EmptyArrayType; -use crate::types::EmptyMapType; use crate::types::MapType; -use crate::types::NullType; use crate::types::NullableType; use crate::types::NumberType; use crate::types::StringType; @@ -102,42 +106,81 @@ impl Column { let capacity = columns.iter().map(|c| c.len()).sum(); match &columns[0] { - Column::Null { .. } => Self::concat_arg_types::(columns), - Column::EmptyArray { .. } => Self::concat_arg_types::(columns), - Column::EmptyMap { .. } => Self::concat_arg_types::(columns), + Column::Null { .. } => Column::Null { len: capacity }, + Column::EmptyArray { .. } => Column::EmptyArray { len: capacity }, + Column::EmptyMap { .. } => Column::EmptyMap { len: capacity }, Column::Number(col) => with_number_mapped_type!(|NUM_TYPE| match col { NumberColumn::NUM_TYPE(_) => { - Self::concat_arg_types::>(columns) + let columns = columns + .iter() + .map(|col| >::try_downcast_column(col).unwrap()) + .collect_vec(); + let builder = Self::concat_primitive_types(&columns, capacity); + >::upcast_column(>::column_from_vec( + builder, + &[], + )) } }), Column::Decimal(col) => with_decimal_type!(|DECIMAL_TYPE| match col { DecimalColumn::DECIMAL_TYPE(_, size) => { - let mut builder = Vec::with_capacity(capacity); - for c in columns { - match c { - Column::Decimal(DecimalColumn::DECIMAL_TYPE(col, size)) => { - debug_assert_eq!(size, size); - builder.extend_from_slice(col); - } + let columns = columns + .iter() + .map(|col| match col { + Column::Decimal(DecimalColumn::DECIMAL_TYPE(col, _)) => col.clone(), _ => unreachable!(), - } - } + }) + .collect_vec(); + let builder = Self::concat_primitive_types(&columns, capacity); Column::Decimal(DecimalColumn::DECIMAL_TYPE(builder.into(), *size)) } }), - Column::Boolean(_) => Self::concat_arg_types::(columns), + Column::Boolean(_) => { + let columns = columns + .iter() + .map(|col| BooleanType::try_downcast_column(col).unwrap()) + .collect_vec(); + Column::Boolean(Self::concat_boolean_types(&columns, capacity)) + } Column::String(_) => { - let data_capacity = columns.iter().map(|c| c.memory_size() - c.len() * 8).sum(); - let builder = StringColumnBuilder::with_capacity(capacity, data_capacity); - Self::concat_value_types::(builder, columns) + let columns = columns + .iter() + .map(|col| StringType::try_downcast_column(col).unwrap()) + .collect_vec(); + StringType::upcast_column(Self::concat_string_types(&columns, capacity)) } Column::Timestamp(_) => { - let builder = Vec::with_capacity(capacity); - Self::concat_value_types::(builder, columns) + let columns = columns + .iter() + .map(|col| TimestampType::try_downcast_column(col).unwrap()) + .collect_vec(); + let builder = Self::concat_primitive_types(&columns, capacity); + let ts = >::upcast_column(>::column_from_vec( + builder, + &[], + )) + .into_number() + .unwrap() + .into_int64() + .unwrap(); + Column::Timestamp(ts) } Column::Date(_) => { - let builder = Vec::with_capacity(capacity); - Self::concat_value_types::(builder, columns) + let columns = columns + .iter() + .map(|col| DateType::try_downcast_column(col).unwrap()) + .collect_vec(); + + let builder = Self::concat_primitive_types(&columns, capacity); + let d = >::upcast_column(>::column_from_vec( + builder, + &[], + )) + .into_number() + .unwrap() + .into_int32() + .unwrap(); + Column::Date(d) } Column::Array(col) => { let mut offsets = Vec::with_capacity(capacity + 1); @@ -164,9 +207,11 @@ impl Column { Self::concat_value_types::>(builder, columns) } Column::Bitmap(_) => { - let data_capacity = columns.iter().map(|c| c.memory_size() - c.len() * 8).sum(); - let builder = StringColumnBuilder::with_capacity(capacity, data_capacity); - Self::concat_value_types::(builder, columns) + let columns = columns + .iter() + .map(|col| BitmapType::try_downcast_column(col).unwrap()) + .collect_vec(); + BitmapType::upcast_column(Self::concat_string_types(&columns, capacity)) } Column::Nullable(_) => { let mut bitmaps = Vec::with_capacity(columns.len()); @@ -178,7 +223,11 @@ impl Column { } let column = Self::concat(&inners); - let validity = Self::concat_arg_types::(&bitmaps); + let bitmaps = bitmaps + .iter() + .map(|col| BooleanType::try_downcast_column(col).unwrap()) + .collect_vec(); + let validity = Column::Boolean(Self::concat_boolean_types(&bitmaps, capacity)); let validity = BooleanType::try_downcast_column(&validity).unwrap(); Column::Nullable(Box::new(NullableColumn { column, validity })) @@ -196,21 +245,95 @@ impl Column { Column::Tuple(fields) } Column::Variant(_) => { - let data_capacity = columns.iter().map(|c| c.memory_size() - c.len() * 8).sum(); - let builder = StringColumnBuilder::with_capacity(capacity, data_capacity); - Self::concat_value_types::(builder, columns) + let columns = columns + .iter() + .map(|col| VariantType::try_downcast_column(col).unwrap()) + .collect_vec(); + VariantType::upcast_column(Self::concat_string_types(&columns, capacity)) } } } - fn concat_arg_types(columns: &[Column]) -> Column { - let columns: Vec = columns - .iter() - .map(|c| T::try_downcast_column(c).unwrap()) - .collect(); - let iter = columns.iter().flat_map(|c| T::iter_column(c)); - let result = T::column_from_ref_iter(iter, &[]); - T::upcast_column(result) + pub fn concat_primitive_types(cols: &[Buffer], num_rows: usize) -> Vec + where T: Copy { + let mut builder: Vec = Vec::with_capacity(num_rows); + for col in cols { + builder.extend(col.iter()); + } + builder + } + + pub fn concat_string_types<'a>(cols: &'a [StringColumn], num_rows: usize) -> StringColumn { + // [`StringColumn`] consists of [`data`] and [`offset`], we build [`data`] and [`offset`] respectively, + // and then call `StringColumn::new(data.into(), offsets.into())` to create [`StringColumn`]. + let mut offsets: Vec = Vec::with_capacity(num_rows + 1); + let mut offsets_len = 0; + let mut data_size = 0; + + // Build [`offset`] and calculate `data_size` required by [`data`]. + unsafe { + *offsets.get_unchecked_mut(offsets_len) = 0; + offsets_len += 1; + for col in cols.iter() { + let mut start = 0; + for end in col.offsets()[1..].iter() { + data_size += end - start; + start = *end; + *offsets.get_unchecked_mut(offsets_len) = data_size; + offsets_len += 1; + } + } + offsets.set_len(offsets_len); + } + + // Build [`data`]. + let mut data: Vec = Vec::with_capacity(data_size as usize); + let mut data_ptr = data.as_mut_ptr(); + + unsafe { + for col in cols.iter() { + let col_data = col.data().as_slice(); + copy_advance_aligned(col_data.as_ptr(), &mut data_ptr, col_data.len()); + } + set_vec_len_by_ptr(&mut data, data_ptr); + } + + StringColumn::new(data.into(), offsets.into()) + } + + pub fn concat_boolean_types(cols: &[Bitmap], num_rows: usize) -> Bitmap { + let capacity = num_rows.saturating_add(7) / 8; + let mut builder: Vec = Vec::with_capacity(capacity); + let mut builder_len = 0; + let mut unset_bits = 0; + let mut value = 0; + let mut i = 0; + + unsafe { + for col in cols { + for item in col.iter() { + if item { + value |= BIT_MASK[i % 8]; + } else { + unset_bits += 1; + } + i += 1; + if i % 8 == 0 { + *builder.get_unchecked_mut(builder_len) = value; + builder_len += 1; + value = 0; + } + } + } + if i % 8 != 0 { + *builder.get_unchecked_mut(builder_len) = value; + builder_len += 1; + } + builder.set_len(builder_len); + Bitmap::from_inner(Arc::new(builder.into()), 0, num_rows, unset_bits) + .ok() + .unwrap() + } } fn concat_value_types( diff --git a/src/query/expression/src/kernels/filter.rs b/src/query/expression/src/kernels/filter.rs index 6b962798a9d9e..d5f8344aa31f4 100644 --- a/src/query/expression/src/kernels/filter.rs +++ b/src/query/expression/src/kernels/filter.rs @@ -19,6 +19,9 @@ use common_arrow::arrow::bitmap::MutableBitmap; use common_arrow::arrow::buffer::Buffer; use common_exception::Result; +use crate::kernels::utils::copy_advance_aligned; +use crate::kernels::utils::set_vec_len_by_ptr; +use crate::kernels::utils::store_advance_aligned; use crate::types::array::ArrayColumn; use crate::types::array::ArrayColumnBuilder; use crate::types::decimal::DecimalColumn; @@ -26,13 +29,11 @@ use crate::types::map::KvColumnBuilder; use crate::types::nullable::NullableColumn; use crate::types::number::NumberColumn; use crate::types::string::StringColumn; -use crate::types::string::StringColumnBuilder; use crate::types::AnyType; use crate::types::ArrayType; use crate::types::BooleanType; use crate::types::MapType; use crate::types::ValueType; -use crate::types::VariantType; use crate::with_decimal_type; use crate::with_number_type; use crate::BlockEntry; @@ -178,14 +179,8 @@ impl Column { Column::Tuple(fields) } Column::Variant(column) => { - let bytes_per_row = column.data().len() / filter.len().max(1); - let data_capacity = (filter.len() - filter.unset_bits()) * bytes_per_row; - - Self::filter_scalar_types::( - column, - StringColumnBuilder::with_capacity(length, data_capacity), - filter, - ) + let column = Self::filter_string_scalars(column, filter); + Column::Variant(column) } } } @@ -246,154 +241,187 @@ impl Column { // low-level API using unsafe to improve performance fn filter_primitive_types(values: &Buffer, filter: &Bitmap) -> Buffer { debug_assert_eq!(values.len(), filter.len()); - let selected = filter.len() - filter.unset_bits(); - if selected == values.len() { + let num_rows = filter.len() - filter.unset_bits(); + if num_rows == values.len() { return values.clone(); } - let mut values = values.as_slice(); - let mut new = Vec::::with_capacity(selected); - let mut dst = new.as_mut_ptr(); + let mut builder: Vec = Vec::with_capacity(num_rows); + let mut ptr = builder.as_mut_ptr(); + let mut values_ptr = values.as_slice().as_ptr(); let (mut slice, offset, mut length) = filter.as_slice(); - if offset > 0 { - // Consume the offset - let n = 8 - offset; - values - .iter() - .zip(filter.iter()) - .take(n) - .for_each(|(value, is_selected)| { - if is_selected { - unsafe { - dst.write(*value); - dst = dst.add(1); - } - } - }); - slice = &slice[1..]; - length -= n; - values = &values[n..]; - } - const CHUNK_SIZE: usize = 64; - let mut chunks = values.chunks_exact(CHUNK_SIZE); - let mut mask_chunks = BitChunksExact::::new(slice, length); + unsafe { + if offset > 0 { + let mut mask = slice[0]; + while mask != 0 { + let n = mask.trailing_zeros() as usize; + if n >= offset { + copy_advance_aligned(values_ptr.add(n - offset), &mut ptr, 1); + } + mask = mask & (mask - 1); + } + length -= 8 - offset; + slice = &slice[1..]; + values_ptr = values_ptr.add(8 - offset); + } - chunks - .by_ref() - .zip(mask_chunks.by_ref()) - .for_each(|(chunk, mut mask)| { + const CHUNK_SIZE: usize = 64; + let mut mask_chunks = BitChunksExact::::new(slice, length); + let mut continuous_selected = 0; + for mut mask in mask_chunks.by_ref() { if mask == u64::MAX { - unsafe { - std::ptr::copy(chunk.as_ptr(), dst, CHUNK_SIZE); - dst = dst.add(CHUNK_SIZE); - } + continuous_selected += CHUNK_SIZE; } else { + if continuous_selected > 0 { + copy_advance_aligned(values_ptr, &mut ptr, continuous_selected); + values_ptr = values_ptr.add(continuous_selected); + continuous_selected = 0; + } while mask != 0 { let n = mask.trailing_zeros() as usize; - unsafe { - dst.write(chunk[n]); - dst = dst.add(1); - } + copy_advance_aligned(values_ptr.add(n), &mut ptr, 1); mask = mask & (mask - 1); } + values_ptr = values_ptr.add(CHUNK_SIZE); } - }); + } + if continuous_selected > 0 { + copy_advance_aligned(values_ptr, &mut ptr, continuous_selected); + values_ptr = values_ptr.add(continuous_selected); + } - chunks - .remainder() - .iter() - .zip(mask_chunks.remainder_iter()) - .for_each(|(value, is_selected)| { + for (i, is_selected) in mask_chunks.remainder_iter().enumerate() { if is_selected { - unsafe { - dst.write(*value); - dst = dst.add(1); - } + copy_advance_aligned(values_ptr.add(i), &mut ptr, 1); } - }); + } - unsafe { new.set_len(selected) }; - new.into() + set_vec_len_by_ptr(&mut builder, ptr); + } + + builder.into() } // low-level API using unsafe to improve performance fn filter_string_scalars(values: &StringColumn, filter: &Bitmap) -> StringColumn { debug_assert_eq!(values.len(), filter.len()); - let selected = filter.len() - filter.unset_bits(); - if selected == values.len() { + let num_rows = filter.len() - filter.unset_bits(); + if num_rows == values.len() { return values.clone(); } - let data = values.data().as_slice(); - let offsets = values.offsets().as_slice(); - - let mut res_offsets = Vec::with_capacity(selected + 1); - res_offsets.push(0); - let mut res_data = vec![]; - let hint_size = data.len() / (values.len() + 1) * selected; - - static MAX_HINT_SIZE: usize = 1000000000; - if hint_size < MAX_HINT_SIZE && values.len() < MAX_HINT_SIZE { - res_data.reserve(hint_size) - } - - let mut pos = 0; - - let (mut slice, offset, mut length) = filter.as_slice(); - if offset > 0 { - // Consume the offset - let n = 8 - offset; - values - .iter() - .zip(filter.iter()) - .take(n) - .for_each(|(value, is_selected)| { - if is_selected { - res_data.extend_from_slice(value); - res_offsets.push(res_data.len() as u64); + // Each element of `items` is (string pointer(u64), string length). + let mut items: Vec<(u64, usize)> = Vec::with_capacity(num_rows); + // [`StringColumn`] consists of [`data`] and [`offset`], we build [`data`] and [`offset`] respectively, + // and then call `StringColumn::new(data.into(), offsets.into())` to create [`StringColumn`]. + let values_offset = values.offsets().as_slice(); + let values_data_ptr = values.data().as_slice().as_ptr(); + let mut offsets: Vec = Vec::with_capacity(num_rows + 1); + let mut offsets_ptr = offsets.as_mut_ptr(); + let mut items_ptr = items.as_mut_ptr(); + let mut data_size = 0; + + // Build [`offset`] and calculate `data_size` required by [`data`]. + unsafe { + store_advance_aligned::(0, &mut offsets_ptr); + let mut idx = 0; + let (mut slice, offset, mut length) = filter.as_slice(); + if offset > 0 { + let mut mask = slice[0]; + while mask != 0 { + let n = mask.trailing_zeros() as usize; + if n >= offset { + let start = *values_offset.get_unchecked(n - offset) as usize; + let len = *values_offset.get_unchecked(n - offset + 1) as usize - start; + data_size += len as u64; + store_advance_aligned(data_size, &mut offsets_ptr); + store_advance_aligned( + (values_data_ptr.add(start) as u64, len), + &mut items_ptr, + ); } - }); - slice = &slice[1..]; - length -= n; - pos += n; - } - - const CHUNK_SIZE: usize = 64; - let mut mask_chunks = BitChunksExact::::new(slice, length); - - for mut mask in mask_chunks.by_ref() { - if mask == u64::MAX { - let data = &data[offsets[pos] as usize..offsets[pos + CHUNK_SIZE] as usize]; - res_data.extend_from_slice(data); + mask = mask & (mask - 1); + } + length -= 8 - offset; + slice = &slice[1..]; + idx += 8 - offset; + } - let mut last_len = *res_offsets.last().unwrap(); - for i in 0..CHUNK_SIZE { - last_len += offsets[pos + i + 1] - offsets[pos + i]; - res_offsets.push(last_len); + const CHUNK_SIZE: usize = 64; + let mut mask_chunks = BitChunksExact::::new(slice, length); + let mut continuous_selected = 0; + for mut mask in mask_chunks.by_ref() { + if mask == u64::MAX { + continuous_selected += CHUNK_SIZE; + } else { + if continuous_selected > 0 { + let start = *values_offset.get_unchecked(idx) as usize; + let len = *values_offset.get_unchecked(idx + continuous_selected) as usize + - start; + store_advance_aligned( + (values_data_ptr.add(start) as u64, len), + &mut items_ptr, + ); + for i in 0..continuous_selected { + data_size += *values_offset.get_unchecked(idx + i + 1) + - *values_offset.get_unchecked(idx + i); + store_advance_aligned(data_size, &mut offsets_ptr); + } + idx += continuous_selected; + continuous_selected = 0; + } + while mask != 0 { + let n = mask.trailing_zeros() as usize; + let start = *values_offset.get_unchecked(idx + n) as usize; + let len = *values_offset.get_unchecked(idx + n + 1) as usize - start; + data_size += len as u64; + store_advance_aligned( + (values_data_ptr.add(start) as u64, len), + &mut items_ptr, + ); + store_advance_aligned(data_size, &mut offsets_ptr); + mask = mask & (mask - 1); + } + idx += CHUNK_SIZE; } - } else { - while mask != 0 { - let n = mask.trailing_zeros() as usize; - let data = &data[offsets[pos + n] as usize..offsets[pos + n + 1] as usize]; - res_data.extend_from_slice(data); - res_offsets.push(res_data.len() as u64); + } + if continuous_selected > 0 { + let start = *values_offset.get_unchecked(idx) as usize; + let len = *values_offset.get_unchecked(idx + continuous_selected) as usize - start; + store_advance_aligned((values_data_ptr.add(start) as u64, len), &mut items_ptr); + for i in 0..continuous_selected { + data_size += *values_offset.get_unchecked(idx + i + 1) + - *values_offset.get_unchecked(idx + i); + store_advance_aligned(data_size, &mut offsets_ptr); + } + idx += continuous_selected; + } - mask = mask & (mask - 1); + for (i, is_selected) in mask_chunks.remainder_iter().enumerate() { + if is_selected { + let start = *values_offset.get_unchecked(idx + i) as usize; + let len = *values_offset.get_unchecked(idx + i + 1) as usize - start; + data_size += len as u64; + store_advance_aligned((values_data_ptr.add(start) as u64, len), &mut items_ptr); + store_advance_aligned(data_size, &mut offsets_ptr); } } - pos += CHUNK_SIZE; + set_vec_len_by_ptr(&mut items, items_ptr); + set_vec_len_by_ptr(&mut offsets, offsets_ptr); } - for is_select in mask_chunks.remainder_iter() { - if is_select { - let data = &data[offsets[pos] as usize..offsets[pos + 1] as usize]; - res_data.extend_from_slice(data); - res_offsets.push(res_data.len() as u64); + // Build [`data`]. + let mut data: Vec = Vec::with_capacity(data_size as usize); + let mut data_ptr = data.as_mut_ptr(); + + unsafe { + for (str_ptr, len) in items.iter() { + copy_advance_aligned(*str_ptr as *const u8, &mut data_ptr, *len); } - pos += 1; + set_vec_len_by_ptr(&mut data, data_ptr); } - StringColumn::new(res_data.into(), res_offsets.into()) + StringColumn::new(data.into(), offsets.into()) } } diff --git a/src/query/expression/src/kernels/sort.rs b/src/query/expression/src/kernels/sort.rs index f6f82b7f02b99..3bf3181023cb0 100644 --- a/src/query/expression/src/kernels/sort.rs +++ b/src/query/expression/src/kernels/sort.rs @@ -94,7 +94,7 @@ impl DataBlock { let indices: PrimitiveArray = arrow_sort::lexsort_to_indices_impl(&order_arrays, limit, &build_compare)?; - DataBlock::take(block, indices.values()) + DataBlock::take(block, indices.values(), &mut None) } // merge two blocks to one sorted block diff --git a/src/query/expression/src/kernels/take.rs b/src/query/expression/src/kernels/take.rs index 3392aed9fa88c..fa8d1d34f128b 100644 --- a/src/query/expression/src/kernels/take.rs +++ b/src/query/expression/src/kernels/take.rs @@ -12,17 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + +use common_arrow::arrow::bitmap::Bitmap; +use common_arrow::arrow::buffer::Buffer; use common_exception::Result; +use crate::kernels::utils::copy_advance_aligned; +use crate::kernels::utils::set_vec_len_by_ptr; use crate::types::array::ArrayColumn; use crate::types::array::ArrayColumnBuilder; use crate::types::bitmap::BitmapType; -use crate::types::decimal::Decimal128Type; -use crate::types::decimal::Decimal256Type; use crate::types::decimal::DecimalColumn; use crate::types::map::KvColumnBuilder; use crate::types::nullable::NullableColumn; use crate::types::number::NumberColumn; +use crate::types::string::StringColumn; use crate::types::AnyType; use crate::types::ArgType; use crate::types::ArrayType; @@ -32,6 +37,7 @@ use crate::types::NumberType; use crate::types::StringType; use crate::types::ValueType; use crate::types::VariantType; +use crate::with_decimal_type; use crate::with_number_mapped_type; use crate::BlockEntry; use crate::Column; @@ -39,9 +45,17 @@ use crate::ColumnBuilder; use crate::DataBlock; use crate::Value; +pub const BIT_MASK: [u8; 8] = [1, 2, 4, 8, 16, 32, 64, 128]; + impl DataBlock { - pub fn take(&self, indices: &[I]) -> Result - where I: common_arrow::arrow::types::Index { + pub fn take( + &self, + indices: &[I], + string_items_buf: &mut Option>, + ) -> Result + where + I: common_arrow::arrow::types::Index, + { if indices.is_empty() { return Ok(self.slice(0..0)); } @@ -55,7 +69,7 @@ impl DataBlock { } Value::Column(c) => BlockEntry::new( entry.data_type.clone(), - Value::Column(Column::take(c, indices)), + Value::Column(Column::take(c, indices, string_items_buf)), ), }) .collect(); @@ -69,54 +83,55 @@ impl DataBlock { } impl Column { - pub fn take(&self, indices: &[I]) -> Self + pub fn take(&self, indices: &[I], string_items_buf: &mut Option>) -> Self where I: common_arrow::arrow::types::Index { match self { Column::Null { .. } | Column::EmptyArray { .. } | Column::EmptyMap { .. } => { self.slice(0..indices.len()) } Column::Number(column) => with_number_mapped_type!(|NUM_TYPE| match column { - NumberColumn::NUM_TYPE(values) => - Self::take_arg_types::, _>(values, indices), - }), - Column::Decimal(column) => match column { - DecimalColumn::Decimal128(values, size) => { - let mut builder = Decimal128Type::create_builder(indices.len(), &[]); - for index in indices { - Decimal128Type::push_item(&mut builder, unsafe { - Decimal128Type::index_column_unchecked(values, index.to_usize()) - }); - } - let column = Decimal128Type::build_column(builder); - Column::Decimal(DecimalColumn::Decimal128(column, *size)) + NumberColumn::NUM_TYPE(values) => { + let builder = Self::take_primitive_types(values, indices); + >::upcast_column(>::column_from_vec( + builder, + &[], + )) } - DecimalColumn::Decimal256(values, size) => { - let mut builder = Decimal256Type::create_builder(indices.len(), &[]); - for index in indices { - Decimal256Type::push_item(&mut builder, unsafe { - Decimal256Type::index_column_unchecked(values, index.to_usize()) - }); - } - let column = Decimal256Type::build_column(builder); - Column::Decimal(DecimalColumn::Decimal256(column, *size)) + }), + Column::Decimal(column) => with_decimal_type!(|DECIMAL_TYPE| match column { + DecimalColumn::DECIMAL_TYPE(values, size) => { + let builder = Self::take_primitive_types(values, indices); + Column::Decimal(DecimalColumn::DECIMAL_TYPE(builder.into(), *size)) } - }, - Column::Boolean(bm) => Self::take_arg_types::(bm, indices), - Column::String(column) => Self::take_arg_types::(column, indices), + }), + Column::Boolean(bm) => Column::Boolean(Self::take_boolean_types(bm, indices)), + Column::String(column) => StringType::upcast_column(Self::take_string_types( + column, + indices, + string_items_buf.as_mut(), + )), Column::Timestamp(column) => { - let ts = Self::take_arg_types::, _>(column, indices) - .into_number() - .unwrap() - .into_int64() - .unwrap(); + let builder = Self::take_primitive_types(column, indices); + let ts = >::upcast_column(>::column_from_vec( + builder, + &[], + )) + .into_number() + .unwrap() + .into_int64() + .unwrap(); Column::Timestamp(ts) } Column::Date(column) => { - let d = Self::take_arg_types::, _>(column, indices) - .into_number() - .unwrap() - .into_int32() - .unwrap(); + let builder = Self::take_primitive_types(column, indices); + let d = >::upcast_column(>::column_from_vec( + builder, + &[], + )) + .into_number() + .unwrap() + .into_int32() + .unwrap(); Column::Date(d) } Column::Array(column) => { @@ -144,33 +159,139 @@ impl Column { let column = ArrayColumn::try_downcast(column).unwrap(); Self::take_value_types::, _>(&column, builder, indices) } - Column::Bitmap(column) => Self::take_arg_types::(column, indices), + Column::Bitmap(column) => BitmapType::upcast_column(Self::take_string_types( + column, + indices, + string_items_buf.as_mut(), + )), Column::Nullable(c) => { - let column = c.column.take(indices); - let validity = Self::take_arg_types::(&c.validity, indices); + let column = c.column.take(indices, string_items_buf); + let validity = Column::Boolean(Self::take_boolean_types(&c.validity, indices)); Column::Nullable(Box::new(NullableColumn { column, validity: BooleanType::try_downcast_column(&validity).unwrap(), })) } Column::Tuple(fields) => { - let fields = fields.iter().map(|c| c.take(indices)).collect(); + let fields = fields + .iter() + .map(|c| c.take(indices, string_items_buf)) + .collect(); Column::Tuple(fields) } - Column::Variant(column) => Self::take_arg_types::(column, indices), + Column::Variant(column) => VariantType::upcast_column(Self::take_string_types( + column, + indices, + string_items_buf.as_mut(), + )), + } + } + + pub fn take_primitive_types(col: &Buffer, indices: &[I]) -> Vec + where + T: Copy, + I: common_arrow::arrow::types::Index, + { + let num_rows = indices.len(); + let mut builder: Vec = Vec::with_capacity(num_rows); + let col = col.as_slice(); + builder.extend( + indices + .iter() + .map(|index| unsafe { *col.get_unchecked(index.to_usize()) }), + ); + builder + } + + pub fn take_string_types<'a, I>( + col: &'a StringColumn, + indices: &[I], + string_items_buf: Option<&mut Vec<(u64, usize)>>, + ) -> StringColumn + where + I: common_arrow::arrow::types::Index, + { + let num_rows = indices.len(); + + // Each element of `items` is (string pointer(u64), string length), if `string_items_buf` + // can be reused, we will not re-allocate memory. + let mut items: Option> = match &string_items_buf { + Some(string_items_buf) if string_items_buf.capacity() >= num_rows => None, + _ => Some(Vec::with_capacity(num_rows)), + }; + let items = match items.is_some() { + true => items.as_mut().unwrap(), + false => string_items_buf.unwrap(), + }; + + // [`StringColumn`] consists of [`data`] and [`offset`], we build [`data`] and [`offset`] respectively, + // and then call `StringColumn::new(data.into(), offsets.into())` to create [`StringColumn`]. + let col_offset = col.offsets().as_slice(); + let col_data_ptr = col.data().as_slice().as_ptr(); + let mut offsets: Vec = Vec::with_capacity(num_rows + 1); + let mut data_size = 0; + + // Build [`offset`] and calculate `data_size` required by [`data`]. + unsafe { + *offsets.get_unchecked_mut(0) = 0; + for (i, index) in indices.iter().enumerate() { + let start = *col_offset.get_unchecked(index.to_usize()) as usize; + let len = *col_offset.get_unchecked(index.to_usize() + 1) as usize - start; + data_size += len as u64; + *items.get_unchecked_mut(i) = (col_data_ptr.add(start) as u64, len); + *offsets.get_unchecked_mut(i + 1) = data_size; + } + items.set_len(num_rows); + offsets.set_len(num_rows + 1); + } + + // Build [`data`]. + let mut data: Vec = Vec::with_capacity(data_size as usize); + let mut data_ptr = data.as_mut_ptr(); + + unsafe { + for (str_ptr, len) in items.iter() { + copy_advance_aligned(*str_ptr as *const u8, &mut data_ptr, *len); + } + set_vec_len_by_ptr(&mut data, data_ptr); } + + StringColumn::new(data.into(), offsets.into()) } - fn take_arg_types(col: &T::Column, indices: &[I]) -> Column + pub fn take_boolean_types(col: &Bitmap, indices: &[I]) -> Bitmap where I: common_arrow::arrow::types::Index { - let mut builder = T::create_builder(indices.len(), &[]); - for index in indices { - T::push_item(&mut builder, unsafe { - T::index_column_unchecked(col, index.to_usize()) - }); + let num_rows = indices.len(); + let capacity = num_rows.saturating_add(7) / 8; + let mut builder: Vec = Vec::with_capacity(capacity); + let mut builder_len = 0; + let mut unset_bits = 0; + let mut value = 0; + let mut i = 0; + + unsafe { + for index in indices.iter() { + if col.get_bit_unchecked(index.to_usize()) { + value |= BIT_MASK[i % 8]; + } else { + unset_bits += 1; + } + i += 1; + if i % 8 == 0 { + *builder.get_unchecked_mut(builder_len) = value; + builder_len += 1; + value = 0; + } + } + if i % 8 != 0 { + *builder.get_unchecked_mut(builder_len) = value; + builder_len += 1; + } + builder.set_len(builder_len); + Bitmap::from_inner(Arc::new(builder.into()), 0, num_rows, unset_bits) + .ok() + .unwrap() } - let column = T::build_column(builder); - T::upcast_column(column) } fn take_value_types( diff --git a/src/query/expression/src/kernels/take_chunks.rs b/src/query/expression/src/kernels/take_chunks.rs index f69332798685e..0c52ff886bd38 100644 --- a/src/query/expression/src/kernels/take_chunks.rs +++ b/src/query/expression/src/kernels/take_chunks.rs @@ -12,10 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + +use common_arrow::arrow::bitmap::Bitmap; +use common_arrow::arrow::buffer::Buffer; use common_arrow::arrow::compute::merge_sort::MergeSlice; use common_hashtable::RowPtr; use itertools::Itertools; +use crate::kernels::take::BIT_MASK; +use crate::kernels::utils::copy_advance_aligned; +use crate::kernels::utils::set_vec_len_by_ptr; use crate::types::array::ArrayColumnBuilder; use crate::types::bitmap::BitmapType; use crate::types::decimal::DecimalColumn; @@ -24,6 +31,7 @@ use crate::types::map::KvColumnBuilder; use crate::types::nullable::NullableColumn; use crate::types::nullable::NullableColumnVec; use crate::types::number::NumberColumn; +use crate::types::string::StringColumn; use crate::types::AnyType; use crate::types::ArgType; use crate::types::ArrayType; @@ -114,6 +122,7 @@ impl DataBlock { build_columns_data_type: &[DataType], indices: &[RowPtr], result_size: usize, + string_items_buf: &mut Option>, ) -> Self { let num_columns = build_columns.len(); let result_columns = (0..num_columns) @@ -124,6 +133,7 @@ impl DataBlock { data_type.clone(), indices, result_size, + string_items_buf, ); BlockEntry::new(data_type.clone(), Value::Column(column)) }) @@ -569,6 +579,7 @@ impl Column { data_type: DataType, indices: &[RowPtr], result_size: usize, + string_items_buf: &mut Option>, ) -> Column { match &columns { ColumnVec::Null { .. } => Column::Null { len: result_size }, @@ -576,40 +587,48 @@ impl Column { ColumnVec::EmptyMap { .. } => Column::EmptyMap { len: result_size }, ColumnVec::Number(column) => with_number_mapped_type!(|NUM_TYPE| match column { NumberColumnVec::NUM_TYPE(columns) => { - let builder = NumberType::::create_builder(result_size, &[]); - Self::take_block_vec_value_types::>( - columns, builder, indices, - ) + let builder = Self::take_block_vec_primitive_types(columns, indices); + >::upcast_column(>::column_from_vec( + builder, + &[], + )) } }), ColumnVec::Decimal(column) => with_decimal_type!(|DECIMAL_TYPE| match column { DecimalColumnVec::DECIMAL_TYPE(columns, size) => { - let mut builder = Vec::with_capacity(result_size); - for row_ptr in indices { - let val = unsafe { - columns[row_ptr.chunk_index as usize] - .get_unchecked(row_ptr.row_index as usize) - }; - builder.push(*val); - } + let builder = Self::take_block_vec_primitive_types(columns, indices); Column::Decimal(DecimalColumn::DECIMAL_TYPE(builder.into(), *size)) } }), ColumnVec::Boolean(columns) => { - let builder = BooleanType::create_builder(result_size, &[]); - Self::take_block_vec_value_types::(columns, builder, indices) - } - ColumnVec::String(columns) => { - let builder = StringType::create_builder(result_size, &[]); - Self::take_block_vec_value_types::(columns, builder, indices) + Column::Boolean(Self::take_block_vec_boolean_types(columns, indices)) } + ColumnVec::String(columns) => StringType::upcast_column( + Self::take_block_vec_string_types(columns, indices, string_items_buf.as_mut()), + ), ColumnVec::Timestamp(columns) => { - let builder = TimestampType::create_builder(result_size, &[]); - Self::take_block_vec_value_types::(columns, builder, indices) + let builder = Self::take_block_vec_primitive_types(columns, indices); + let ts = >::upcast_column(>::column_from_vec( + builder, + &[], + )) + .into_number() + .unwrap() + .into_int64() + .unwrap(); + Column::Timestamp(ts) } ColumnVec::Date(columns) => { - let builder = DateType::create_builder(result_size, &[]); - Self::take_block_vec_value_types::(columns, builder, indices) + let builder = Self::take_block_vec_primitive_types(columns, indices); + let d = >::upcast_column(>::column_from_vec( + builder, + &[], + )) + .into_number() + .unwrap() + .into_int32() + .unwrap(); + Column::Date(d) } ColumnVec::Array(columns) => { let data_type = data_type.as_array().unwrap(); @@ -639,10 +658,9 @@ impl Column { columns, builder, indices, ) } - ColumnVec::Bitmap(columns) => { - let builder = BitmapType::create_builder(result_size, &[]); - Self::take_block_vec_value_types::(columns, builder, indices) - } + ColumnVec::Bitmap(columns) => BitmapType::upcast_column( + Self::take_block_vec_string_types(columns, indices, string_items_buf.as_mut()), + ), ColumnVec::Nullable(columns) => { let inner_data_type = data_type.as_nullable().unwrap(); let inner_column = Self::take_column_vec_indices( @@ -650,6 +668,7 @@ impl Column { *inner_data_type.clone(), indices, result_size, + string_items_buf, ); let inner_bitmap = Self::take_column_vec_indices( @@ -657,6 +676,7 @@ impl Column { DataType::Boolean, indices, result_size, + string_items_buf, ); Column::Nullable(Box::new(NullableColumn { @@ -675,16 +695,111 @@ impl Column { ty.clone(), indices, result_size, + string_items_buf, ) }) .collect(); Column::Tuple(fields) } - ColumnVec::Variant(columns) => { - let builder = VariantType::create_builder(result_size, &[]); - Self::take_block_vec_value_types::(columns, builder, indices) + ColumnVec::Variant(columns) => StringType::upcast_column( + Self::take_block_vec_string_types(columns, indices, string_items_buf.as_mut()), + ), + } + } + + pub fn take_block_vec_primitive_types(col: &[Buffer], indices: &[RowPtr]) -> Vec + where T: Copy { + let num_rows = indices.len(); + let mut builder: Vec = Vec::with_capacity(num_rows); + builder.extend(indices.iter().map(|row_ptr| unsafe { + col.get_unchecked(row_ptr.chunk_index as usize)[row_ptr.row_index as usize] + })); + builder + } + + pub fn take_block_vec_string_types<'a>( + col: &'a [StringColumn], + indices: &[RowPtr], + string_items_buf: Option<&mut Vec<(u64, usize)>>, + ) -> StringColumn { + let num_rows = indices.len(); + + // Each element of `items` is (string pointer(u64), string length), if `string_items_buf` + // can be reused, we will not re-allocate memory. + let mut items: Option> = match &string_items_buf { + Some(string_items_buf) if string_items_buf.capacity() >= num_rows => None, + _ => Some(Vec::with_capacity(num_rows)), + }; + let items = match items.is_some() { + true => items.as_mut().unwrap(), + false => string_items_buf.unwrap(), + }; + + // [`StringColumn`] consists of [`data`] and [`offset`], we build [`data`] and [`offset`] respectively, + // and then call `StringColumn::new(data.into(), offsets.into())` to create [`StringColumn`]. + let mut offsets: Vec = Vec::with_capacity(num_rows + 1); + let mut data_size = 0; + + // Build [`offset`] and calculate `data_size` required by [`data`]. + unsafe { + *offsets.get_unchecked_mut(0) = 0; + for (i, row_ptr) in indices.iter().enumerate() { + let item = + col[row_ptr.chunk_index as usize].index_unchecked(row_ptr.row_index as usize); + data_size += item.len() as u64; + *items.get_unchecked_mut(i) = (item.as_ptr() as u64, item.len()); + *offsets.get_unchecked_mut(i + 1) = data_size; + } + items.set_len(num_rows); + offsets.set_len(num_rows + 1); + } + + // Build [`data`]. + let mut data: Vec = Vec::with_capacity(data_size as usize); + let mut data_ptr = data.as_mut_ptr(); + + unsafe { + for (str_ptr, len) in items.iter() { + copy_advance_aligned(*str_ptr as *const u8, &mut data_ptr, *len); + } + set_vec_len_by_ptr(&mut data, data_ptr); + } + + StringColumn::new(data.into(), offsets.into()) + } + + pub fn take_block_vec_boolean_types(col: &[Bitmap], indices: &[RowPtr]) -> Bitmap { + let num_rows = indices.len(); + let capacity = num_rows.saturating_add(7) / 8; + let mut builder: Vec = Vec::with_capacity(capacity); + let mut builder_len = 0; + let mut unset_bits = 0; + let mut value = 0; + let mut i = 0; + + unsafe { + for row_ptr in indices.iter() { + if col[row_ptr.chunk_index as usize].get_bit_unchecked(row_ptr.row_index as usize) { + value |= BIT_MASK[i % 8]; + } else { + unset_bits += 1; + } + i += 1; + if i % 8 == 0 { + *builder.get_unchecked_mut(builder_len) = value; + builder_len += 1; + value = 0; + } + } + if i % 8 != 0 { + *builder.get_unchecked_mut(builder_len) = value; + builder_len += 1; } + builder.set_len(builder_len); + Bitmap::from_inner(Arc::new(builder.into()), 0, num_rows, unset_bits) + .ok() + .unwrap() } } diff --git a/src/query/expression/src/kernels/take_compact.rs b/src/query/expression/src/kernels/take_compact.rs index f7016fb88c66c..94eb26eac59e1 100644 --- a/src/query/expression/src/kernels/take_compact.rs +++ b/src/query/expression/src/kernels/take_compact.rs @@ -12,17 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_arrow::arrow::buffer::Buffer; use common_exception::Result; +use crate::kernels::utils::copy_advance_aligned; +use crate::kernels::utils::set_vec_len_by_ptr; +use crate::kernels::utils::store_advance_aligned; use crate::types::array::ArrayColumn; use crate::types::array::ArrayColumnBuilder; use crate::types::bitmap::BitmapType; -use crate::types::decimal::Decimal128Type; -use crate::types::decimal::Decimal256Type; use crate::types::decimal::DecimalColumn; use crate::types::map::KvColumnBuilder; use crate::types::nullable::NullableColumn; use crate::types::number::NumberColumn; +use crate::types::string::StringColumn; use crate::types::AnyType; use crate::types::ArgType; use crate::types::ArrayType; @@ -32,6 +35,7 @@ use crate::types::NumberType; use crate::types::StringType; use crate::types::ValueType; use crate::types::VariantType; +use crate::with_decimal_type; use crate::with_number_mapped_type; use crate::BlockEntry; use crate::Column; @@ -40,16 +44,16 @@ use crate::DataBlock; use crate::Value; impl DataBlock { - pub fn take_compacted_indices(&self, indices: &[(u32, u32)], row_num: usize) -> Result { + pub fn take_compacted_indices(&self, indices: &[(u32, u32)], num_rows: usize) -> Result { if indices.is_empty() { return Ok(self.slice(0..0)); } // Each item in the `indices` consists of an `index` and a `cnt`, the sum - // of the `cnt` must be equal to the `row_num`. + // of the `cnt` must be equal to the `num_rows`. debug_assert_eq!( indices.iter().fold(0, |acc, &(_, x)| acc + x as usize), - row_num + num_rows ); let after_columns = self @@ -62,92 +66,82 @@ impl DataBlock { }, Value::Column(c) => BlockEntry { data_type: entry.data_type.clone(), - value: Value::Column(Column::take_compacted_indices(c, indices, row_num)), + value: Value::Column(Column::take_compacted_indices(c, indices, num_rows)), }, }) .collect(); Ok(DataBlock::new_with_meta( after_columns, - row_num, + num_rows, self.get_meta().cloned(), )) } } impl Column { - pub fn take_compacted_indices(&self, indices: &[(u32, u32)], row_num: usize) -> Self { + pub fn take_compacted_indices(&self, indices: &[(u32, u32)], num_rows: usize) -> Self { match self { Column::Null { .. } | Column::EmptyArray { .. } | Column::EmptyMap { .. } => { - self.slice(0..row_num) + self.slice(0..num_rows) } Column::Number(column) => with_number_mapped_type!(|NUM_TYPE| match column { - NumberColumn::NUM_TYPE(values) => - Self::take_compacted_arg_types::>(values, indices, row_num), - }), - Column::Decimal(column) => match column { - DecimalColumn::Decimal128(values, size) => { - let mut builder = Decimal128Type::create_builder(row_num, &[]); - for (index, cnt) in indices { - let item = unsafe { - Decimal128Type::index_column_unchecked(values, *index as usize) - }; - for _ in 0..*cnt { - Decimal128Type::push_item(&mut builder, item); - } - } - let column = Decimal128Type::build_column(builder); - Column::Decimal(DecimalColumn::Decimal128(column, *size)) + NumberColumn::NUM_TYPE(values) => { + let builder = Self::take_compacted_primitive_types(values, indices, num_rows); + >::upcast_column(>::column_from_vec( + builder, + &[], + )) } - DecimalColumn::Decimal256(values, size) => { - let mut builder = Decimal256Type::create_builder(row_num, &[]); - for (index, cnt) in indices { - let item = unsafe { - Decimal256Type::index_column_unchecked(values, *index as usize) - }; - for _ in 0..*cnt { - Decimal256Type::push_item(&mut builder, item); - } - } - let column = Decimal256Type::build_column(builder); - Column::Decimal(DecimalColumn::Decimal256(column, *size)) + }), + Column::Decimal(column) => with_decimal_type!(|DECIMAL_TYPE| match column { + DecimalColumn::DECIMAL_TYPE(values, size) => { + let builder = Self::take_compacted_primitive_types(values, indices, num_rows); + Column::Decimal(DecimalColumn::DECIMAL_TYPE(builder.into(), *size)) } - }, + }), Column::Boolean(bm) => { - Self::take_compacted_arg_types::(bm, indices, row_num) - } - Column::String(column) => { - Self::take_compacted_arg_types::(column, indices, row_num) + Self::take_compacted_arg_types::(bm, indices, num_rows) } + Column::String(column) => StringType::upcast_column(Self::take_compact_string_types( + column, indices, num_rows, + )), Column::Timestamp(column) => { - let ts = - Self::take_compacted_arg_types::>(column, indices, row_num) - .into_number() - .unwrap() - .into_int64() - .unwrap(); + let builder = Self::take_compacted_primitive_types(column, indices, num_rows); + let ts = >::upcast_column(>::column_from_vec( + builder, + &[], + )) + .into_number() + .unwrap() + .into_int64() + .unwrap(); Column::Timestamp(ts) } Column::Date(column) => { - let d = Self::take_compacted_arg_types::>(column, indices, row_num) - .into_number() - .unwrap() - .into_int32() - .unwrap(); + let builder = Self::take_compacted_primitive_types(column, indices, num_rows); + let d = >::upcast_column(>::column_from_vec( + builder, + &[], + )) + .into_number() + .unwrap() + .into_int32() + .unwrap(); Column::Date(d) } Column::Array(column) => { - let mut offsets = Vec::with_capacity(row_num + 1); + let mut offsets = Vec::with_capacity(num_rows + 1); offsets.push(0); - let builder = ColumnBuilder::with_capacity(&column.values.data_type(), row_num); + let builder = ColumnBuilder::with_capacity(&column.values.data_type(), num_rows); let builder = ArrayColumnBuilder { builder, offsets }; Self::take_compacted_value_types::>(column, builder, indices) } Column::Map(column) => { - let mut offsets = Vec::with_capacity(row_num + 1); + let mut offsets = Vec::with_capacity(num_rows + 1); offsets.push(0); let builder = ColumnBuilder::from_column( - ColumnBuilder::with_capacity(&column.values.data_type(), row_num).build(), + ColumnBuilder::with_capacity(&column.values.data_type(), num_rows).build(), ); let (key_builder, val_builder) = match builder { ColumnBuilder::Tuple(fields) => (fields[0].clone(), fields[1].clone()), @@ -163,13 +157,13 @@ impl Column { &column, builder, indices, ) } - Column::Bitmap(column) => { - Self::take_compacted_arg_types::(column, indices, row_num) - } + Column::Bitmap(column) => BitmapType::upcast_column(Self::take_compact_string_types( + column, indices, num_rows, + )), Column::Nullable(c) => { - let column = c.column.take_compacted_indices(indices, row_num); + let column = c.column.take_compacted_indices(indices, num_rows); let validity = - Self::take_compacted_arg_types::(&c.validity, indices, row_num); + Self::take_compacted_arg_types::(&c.validity, indices, num_rows); Column::Nullable(Box::new(NullableColumn { column, validity: BooleanType::try_downcast_column(&validity).unwrap(), @@ -178,22 +172,138 @@ impl Column { Column::Tuple(fields) => { let fields = fields .iter() - .map(|c| c.take_compacted_indices(indices, row_num)) + .map(|c| c.take_compacted_indices(indices, num_rows)) .collect(); Column::Tuple(fields) } - Column::Variant(column) => { - Self::take_compacted_arg_types::(column, indices, row_num) + Column::Variant(column) => VariantType::upcast_column(Self::take_compact_string_types( + column, indices, num_rows, + )), + } + } + + pub fn take_compacted_primitive_types( + col: &Buffer, + indices: &[(u32, u32)], + num_rows: usize, + ) -> Vec + where + T: Copy, + { + let col_ptr = col.as_slice().as_ptr(); + let mut builder: Vec = Vec::with_capacity(num_rows); + let mut ptr = builder.as_mut_ptr(); + let mut remain; + + unsafe { + for (index, cnt) in indices.iter() { + if *cnt == 1 { + copy_advance_aligned(col_ptr.add(*index as usize), &mut ptr, 1); + continue; + } + + // Using the doubling method to copy the max segment memory. + // [___________] => [x__________] => [xx_________] => [xxxx_______] => [xxxxxxxx___] + // Since cnt > 0, then 31 - cnt.leading_zeros() >= 0. + let max_segment = 1 << (31 - cnt.leading_zeros()); + let base_ptr = ptr; + copy_advance_aligned(col_ptr.add(*index as usize), &mut ptr, 1); + let mut cur_segment = 1; + while cur_segment < max_segment { + copy_advance_aligned(base_ptr, &mut ptr, cur_segment); + cur_segment <<= 1; + } + + // Copy the remaining memory directly. + // [xxxxxxxxxx____] => [xxxxxxxxxxxxxx] + // ^^^^ ---> ^^^^ + remain = *cnt as usize - max_segment; + if remain > 0 { + copy_advance_aligned(base_ptr, &mut ptr, remain); + } + } + set_vec_len_by_ptr(&mut builder, ptr); + } + + builder + } + + pub fn take_compact_string_types<'a>( + col: &'a StringColumn, + indices: &[(u32, u32)], + num_rows: usize, + ) -> StringColumn { + // Each element of `items` is (string(&[u8]), repeat times). + let mut items = Vec::with_capacity(indices.len()); + let mut items_ptr = items.as_mut_ptr(); + + // [`StringColumn`] consists of [`data`] and [`offset`], we build [`data`] and [`offset`] respectively, + // and then call `StringColumn::new(data.into(), offsets.into())` to create [`StringColumn`]. + let mut offsets = Vec::with_capacity(num_rows + 1); + let mut offsets_ptr = offsets.as_mut_ptr(); + let mut data_size = 0; + + // Build [`offset`] and calculate `data_size` required by [`data`]. + unsafe { + store_advance_aligned::(0, &mut offsets_ptr); + for (index, cnt) in indices.iter() { + let item = col.index_unchecked(*index as usize); + store_advance_aligned((item, *cnt), &mut items_ptr); + for _ in 0..*cnt { + data_size += item.len() as u64; + store_advance_aligned(data_size, &mut offsets_ptr); + } + } + set_vec_len_by_ptr(&mut offsets, offsets_ptr); + set_vec_len_by_ptr(&mut items, items_ptr); + } + + // Build [`data`]. + let mut data: Vec = Vec::with_capacity(data_size as usize); + let mut data_ptr = data.as_mut_ptr(); + let mut remain; + + unsafe { + for (item, cnt) in items { + let len = item.len(); + if cnt == 1 { + copy_advance_aligned(item.as_ptr(), &mut data_ptr, len); + continue; + } + + // Using the doubling method to copy the max segment memory. + // [___________] => [x__________] => [xx_________] => [xxxx_______] => [xxxxxxxx___] + // Since cnt > 0, then 31 - cnt.leading_zeros() >= 0. + let max_bit_num = 1 << (31 - cnt.leading_zeros()); + let max_segment = max_bit_num * len; + let base_data_ptr = data_ptr; + copy_advance_aligned(item.as_ptr(), &mut data_ptr, len); + let mut cur_segment = len; + while cur_segment < max_segment { + copy_advance_aligned(base_data_ptr, &mut data_ptr, cur_segment); + cur_segment <<= 1; + } + + // Copy the remaining memory directly. + // [xxxxxxxxxx____] => [xxxxxxxxxxxxxx] + // ^^^^ ---> ^^^^ + remain = cnt as usize - max_bit_num; + if remain > 0 { + copy_advance_aligned(base_data_ptr, &mut data_ptr, remain * len); + } } + set_vec_len_by_ptr(&mut data, data_ptr); } + + StringColumn::new(data.into(), offsets.into()) } fn take_compacted_arg_types( col: &T::Column, indices: &[(u32, u32)], - row_num: usize, + num_rows: usize, ) -> Column { - let mut builder = T::create_builder(row_num, &[]); + let mut builder = T::create_builder(num_rows, &[]); for (index, cnt) in indices { for _ in 0..*cnt { T::push_item(&mut builder, unsafe { diff --git a/src/query/expression/src/types.rs b/src/query/expression/src/types.rs index 300aefa4ad98a..670a127b37f65 100755 --- a/src/query/expression/src/types.rs +++ b/src/query/expression/src/types.rs @@ -188,6 +188,15 @@ impl DataType { matches!(self, DataType::Timestamp | DataType::Date) } + #[inline] + pub fn is_string_column(&self) -> bool { + match self { + DataType::String | DataType::Bitmap | DataType::Variant => true, + DataType::Nullable(ty) => ty.is_string_column(), + _ => false, + } + } + pub fn numeric_byte_size(&self) -> Result { match self { DataType::Number(NumberDataType::UInt8) | DataType::Number(NumberDataType::Int8) => { diff --git a/src/query/expression/tests/it/common.rs b/src/query/expression/tests/it/common.rs index df401eaac5fe5..279ab7d628a01 100644 --- a/src/query/expression/tests/it/common.rs +++ b/src/query/expression/tests/it/common.rs @@ -127,7 +127,7 @@ pub fn run_scatter(file: &mut impl Write, block: &DataBlock, indices: &[u32], sc } pub fn run_take(file: &mut impl Write, indices: &[u32], block: &DataBlock) { - let result = DataBlock::take(block, indices); + let result = DataBlock::take(block, indices, &mut None); match result { Ok(result_block) => { diff --git a/src/query/service/src/api/rpc/exchange/exchange_manager.rs b/src/query/service/src/api/rpc/exchange/exchange_manager.rs index 0f66510c4c3bc..62f5ee0bf0a0e 100644 --- a/src/query/service/src/api/rpc/exchange/exchange_manager.rs +++ b/src/query/service/src/api/rpc/exchange/exchange_manager.rs @@ -807,6 +807,8 @@ impl FragmentCoordinator { let pipeline_ctx = QueryContext::create_from(ctx); let pipeline_builder = PipelineBuilder::create( + pipeline_ctx.get_function_context()?, + pipeline_ctx.get_settings(), pipeline_ctx, enable_profiling, SharedProcessorProfiles::default(), diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index bb95ebfc46f4c..765e292dd70a2 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -68,6 +68,7 @@ use common_pipeline_transforms::processors::transforms::build_full_sort_pipeline use common_pipeline_transforms::processors::transforms::create_dummy_item; use common_pipeline_transforms::processors::transforms::Transformer; use common_profile::SharedProcessorProfiles; +use common_settings::Settings; use common_sql::evaluator::BlockOperator; use common_sql::evaluator::CompoundBlockOperator; use common_sql::executor::AggregateExpand; @@ -183,6 +184,8 @@ pub struct PipelineBuilder { main_pipeline: Pipeline, pub pipelines: Vec, + func_ctx: FunctionContext, + settings: Arc, // Used in runtime filter source pub join_state: Option>, @@ -199,6 +202,8 @@ pub struct PipelineBuilder { impl PipelineBuilder { pub fn create( + func_ctx: FunctionContext, + settings: Arc, ctx: Arc, enable_profiling: bool, prof_span_set: SharedProcessorProfiles, @@ -206,6 +211,8 @@ impl PipelineBuilder { PipelineBuilder { enable_profiling, ctx, + func_ctx, + settings, pipelines: vec![], join_state: None, main_pipeline: Pipeline::create(), @@ -338,14 +345,13 @@ impl PipelineBuilder { }) = select_ctx { PipelineBuilder::render_result_set( - &self.ctx.get_function_context()?, + &self.func_ctx, input.output_schema()?, select_column_bindings, &mut self.main_pipeline, false, )?; if Self::check_schema_cast(select_schema.clone(), target_schema.clone())? { - let func_ctx = self.ctx.get_function_context()?; self.main_pipeline.add_transform( |transform_input_port, transform_output_port| { TransformCastSchema::try_create( @@ -353,7 +359,7 @@ impl PipelineBuilder { transform_output_port, select_schema.clone(), target_schema.clone(), - func_ctx.clone(), + self.func_ctx.clone(), ) }, )?; @@ -481,7 +487,7 @@ impl PipelineBuilder { let merge_into_not_matched_processor = MergeIntoNotMatchedProcessor::create( unmatched.clone(), input.output_schema()?, - self.ctx.get_function_context()?, + self.func_ctx.clone(), )?; pipe_items.push(matched_split_processor.into_pipe_item()); @@ -576,7 +582,7 @@ impl PipelineBuilder { self.main_pipeline.add_pipe(builder.finalize()); } - let max_threads = self.ctx.get_settings().get_max_threads()?; + let max_threads = self.settings.get_max_threads()?; let io_request_semaphore = Arc::new(Semaphore::new(max_threads as usize)); pipe_items.clear(); @@ -644,7 +650,7 @@ impl PipelineBuilder { block_slots, need_insert, } = replace; - let max_threads = self.ctx.get_settings().get_max_threads()?; + let max_threads = self.settings.get_max_threads()?; let segment_partition_num = std::cmp::min(segments.len(), max_threads as usize); let table = self .ctx @@ -679,7 +685,7 @@ impl PipelineBuilder { .add_pipe(Pipe::create(1, segment_partition_num, vec![ broadcast_processor.into_pipe_item(), ])); - let max_threads = self.ctx.get_settings().get_max_threads()?; + let max_threads = self.settings.get_max_threads()?; let io_request_semaphore = Arc::new(Semaphore::new(max_threads as usize)); let merge_into_operation_aggregators = table.merge_into_mutators( @@ -760,7 +766,7 @@ impl PipelineBuilder { // setup the dummy transform pipe_items.push(serialize_segment_transform.into_pipe_item()); - let max_threads = self.ctx.get_settings().get_max_threads()?; + let max_threads = self.settings.get_max_threads()?; let io_request_semaphore = Arc::new(Semaphore::new(max_threads as usize)); // setup the merge into operation aggregators @@ -790,10 +796,9 @@ impl PipelineBuilder { } fn build_async_sourcer(&mut self, async_sourcer: &AsyncSourcerPlan) -> Result<()> { - let settings = self.ctx.get_settings(); self.main_pipeline.add_source( |output| { - let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?; + let name_resolution_ctx = NameResolutionContext::try_from(self.settings.as_ref())?; let inner = ValueSource::new( async_sourcer.value_data.clone(), self.ctx.clone(), @@ -816,7 +821,7 @@ impl PipelineBuilder { CopyIntoTableSource::Query(input) => { self.build_pipeline(&input.plan)?; Self::render_result_set( - &self.ctx.get_function_context()?, + &self.func_ctx, input.plan.output_schema()?, &input.result_columns, &mut self.main_pipeline, @@ -987,7 +992,7 @@ impl PipelineBuilder { state: Arc, ) -> Result<()> { self.build_pipeline(&range_join.left)?; - let max_threads = self.ctx.get_settings().get_max_threads()? as usize; + let max_threads = self.settings.get_max_threads()? as usize; self.main_pipeline.try_resize(max_threads)?; self.main_pipeline.add_transform(|input, output| { let transform = TransformRangeJoinLeft::create(input, output, state.clone()); @@ -1011,6 +1016,8 @@ impl PipelineBuilder { ) -> Result<()> { let right_side_context = QueryContext::create_from(self.ctx.clone()); let mut right_side_builder = PipelineBuilder::create( + self.func_ctx.clone(), + self.settings.clone(), right_side_context, self.enable_profiling, self.proc_profs.clone(), @@ -1062,6 +1069,8 @@ impl PipelineBuilder { ) -> Result<()> { let build_side_context = QueryContext::create_from(self.ctx.clone()); let mut build_side_builder = PipelineBuilder::create( + self.func_ctx.clone(), + self.settings.clone(), build_side_context, self.enable_profiling, self.proc_profs.clone(), @@ -1076,6 +1085,7 @@ impl PipelineBuilder { let restore_barrier = Barrier::new(output_len); let build_state = HashJoinBuildState::try_create( self.ctx.clone(), + self.func_ctx.clone(), &hash_join_plan.build_keys, &hash_join_plan.build_projections, join_state, @@ -1084,7 +1094,7 @@ impl PipelineBuilder { )?; let create_sink_processor = |input| { - let spill_state = if self.ctx.get_settings().get_join_spilling_threshold()? != 0 { + let spill_state = if self.settings.get_join_spilling_threshold()? != 0 { Some(Box::new(BuildSpillState::create( self.ctx.clone(), spill_coordinator.clone(), @@ -1214,15 +1224,13 @@ impl PipelineBuilder { // if projection is sequential, no need to add projection if projection != (0..schema.fields().len()).collect::>() { let ops = vec![BlockOperator::Project { projection }]; - let func_ctx = self.ctx.get_function_context()?; - let num_input_columns = schema.num_fields(); self.main_pipeline.add_transform(|input, output| { Ok(ProcessorPtr::create(CompoundBlockOperator::create( input, output, num_input_columns, - func_ctx.clone(), + self.func_ctx.clone(), ops.clone(), ))) })?; @@ -1232,7 +1240,7 @@ impl PipelineBuilder { } fn build_cte_scan(&mut self, cte_scan: &CteScan) -> Result<()> { - let max_threads = self.ctx.get_settings().get_max_threads()?; + let max_threads = self.settings.get_max_threads()?; self.main_pipeline.add_source( |output| { MaterializedCteSource::create( @@ -1285,7 +1293,7 @@ impl PipelineBuilder { projections: filter.projections.clone(), expr: predicate.clone(), }], - self.ctx.get_function_context()?, + self.func_ctx.clone(), num_input_columns, ); @@ -1309,16 +1317,13 @@ impl PipelineBuilder { fn build_project(&mut self, project: &Project) -> Result<()> { self.build_pipeline(&project.input)?; - let func_ctx = self.ctx.get_function_context()?; - let num_input_columns = project.input.output_schema()?.num_fields(); - self.main_pipeline.add_transform(|input, output| { Ok(ProcessorPtr::create(CompoundBlockOperator::create( input, output, num_input_columns, - func_ctx.clone(), + self.func_ctx.clone(), vec![BlockOperator::Project { projection: project.projections.clone(), }], @@ -1345,13 +1350,14 @@ impl PipelineBuilder { projections: Some(eval_scalar.projections.clone()), }; - let func_ctx = self.ctx.get_function_context()?; - let num_input_columns = input_schema.num_fields(); self.main_pipeline.add_transform(|input, output| { - let transform = - CompoundBlockOperator::new(vec![op.clone()], func_ctx.clone(), num_input_columns); + let transform = CompoundBlockOperator::new( + vec![op.clone()], + self.func_ctx.clone(), + num_input_columns, + ); if self.enable_profiling { Ok(ProcessorPtr::create(TransformProfileWrapper::create( @@ -1383,13 +1389,14 @@ impl PipelineBuilder { .collect(), }; - let func_ctx = self.ctx.get_function_context()?; - let num_input_columns = project_set.input.output_schema()?.num_fields(); self.main_pipeline.add_transform(|input, output| { - let transform = - CompoundBlockOperator::new(vec![op.clone()], func_ctx.clone(), num_input_columns); + let transform = CompoundBlockOperator::new( + vec![op.clone()], + self.func_ctx.clone(), + num_input_columns, + ); if self.enable_profiling { Ok(ProcessorPtr::create(TransformProfileWrapper::create( @@ -1414,13 +1421,14 @@ impl PipelineBuilder { let op = BlockOperator::LambdaMap { funcs }; let input_schema = lambda.input.output_schema()?; - let func_ctx = self.ctx.get_function_context()?; - let num_input_columns = input_schema.num_fields(); self.main_pipeline.add_transform(|input, output| { - let transform = - CompoundBlockOperator::new(vec![op.clone()], func_ctx.clone(), num_input_columns); + let transform = CompoundBlockOperator::new( + vec![op.clone()], + self.func_ctx.clone(), + num_input_columns, + ); if self.enable_profiling { Ok(ProcessorPtr::create(TransformProfileWrapper::create( @@ -1515,9 +1523,7 @@ impl PipelineBuilder { }); } - // let is_standalone = self.ctx.get_cluster().is_empty(); - let settings = self.ctx.get_settings(); - let efficiently_memory = settings.get_efficiently_memory_group_by()?; + let efficiently_memory = self.settings.get_efficiently_memory_group_by()?; let group_cols = ¶ms.group_columns; let schema_before_group_by = params.input_schema.clone(); @@ -1653,8 +1659,7 @@ impl PipelineBuilder { return Ok(()); } - let settings = self.ctx.get_settings(); - let efficiently_memory = settings.get_efficiently_memory_group_by()?; + let efficiently_memory = self.settings.get_efficiently_memory_group_by()?; let group_cols = ¶ms.group_columns; let schema_before_group_by = params.input_schema.clone(); @@ -1890,7 +1895,7 @@ impl PipelineBuilder { input, output, input_schema.num_fields(), - self.ctx.get_function_context()?, + self.func_ctx.clone(), vec![BlockOperator::Project { projection: projection.clone(), }], @@ -1932,8 +1937,8 @@ impl PipelineBuilder { limit: Option, after_exchange: bool, ) -> Result<()> { - let block_size = self.ctx.get_settings().get_max_block_size()? as usize; - let max_threads = self.ctx.get_settings().get_max_threads()? as usize; + let block_size = self.settings.get_max_block_size()? as usize; + let max_threads = self.settings.get_max_threads()? as usize; // TODO(Winter): the query will hang in MultiSortMergeProcessor when max_threads == 1 and output_len != 1 if self.main_pipeline.output_len() == 1 || max_threads == 1 { @@ -1995,12 +2000,12 @@ impl PipelineBuilder { fn build_join_probe(&mut self, join: &HashJoin, state: Arc) -> Result<()> { self.build_pipeline(&join.probe)?; - let max_block_size = self.ctx.get_settings().get_max_block_size()? as usize; - let func_ctx = self.ctx.get_function_context()?; + let max_block_size = self.settings.get_max_block_size()? as usize; let barrier = Barrier::new(self.main_pipeline.output_len()); let restore_barrier = Barrier::new(self.main_pipeline.output_len()); let probe_state = Arc::new(HashJoinProbeState::create( self.ctx.clone(), + self.func_ctx.clone(), state, &join.probe_projections, &join.probe_keys, @@ -2010,9 +2015,13 @@ impl PipelineBuilder { barrier, restore_barrier, )?); + let mut has_string_column = false; + for filed in join.output_schema()?.fields() { + has_string_column |= filed.data_type().is_string_column(); + } self.main_pipeline.add_transform(|input, output| { - let probe_spill_state = if self.ctx.get_settings().get_join_spilling_threshold()? != 0 { + let probe_spill_state = if self.settings.get_join_spilling_threshold()? != 0 { Some(Box::new(ProbeSpillState::create( self.ctx.clone(), probe_state.clone(), @@ -2027,9 +2036,10 @@ impl PipelineBuilder { probe_state.clone(), probe_spill_state, max_block_size, - func_ctx.clone(), + self.func_ctx.clone(), &join.join_type, !join.non_equi_conditions.is_empty(), + has_string_column, )?; if self.enable_profiling { @@ -2083,8 +2093,13 @@ impl PipelineBuilder { union_plan: &UnionAll, ) -> Result> { let union_ctx = QueryContext::create_from(self.ctx.clone()); - let mut pipeline_builder = - PipelineBuilder::create(union_ctx, self.enable_profiling, self.proc_profs.clone()); + let mut pipeline_builder = PipelineBuilder::create( + self.func_ctx.clone(), + self.settings.clone(), + union_ctx, + self.enable_profiling, + self.proc_profs.clone(), + ); pipeline_builder.cte_state = self.cte_state.clone(); let mut build_res = pipeline_builder.finalize(input)?; @@ -2150,7 +2165,7 @@ impl PipelineBuilder { // should render result for select PipelineBuilder::render_result_set( - &self.ctx.get_function_context()?, + &self.func_ctx, insert_select.input.output_schema()?, &insert_select.select_column_bindings, &mut self.main_pipeline, @@ -2158,7 +2173,6 @@ impl PipelineBuilder { )?; if insert_select.cast_needed { - let func_ctx = self.ctx.get_function_context()?; self.main_pipeline .add_transform(|transform_input_port, transform_output_port| { TransformCastSchema::try_create( @@ -2166,7 +2180,7 @@ impl PipelineBuilder { transform_output_port, select_schema.clone(), insert_schema.clone(), - func_ctx.clone(), + self.func_ctx.clone(), ) })?; } @@ -2290,6 +2304,8 @@ impl PipelineBuilder { let state = Arc::new(MaterializedCteState::new(self.ctx.clone())); self.cte_state.insert(cte_idx, state.clone()); let mut left_side_builder = PipelineBuilder::create( + self.func_ctx.clone(), + self.settings.clone(), left_side_ctx, self.enable_profiling, self.proc_profs.clone(), @@ -2299,7 +2315,7 @@ impl PipelineBuilder { assert!(left_side_pipeline.main_pipeline.is_pulling_pipeline()?); PipelineBuilder::render_result_set( - &self.ctx.get_function_context()?, + &self.func_ctx, left_side.output_schema()?, left_output_columns, &mut left_side_pipeline.main_pipeline, diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs index 46e9577f3b0bf..141a320612148 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs @@ -14,7 +14,6 @@ use common_arrow::arrow::bitmap::Bitmap; use common_arrow::arrow::bitmap::MutableBitmap; -use common_catalog::table_context::TableContext; use common_exception::Result; use common_expression::arrow::constant_bitmap; use common_expression::arrow::or_validities; @@ -27,6 +26,7 @@ use common_expression::Column; use common_expression::DataBlock; use common_expression::Evaluator; use common_expression::Expr; +use common_expression::FunctionContext; use common_expression::Scalar; use common_expression::Value; use common_functions::BUILTIN_FUNCTIONS; @@ -95,16 +95,15 @@ impl HashJoinProbeState { Ok(DataBlock::new_from_columns(vec![marker_column])) } - // return an (option bitmap, all_true, all_false) + // return an (option bitmap, all_true, all_false). pub(crate) fn get_other_filters( &self, merged_block: &DataBlock, filter: &Expr, + func_ctx: &FunctionContext, ) -> Result<(Option, bool, bool)> { let filter = cast_expr_to_non_null_boolean(filter.clone())?; - - let func_ctx = self.ctx.get_function_context()?; - let evaluator = Evaluator::new(merged_block, &func_ctx, &BUILTIN_FUNCTIONS); + let evaluator = Evaluator::new(merged_block, func_ctx, &BUILTIN_FUNCTIONS); let predicates = evaluator .run(&filter)? .try_downcast::() @@ -124,10 +123,9 @@ impl HashJoinProbeState { &self, merged_block: &DataBlock, filter: &Expr, + func_ctx: &FunctionContext, ) -> Result { - let func_ctx = self.ctx.get_function_context()?; - let evaluator = Evaluator::new(merged_block, &func_ctx, &BUILTIN_FUNCTIONS); - + let evaluator = Evaluator::new(merged_block, func_ctx, &BUILTIN_FUNCTIONS); let filter_vector: Value = evaluator.run(filter)?; let filter_vector = filter_vector.convert_to_full_column(filter.data_type(), merged_block.num_rows()); diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs index ff4308b65f870..8e22215d56187 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs @@ -31,6 +31,7 @@ use common_expression::ColumnBuilder; use common_expression::ColumnVec; use common_expression::DataBlock; use common_expression::Evaluator; +use common_expression::FunctionContext; use common_expression::HashMethod; use common_expression::HashMethodKind; use common_expression::HashMethodSerializer; @@ -65,6 +66,7 @@ use crate::sessions::QueryContext; /// Define some shared states for all hash join build threads. pub struct HashJoinBuildState { pub(crate) ctx: Arc, + pub(crate) func_ctx: FunctionContext, /// `hash_join_state` is shared by `HashJoinBuild` and `HashJoinProbe` pub(crate) hash_join_state: Arc, // When build side input data is coming, will put it into chunks. @@ -100,8 +102,10 @@ pub struct HashJoinBuildState { } impl HashJoinBuildState { + #[allow(clippy::too_many_arguments)] pub fn try_create( ctx: Arc, + func_ctx: FunctionContext, build_keys: &[RemoteExpr], build_projections: &ColumnSet, hash_join_state: Arc, @@ -115,8 +119,9 @@ impl HashJoinBuildState { let method = DataBlock::choose_hash_method_with_types(&hash_key_types, false)?; Ok(Arc::new(Self { ctx: ctx.clone(), - chunk_size_limit: ctx.get_settings().get_max_block_size()? as usize * 16, + func_ctx, hash_join_state, + chunk_size_limit: ctx.get_settings().get_max_block_size()? as usize * 16, barrier, restore_barrier, row_space_builders: Default::default(), @@ -467,7 +472,6 @@ impl HashJoinBuildState { }}; } - let func_ctx = self.ctx.get_function_context()?; let chunks = unsafe { &mut *self.hash_join_state.chunks.get() }; let mut has_null = false; for chunk_index in task.0..task.1 { @@ -479,7 +483,7 @@ impl HashJoinBuildState { let chunk = &mut chunks[chunk_index]; - let evaluator = Evaluator::new(chunk, &func_ctx, &BUILTIN_FUNCTIONS); + let evaluator = Evaluator::new(chunk, &self.func_ctx, &BUILTIN_FUNCTIONS); let columns: Vec<(Column, DataType)> = self .hash_join_state .hash_join_desc diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs index 06786374c6001..3ed563e25cacf 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs @@ -32,6 +32,7 @@ use common_expression::Column; use common_expression::DataBlock; use common_expression::DataSchemaRef; use common_expression::Evaluator; +use common_expression::FunctionContext; use common_expression::HashMethod; use common_expression::HashMethodKind; use common_expression::RemoteExpr; @@ -58,6 +59,7 @@ use crate::sql::planner::plans::JoinType; /// Define some shared states for all hash join probe threads. pub struct HashJoinProbeState { pub(crate) ctx: Arc, + pub(crate) func_ctx: FunctionContext, /// `hash_join_state` is shared by `HashJoinBuild` and `HashJoinProbe` pub(crate) hash_join_state: Arc, /// Processors count @@ -96,6 +98,7 @@ impl HashJoinProbeState { #[allow(clippy::too_many_arguments)] pub fn create( ctx: Arc, + func_ctx: FunctionContext, hash_join_state: Arc, probe_projections: &ColumnSet, probe_keys: &[RemoteExpr], @@ -118,6 +121,7 @@ impl HashJoinProbeState { let method = DataBlock::choose_hash_method_with_types(&hash_key_types, false)?; Ok(HashJoinProbeState { ctx, + func_ctx, hash_join_state, processor_count, probe_workers: AtomicUsize::new(0), @@ -381,6 +385,7 @@ impl HashJoinProbeState { let max_block_size = state.max_block_size; let true_validity = &state.true_validity; let build_indexes = &mut state.build_indexes; + let string_items_buf = &mut state.string_items_buf; let mut build_indexes_occupied = 0; let mut result_blocks = vec![]; @@ -446,6 +451,7 @@ impl HashJoinProbeState { build_columns, build_columns_data_type, &build_num_rows, + string_items_buf, )?; if self.hash_join_state.hash_join_desc.join_type == JoinType::Full { @@ -491,6 +497,7 @@ impl HashJoinProbeState { ) -> Result> { let max_block_size = state.max_block_size; let build_indexes = &mut state.build_indexes; + let string_items_buf = &mut state.string_items_buf; let mut build_indexes_occupied = 0; let mut result_blocks = vec![]; @@ -530,6 +537,7 @@ impl HashJoinProbeState { build_columns, build_columns_data_type, &build_num_rows, + string_items_buf, )?); build_indexes_occupied = 0; } @@ -543,6 +551,7 @@ impl HashJoinProbeState { ) -> Result> { let max_block_size = state.max_block_size; let build_indexes = &mut state.build_indexes; + let string_items_buf = &mut state.string_items_buf; let mut build_indexes_occupied = 0; let mut result_blocks = vec![]; @@ -582,6 +591,7 @@ impl HashJoinProbeState { build_columns, build_columns_data_type, &build_num_rows, + string_items_buf, )?); build_indexes_occupied = 0; } @@ -591,6 +601,7 @@ impl HashJoinProbeState { pub fn left_mark_scan(&self, task: usize, state: &mut ProbeState) -> Result> { let max_block_size = state.max_block_size; let build_indexes = &mut state.build_indexes; + let string_items_buf = &mut state.string_items_buf; let mut build_indexes_occupied = 0; let mut result_blocks = vec![]; @@ -659,6 +670,7 @@ impl HashJoinProbeState { build_columns, build_columns_data_type, &build_num_rows, + string_items_buf, )?; result_blocks.push(self.merge_eq_block( Some(build_block), diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs index b1a5ab6a58b4a..0e05b85cadf58 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs @@ -17,7 +17,6 @@ use std::sync::atomic::Ordering; use common_arrow::arrow::bitmap::Bitmap; use common_arrow::arrow::bitmap::MutableBitmap; -use common_catalog::table_context::TableContext; use common_exception::ErrorCode; use common_exception::Result; use common_expression::types::BooleanType; @@ -54,6 +53,7 @@ impl HashJoinProbeState { let probe_indexes = &mut probe_state.probe_indexes; let build_indexes = &mut probe_state.build_indexes; let build_indexes_ptr = build_indexes.as_mut_ptr(); + let string_items_buf = &mut probe_state.string_items_buf; let build_columns = unsafe { &*self.hash_join_state.build_columns.get() }; let build_columns_data_type = @@ -92,7 +92,7 @@ impl HashJoinProbeState { } let probe_block = if is_probe_projected { - Some(DataBlock::take(input, probe_indexes)?) + Some(DataBlock::take(input, probe_indexes, string_items_buf)?) } else { None }; @@ -102,6 +102,7 @@ impl HashJoinProbeState { build_columns, build_columns_data_type, build_num_rows, + string_items_buf, )?) } else { None @@ -165,7 +166,11 @@ impl HashJoinProbeState { if matched_num > 0 { let probe_block = if is_probe_projected { - Some(DataBlock::take(input, &probe_indexes[0..matched_num])?) + Some(DataBlock::take( + input, + &probe_indexes[0..matched_num], + string_items_buf, + )?) } else { None }; @@ -175,6 +180,7 @@ impl HashJoinProbeState { build_columns, build_columns_data_type, build_num_rows, + string_items_buf, )?) } else { None @@ -214,9 +220,7 @@ impl HashJoinProbeState { let other_predicate = cast_expr_to_non_null_boolean(other_predicate.clone())?; assert_eq!(other_predicate.data_type(), &DataType::Boolean); - let func_ctx = self.ctx.get_function_context()?; let mut filtered_blocks = Vec::with_capacity(result_blocks.len()); - for result_block in result_blocks { if self.hash_join_state.interrupt.load(Ordering::Relaxed) { return Err(ErrorCode::AbortedQuery( @@ -224,7 +228,8 @@ impl HashJoinProbeState { )); } - let evaluator = Evaluator::new(&result_block, &func_ctx, &BUILTIN_FUNCTIONS); + let evaluator = + Evaluator::new(&result_block, &self.func_ctx, &BUILTIN_FUNCTIONS); let predicate = evaluator .run(&other_predicate)? .try_downcast::() diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs index b856aa54260ec..0ada98786288d 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs @@ -51,6 +51,7 @@ impl HashJoinProbeState { let probe_indexes = &mut probe_state.probe_indexes; let local_build_indexes = &mut probe_state.build_indexes; let local_build_indexes_ptr = local_build_indexes.as_mut_ptr(); + let string_items_buf = &mut probe_state.string_items_buf; // Safe to unwrap. let probe_unmatched_indexes = probe_state.probe_unmatched_indexes.as_mut().unwrap(); @@ -113,6 +114,7 @@ impl HashJoinProbeState { probe_unmatched_indexes_occupied, is_probe_projected, is_build_projected, + string_items_buf, )?); probe_unmatched_indexes_occupied = 0; } @@ -126,8 +128,11 @@ impl HashJoinProbeState { } let probe_block = if is_probe_projected { - let mut probe_block = - DataBlock::take(input, &probe_indexes[0..matched_num])?; + let mut probe_block = DataBlock::take( + input, + &probe_indexes[0..matched_num], + string_items_buf, + )?; // For full join, wrap nullable for probe block if self.hash_join_state.hash_join_desc.join_type == JoinType::Full { let nullable_probe_columns = if matched_num == max_block_size { @@ -158,6 +163,7 @@ impl HashJoinProbeState { build_columns, build_columns_data_type, &build_num_rows, + string_items_buf, )?; // For left join, wrap nullable for build block let (nullable_columns, num_rows) = if build_num_rows == 0 { @@ -252,6 +258,7 @@ impl HashJoinProbeState { probe_unmatched_indexes_occupied, is_probe_projected, is_build_projected, + string_items_buf, )?); Ok(result_blocks) } @@ -276,6 +283,7 @@ impl HashJoinProbeState { let probe_indexes = &mut probe_state.probe_indexes; let local_build_indexes = &mut probe_state.build_indexes; let local_build_indexes_ptr = local_build_indexes.as_mut_ptr(); + let string_items_buf = &mut probe_state.string_items_buf; if input_num_rows > probe_state.row_state.as_ref().unwrap().len() { probe_state.row_state = Some(vec![0; input_num_rows]); } @@ -343,8 +351,11 @@ impl HashJoinProbeState { } let probe_block = if is_probe_projected { - let mut probe_block = - DataBlock::take(input, &probe_indexes[0..matched_num])?; + let mut probe_block = DataBlock::take( + input, + &probe_indexes[0..matched_num], + string_items_buf, + )?; // For full join, wrap nullable for probe block if self.hash_join_state.hash_join_desc.join_type == JoinType::Full { let nullable_probe_columns = if matched_num == max_block_size { @@ -375,6 +386,7 @@ impl HashJoinProbeState { build_columns, build_columns_data_type, &build_num_rows, + string_items_buf, )?; // For left join, wrap nullable for build block let (nullable_columns, num_rows) = if build_num_rows == 0 { @@ -425,6 +437,7 @@ impl HashJoinProbeState { .other_predicate .as_ref() .unwrap(), + &self.func_ctx, )?; if all_true { @@ -522,6 +535,7 @@ impl HashJoinProbeState { matched_num, is_probe_projected, is_build_projected, + string_items_buf, )?); matched_num = 0; } @@ -539,6 +553,7 @@ impl HashJoinProbeState { matched_num, is_probe_projected, is_build_projected, + string_items_buf, )?); Ok(result_blocks) } @@ -550,9 +565,11 @@ impl HashJoinProbeState { matched_num: usize, is_probe_projected: bool, is_build_projected: bool, + string_items_buf: &mut Option>, ) -> Result { let probe_block = if is_probe_projected { - let mut probe_block = DataBlock::take(input, &indexes[0..matched_num])?; + let mut probe_block = + DataBlock::take(input, &indexes[0..matched_num], string_items_buf)?; // For full join, wrap nullable for probe block if self.hash_join_state.hash_join_desc.join_type == JoinType::Full { let nullable_probe_columns = probe_block diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark.rs index 14292d08898b1..5f8a758f21ca6 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark.rs @@ -15,7 +15,6 @@ use std::iter::TrustedLen; use std::sync::atomic::Ordering; -use common_catalog::table_context::TableContext; use common_exception::ErrorCode; use common_exception::Result; use common_expression::types::BooleanType; @@ -145,7 +144,6 @@ impl HashJoinProbeState { *has_null = true; } - let _func_ctx = self.ctx.get_function_context()?; let other_predicate = self .hash_join_state .hash_join_desc @@ -157,6 +155,7 @@ impl HashJoinProbeState { let probe_indexes = &mut probe_state.probe_indexes; let build_indexes = &mut probe_state.build_indexes; let build_indexes_ptr = build_indexes.as_mut_ptr(); + let string_items_buf = &mut probe_state.string_items_buf; let build_columns = unsafe { &*self.hash_join_state.build_columns.get() }; let build_columns_data_type = @@ -197,7 +196,7 @@ impl HashJoinProbeState { } let probe_block = if is_probe_projected { - Some(DataBlock::take(input, probe_indexes)?) + Some(DataBlock::take(input, probe_indexes, string_items_buf)?) } else { None }; @@ -207,13 +206,18 @@ impl HashJoinProbeState { build_columns, build_columns_data_type, build_num_rows, + string_items_buf, )?) } else { None }; let result_block = self.merge_eq_block(probe_block, build_block, matched_num); - let filter = self.get_nullable_filter_column(&result_block, other_predicate)?; + let filter = self.get_nullable_filter_column( + &result_block, + other_predicate, + &self.func_ctx, + )?; let filter_viewer = NullableType::::try_downcast_column(&filter).unwrap(); let validity = &filter_viewer.validity; @@ -277,7 +281,11 @@ impl HashJoinProbeState { } let probe_block = if is_probe_projected { - Some(DataBlock::take(input, &probe_indexes[0..matched_num])?) + Some(DataBlock::take( + input, + &probe_indexes[0..matched_num], + string_items_buf, + )?) } else { None }; @@ -287,13 +295,15 @@ impl HashJoinProbeState { build_columns, build_columns_data_type, build_num_rows, + string_items_buf, )?) } else { None }; let result_block = self.merge_eq_block(probe_block, build_block, matched_num); - let filter = self.get_nullable_filter_column(&result_block, other_predicate)?; + let filter = + self.get_nullable_filter_column(&result_block, other_predicate, &self.func_ctx)?; let filter_viewer = NullableType::::try_downcast_column(&filter).unwrap(); let validity = &filter_viewer.validity; let data = &filter_viewer.column; diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_semi_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_semi_join.rs index fcda0177426d4..aced04d950e16 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_semi_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_semi_join.rs @@ -44,6 +44,7 @@ impl HashJoinProbeState { let max_block_size = probe_state.max_block_size; let valids = probe_state.valids.as_ref(); let probe_indexes = &mut probe_state.probe_indexes; + let string_items_buf = &mut probe_state.string_items_buf; let mut matched_num = 0; let mut result_blocks = vec![]; @@ -66,7 +67,7 @@ impl HashJoinProbeState { "Aborted query, because the server is shutting down or the query was killed.", )); } - let probe_block = DataBlock::take(input, probe_indexes)?; + let probe_block = DataBlock::take(input, probe_indexes, string_items_buf)?; result_blocks.push(probe_block); matched_num = 0; @@ -78,7 +79,7 @@ impl HashJoinProbeState { if matched_num == 0 { return Ok(result_blocks); } - let probe_block = DataBlock::take(input, &probe_indexes[0..matched_num])?; + let probe_block = DataBlock::take(input, &probe_indexes[0..matched_num], string_items_buf)?; result_blocks.push(probe_block); Ok(result_blocks) } @@ -109,6 +110,7 @@ impl HashJoinProbeState { let probe_indexes = &mut probe_state.probe_indexes; let build_indexes = &mut probe_state.build_indexes; let build_indexes_ptr = build_indexes.as_mut_ptr(); + let string_items_buf = &mut probe_state.string_items_buf; let build_columns = unsafe { &*self.hash_join_state.build_columns.get() }; let build_columns_data_type = @@ -127,10 +129,10 @@ impl HashJoinProbeState { .unwrap(); // For semi join, it defaults to all. let mut row_state = vec![0_u32; input.num_rows()]; - let dummy_probed_rows = vec![RowPtr { + let dummy_probed_row = RowPtr { chunk_index: 0, row_index: 0, - }]; + }; for (i, (key, ptr)) in keys_iter.zip(pointers).enumerate() { let (mut match_count, mut incomplete_ptr) = @@ -148,14 +150,10 @@ impl HashJoinProbeState { continue; } false => { - // dummy_probed_rows + // dummy_probed_row unsafe { - std::ptr::copy_nonoverlapping( - &dummy_probed_rows[0] as *const RowPtr, - build_indexes_ptr.add(matched_num), - 1, - ) - } + std::ptr::write(build_indexes_ptr.add(matched_num), dummy_probed_row) + }; match_count = 1; } true => (), @@ -178,7 +176,7 @@ impl HashJoinProbeState { } let probe_block = if is_probe_projected { - Some(DataBlock::take(input, probe_indexes)?) + Some(DataBlock::take(input, probe_indexes, string_items_buf)?) } else { None }; @@ -188,6 +186,7 @@ impl HashJoinProbeState { build_columns, build_columns_data_type, build_num_rows, + string_items_buf, )?) } else { None @@ -195,7 +194,11 @@ impl HashJoinProbeState { let result_block = self.merge_eq_block(probe_block.clone(), build_block, matched_num); - let mut bm = match self.get_other_filters(&result_block, other_predicate)? { + let mut bm = match self.get_other_filters( + &result_block, + other_predicate, + &self.func_ctx, + )? { (Some(b), _, _) => b.make_mut(), (_, true, _) => MutableBitmap::from_len_set(result_block.num_rows()), (_, _, true) => MutableBitmap::from_len_zeroed(result_block.num_rows()), @@ -257,7 +260,11 @@ impl HashJoinProbeState { } let probe_block = if is_probe_projected { - Some(DataBlock::take(input, &probe_indexes[0..matched_num])?) + Some(DataBlock::take( + input, + &probe_indexes[0..matched_num], + string_items_buf, + )?) } else { None }; @@ -267,13 +274,14 @@ impl HashJoinProbeState { build_columns, build_columns_data_type, build_num_rows, + string_items_buf, )?) } else { None }; let result_block = self.merge_eq_block(probe_block.clone(), build_block, matched_num); - let mut bm = match self.get_other_filters(&result_block, other_predicate)? { + let mut bm = match self.get_other_filters(&result_block, other_predicate, &self.func_ctx)? { (Some(b), _, _) => b.make_mut(), (_, true, _) => MutableBitmap::from_len_set(result_block.num_rows()), (_, _, true) => MutableBitmap::from_len_zeroed(result_block.num_rows()), diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_anti_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_anti_join.rs index 6bcabc0beb6f0..15f47e6089bf8 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_anti_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_anti_join.rs @@ -126,6 +126,7 @@ impl HashJoinProbeState { let local_probe_indexes = &mut probe_state.probe_indexes; let local_build_indexes = &mut probe_state.build_indexes; let local_build_indexes_ptr = local_build_indexes.as_mut_ptr(); + let string_items_buf = &mut probe_state.string_items_buf; let build_columns = unsafe { &*self.hash_join_state.build_columns.get() }; let build_columns_data_type = @@ -167,7 +168,11 @@ impl HashJoinProbeState { } let probe_block = if is_probe_projected { - Some(DataBlock::take(input, local_probe_indexes)?) + Some(DataBlock::take( + input, + local_probe_indexes, + string_items_buf, + )?) } else { None }; @@ -177,6 +182,7 @@ impl HashJoinProbeState { build_columns, build_columns_data_type, build_num_rows, + string_items_buf, )?) } else { None @@ -191,6 +197,7 @@ impl HashJoinProbeState { .other_predicate .as_ref() .unwrap(), + &self.func_ctx, )?; if all_true { @@ -248,6 +255,7 @@ impl HashJoinProbeState { Some(DataBlock::take( input, &local_probe_indexes[0..matched_num], + string_items_buf, )?) } else { None @@ -258,6 +266,7 @@ impl HashJoinProbeState { build_columns, build_columns_data_type, build_num_rows, + string_items_buf, )?) } else { None @@ -272,6 +281,7 @@ impl HashJoinProbeState { .other_predicate .as_ref() .unwrap(), + &self.func_ctx, )?; if all_true { diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_join.rs index fd246e583e57c..b0f0afbea9f60 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_join.rs @@ -49,6 +49,7 @@ impl HashJoinProbeState { let local_probe_indexes = &mut probe_state.probe_indexes; let local_build_indexes = &mut probe_state.build_indexes; let local_build_indexes_ptr = local_build_indexes.as_mut_ptr(); + let string_items_buf = &mut probe_state.string_items_buf; let mut matched_num = 0; let mut result_blocks = vec![]; @@ -106,7 +107,8 @@ impl HashJoinProbeState { } let probe_block = if is_probe_projected { - let probe_block = DataBlock::take(input, local_probe_indexes)?; + let probe_block = + DataBlock::take(input, local_probe_indexes, string_items_buf)?; // The join type is right join, we need to wrap nullable for probe side. let nullable_columns = probe_block @@ -124,6 +126,7 @@ impl HashJoinProbeState { build_columns, build_columns_data_type, build_num_rows, + string_items_buf, )?) } else { None @@ -161,6 +164,7 @@ impl HashJoinProbeState { .other_predicate .as_ref() .unwrap(), + &self.func_ctx, )?; if all_true { @@ -242,7 +246,11 @@ impl HashJoinProbeState { } let probe_block = if is_probe_projected { - let probe_block = DataBlock::take(input, &local_probe_indexes[0..matched_num])?; + let probe_block = DataBlock::take( + input, + &local_probe_indexes[0..matched_num], + string_items_buf, + )?; // The join type is right join, we need to wrap nullable for probe side. let mut validity = MutableBitmap::new(); @@ -263,6 +271,7 @@ impl HashJoinProbeState { build_columns, build_columns_data_type, build_num_rows, + string_items_buf, )?) } else { None @@ -297,6 +306,7 @@ impl HashJoinProbeState { .other_predicate .as_ref() .unwrap(), + &self.func_ctx, )?; if all_true { diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_mark.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_mark.rs index 8439d649b18e8..19aefdc099a05 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_mark.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_mark.rs @@ -119,6 +119,7 @@ impl HashJoinProbeState { let probe_indexes = &mut probe_state.probe_indexes; let build_indexes = &mut probe_state.build_indexes; let build_indexes_ptr = build_indexes.as_mut_ptr(); + let string_items_buf = &mut probe_state.string_items_buf; let build_columns = unsafe { &*self.hash_join_state.build_columns.get() }; let build_columns_data_type = @@ -156,7 +157,7 @@ impl HashJoinProbeState { } let probe_block = if is_probe_projected { - Some(DataBlock::take(input, probe_indexes)?) + Some(DataBlock::take(input, probe_indexes, string_items_buf)?) } else { None }; @@ -166,13 +167,18 @@ impl HashJoinProbeState { build_columns, build_columns_data_type, build_num_rows, + string_items_buf, )?) } else { None }; let result_block = self.merge_eq_block(probe_block, build_block, matched_num); - let filter = self.get_nullable_filter_column(&result_block, other_predicate)?; + let filter = self.get_nullable_filter_column( + &result_block, + other_predicate, + &self.func_ctx, + )?; let filter_viewer = NullableType::::try_downcast_column(&filter).unwrap(); let validity = &filter_viewer.validity; @@ -218,7 +224,11 @@ impl HashJoinProbeState { if matched_num > 0 { let probe_block = if is_probe_projected { - Some(DataBlock::take(input, &probe_indexes[0..matched_num])?) + Some(DataBlock::take( + input, + &probe_indexes[0..matched_num], + string_items_buf, + )?) } else { None }; @@ -228,13 +238,15 @@ impl HashJoinProbeState { build_columns, build_columns_data_type, build_num_rows, + string_items_buf, )?) } else { None }; let result_block = self.merge_eq_block(probe_block, build_block, matched_num); - let filter = self.get_nullable_filter_column(&result_block, other_predicate)?; + let filter = + self.get_nullable_filter_column(&result_block, other_predicate, &self.func_ctx)?; let filter_viewer = NullableType::::try_downcast_column(&filter).unwrap(); let validity = &filter_viewer.validity; let data = &filter_viewer.column; diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_semi_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_semi_join.rs index 1da419414037a..f068a3c68a5a5 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_semi_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_semi_join.rs @@ -126,6 +126,7 @@ impl HashJoinProbeState { let local_probe_indexes = &mut probe_state.probe_indexes; let local_build_indexes = &mut probe_state.build_indexes; let local_build_indexes_ptr = local_build_indexes.as_mut_ptr(); + let string_items_buf = &mut probe_state.string_items_buf; let build_columns = unsafe { &*self.hash_join_state.build_columns.get() }; let build_columns_data_type = @@ -167,7 +168,11 @@ impl HashJoinProbeState { } let probe_block = if is_probe_projected { - Some(DataBlock::take(input, local_probe_indexes)?) + Some(DataBlock::take( + input, + local_probe_indexes, + string_items_buf, + )?) } else { None }; @@ -177,6 +182,7 @@ impl HashJoinProbeState { build_columns, build_columns_data_type, build_num_rows, + string_items_buf, )?) } else { None @@ -191,6 +197,7 @@ impl HashJoinProbeState { .other_predicate .as_ref() .unwrap(), + &self.func_ctx, )?; if all_true { @@ -248,6 +255,7 @@ impl HashJoinProbeState { Some(DataBlock::take( input, &local_probe_indexes[0..matched_num], + string_items_buf, )?) } else { None @@ -258,6 +266,7 @@ impl HashJoinProbeState { build_columns, build_columns_data_type, build_num_rows, + string_items_buf, )?) } else { None @@ -272,6 +281,7 @@ impl HashJoinProbeState { .other_predicate .as_ref() .unwrap(), + &self.func_ctx, )?; if all_true { diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs index 8d4edf47be27b..9d0d539071724 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs @@ -37,6 +37,7 @@ pub struct ProbeState { pub(crate) row_state_indexes: Option>, pub(crate) probe_unmatched_indexes: Option>, pub(crate) markers: Option>, + pub(crate) string_items_buf: Option>, } impl ProbeState { @@ -48,6 +49,7 @@ impl ProbeState { max_block_size: usize, join_type: &JoinType, with_conjunct: bool, + has_string_column: bool, func_ctx: FunctionContext, ) -> Self { let mut true_validity = MutableBitmap::new(); @@ -76,6 +78,11 @@ impl ProbeState { } else { None }; + let string_items_buf = if has_string_column { + Some(vec![(0, 0); max_block_size]) + } else { + None + }; ProbeState { max_block_size, probe_indexes: vec![0; max_block_size], @@ -91,8 +98,9 @@ impl ProbeState { func_ctx, row_state, row_state_indexes, - markers, probe_unmatched_indexes, + markers, + string_items_buf, } } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/row.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/row.rs index 60109a29fe502..b7ad0269f2577 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/row.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/row.rs @@ -65,6 +65,7 @@ impl RowSpace { build_columns: &[ColumnVec], build_columns_data_type: &[DataType], num_rows: &usize, + string_items_buf: &mut Option>, ) -> Result { if *num_rows != 0 { let data_block = DataBlock::take_column_vec( @@ -72,6 +73,7 @@ impl RowSpace { build_columns_data_type, row_ptrs, row_ptrs.len(), + string_items_buf, ); Ok(data_block) } else { diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs index fe8b4f0df3965..dc019ccd4c0f5 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs @@ -84,6 +84,7 @@ impl TransformHashJoinProbe { func_ctx: FunctionContext, join_type: &JoinType, with_conjunct: bool, + has_string_column: bool, ) -> Result> { let id = join_probe_state.probe_attach()?; Ok(Box::new(TransformHashJoinProbe { @@ -94,7 +95,13 @@ impl TransformHashJoinProbe { output_data_blocks: VecDeque::new(), step: HashJoinProbeStep::WaitBuild, join_probe_state, - probe_state: ProbeState::create(max_block_size, join_type, with_conjunct, func_ctx), + probe_state: ProbeState::create( + max_block_size, + join_type, + with_conjunct, + has_string_column, + func_ctx, + ), max_block_size, outer_scan_finished: false, spill_done: false, diff --git a/src/query/service/src/schedulers/scheduler.rs b/src/query/service/src/schedulers/scheduler.rs index c505a6cbad157..0b0a1785e2a15 100644 --- a/src/query/service/src/schedulers/scheduler.rs +++ b/src/query/service/src/schedulers/scheduler.rs @@ -79,6 +79,8 @@ pub async fn build_local_pipeline( enable_profiling: bool, ) -> Result { let pipeline = PipelineBuilder::create( + ctx.get_function_context()?, + ctx.get_settings(), ctx.clone(), enable_profiling, SharedProcessorProfiles::default(), diff --git a/src/query/sql/src/planner/optimizer/cascades/tasks/apply_rule.rs b/src/query/sql/src/planner/optimizer/cascades/tasks/apply_rule.rs index 69725942b11d1..833f4a92c0001 100644 --- a/src/query/sql/src/planner/optimizer/cascades/tasks/apply_rule.rs +++ b/src/query/sql/src/planner/optimizer/cascades/tasks/apply_rule.rs @@ -72,11 +72,7 @@ impl ApplyRuleTask { let group = optimizer.memo.group(self.target_group_index)?; let m_expr = group.m_expr(self.m_expr_index)?; let mut state = TransformResult::new(); - let rule = RuleFactory::create_rule( - self.rule_id, - optimizer.metadata.clone(), - self.ctx.get_function_context()?, - )?; + let rule = RuleFactory::create_rule(self.rule_id, optimizer.metadata.clone())?; m_expr.apply_rule(&optimizer.memo, &rule, &mut state)?; optimizer.insert_from_transform_state(self.target_group_index, state)?; diff --git a/src/query/sql/src/planner/optimizer/heuristic/heuristic.rs b/src/query/sql/src/planner/optimizer/heuristic/heuristic.rs index 2f6ff745e8107..44c460608df1c 100644 --- a/src/query/sql/src/planner/optimizer/heuristic/heuristic.rs +++ b/src/query/sql/src/planner/optimizer/heuristic/heuristic.rs @@ -60,13 +60,16 @@ pub static RESIDUAL_RULES: Lazy> = /// A heuristic query optimizer. It will apply specific transformation rules in order and /// implement the logical plans with default implementation rules. pub struct HeuristicOptimizer { - func_ctx: FunctionContext, + _func_ctx: FunctionContext, metadata: MetadataRef, } impl HeuristicOptimizer { pub fn new(func_ctx: FunctionContext, metadata: MetadataRef) -> Self { - HeuristicOptimizer { func_ctx, metadata } + HeuristicOptimizer { + _func_ctx: func_ctx, + metadata, + } } pub fn pre_optimize(&self, s_expr: SExpr) -> Result { @@ -99,8 +102,7 @@ impl HeuristicOptimizer { let mut s_expr = s_expr.clone(); for rule_id in rules { - let rule = - RuleFactory::create_rule(*rule_id, self.metadata.clone(), self.func_ctx.clone())?; + let rule = RuleFactory::create_rule(*rule_id, self.metadata.clone())?; let mut state = TransformResult::new(); if rule .patterns() diff --git a/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs b/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs index 1f3f8b6b26a30..c2ea6f6c87519 100644 --- a/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs +++ b/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs @@ -625,11 +625,7 @@ impl DPhpy { fn apply_rule(&self, s_expr: &SExpr) -> Result { let mut s_expr = s_expr.clone(); - let rule = RuleFactory::create_rule( - RuleID::PushDownFilterJoin, - self.metadata.clone(), - self.ctx.get_function_context()?, - )?; + let rule = RuleFactory::create_rule(RuleID::PushDownFilterJoin, self.metadata.clone())?; let mut state = TransformResult::new(); if rule .patterns() diff --git a/src/query/sql/src/planner/optimizer/rule/factory.rs b/src/query/sql/src/planner/optimizer/rule/factory.rs index a3723ef7cae79..9d57ee0fb2fdd 100644 --- a/src/query/sql/src/planner/optimizer/rule/factory.rs +++ b/src/query/sql/src/planner/optimizer/rule/factory.rs @@ -13,7 +13,6 @@ // limitations under the License. use common_exception::Result; -use common_expression::FunctionContext; use super::rewrite::RuleCommuteJoin; use super::rewrite::RuleEliminateEvalScalar; @@ -51,11 +50,7 @@ use crate::MetadataRef; pub struct RuleFactory; impl RuleFactory { - pub fn create_rule( - id: RuleID, - metadata: MetadataRef, - _func_ctx: FunctionContext, - ) -> Result { + pub fn create_rule(id: RuleID, metadata: MetadataRef) -> Result { match id { RuleID::EliminateEvalScalar => Ok(Box::new(RuleEliminateEvalScalar::new())), RuleID::PushDownFilterUnion => Ok(Box::new(RulePushDownFilterUnion::new())), diff --git a/src/query/storages/fuse/src/statistics/cluster_statistics.rs b/src/query/storages/fuse/src/statistics/cluster_statistics.rs index 0f11a8848653a..9b1411b24802a 100644 --- a/src/query/storages/fuse/src/statistics/cluster_statistics.rs +++ b/src/query/storages/fuse/src/statistics/cluster_statistics.rs @@ -109,7 +109,7 @@ impl ClusterStatsGenerator { if !self.cluster_key_index.is_empty() { let indices = vec![0u32, block.num_rows() as u32 - 1]; - block = block.take(&indices)?; + block = block.take(&indices, &mut None)?; } block = self