Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(query): improve hash join #12928

Merged
merged 37 commits into from
Oct 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
c5b01f3
improve hash join
Dousir9 Sep 19, 2023
d91102c
improve concat
Dousir9 Sep 19, 2023
81dd24b
Merge branch 'main' of github.com:datafuselabs/databend into improve_…
Dousir9 Sep 19, 2023
43f6395
improve take_string and add take_boolean
Dousir9 Sep 20, 2023
0a28726
fix
Dousir9 Sep 20, 2023
1cf2e82
Merge branch 'main' of github.com:datafuselabs/databend into improve_…
Dousir9 Sep 20, 2023
332b435
improve concat
Dousir9 Sep 21, 2023
f1c83ad
improve concat_string_types
Dousir9 Sep 21, 2023
83c6586
improve take
Dousir9 Sep 21, 2023
491df68
improve filter
Dousir9 Sep 22, 2023
e385a09
Merge branch 'main' of github.com:datafuselabs/databend into improve_…
Dousir9 Sep 22, 2023
83c8cd9
update
Dousir9 Sep 22, 2023
b035116
remove get_function_context
Dousir9 Sep 22, 2023
82dd7ac
Merge branch 'main' of github.com:datafuselabs/databend into improve_…
Dousir9 Sep 22, 2023
0e8e35a
improve settings
Dousir9 Sep 22, 2023
a4ccedd
Merge branch 'main' of github.com:datafuselabs/databend into improve_…
Dousir9 Sep 23, 2023
9790039
allow too_many_arguments
Dousir9 Sep 23, 2023
0101b02
Merge branch 'main' of github.com:datafuselabs/databend into improve_…
Dousir9 Sep 25, 2023
f992e23
merge
Dousir9 Sep 25, 2023
e4c98f5
Merge branch 'main' of github.com:datafuselabs/databend into improve_…
Dousir9 Sep 26, 2023
4cbbc8a
merge
Dousir9 Sep 26, 2023
9ca0292
refine primitive comments
Dousir9 Sep 26, 2023
de681b2
Merge branch 'main' of github.com:datafuselabs/databend into improve_…
Dousir9 Sep 27, 2023
a0bdf31
refine
Dousir9 Sep 27, 2023
c568805
refine
Dousir9 Sep 27, 2023
1bcfbf8
refine take_compact
Dousir9 Sep 27, 2023
c07f88f
Merge branch 'main' of github.com:datafuselabs/databend into improve_…
Dousir9 Sep 27, 2023
62176e6
fix take_compact
Dousir9 Sep 27, 2023
6a15217
add safety comment
Dousir9 Sep 27, 2023
bd0d64e
Merge branch 'main' of github.com:datafuselabs/databend into improve_…
Dousir9 Sep 27, 2023
50a17b7
fix take_compact_string
Dousir9 Sep 27, 2023
5884ad9
Merge branch 'main' of github.com:datafuselabs/databend into improve_…
Dousir9 Oct 1, 2023
ed5bdc7
refine: use extend from iter and get_unchecked_mut
Dousir9 Oct 2, 2023
ec6f001
Merge branch 'main' of github.com:datafuselabs/databend into improve_…
Dousir9 Oct 8, 2023
82fb00a
refine concat_primitive_types
Dousir9 Oct 8, 2023
93033cc
reduce pr size
Dousir9 Oct 8, 2023
26b8221
reduce pr size
Dousir9 Oct 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 163 additions & 40 deletions src/query/expression/src/kernels/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,30 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_arrow::arrow::bitmap::Bitmap;
use common_arrow::arrow::buffer::Buffer;
use common_exception::ErrorCode;
use common_exception::Result;
use itertools::Itertools;

use crate::kernels::take::BIT_MASK;
use crate::kernels::utils::copy_advance_aligned;
use crate::kernels::utils::set_vec_len_by_ptr;
use crate::types::array::ArrayColumnBuilder;
use crate::types::decimal::DecimalColumn;
use crate::types::map::KvColumnBuilder;
use crate::types::nullable::NullableColumn;
use crate::types::number::NumberColumn;
use crate::types::string::StringColumnBuilder;
use crate::types::string::StringColumn;
use crate::types::AnyType;
use crate::types::ArgType;
use crate::types::ArrayType;
use crate::types::BitmapType;
use crate::types::BooleanType;
use crate::types::DateType;
use crate::types::EmptyArrayType;
use crate::types::EmptyMapType;
use crate::types::MapType;
use crate::types::NullType;
use crate::types::NullableType;
use crate::types::NumberType;
use crate::types::StringType;
Expand Down Expand Up @@ -102,42 +106,81 @@ impl Column {
let capacity = columns.iter().map(|c| c.len()).sum();

match &columns[0] {
Column::Null { .. } => Self::concat_arg_types::<NullType>(columns),
Column::EmptyArray { .. } => Self::concat_arg_types::<EmptyArrayType>(columns),
Column::EmptyMap { .. } => Self::concat_arg_types::<EmptyMapType>(columns),
Column::Null { .. } => Column::Null { len: capacity },
Column::EmptyArray { .. } => Column::EmptyArray { len: capacity },
Column::EmptyMap { .. } => Column::EmptyMap { len: capacity },
Column::Number(col) => with_number_mapped_type!(|NUM_TYPE| match col {
NumberColumn::NUM_TYPE(_) => {
Self::concat_arg_types::<NumberType<NUM_TYPE>>(columns)
let columns = columns
.iter()
.map(|col| <NumberType<NUM_TYPE>>::try_downcast_column(col).unwrap())
.collect_vec();
let builder = Self::concat_primitive_types(&columns, capacity);
<NumberType<NUM_TYPE>>::upcast_column(<NumberType<NUM_TYPE>>::column_from_vec(
builder,
&[],
))
}
}),
Column::Decimal(col) => with_decimal_type!(|DECIMAL_TYPE| match col {
DecimalColumn::DECIMAL_TYPE(_, size) => {
let mut builder = Vec::with_capacity(capacity);
for c in columns {
match c {
Column::Decimal(DecimalColumn::DECIMAL_TYPE(col, size)) => {
debug_assert_eq!(size, size);
builder.extend_from_slice(col);
}
let columns = columns
.iter()
.map(|col| match col {
Column::Decimal(DecimalColumn::DECIMAL_TYPE(col, _)) => col.clone(),
_ => unreachable!(),
}
}
})
.collect_vec();
let builder = Self::concat_primitive_types(&columns, capacity);
Column::Decimal(DecimalColumn::DECIMAL_TYPE(builder.into(), *size))
}
}),
Column::Boolean(_) => Self::concat_arg_types::<BooleanType>(columns),
Column::Boolean(_) => {
let columns = columns
.iter()
.map(|col| BooleanType::try_downcast_column(col).unwrap())
.collect_vec();
Column::Boolean(Self::concat_boolean_types(&columns, capacity))
}
Column::String(_) => {
let data_capacity = columns.iter().map(|c| c.memory_size() - c.len() * 8).sum();
let builder = StringColumnBuilder::with_capacity(capacity, data_capacity);
Self::concat_value_types::<StringType>(builder, columns)
let columns = columns
.iter()
.map(|col| StringType::try_downcast_column(col).unwrap())
.collect_vec();
StringType::upcast_column(Self::concat_string_types(&columns, capacity))
}
Column::Timestamp(_) => {
let builder = Vec::with_capacity(capacity);
Self::concat_value_types::<TimestampType>(builder, columns)
let columns = columns
.iter()
.map(|col| TimestampType::try_downcast_column(col).unwrap())
.collect_vec();
let builder = Self::concat_primitive_types(&columns, capacity);
let ts = <NumberType<i64>>::upcast_column(<NumberType<i64>>::column_from_vec(
builder,
&[],
))
.into_number()
.unwrap()
.into_int64()
.unwrap();
Column::Timestamp(ts)
}
Column::Date(_) => {
let builder = Vec::with_capacity(capacity);
Self::concat_value_types::<DateType>(builder, columns)
let columns = columns
.iter()
.map(|col| DateType::try_downcast_column(col).unwrap())
.collect_vec();

let builder = Self::concat_primitive_types(&columns, capacity);
let d = <NumberType<i32>>::upcast_column(<NumberType<i32>>::column_from_vec(
builder,
&[],
))
.into_number()
.unwrap()
.into_int32()
.unwrap();
Column::Date(d)
}
Column::Array(col) => {
let mut offsets = Vec::with_capacity(capacity + 1);
Expand All @@ -164,9 +207,11 @@ impl Column {
Self::concat_value_types::<MapType<AnyType, AnyType>>(builder, columns)
}
Column::Bitmap(_) => {
let data_capacity = columns.iter().map(|c| c.memory_size() - c.len() * 8).sum();
let builder = StringColumnBuilder::with_capacity(capacity, data_capacity);
Self::concat_value_types::<BitmapType>(builder, columns)
let columns = columns
.iter()
.map(|col| BitmapType::try_downcast_column(col).unwrap())
.collect_vec();
BitmapType::upcast_column(Self::concat_string_types(&columns, capacity))
}
Column::Nullable(_) => {
let mut bitmaps = Vec::with_capacity(columns.len());
Expand All @@ -178,7 +223,11 @@ impl Column {
}

let column = Self::concat(&inners);
let validity = Self::concat_arg_types::<BooleanType>(&bitmaps);
let bitmaps = bitmaps
.iter()
.map(|col| BooleanType::try_downcast_column(col).unwrap())
.collect_vec();
let validity = Column::Boolean(Self::concat_boolean_types(&bitmaps, capacity));
let validity = BooleanType::try_downcast_column(&validity).unwrap();

Column::Nullable(Box::new(NullableColumn { column, validity }))
Expand All @@ -196,21 +245,95 @@ impl Column {
Column::Tuple(fields)
}
Column::Variant(_) => {
let data_capacity = columns.iter().map(|c| c.memory_size() - c.len() * 8).sum();
let builder = StringColumnBuilder::with_capacity(capacity, data_capacity);
Self::concat_value_types::<VariantType>(builder, columns)
let columns = columns
.iter()
.map(|col| VariantType::try_downcast_column(col).unwrap())
.collect_vec();
VariantType::upcast_column(Self::concat_string_types(&columns, capacity))
}
}
}

fn concat_arg_types<T: ArgType>(columns: &[Column]) -> Column {
let columns: Vec<T::Column> = columns
.iter()
.map(|c| T::try_downcast_column(c).unwrap())
.collect();
let iter = columns.iter().flat_map(|c| T::iter_column(c));
let result = T::column_from_ref_iter(iter, &[]);
T::upcast_column(result)
pub fn concat_primitive_types<T>(cols: &[Buffer<T>], num_rows: usize) -> Vec<T>
where T: Copy {
let mut builder: Vec<T> = Vec::with_capacity(num_rows);
for col in cols {
builder.extend(col.iter());
}
builder
}

pub fn concat_string_types<'a>(cols: &'a [StringColumn], num_rows: usize) -> StringColumn {
// [`StringColumn`] consists of [`data`] and [`offset`], we build [`data`] and [`offset`] respectively,
// and then call `StringColumn::new(data.into(), offsets.into())` to create [`StringColumn`].
let mut offsets: Vec<u64> = Vec::with_capacity(num_rows + 1);
let mut offsets_len = 0;
let mut data_size = 0;

// Build [`offset`] and calculate `data_size` required by [`data`].
unsafe {
*offsets.get_unchecked_mut(offsets_len) = 0;
offsets_len += 1;
for col in cols.iter() {
let mut start = 0;
for end in col.offsets()[1..].iter() {
data_size += end - start;
start = *end;
*offsets.get_unchecked_mut(offsets_len) = data_size;
offsets_len += 1;
}
}
offsets.set_len(offsets_len);
}

// Build [`data`].
let mut data: Vec<u8> = Vec::with_capacity(data_size as usize);
let mut data_ptr = data.as_mut_ptr();

unsafe {
for col in cols.iter() {
let col_data = col.data().as_slice();
copy_advance_aligned(col_data.as_ptr(), &mut data_ptr, col_data.len());
}
set_vec_len_by_ptr(&mut data, data_ptr);
}

StringColumn::new(data.into(), offsets.into())
}

pub fn concat_boolean_types(cols: &[Bitmap], num_rows: usize) -> Bitmap {
let capacity = num_rows.saturating_add(7) / 8;
let mut builder: Vec<u8> = Vec::with_capacity(capacity);
let mut builder_len = 0;
let mut unset_bits = 0;
let mut value = 0;
let mut i = 0;

unsafe {
for col in cols {
for item in col.iter() {
if item {
value |= BIT_MASK[i % 8];
} else {
unset_bits += 1;
}
i += 1;
if i % 8 == 0 {
*builder.get_unchecked_mut(builder_len) = value;
builder_len += 1;
value = 0;
}
}
}
if i % 8 != 0 {
*builder.get_unchecked_mut(builder_len) = value;
builder_len += 1;
}
builder.set_len(builder_len);
Bitmap::from_inner(Arc::new(builder.into()), 0, num_rows, unset_bits)
.ok()
.unwrap()
}
}

fn concat_value_types<T: ValueType>(
Expand Down
Loading
Loading