Skip to content

Commit

Permalink
improve filter
Browse files Browse the repository at this point in the history
  • Loading branch information
Dousir9 committed Sep 22, 2023
1 parent 83c6586 commit 491df68
Showing 1 changed file with 184 additions and 112 deletions.
296 changes: 184 additions & 112 deletions src/query/expression/src/kernels/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,11 @@ use crate::types::map::KvColumnBuilder;
use crate::types::nullable::NullableColumn;
use crate::types::number::NumberColumn;
use crate::types::string::StringColumn;
use crate::types::string::StringColumnBuilder;
use crate::types::AnyType;
use crate::types::ArrayType;
use crate::types::BooleanType;
use crate::types::MapType;
use crate::types::ValueType;
use crate::types::VariantType;
use crate::with_decimal_type;
use crate::with_number_type;
use crate::BlockEntry;
Expand Down Expand Up @@ -178,14 +176,8 @@ impl Column {
Column::Tuple(fields)
}
Column::Variant(column) => {
let bytes_per_row = column.data().len() / filter.len().max(1);
let data_capacity = (filter.len() - filter.unset_bits()) * bytes_per_row;

Self::filter_scalar_types::<VariantType>(
column,
StringColumnBuilder::with_capacity(length, data_capacity),
filter,
)
let column = Self::filter_string_scalars(column, filter);
Column::Variant(column)
}
}
}
Expand Down Expand Up @@ -246,154 +238,234 @@ impl Column {
// low-level API using unsafe to improve performance
fn filter_primitive_types<T: Copy>(values: &Buffer<T>, filter: &Bitmap) -> Buffer<T> {
debug_assert_eq!(values.len(), filter.len());
let selected = filter.len() - filter.unset_bits();
if selected == values.len() {
let num_rows = filter.len() - filter.unset_bits();
if num_rows == values.len() {
return values.clone();
}
let mut values = values.as_slice();
let mut new = Vec::<T>::with_capacity(selected);
let mut dst = new.as_mut_ptr();

let mut builder: Vec<T> = Vec::with_capacity(num_rows);
let mut ptr = builder.as_mut_ptr();
let mut values_ptr = values.as_slice().as_ptr();

let (mut slice, offset, mut length) = filter.as_slice();
if offset > 0 {
// Consume the offset
let n = 8 - offset;
values
.iter()
.zip(filter.iter())
.take(n)
.for_each(|(value, is_selected)| {
if is_selected {
unsafe {
dst.write(*value);
dst = dst.add(1);
}
let mut mask = slice[0];
while mask != 0 {
let n = mask.trailing_zeros() as usize;
if n >= offset {
unsafe {
std::ptr::copy_nonoverlapping(values_ptr.add(n - offset), ptr, 1);
ptr = ptr.add(1);
}
});
}
mask = mask & (mask - 1);
}
length -= 8 - offset;
slice = &slice[1..];
length -= n;
values = &values[n..];
unsafe {
values_ptr = values_ptr.add(8 - offset);
}
}

const CHUNK_SIZE: usize = 64;
let mut chunks = values.chunks_exact(CHUNK_SIZE);
let mut mask_chunks = BitChunksExact::<u64>::new(slice, length);

chunks
.by_ref()
.zip(mask_chunks.by_ref())
.for_each(|(chunk, mut mask)| {
if mask == u64::MAX {
let mut continuous_selected = 0;
for mut mask in mask_chunks.by_ref() {
if mask == u64::MAX {
continuous_selected += CHUNK_SIZE;
} else {
if continuous_selected > 0 {
unsafe {
std::ptr::copy(chunk.as_ptr(), dst, CHUNK_SIZE);
dst = dst.add(CHUNK_SIZE);
}
} else {
while mask != 0 {
let n = mask.trailing_zeros() as usize;
unsafe {
dst.write(chunk[n]);
dst = dst.add(1);
}
mask = mask & (mask - 1);
std::ptr::copy_nonoverlapping(values_ptr, ptr, continuous_selected);
ptr = ptr.add(continuous_selected);
values_ptr = values_ptr.add(continuous_selected);
}
continuous_selected = 0;
}
});

chunks
.remainder()
.iter()
.zip(mask_chunks.remainder_iter())
.for_each(|(value, is_selected)| {
if is_selected {
while mask != 0 {
let n = mask.trailing_zeros() as usize;
unsafe {
dst.write(*value);
dst = dst.add(1);
std::ptr::copy_nonoverlapping(values_ptr.add(n), ptr, 1);
ptr = ptr.add(1);
}
mask = mask & (mask - 1);
}
});
unsafe {
values_ptr = values_ptr.add(CHUNK_SIZE);
}
}
}
if continuous_selected > 0 {
unsafe {
std::ptr::copy_nonoverlapping(values_ptr, ptr, continuous_selected);
ptr = ptr.add(continuous_selected);
values_ptr = values_ptr.add(continuous_selected);
}
}

unsafe { new.set_len(selected) };
new.into()
for (i, is_selected) in mask_chunks.remainder_iter().enumerate() {
if is_selected {
unsafe {
std::ptr::copy_nonoverlapping(values_ptr.add(i), ptr, 1);
ptr = ptr.add(1);
}
}
}

unsafe { builder.set_len(num_rows) };
builder.into()
}

// low-level API using unsafe to improve performance
fn filter_string_scalars(values: &StringColumn, filter: &Bitmap) -> StringColumn {
debug_assert_eq!(values.len(), filter.len());
let selected = filter.len() - filter.unset_bits();
if selected == values.len() {
let num_rows = filter.len() - filter.unset_bits();
if num_rows == values.len() {
return values.clone();
}
let data = values.data().as_slice();
let offsets = values.offsets().as_slice();

let mut res_offsets = Vec::with_capacity(selected + 1);
res_offsets.push(0);

let mut res_data = vec![];
let hint_size = data.len() / (values.len() + 1) * selected;

static MAX_HINT_SIZE: usize = 1000000000;
if hint_size < MAX_HINT_SIZE && values.len() < MAX_HINT_SIZE {
res_data.reserve(hint_size)
}

let mut pos = 0;

let mut items: Vec<(u64, usize)> = Vec::with_capacity(num_rows);
let mut offsets: Vec<u64> = Vec::with_capacity(num_rows + 1);
offsets.push(0);
let items_ptr = items.as_mut_ptr();
let mut offsets_ptr = unsafe { offsets.as_mut_ptr().add(1) };
let mut items_pos = 0;

let mut data_size = 0;
let values_offset = values.offsets().as_slice();
let col_data_ptr = values.data().as_slice().as_ptr();
let mut idx = 0;
let (mut slice, offset, mut length) = filter.as_slice();
if offset > 0 {
// Consume the offset
let n = 8 - offset;
values
.iter()
.zip(filter.iter())
.take(n)
.for_each(|(value, is_selected)| {
if is_selected {
res_data.extend_from_slice(value);
res_offsets.push(res_data.len() as u64);
let mut mask = slice[0];
while mask != 0 {
let n = mask.trailing_zeros() as usize;
if n >= offset {
let start = unsafe { *values_offset.get_unchecked(n - offset) } as usize;
let len =
unsafe { *values_offset.get_unchecked(n - offset + 1) as usize } - start;
data_size += len as u64;
unsafe {
std::ptr::write(
items_ptr.add(items_pos),
(col_data_ptr.add(start) as u64, len),
);
std::ptr::write(offsets_ptr, data_size);
items_pos += 1;
offsets_ptr = offsets_ptr.add(1);
}
});
}
mask = mask & (mask - 1);
}
length -= 8 - offset;
slice = &slice[1..];
length -= n;
pos += n;
idx += 8 - offset;
}

const CHUNK_SIZE: usize = 64;
let mut mask_chunks = BitChunksExact::<u64>::new(slice, length);

let mut continuous_selected = 0;
for mut mask in mask_chunks.by_ref() {
if mask == u64::MAX {
let data = &data[offsets[pos] as usize..offsets[pos + CHUNK_SIZE] as usize];
res_data.extend_from_slice(data);

let mut last_len = *res_offsets.last().unwrap();
for i in 0..CHUNK_SIZE {
last_len += offsets[pos + i + 1] - offsets[pos + i];
res_offsets.push(last_len);
}
continuous_selected += CHUNK_SIZE;
} else {
if continuous_selected > 0 {
let start = unsafe { *values_offset.get_unchecked(idx) } as usize;
let len =
unsafe { *values_offset.get_unchecked(idx + continuous_selected) as usize }
- start;
unsafe {
std::ptr::write(
items_ptr.add(items_pos),
(col_data_ptr.add(start) as u64, len),
);
items_pos += 1;
}
for i in 0..continuous_selected {
unsafe {
data_size += *values_offset.get_unchecked(idx + i + 1)
- *values_offset.get_unchecked(idx + i);
std::ptr::write(offsets_ptr, data_size);
offsets_ptr = offsets_ptr.add(1);
}
}
idx += continuous_selected;
continuous_selected = 0;
}
while mask != 0 {
let n = mask.trailing_zeros() as usize;
let data = &data[offsets[pos + n] as usize..offsets[pos + n + 1] as usize];
res_data.extend_from_slice(data);
res_offsets.push(res_data.len() as u64);

let start = unsafe { *values_offset.get_unchecked(idx + n) } as usize;
let len = unsafe { *values_offset.get_unchecked(idx + n + 1) as usize } - start;
data_size += len as u64;
unsafe {
std::ptr::write(
items_ptr.add(items_pos),
(col_data_ptr.add(start) as u64, len),
);
std::ptr::write(offsets_ptr, data_size);
items_pos += 1;
offsets_ptr = offsets_ptr.add(1);
}
mask = mask & (mask - 1);
}
idx += CHUNK_SIZE;
}
pos += CHUNK_SIZE;
}
if continuous_selected > 0 {
let start = unsafe { *values_offset.get_unchecked(idx) } as usize;
let len =
unsafe { *values_offset.get_unchecked(idx + continuous_selected) as usize } - start;
unsafe {
std::ptr::write(
items_ptr.add(items_pos),
(col_data_ptr.add(start) as u64, len),
);
items_pos += 1;
}
for i in 0..continuous_selected {
unsafe {
data_size += *values_offset.get_unchecked(idx + i + 1)
- *values_offset.get_unchecked(idx + i);
std::ptr::write(offsets_ptr, data_size);
offsets_ptr = offsets_ptr.add(1);
}
}
idx += continuous_selected;
}

for is_select in mask_chunks.remainder_iter() {
if is_select {
let data = &data[offsets[pos] as usize..offsets[pos + 1] as usize];
res_data.extend_from_slice(data);
res_offsets.push(res_data.len() as u64);
for (i, is_selected) in mask_chunks.remainder_iter().enumerate() {
if is_selected {
unsafe {
let start = *values_offset.get_unchecked(idx + i) as usize;
let len = *values_offset.get_unchecked(idx + i + 1) as usize - start;
data_size += len as u64;
std::ptr::write(
items_ptr.add(items_pos),
(col_data_ptr.add(start) as u64, len),
);
std::ptr::write(offsets_ptr, data_size);
items_pos += 1;
offsets_ptr = offsets_ptr.add(1);
}
}
pos += 1;
}
unsafe {
items.set_len(items_pos);
offsets.set_len(num_rows + 1);
}

StringColumn::new(res_data.into(), res_offsets.into())
let mut data: Vec<u8> = Vec::with_capacity(data_size as usize);
let data_ptr = data.as_mut_ptr();
let mut offset = 0;
for (str_ptr, len) in items.iter() {
// # Safety
// `offset` + `len` < `data_size`.
unsafe {
std::ptr::copy_nonoverlapping(*str_ptr as *const u8, data_ptr.add(offset), *len);
}
offset += len;
}
unsafe { data.set_len(offset) };
StringColumn::new(data.into(), offsets.into())
}
}

0 comments on commit 491df68

Please sign in to comment.