Skip to content

Commit

Permalink
refactor(query): improve hash join (databendlabs#12928)
Browse files Browse the repository at this point in the history
* improve hash join

* improve concat

* improve take_string and add take_boolean

* fix

* improve concat

* improve concat_string_types

* improve take

* improve filter

* update

* remove get_function_context

* improve settings

* allow too_many_arguments

* merge

* merge

* refine primitive comments

* refine

* refine

* refine take_compact

* fix take_compact

* add safety comment

* fix take_compact_string

* refine: use extend from iter and get_unchecked_mut

* refine concat_primitive_types

* reduce pr size

* reduce pr size
  • Loading branch information
Dousir9 authored and andylokandy committed Nov 27, 2023
1 parent 2839bfa commit 35f3c4e
Show file tree
Hide file tree
Showing 30 changed files with 1,069 additions and 441 deletions.
203 changes: 163 additions & 40 deletions src/query/expression/src/kernels/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,30 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_arrow::arrow::bitmap::Bitmap;
use common_arrow::arrow::buffer::Buffer;
use common_exception::ErrorCode;
use common_exception::Result;
use itertools::Itertools;

use crate::kernels::take::BIT_MASK;
use crate::kernels::utils::copy_advance_aligned;
use crate::kernels::utils::set_vec_len_by_ptr;
use crate::types::array::ArrayColumnBuilder;
use crate::types::decimal::DecimalColumn;
use crate::types::map::KvColumnBuilder;
use crate::types::nullable::NullableColumn;
use crate::types::number::NumberColumn;
use crate::types::string::StringColumnBuilder;
use crate::types::string::StringColumn;
use crate::types::AnyType;
use crate::types::ArgType;
use crate::types::ArrayType;
use crate::types::BitmapType;
use crate::types::BooleanType;
use crate::types::DateType;
use crate::types::EmptyArrayType;
use crate::types::EmptyMapType;
use crate::types::MapType;
use crate::types::NullType;
use crate::types::NullableType;
use crate::types::NumberType;
use crate::types::StringType;
Expand Down Expand Up @@ -102,42 +106,81 @@ impl Column {
let capacity = columns.iter().map(|c| c.len()).sum();

match &columns[0] {
Column::Null { .. } => Self::concat_arg_types::<NullType>(columns),
Column::EmptyArray { .. } => Self::concat_arg_types::<EmptyArrayType>(columns),
Column::EmptyMap { .. } => Self::concat_arg_types::<EmptyMapType>(columns),
Column::Null { .. } => Column::Null { len: capacity },
Column::EmptyArray { .. } => Column::EmptyArray { len: capacity },
Column::EmptyMap { .. } => Column::EmptyMap { len: capacity },
Column::Number(col) => with_number_mapped_type!(|NUM_TYPE| match col {
NumberColumn::NUM_TYPE(_) => {
Self::concat_arg_types::<NumberType<NUM_TYPE>>(columns)
let columns = columns
.iter()
.map(|col| <NumberType<NUM_TYPE>>::try_downcast_column(col).unwrap())
.collect_vec();
let builder = Self::concat_primitive_types(&columns, capacity);
<NumberType<NUM_TYPE>>::upcast_column(<NumberType<NUM_TYPE>>::column_from_vec(
builder,
&[],
))
}
}),
Column::Decimal(col) => with_decimal_type!(|DECIMAL_TYPE| match col {
DecimalColumn::DECIMAL_TYPE(_, size) => {
let mut builder = Vec::with_capacity(capacity);
for c in columns {
match c {
Column::Decimal(DecimalColumn::DECIMAL_TYPE(col, size)) => {
debug_assert_eq!(size, size);
builder.extend_from_slice(col);
}
let columns = columns
.iter()
.map(|col| match col {
Column::Decimal(DecimalColumn::DECIMAL_TYPE(col, _)) => col.clone(),
_ => unreachable!(),
}
}
})
.collect_vec();
let builder = Self::concat_primitive_types(&columns, capacity);
Column::Decimal(DecimalColumn::DECIMAL_TYPE(builder.into(), *size))
}
}),
Column::Boolean(_) => Self::concat_arg_types::<BooleanType>(columns),
Column::Boolean(_) => {
let columns = columns
.iter()
.map(|col| BooleanType::try_downcast_column(col).unwrap())
.collect_vec();
Column::Boolean(Self::concat_boolean_types(&columns, capacity))
}
Column::String(_) => {
let data_capacity = columns.iter().map(|c| c.memory_size() - c.len() * 8).sum();
let builder = StringColumnBuilder::with_capacity(capacity, data_capacity);
Self::concat_value_types::<StringType>(builder, columns)
let columns = columns
.iter()
.map(|col| StringType::try_downcast_column(col).unwrap())
.collect_vec();
StringType::upcast_column(Self::concat_string_types(&columns, capacity))
}
Column::Timestamp(_) => {
let builder = Vec::with_capacity(capacity);
Self::concat_value_types::<TimestampType>(builder, columns)
let columns = columns
.iter()
.map(|col| TimestampType::try_downcast_column(col).unwrap())
.collect_vec();
let builder = Self::concat_primitive_types(&columns, capacity);
let ts = <NumberType<i64>>::upcast_column(<NumberType<i64>>::column_from_vec(
builder,
&[],
))
.into_number()
.unwrap()
.into_int64()
.unwrap();
Column::Timestamp(ts)
}
Column::Date(_) => {
let builder = Vec::with_capacity(capacity);
Self::concat_value_types::<DateType>(builder, columns)
let columns = columns
.iter()
.map(|col| DateType::try_downcast_column(col).unwrap())
.collect_vec();

let builder = Self::concat_primitive_types(&columns, capacity);
let d = <NumberType<i32>>::upcast_column(<NumberType<i32>>::column_from_vec(
builder,
&[],
))
.into_number()
.unwrap()
.into_int32()
.unwrap();
Column::Date(d)
}
Column::Array(col) => {
let mut offsets = Vec::with_capacity(capacity + 1);
Expand All @@ -164,9 +207,11 @@ impl Column {
Self::concat_value_types::<MapType<AnyType, AnyType>>(builder, columns)
}
Column::Bitmap(_) => {
let data_capacity = columns.iter().map(|c| c.memory_size() - c.len() * 8).sum();
let builder = StringColumnBuilder::with_capacity(capacity, data_capacity);
Self::concat_value_types::<BitmapType>(builder, columns)
let columns = columns
.iter()
.map(|col| BitmapType::try_downcast_column(col).unwrap())
.collect_vec();
BitmapType::upcast_column(Self::concat_string_types(&columns, capacity))
}
Column::Nullable(_) => {
let mut bitmaps = Vec::with_capacity(columns.len());
Expand All @@ -178,7 +223,11 @@ impl Column {
}

let column = Self::concat(&inners);
let validity = Self::concat_arg_types::<BooleanType>(&bitmaps);
let bitmaps = bitmaps
.iter()
.map(|col| BooleanType::try_downcast_column(col).unwrap())
.collect_vec();
let validity = Column::Boolean(Self::concat_boolean_types(&bitmaps, capacity));
let validity = BooleanType::try_downcast_column(&validity).unwrap();

Column::Nullable(Box::new(NullableColumn { column, validity }))
Expand All @@ -196,21 +245,95 @@ impl Column {
Column::Tuple(fields)
}
Column::Variant(_) => {
let data_capacity = columns.iter().map(|c| c.memory_size() - c.len() * 8).sum();
let builder = StringColumnBuilder::with_capacity(capacity, data_capacity);
Self::concat_value_types::<VariantType>(builder, columns)
let columns = columns
.iter()
.map(|col| VariantType::try_downcast_column(col).unwrap())
.collect_vec();
VariantType::upcast_column(Self::concat_string_types(&columns, capacity))
}
}
}

fn concat_arg_types<T: ArgType>(columns: &[Column]) -> Column {
let columns: Vec<T::Column> = columns
.iter()
.map(|c| T::try_downcast_column(c).unwrap())
.collect();
let iter = columns.iter().flat_map(|c| T::iter_column(c));
let result = T::column_from_ref_iter(iter, &[]);
T::upcast_column(result)
pub fn concat_primitive_types<T>(cols: &[Buffer<T>], num_rows: usize) -> Vec<T>
where T: Copy {
let mut builder: Vec<T> = Vec::with_capacity(num_rows);
for col in cols {
builder.extend(col.iter());
}
builder
}

pub fn concat_string_types<'a>(cols: &'a [StringColumn], num_rows: usize) -> StringColumn {
// [`StringColumn`] consists of [`data`] and [`offset`], we build [`data`] and [`offset`] respectively,
// and then call `StringColumn::new(data.into(), offsets.into())` to create [`StringColumn`].
let mut offsets: Vec<u64> = Vec::with_capacity(num_rows + 1);
let mut offsets_len = 0;
let mut data_size = 0;

// Build [`offset`] and calculate `data_size` required by [`data`].
unsafe {
*offsets.get_unchecked_mut(offsets_len) = 0;
offsets_len += 1;
for col in cols.iter() {
let mut start = 0;
for end in col.offsets()[1..].iter() {
data_size += end - start;
start = *end;
*offsets.get_unchecked_mut(offsets_len) = data_size;
offsets_len += 1;
}
}
offsets.set_len(offsets_len);
}

// Build [`data`].
let mut data: Vec<u8> = Vec::with_capacity(data_size as usize);
let mut data_ptr = data.as_mut_ptr();

unsafe {
for col in cols.iter() {
let col_data = col.data().as_slice();
copy_advance_aligned(col_data.as_ptr(), &mut data_ptr, col_data.len());
}
set_vec_len_by_ptr(&mut data, data_ptr);
}

StringColumn::new(data.into(), offsets.into())
}

pub fn concat_boolean_types(cols: &[Bitmap], num_rows: usize) -> Bitmap {
let capacity = num_rows.saturating_add(7) / 8;
let mut builder: Vec<u8> = Vec::with_capacity(capacity);
let mut builder_len = 0;
let mut unset_bits = 0;
let mut value = 0;
let mut i = 0;

unsafe {
for col in cols {
for item in col.iter() {
if item {
value |= BIT_MASK[i % 8];
} else {
unset_bits += 1;
}
i += 1;
if i % 8 == 0 {
*builder.get_unchecked_mut(builder_len) = value;
builder_len += 1;
value = 0;
}
}
}
if i % 8 != 0 {
*builder.get_unchecked_mut(builder_len) = value;
builder_len += 1;
}
builder.set_len(builder_len);
Bitmap::from_inner(Arc::new(builder.into()), 0, num_rows, unset_bits)
.ok()
.unwrap()
}
}

fn concat_value_types<T: ValueType>(
Expand Down
Loading

0 comments on commit 35f3c4e

Please sign in to comment.