Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: enable runtime filter for native datasource #13976

Merged
merged 3 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use crate::pipelines::processors::transforms::hash_join::SingleStringHashJoinHas
use crate::pipelines::processors::HashJoinState;
use crate::sessions::QueryContext;

const INLIST_RUNTIME_FILTER_THRESHOLD: usize = 10_000;
pub(crate) const INLIST_RUNTIME_FILTER_THRESHOLD: usize = 10_000;

/// Define some shared states for all hash join build threads.
pub struct HashJoinBuildState {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use ethnum::U256;
use parking_lot::RwLock;

use crate::pipelines::processors::transforms::hash_join::build_state::BuildState;
use crate::pipelines::processors::transforms::hash_join::hash_join_build_state::INLIST_RUNTIME_FILTER_THRESHOLD;
use crate::pipelines::processors::transforms::hash_join::row::RowSpace;
use crate::pipelines::processors::transforms::hash_join::util::build_schema_wrap_nullable;
use crate::pipelines::processors::transforms::hash_join::util::inlist_filter;
Expand Down Expand Up @@ -257,7 +258,7 @@ impl HashJoinState {
let data_blocks = &mut build_state.build_chunks;

let num_rows = build_state.generation_state.build_num_rows;
if num_rows > 10_000 {
if num_rows > INLIST_RUNTIME_FILTER_THRESHOLD {
data_blocks.clear();
return Ok(());
}
Expand Down
5 changes: 5 additions & 0 deletions src/query/storages/fuse/src/operations/read/fuse_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use crate::operations::read::ReadParquetDataSource;
#[allow(clippy::too_many_arguments)]
pub fn build_fuse_native_source_pipeline(
ctx: Arc<dyn TableContext>,
table_schema: Arc<TableSchema>,
pipeline: &mut Pipeline,
block_reader: Arc<BlockReader>,
mut max_threads: usize,
Expand Down Expand Up @@ -77,7 +78,9 @@ pub fn build_fuse_native_source_pipeline(
output.clone(),
ReadNativeDataSource::<true>::create(
i,
plan.table_index,
ctx.clone(),
table_schema.clone(),
output,
block_reader.clone(),
partitions.clone(),
Expand All @@ -102,7 +105,9 @@ pub fn build_fuse_native_source_pipeline(
output.clone(),
ReadNativeDataSource::<false>::create(
i,
plan.table_index,
ctx.clone(),
table_schema.clone(),
output,
block_reader.clone(),
partitions.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,27 @@ use common_catalog::table_context::TableContext;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::DataBlock;
use common_expression::FunctionContext;
use common_expression::TableSchema;
use common_pipeline_core::processors::Event;
use common_pipeline_core::processors::OutputPort;
use common_pipeline_core::processors::Processor;
use common_pipeline_core::processors::ProcessorPtr;
use common_pipeline_sources::SyncSource;
use common_pipeline_sources::SyncSourcer;
use common_sql::IndexType;

use super::native_data_source::DataSource;
use crate::io::AggIndexReader;
use crate::io::BlockReader;
use crate::io::TableMetaLocationGenerator;
use crate::io::VirtualColumnReader;
use crate::operations::read::native_data_source::NativeDataSourceMeta;
use crate::operations::read::runtime_filter_prunner::runtime_filter_pruner;
use crate::FusePartInfo;

pub struct ReadNativeDataSource<const BLOCKING_IO: bool> {
func_ctx: FunctionContext,
id: usize,
finished: bool,
batch_size: usize,
Expand All @@ -49,20 +54,27 @@ pub struct ReadNativeDataSource<const BLOCKING_IO: bool> {

index_reader: Arc<Option<AggIndexReader>>,
virtual_reader: Arc<Option<VirtualColumnReader>>,

table_schema: Arc<TableSchema>,
table_index: IndexType,
}

impl ReadNativeDataSource<true> {
pub fn create(
id: usize,
table_index: IndexType,
ctx: Arc<dyn TableContext>,
table_schema: Arc<TableSchema>,
output: Arc<OutputPort>,
block_reader: Arc<BlockReader>,
partitions: StealablePartitions,
index_reader: Arc<Option<AggIndexReader>>,
virtual_reader: Arc<Option<VirtualColumnReader>>,
) -> Result<ProcessorPtr> {
let batch_size = ctx.get_settings().get_storage_fetch_part_num()? as usize;
let func_ctx = ctx.get_function_context()?;
SyncSourcer::create(ctx.clone(), output.clone(), ReadNativeDataSource::<true> {
func_ctx,
id,
output,
batch_size,
Expand All @@ -72,24 +84,30 @@ impl ReadNativeDataSource<true> {
partitions,
index_reader,
virtual_reader,
table_schema,
table_index,
})
}
}

impl ReadNativeDataSource<false> {
pub fn create(
id: usize,
table_index: IndexType,
ctx: Arc<dyn TableContext>,
table_schema: Arc<TableSchema>,
output: Arc<OutputPort>,
block_reader: Arc<BlockReader>,
partitions: StealablePartitions,
index_reader: Arc<Option<AggIndexReader>>,
virtual_reader: Arc<Option<VirtualColumnReader>>,
) -> Result<ProcessorPtr> {
let batch_size = ctx.get_settings().get_storage_fetch_part_num()? as usize;
let func_ctx = ctx.get_function_context()?;
Ok(ProcessorPtr::create(Box::new(ReadNativeDataSource::<
false,
> {
func_ctx,
id,
output,
batch_size,
Expand All @@ -99,6 +117,8 @@ impl ReadNativeDataSource<false> {
partitions,
index_reader,
virtual_reader,
table_schema,
table_index,
})))
}
}
Expand All @@ -110,6 +130,17 @@ impl SyncSource for ReadNativeDataSource<true> {
match self.partitions.steal_one(self.id) {
None => Ok(None),
Some(part) => {
if runtime_filter_pruner(
self.table_schema.clone(),
&part,
&self
.partitions
.ctx
.get_runtime_filter_with_id(self.table_index),
&self.func_ctx,
)? {
return Ok(Some(DataBlock::empty()));
}
if let Some(index_reader) = self.index_reader.as_ref() {
let fuse_part = FusePartInfo::from_part(&part)?;
let loc =
Expand Down Expand Up @@ -198,7 +229,15 @@ impl Processor for ReadNativeDataSource<false> {

if !parts.is_empty() {
let mut chunks = Vec::with_capacity(parts.len());
let filters = self
.partitions
.ctx
.get_runtime_filter_with_id(self.table_index);
for part in &parts {
if runtime_filter_pruner(self.table_schema.clone(), part, &filters, &self.func_ctx)?
{
continue;
}
let part = part.clone();
let block_reader = self.block_reader.clone();
let index_reader = self.index_reader.clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_catalog::table_context::TableContext;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::DataBlock;
use common_expression::FunctionContext;
use common_expression::TableSchema;
use common_pipeline_core::processors::Event;
use common_pipeline_core::processors::OutputPort;
Expand All @@ -42,7 +43,7 @@ use crate::operations::read::parquet_data_source::DataSourceMeta;
use crate::operations::read::runtime_filter_prunner::runtime_filter_pruner;

pub struct ReadParquetDataSource<const BLOCKING_IO: bool> {
ctx: Arc<dyn TableContext>,
func_ctx: FunctionContext,
id: usize,
table_index: IndexType,
finished: bool,
Expand Down Expand Up @@ -73,10 +74,10 @@ impl<const BLOCKING_IO: bool> ReadParquetDataSource<BLOCKING_IO> {
virtual_reader: Arc<Option<VirtualColumnReader>>,
) -> Result<ProcessorPtr> {
let batch_size = ctx.get_settings().get_storage_fetch_part_num()? as usize;

let func_ctx = ctx.get_function_context()?;
if BLOCKING_IO {
SyncSourcer::create(ctx.clone(), output.clone(), ReadParquetDataSource::<true> {
ctx: ctx.clone(),
func_ctx,
id,
table_index,
output,
Expand All @@ -93,7 +94,7 @@ impl<const BLOCKING_IO: bool> ReadParquetDataSource<BLOCKING_IO> {
Ok(ProcessorPtr::create(Box::new(ReadParquetDataSource::<
false,
> {
ctx: ctx.clone(),
func_ctx,
id,
table_index,
output,
Expand All @@ -120,8 +121,11 @@ impl SyncSource for ReadParquetDataSource<true> {
if runtime_filter_pruner(
self.table_schema.clone(),
&part,
&self.ctx.get_runtime_filter_with_id(self.table_index),
&self.partitions.ctx.get_function_context()?,
&self
.partitions
.ctx
.get_runtime_filter_with_id(self.table_index),
&self.func_ctx,
)? {
return Ok(Some(DataBlock::empty()));
}
Expand Down Expand Up @@ -220,13 +224,13 @@ impl Processor for ReadParquetDataSource<false> {

if !parts.is_empty() {
let mut chunks = Vec::with_capacity(parts.len());
let filters = self
.partitions
.ctx
.get_runtime_filter_with_id(self.table_index);
for part in &parts {
if runtime_filter_pruner(
self.table_schema.clone(),
part,
&self.ctx.get_runtime_filter_with_id(self.table_index),
&self.partitions.ctx.get_function_context()?,
)? {
if runtime_filter_pruner(self.table_schema.clone(), part, &filters, &self.func_ctx)?
{
continue;
}
let part = part.clone();
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/fuse/src/operations/read_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ impl FuseTable {
match storage_format {
FuseStorageFormat::Native => build_fuse_native_source_pipeline(
ctx,
table_schema,
pipeline,
block_reader,
max_threads,
Expand Down
Loading