From 83f0a7d26c40a0b0f8d46bbadb3e9851679887fe Mon Sep 17 00:00:00 2001 From: xudong963 Date: Mon, 11 Dec 2023 15:12:38 +0800 Subject: [PATCH] chore: enable runtime filter for native datasource --- .../hash_join/hash_join_build_state.rs | 2 +- .../transforms/hash_join/hash_join_state.rs | 3 +- .../fuse/src/operations/read/fuse_source.rs | 5 +++ .../read/native_data_source_reader.rs | 39 +++++++++++++++++++ .../read/parquet_data_source_reader.rs | 28 +++++++------ .../storages/fuse/src/operations/read_data.rs | 1 + 6 files changed, 64 insertions(+), 14 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs index d823d48870cb..01e9f60dea6c 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs @@ -63,7 +63,7 @@ use crate::pipelines::processors::transforms::hash_join::SingleStringHashJoinHas use crate::pipelines::processors::HashJoinState; use crate::sessions::QueryContext; -const INLIST_RUNTIME_FILTER_THRESHOLD: usize = 10_000; +pub(crate) const INLIST_RUNTIME_FILTER_THRESHOLD: usize = 10_000; /// Define some shared states for all hash join build threads. pub struct HashJoinBuildState { diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs index d848d9f56ec4..a87672fdf8df 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs @@ -40,6 +40,7 @@ use ethnum::U256; use parking_lot::RwLock; use crate::pipelines::processors::transforms::hash_join::build_state::BuildState; +use crate::pipelines::processors::transforms::hash_join::hash_join_build_state::INLIST_RUNTIME_FILTER_THRESHOLD; use crate::pipelines::processors::transforms::hash_join::row::RowSpace; use crate::pipelines::processors::transforms::hash_join::util::build_schema_wrap_nullable; use crate::pipelines::processors::transforms::hash_join::util::inlist_filter; @@ -257,7 +258,7 @@ impl HashJoinState { let data_blocks = &mut build_state.build_chunks; let num_rows = build_state.generation_state.build_num_rows; - if num_rows > 10_000 { + if num_rows > INLIST_RUNTIME_FILTER_THRESHOLD { data_blocks.clear(); return Ok(()); } diff --git a/src/query/storages/fuse/src/operations/read/fuse_source.rs b/src/query/storages/fuse/src/operations/read/fuse_source.rs index d6f1a8f2a2b3..dd7d20ae6d15 100644 --- a/src/query/storages/fuse/src/operations/read/fuse_source.rs +++ b/src/query/storages/fuse/src/operations/read/fuse_source.rs @@ -43,6 +43,7 @@ use crate::operations::read::ReadParquetDataSource; #[allow(clippy::too_many_arguments)] pub fn build_fuse_native_source_pipeline( ctx: Arc, + table_schema: Arc, pipeline: &mut Pipeline, block_reader: Arc, mut max_threads: usize, @@ -77,7 +78,9 @@ pub fn build_fuse_native_source_pipeline( output.clone(), ReadNativeDataSource::::create( i, + plan.table_index, ctx.clone(), + table_schema.clone(), output, block_reader.clone(), partitions.clone(), @@ -102,7 +105,9 @@ pub fn build_fuse_native_source_pipeline( output.clone(), ReadNativeDataSource::::create( i, + plan.table_index, ctx.clone(), + table_schema.clone(), output, block_reader.clone(), partitions.clone(), diff --git a/src/query/storages/fuse/src/operations/read/native_data_source_reader.rs b/src/query/storages/fuse/src/operations/read/native_data_source_reader.rs index a5dc660d71db..25e6d963ecaf 100644 --- a/src/query/storages/fuse/src/operations/read/native_data_source_reader.rs +++ b/src/query/storages/fuse/src/operations/read/native_data_source_reader.rs @@ -22,12 +22,15 @@ use common_catalog::table_context::TableContext; use common_exception::ErrorCode; use common_exception::Result; use common_expression::DataBlock; +use common_expression::FunctionContext; +use common_expression::TableSchema; use common_pipeline_core::processors::Event; use common_pipeline_core::processors::OutputPort; use common_pipeline_core::processors::Processor; use common_pipeline_core::processors::ProcessorPtr; use common_pipeline_sources::SyncSource; use common_pipeline_sources::SyncSourcer; +use common_sql::IndexType; use super::native_data_source::DataSource; use crate::io::AggIndexReader; @@ -35,9 +38,11 @@ use crate::io::BlockReader; use crate::io::TableMetaLocationGenerator; use crate::io::VirtualColumnReader; use crate::operations::read::native_data_source::NativeDataSourceMeta; +use crate::operations::read::runtime_filter_prunner::runtime_filter_pruner; use crate::FusePartInfo; pub struct ReadNativeDataSource { + func_ctx: FunctionContext, id: usize, finished: bool, batch_size: usize, @@ -49,12 +54,17 @@ pub struct ReadNativeDataSource { index_reader: Arc>, virtual_reader: Arc>, + + table_schema: Arc, + table_index: IndexType, } impl ReadNativeDataSource { pub fn create( id: usize, + table_index: IndexType, ctx: Arc, + table_schema: Arc, output: Arc, block_reader: Arc, partitions: StealablePartitions, @@ -62,7 +72,9 @@ impl ReadNativeDataSource { virtual_reader: Arc>, ) -> Result { let batch_size = ctx.get_settings().get_storage_fetch_part_num()? as usize; + let func_ctx = ctx.get_function_context()?; SyncSourcer::create(ctx.clone(), output.clone(), ReadNativeDataSource:: { + func_ctx, id, output, batch_size, @@ -72,6 +84,8 @@ impl ReadNativeDataSource { partitions, index_reader, virtual_reader, + table_schema, + table_index, }) } } @@ -79,7 +93,9 @@ impl ReadNativeDataSource { impl ReadNativeDataSource { pub fn create( id: usize, + table_index: IndexType, ctx: Arc, + table_schema: Arc, output: Arc, block_reader: Arc, partitions: StealablePartitions, @@ -87,9 +103,11 @@ impl ReadNativeDataSource { virtual_reader: Arc>, ) -> Result { let batch_size = ctx.get_settings().get_storage_fetch_part_num()? as usize; + let func_ctx = ctx.get_function_context()?; Ok(ProcessorPtr::create(Box::new(ReadNativeDataSource::< false, > { + func_ctx, id, output, batch_size, @@ -99,6 +117,8 @@ impl ReadNativeDataSource { partitions, index_reader, virtual_reader, + table_schema, + table_index, }))) } } @@ -110,6 +130,17 @@ impl SyncSource for ReadNativeDataSource { match self.partitions.steal_one(self.id) { None => Ok(None), Some(part) => { + if runtime_filter_pruner( + self.table_schema.clone(), + &part, + &self + .partitions + .ctx + .get_runtime_filter_with_id(self.table_index), + &self.func_ctx, + )? { + return Ok(Some(DataBlock::empty())); + } if let Some(index_reader) = self.index_reader.as_ref() { let fuse_part = FusePartInfo::from_part(&part)?; let loc = @@ -198,7 +229,15 @@ impl Processor for ReadNativeDataSource { if !parts.is_empty() { let mut chunks = Vec::with_capacity(parts.len()); + let filters = self + .partitions + .ctx + .get_runtime_filter_with_id(self.table_index); for part in &parts { + if runtime_filter_pruner(self.table_schema.clone(), part, &filters, &self.func_ctx)? + { + continue; + } let part = part.clone(); let block_reader = self.block_reader.clone(); let index_reader = self.index_reader.clone(); diff --git a/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs b/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs index d8f67b23ee70..9fbf7c6b7f4f 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs @@ -22,6 +22,7 @@ use common_catalog::table_context::TableContext; use common_exception::ErrorCode; use common_exception::Result; use common_expression::DataBlock; +use common_expression::FunctionContext; use common_expression::TableSchema; use common_pipeline_core::processors::Event; use common_pipeline_core::processors::OutputPort; @@ -42,7 +43,7 @@ use crate::operations::read::parquet_data_source::DataSourceMeta; use crate::operations::read::runtime_filter_prunner::runtime_filter_pruner; pub struct ReadParquetDataSource { - ctx: Arc, + func_ctx: FunctionContext, id: usize, table_index: IndexType, finished: bool, @@ -73,10 +74,10 @@ impl ReadParquetDataSource { virtual_reader: Arc>, ) -> Result { let batch_size = ctx.get_settings().get_storage_fetch_part_num()? as usize; - + let func_ctx = ctx.get_function_context()?; if BLOCKING_IO { SyncSourcer::create(ctx.clone(), output.clone(), ReadParquetDataSource:: { - ctx: ctx.clone(), + func_ctx, id, table_index, output, @@ -93,7 +94,7 @@ impl ReadParquetDataSource { Ok(ProcessorPtr::create(Box::new(ReadParquetDataSource::< false, > { - ctx: ctx.clone(), + func_ctx, id, table_index, output, @@ -120,8 +121,11 @@ impl SyncSource for ReadParquetDataSource { if runtime_filter_pruner( self.table_schema.clone(), &part, - &self.ctx.get_runtime_filter_with_id(self.table_index), - &self.partitions.ctx.get_function_context()?, + &self + .partitions + .ctx + .get_runtime_filter_with_id(self.table_index), + &self.func_ctx, )? { return Ok(Some(DataBlock::empty())); } @@ -220,13 +224,13 @@ impl Processor for ReadParquetDataSource { if !parts.is_empty() { let mut chunks = Vec::with_capacity(parts.len()); + let filters = self + .partitions + .ctx + .get_runtime_filter_with_id(self.table_index); for part in &parts { - if runtime_filter_pruner( - self.table_schema.clone(), - part, - &self.ctx.get_runtime_filter_with_id(self.table_index), - &self.partitions.ctx.get_function_context()?, - )? { + if runtime_filter_pruner(self.table_schema.clone(), part, &filters, &self.func_ctx)? + { continue; } let part = part.clone(); diff --git a/src/query/storages/fuse/src/operations/read_data.rs b/src/query/storages/fuse/src/operations/read_data.rs index 7f8940234b73..52ae5bb05e4e 100644 --- a/src/query/storages/fuse/src/operations/read_data.rs +++ b/src/query/storages/fuse/src/operations/read_data.rs @@ -276,6 +276,7 @@ impl FuseTable { match storage_format { FuseStorageFormat::Native => build_fuse_native_source_pipeline( ctx, + table_schema, pipeline, block_reader, max_threads,