Skip to content

Commit

Permalink
feat(query): support take_ranges (#13878)
Browse files Browse the repository at this point in the history
* support take_ranges

* add comments

* rename

* make lint

* add debug_assert_eq for take_ranges

* &Vec<(u32, u32)> -> &[Range<u32>]

---------

Co-authored-by: sundyli <[email protected]>
  • Loading branch information
Dousir9 and sundy-li authored Dec 1, 2023
1 parent 639638f commit adac7a0
Show file tree
Hide file tree
Showing 4 changed files with 323 additions and 2 deletions.
2 changes: 1 addition & 1 deletion src/query/expression/src/kernels/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ impl Column {
/// # Safety
/// * `src` + `src_idx`(in bits) must be [valid] for reads of `len` bits.
/// * `ptr` must be [valid] for writes of `len` bits.
unsafe fn copy_continuous_bits(
pub unsafe fn copy_continuous_bits(
ptr: &mut *mut u8,
src: &[u8],
mut dst_idx: usize,
Expand Down
1 change: 1 addition & 0 deletions src/query/expression/src/kernels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod sort;
mod take;
mod take_chunks;
mod take_compact;
mod take_ranges;
mod topk;
mod utils;

Expand Down
293 changes: 293 additions & 0 deletions src/query/expression/src/kernels/take_ranges.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use core::ops::Range;
use std::sync::Arc;

use common_arrow::arrow::bitmap::Bitmap;
use common_arrow::arrow::buffer::Buffer;
use common_exception::Result;

use crate::kernels::take::BIT_MASK;
use crate::kernels::utils::copy_advance_aligned;
use crate::kernels::utils::set_vec_len_by_ptr;
use crate::kernels::utils::store_advance_aligned;
use crate::types::array::ArrayColumn;
use crate::types::array::ArrayColumnBuilder;
use crate::types::decimal::DecimalColumn;
use crate::types::map::KvColumnBuilder;
use crate::types::nullable::NullableColumn;
use crate::types::number::NumberColumn;
use crate::types::string::StringColumn;
use crate::types::AnyType;
use crate::types::ArrayType;
use crate::types::MapType;
use crate::types::ValueType;
use crate::with_decimal_type;
use crate::with_number_type;
use crate::BlockEntry;
use crate::Column;
use crate::ColumnBuilder;
use crate::DataBlock;
use crate::Value;

impl DataBlock {
// Generate a new `DataBlock` by the specified indices ranges.
pub fn take_ranges(self, ranges: &[Range<u32>], num_rows: usize) -> Result<DataBlock> {
debug_assert_eq!(
ranges
.iter()
.map(|range| range.end - range.start)
.sum::<u32>() as usize,
num_rows
);

let columns = self
.columns()
.iter()
.map(|entry| match &entry.value {
Value::Column(c) => {
let value = Value::Column(Column::take_ranges(c, ranges, num_rows));
BlockEntry::new(entry.data_type.clone(), value)
}
_ => entry.clone(),
})
.collect();
Ok(DataBlock::new(columns, num_rows))
}
}

impl Column {
// Generate a new `Column` by the specified indices ranges.
fn take_ranges(&self, ranges: &[Range<u32>], num_rows: usize) -> Column {
match self {
Column::Null { .. } => Column::Null { len: num_rows },
Column::EmptyArray { .. } => Column::EmptyArray { len: num_rows },
Column::EmptyMap { .. } => Column::EmptyMap { len: num_rows },
Column::Number(column) => with_number_type!(|NUM_TYPE| match column {
NumberColumn::NUM_TYPE(values) => {
Column::Number(NumberColumn::NUM_TYPE(Self::take_ranges_primitive_types(
values, ranges, num_rows,
)))
}
}),
Column::Decimal(column) => with_decimal_type!(|DECIMAL_TYPE| match column {
DecimalColumn::DECIMAL_TYPE(values, size) => {
Column::Decimal(DecimalColumn::DECIMAL_TYPE(
Self::take_ranges_primitive_types(values, ranges, num_rows),
*size,
))
}
}),
Column::Boolean(bm) => {
let column = Self::take_ranges_boolean_types(bm, ranges, num_rows);
Column::Boolean(column)
}
Column::String(column) => {
let column = Self::take_ranges_string_types(column, ranges, num_rows);
Column::String(column)
}
Column::Timestamp(column) => {
let ts = Self::take_ranges_primitive_types(column, ranges, num_rows);
Column::Timestamp(ts)
}
Column::Date(column) => {
let d = Self::take_ranges_primitive_types(column, ranges, num_rows);
Column::Date(d)
}
Column::Array(column) => {
let mut offsets = Vec::with_capacity(num_rows + 1);
offsets.push(0);
let builder = ColumnBuilder::with_capacity(&column.values.data_type(), num_rows);
let builder = ArrayColumnBuilder { builder, offsets };
Self::take_ranges_scalar_types::<ArrayType<AnyType>>(
column, builder, ranges, num_rows,
)
}
Column::Map(column) => {
let mut offsets = Vec::with_capacity(num_rows + 1);
offsets.push(0);
let builder = ColumnBuilder::from_column(
ColumnBuilder::with_capacity(&column.values.data_type(), num_rows).build(),
);
let (key_builder, val_builder) = match builder {
ColumnBuilder::Tuple(fields) => (fields[0].clone(), fields[1].clone()),
_ => unreachable!(),
};
let builder = KvColumnBuilder {
keys: key_builder,
values: val_builder,
};
let builder = ArrayColumnBuilder { builder, offsets };
let column = ArrayColumn::try_downcast(column).unwrap();
Self::take_ranges_scalar_types::<MapType<AnyType, AnyType>>(
&column, builder, ranges, num_rows,
)
}
Column::Bitmap(column) => {
let column = Self::take_ranges_string_types(column, ranges, num_rows);
Column::Bitmap(column)
}

Column::Nullable(c) => {
let column = Self::take_ranges(&c.column, ranges, num_rows);
let validity = Self::take_ranges_boolean_types(&c.validity, ranges, num_rows);
Column::Nullable(Box::new(NullableColumn { column, validity }))
}
Column::Tuple(fields) => {
let fields = fields
.iter()
.map(|c| c.take_ranges(ranges, num_rows))
.collect();
Column::Tuple(fields)
}
Column::Variant(column) => {
let column = Self::take_ranges_string_types(column, ranges, num_rows);
Column::Variant(column)
}
}
}

fn take_ranges_scalar_types<T: ValueType>(
col: &T::Column,
mut builder: T::ColumnBuilder,
ranges: &[Range<u32>],
_num_rows: usize,
) -> Column {
for range in ranges {
for index in range.start as usize..range.end as usize {
T::push_item(&mut builder, unsafe {
T::index_column_unchecked(col, index)
});
}
}
T::upcast_column(T::build_column(builder))
}

fn take_ranges_primitive_types<T: Copy>(
values: &Buffer<T>,
ranges: &[Range<u32>],
num_rows: usize,
) -> Buffer<T> {
let mut builder: Vec<T> = Vec::with_capacity(num_rows);
for range in ranges {
builder.extend(&values[range.start as usize..range.end as usize]);
}
builder.into()
}

fn take_ranges_string_types(
values: &StringColumn,
ranges: &[Range<u32>],
num_rows: usize,
) -> StringColumn {
let mut offsets: Vec<u64> = Vec::with_capacity(num_rows + 1);
let mut offsets_len = 0;
let mut data_size = 0;

let value_data = values.data().as_slice();
let values_offset = values.offsets().as_slice();
// Build [`offset`] and calculate `data_size` required by [`data`].
unsafe {
*offsets.get_unchecked_mut(offsets_len) = 0;
offsets_len += 1;
for range in ranges {
let mut offset_start = values_offset[range.start as usize];
for offset_end in
values_offset[range.start as usize + 1..range.end as usize + 1].iter()
{
data_size += offset_end - offset_start;
offset_start = *offset_end;
*offsets.get_unchecked_mut(offsets_len) = data_size;
offsets_len += 1;
}
}
offsets.set_len(offsets_len);
}

// Build [`data`].
let mut data: Vec<u8> = Vec::with_capacity(data_size as usize);
let mut data_ptr = data.as_mut_ptr();

unsafe {
for range in ranges {
let col_data = &value_data[values_offset[range.start as usize] as usize
..values_offset[range.end as usize] as usize];
copy_advance_aligned(col_data.as_ptr(), &mut data_ptr, col_data.len());
}
set_vec_len_by_ptr(&mut data, data_ptr);
}

StringColumn::new(data.into(), offsets.into())
}

fn take_ranges_boolean_types(
bitmap: &Bitmap,
ranges: &[Range<u32>],
num_rows: usize,
) -> Bitmap {
let capacity = num_rows.saturating_add(7) / 8;
let mut builder: Vec<u8> = Vec::with_capacity(capacity);
let mut builder_ptr = builder.as_mut_ptr();
let mut builder_idx = 0;
let mut unset_bits = 0;
let mut buf = 0;

let (bitmap_slice, bitmap_offset, _) = bitmap.as_slice();
unsafe {
for range in ranges {
let mut start = range.start as usize;
let end = range.end as usize;
if builder_idx % 8 != 0 {
while start < end {
if bitmap.get_bit_unchecked(start) {
buf |= BIT_MASK[builder_idx % 8];
} else {
unset_bits += 1;
}
builder_idx += 1;
start += 1;
if builder_idx % 8 == 0 {
store_advance_aligned(buf, &mut builder_ptr);
buf = 0;
break;
}
}
}
let remaining = end - start;
if remaining > 0 {
let (cur_buf, cur_unset_bits) = Self::copy_continuous_bits(
&mut builder_ptr,
bitmap_slice,
builder_idx,
start + bitmap_offset,
remaining,
);
builder_idx += remaining;
unset_bits += cur_unset_bits;
buf = cur_buf;
}
}

if builder_idx % 8 != 0 {
store_advance_aligned(buf, &mut builder_ptr);
}

set_vec_len_by_ptr(&mut builder, builder_ptr);
Bitmap::from_inner(Arc::new(builder.into()), 0, num_rows, unset_bits)
.ok()
.unwrap()
}
}
}
29 changes: 28 additions & 1 deletion src/query/expression/tests/it/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use core::ops::Range;

use common_expression::block_debug::assert_block_value_eq;
use common_expression::types::number::*;
use common_expression::types::DataType;
Expand Down Expand Up @@ -202,7 +204,23 @@ pub fn test_pass() {
);
}

/// This test covers take.rs, take_chunks.rs, take_compact.rs, filter.rs, concat.rs.
// Build a range selection from a selection array.
pub fn build_range_selection(selection: &[u32], count: usize) -> Vec<Range<u32>> {
let mut range_selection = Vec::with_capacity(count);
let mut start = selection[0];
let mut idx = 1;
while idx < count {
if selection[idx] != selection[idx - 1] + 1 {
range_selection.push(start..selection[idx - 1] + 1);
start = selection[idx];
}
idx += 1;
}
range_selection.push(start..selection[count - 1] + 1);
range_selection
}

/// This test covers take.rs, take_chunks.rs, take_compact.rs, take_ranges.rs, filter.rs, concat.rs.
#[test]
pub fn test_take_and_filter_and_concat() -> common_exception::Result<()> {
use common_expression::types::DataType;
Expand Down Expand Up @@ -286,25 +304,34 @@ pub fn test_take_and_filter_and_concat() -> common_exception::Result<()> {
&mut None,
);
let block_4 = DataBlock::concat(&filtered_blocks)?;
let block_5 = concated_blocks.take_ranges(
&build_range_selection(&take_indices, take_indices.len()),
take_indices.len(),
)?;

assert_eq!(block_1.num_columns(), block_2.num_columns());
assert_eq!(block_1.num_rows(), block_2.num_rows());
assert_eq!(block_1.num_columns(), block_3.num_columns());
assert_eq!(block_1.num_rows(), block_3.num_rows());
assert_eq!(block_1.num_columns(), block_4.num_columns());
assert_eq!(block_1.num_rows(), block_4.num_rows());
assert_eq!(block_1.num_columns(), block_5.num_columns());
assert_eq!(block_1.num_rows(), block_5.num_rows());

let columns_1 = block_1.columns();
let columns_2 = block_2.columns();
let columns_3 = block_3.columns();
let columns_4 = block_4.columns();
let columns_5 = block_5.columns();
for idx in 0..columns_1.len() {
assert_eq!(columns_1[idx].data_type, columns_2[idx].data_type);
assert_eq!(columns_1[idx].value, columns_2[idx].value);
assert_eq!(columns_1[idx].data_type, columns_3[idx].data_type);
assert_eq!(columns_1[idx].value, columns_3[idx].value);
assert_eq!(columns_1[idx].data_type, columns_4[idx].data_type);
assert_eq!(columns_1[idx].value, columns_4[idx].value);
assert_eq!(columns_1[idx].data_type, columns_5[idx].data_type);
assert_eq!(columns_1[idx].value, columns_5[idx].value);
}

Ok(())
Expand Down

0 comments on commit adac7a0

Please sign in to comment.