diff --git a/src/query/expression/src/kernels/filter.rs b/src/query/expression/src/kernels/filter.rs index 6b962798a9d9..d0358a493cb5 100644 --- a/src/query/expression/src/kernels/filter.rs +++ b/src/query/expression/src/kernels/filter.rs @@ -26,13 +26,11 @@ use crate::types::map::KvColumnBuilder; use crate::types::nullable::NullableColumn; use crate::types::number::NumberColumn; use crate::types::string::StringColumn; -use crate::types::string::StringColumnBuilder; use crate::types::AnyType; use crate::types::ArrayType; use crate::types::BooleanType; use crate::types::MapType; use crate::types::ValueType; -use crate::types::VariantType; use crate::with_decimal_type; use crate::with_number_type; use crate::BlockEntry; @@ -178,14 +176,8 @@ impl Column { Column::Tuple(fields) } Column::Variant(column) => { - let bytes_per_row = column.data().len() / filter.len().max(1); - let data_capacity = (filter.len() - filter.unset_bits()) * bytes_per_row; - - Self::filter_scalar_types::( - column, - StringColumnBuilder::with_capacity(length, data_capacity), - filter, - ) + let column = Self::filter_string_scalars(column, filter); + Column::Variant(column) } } } @@ -246,154 +238,234 @@ impl Column { // low-level API using unsafe to improve performance fn filter_primitive_types(values: &Buffer, filter: &Bitmap) -> Buffer { debug_assert_eq!(values.len(), filter.len()); - let selected = filter.len() - filter.unset_bits(); - if selected == values.len() { + let num_rows = filter.len() - filter.unset_bits(); + if num_rows == values.len() { return values.clone(); } - let mut values = values.as_slice(); - let mut new = Vec::::with_capacity(selected); - let mut dst = new.as_mut_ptr(); + + let mut builder: Vec = Vec::with_capacity(num_rows); + let mut ptr = builder.as_mut_ptr(); + let mut values_ptr = values.as_slice().as_ptr(); let (mut slice, offset, mut length) = filter.as_slice(); if offset > 0 { - // Consume the offset - let n = 8 - offset; - values - .iter() - .zip(filter.iter()) - .take(n) - .for_each(|(value, is_selected)| { - if is_selected { - unsafe { - dst.write(*value); - dst = dst.add(1); - } + let mut mask = slice[0]; + while mask != 0 { + let n = mask.trailing_zeros() as usize; + if n >= offset { + unsafe { + std::ptr::copy_nonoverlapping(values_ptr.add(n - offset), ptr, 1); + ptr = ptr.add(1); } - }); + } + mask = mask & (mask - 1); + } + length -= 8 - offset; slice = &slice[1..]; - length -= n; - values = &values[n..]; + unsafe { + values_ptr = values_ptr.add(8 - offset); + } } const CHUNK_SIZE: usize = 64; - let mut chunks = values.chunks_exact(CHUNK_SIZE); let mut mask_chunks = BitChunksExact::::new(slice, length); - - chunks - .by_ref() - .zip(mask_chunks.by_ref()) - .for_each(|(chunk, mut mask)| { - if mask == u64::MAX { + let mut continuous_selected = 0; + for mut mask in mask_chunks.by_ref() { + if mask == u64::MAX { + continuous_selected += CHUNK_SIZE; + } else { + if continuous_selected > 0 { unsafe { - std::ptr::copy(chunk.as_ptr(), dst, CHUNK_SIZE); - dst = dst.add(CHUNK_SIZE); - } - } else { - while mask != 0 { - let n = mask.trailing_zeros() as usize; - unsafe { - dst.write(chunk[n]); - dst = dst.add(1); - } - mask = mask & (mask - 1); + std::ptr::copy_nonoverlapping(values_ptr, ptr, continuous_selected); + ptr = ptr.add(continuous_selected); + values_ptr = values_ptr.add(continuous_selected); } + continuous_selected = 0; } - }); - - chunks - .remainder() - .iter() - .zip(mask_chunks.remainder_iter()) - .for_each(|(value, is_selected)| { - if is_selected { + while mask != 0 { + let n = mask.trailing_zeros() as usize; unsafe { - dst.write(*value); - dst = dst.add(1); + std::ptr::copy_nonoverlapping(values_ptr.add(n), ptr, 1); + ptr = ptr.add(1); } + mask = mask & (mask - 1); } - }); + unsafe { + values_ptr = values_ptr.add(CHUNK_SIZE); + } + } + } + if continuous_selected > 0 { + unsafe { + std::ptr::copy_nonoverlapping(values_ptr, ptr, continuous_selected); + ptr = ptr.add(continuous_selected); + values_ptr = values_ptr.add(continuous_selected); + } + } - unsafe { new.set_len(selected) }; - new.into() + for (i, is_selected) in mask_chunks.remainder_iter().enumerate() { + if is_selected { + unsafe { + std::ptr::copy_nonoverlapping(values_ptr.add(i), ptr, 1); + ptr = ptr.add(1); + } + } + } + + unsafe { builder.set_len(num_rows) }; + builder.into() } // low-level API using unsafe to improve performance fn filter_string_scalars(values: &StringColumn, filter: &Bitmap) -> StringColumn { debug_assert_eq!(values.len(), filter.len()); - let selected = filter.len() - filter.unset_bits(); - if selected == values.len() { + let num_rows = filter.len() - filter.unset_bits(); + if num_rows == values.len() { return values.clone(); } - let data = values.data().as_slice(); - let offsets = values.offsets().as_slice(); - - let mut res_offsets = Vec::with_capacity(selected + 1); - res_offsets.push(0); - - let mut res_data = vec![]; - let hint_size = data.len() / (values.len() + 1) * selected; - - static MAX_HINT_SIZE: usize = 1000000000; - if hint_size < MAX_HINT_SIZE && values.len() < MAX_HINT_SIZE { - res_data.reserve(hint_size) - } - - let mut pos = 0; + let mut items: Vec<(u64, usize)> = Vec::with_capacity(num_rows); + let mut offsets: Vec = Vec::with_capacity(num_rows + 1); + offsets.push(0); + let items_ptr = items.as_mut_ptr(); + let mut offsets_ptr = unsafe { offsets.as_mut_ptr().add(1) }; + let mut items_pos = 0; + + let mut data_size = 0; + let values_offset = values.offsets().as_slice(); + let col_data_ptr = values.data().as_slice().as_ptr(); + let mut idx = 0; let (mut slice, offset, mut length) = filter.as_slice(); if offset > 0 { - // Consume the offset - let n = 8 - offset; - values - .iter() - .zip(filter.iter()) - .take(n) - .for_each(|(value, is_selected)| { - if is_selected { - res_data.extend_from_slice(value); - res_offsets.push(res_data.len() as u64); + let mut mask = slice[0]; + while mask != 0 { + let n = mask.trailing_zeros() as usize; + if n >= offset { + let start = unsafe { *values_offset.get_unchecked(n - offset) } as usize; + let len = + unsafe { *values_offset.get_unchecked(n - offset + 1) as usize } - start; + data_size += len as u64; + unsafe { + std::ptr::write( + items_ptr.add(items_pos), + (col_data_ptr.add(start) as u64, len), + ); + std::ptr::write(offsets_ptr, data_size); + items_pos += 1; + offsets_ptr = offsets_ptr.add(1); } - }); + } + mask = mask & (mask - 1); + } + length -= 8 - offset; slice = &slice[1..]; - length -= n; - pos += n; + idx += 8 - offset; } const CHUNK_SIZE: usize = 64; let mut mask_chunks = BitChunksExact::::new(slice, length); - + let mut continuous_selected = 0; for mut mask in mask_chunks.by_ref() { if mask == u64::MAX { - let data = &data[offsets[pos] as usize..offsets[pos + CHUNK_SIZE] as usize]; - res_data.extend_from_slice(data); - - let mut last_len = *res_offsets.last().unwrap(); - for i in 0..CHUNK_SIZE { - last_len += offsets[pos + i + 1] - offsets[pos + i]; - res_offsets.push(last_len); - } + continuous_selected += CHUNK_SIZE; } else { + if continuous_selected > 0 { + let start = unsafe { *values_offset.get_unchecked(idx) } as usize; + let len = + unsafe { *values_offset.get_unchecked(idx + continuous_selected) as usize } + - start; + unsafe { + std::ptr::write( + items_ptr.add(items_pos), + (col_data_ptr.add(start) as u64, len), + ); + items_pos += 1; + } + for i in 0..continuous_selected { + unsafe { + data_size += *values_offset.get_unchecked(idx + i + 1) + - *values_offset.get_unchecked(idx + i); + std::ptr::write(offsets_ptr, data_size); + offsets_ptr = offsets_ptr.add(1); + } + } + idx += continuous_selected; + continuous_selected = 0; + } while mask != 0 { let n = mask.trailing_zeros() as usize; - let data = &data[offsets[pos + n] as usize..offsets[pos + n + 1] as usize]; - res_data.extend_from_slice(data); - res_offsets.push(res_data.len() as u64); - + let start = unsafe { *values_offset.get_unchecked(idx + n) } as usize; + let len = unsafe { *values_offset.get_unchecked(idx + n + 1) as usize } - start; + data_size += len as u64; + unsafe { + std::ptr::write( + items_ptr.add(items_pos), + (col_data_ptr.add(start) as u64, len), + ); + std::ptr::write(offsets_ptr, data_size); + items_pos += 1; + offsets_ptr = offsets_ptr.add(1); + } mask = mask & (mask - 1); } + idx += CHUNK_SIZE; } - pos += CHUNK_SIZE; + } + if continuous_selected > 0 { + let start = unsafe { *values_offset.get_unchecked(idx) } as usize; + let len = + unsafe { *values_offset.get_unchecked(idx + continuous_selected) as usize } - start; + unsafe { + std::ptr::write( + items_ptr.add(items_pos), + (col_data_ptr.add(start) as u64, len), + ); + items_pos += 1; + } + for i in 0..continuous_selected { + unsafe { + data_size += *values_offset.get_unchecked(idx + i + 1) + - *values_offset.get_unchecked(idx + i); + std::ptr::write(offsets_ptr, data_size); + offsets_ptr = offsets_ptr.add(1); + } + } + idx += continuous_selected; } - for is_select in mask_chunks.remainder_iter() { - if is_select { - let data = &data[offsets[pos] as usize..offsets[pos + 1] as usize]; - res_data.extend_from_slice(data); - res_offsets.push(res_data.len() as u64); + for (i, is_selected) in mask_chunks.remainder_iter().enumerate() { + if is_selected { + unsafe { + let start = *values_offset.get_unchecked(idx + i) as usize; + let len = *values_offset.get_unchecked(idx + i + 1) as usize - start; + data_size += len as u64; + std::ptr::write( + items_ptr.add(items_pos), + (col_data_ptr.add(start) as u64, len), + ); + std::ptr::write(offsets_ptr, data_size); + items_pos += 1; + offsets_ptr = offsets_ptr.add(1); + } } - pos += 1; + } + unsafe { + items.set_len(items_pos); + offsets.set_len(num_rows + 1); } - StringColumn::new(res_data.into(), res_offsets.into()) + let mut data: Vec = Vec::with_capacity(data_size as usize); + let data_ptr = data.as_mut_ptr(); + let mut offset = 0; + for (str_ptr, len) in items.iter() { + // # Safety + // `offset` + `len` < `data_size`. + unsafe { + std::ptr::copy_nonoverlapping(*str_ptr as *const u8, data_ptr.add(offset), *len); + } + offset += len; + } + unsafe { data.set_len(offset) }; + StringColumn::new(data.into(), offsets.into()) } }