From f616257b60758281e9c9eeed1e71495565de884d Mon Sep 17 00:00:00 2001 From: xudong963 Date: Fri, 1 Dec 2023 16:28:53 +0800 Subject: [PATCH] fix --- .../pipeline/core/src/processors/processor.rs | 3 +- src/query/pipeline/sources/src/sync_source.rs | 4 + .../transforms/hash_join/hash_join_state.rs | 81 ++---------------- .../processors/transforms/hash_join/util.rs | 85 +++++++++++++++++++ .../read/parquet_data_source_reader.rs | 3 +- .../operations/read/runtime_filter_prunner.rs | 24 ++++-- 6 files changed, 117 insertions(+), 83 deletions(-) diff --git a/src/query/pipeline/core/src/processors/processor.rs b/src/query/pipeline/core/src/processors/processor.rs index 757a239be91a9..41f42ce1d00be 100644 --- a/src/query/pipeline/core/src/processors/processor.rs +++ b/src/query/pipeline/core/src/processors/processor.rs @@ -22,7 +22,6 @@ use common_exception::ErrorCode; use common_exception::Result; use common_expression::ColumnId; use common_expression::Expr; -use common_expression::RemoteExpr; use futures::future::BoxFuture; use futures::FutureExt; use minitrace::prelude::*; @@ -106,7 +105,7 @@ pub trait Processor: Send { fn add_runtime_filters(&mut self, _filters: HashMap) -> Result<()> { Err(ErrorCode::Unimplemented(format!( "{} can't add runtime filters", - self.name + self.name() ))) } } diff --git a/src/query/pipeline/sources/src/sync_source.rs b/src/query/pipeline/sources/src/sync_source.rs index 630193eac3b03..8c4db1597dacf 100644 --- a/src/query/pipeline/sources/src/sync_source.rs +++ b/src/query/pipeline/sources/src/sync_source.rs @@ -117,6 +117,10 @@ impl Processor for SyncSourcer { match self.inner.generate()? { None => self.is_finish = true, Some(data_block) => { + if data_block.is_empty() && data_block.get_meta().is_none() { + // A part was pruned by runtime filter + return Ok(()); + } let progress_values = ProgressValues { rows: data_block.num_rows(), bytes: data_block.memory_size(), diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs index 08ca9c9ea75d9..7de4fd12aacdd 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs @@ -51,6 +51,7 @@ use parking_lot::RwLock; use crate::pipelines::processors::transforms::hash_join::build_state::BuildState; use crate::pipelines::processors::transforms::hash_join::row::RowSpace; use crate::pipelines::processors::transforms::hash_join::util::build_schema_wrap_nullable; +use crate::pipelines::processors::transforms::hash_join::util::inlist_filter; use crate::pipelines::processors::HashJoinDesc; use crate::sessions::QueryContext; @@ -266,81 +267,17 @@ impl HashJoinState { .iter() .zip(self.hash_join_desc.probe_keys.iter()) { - // Only support key is a column - if let Expr::ColumnRef { - span, - id, - data_type, - display_name, - } = probe_key - { - let column_id: usize = self.hash_join_desc.probe_schema.fields[*id] - .name() - .parse() - .unwrap(); - let raw_probe_key = RawExpr::ColumnRef { - span: span.clone(), - id: column_id, - data_type: data_type.clone(), - display_name: display_name.clone(), - }; - let mut columns = Vec::with_capacity(data_blocks.len()); - for block in data_blocks.iter() { - if block.num_columns() == 0 { - continue; - } - let evaluator = Evaluator::new(block, &func_ctx, &BUILTIN_FUNCTIONS); - let column = evaluator - .run(build_key)? - .convert_to_full_column(build_key.data_type(), block.num_rows()); - columns.push(column); - } - // Generate inlist using build column - let build_key_column = Column::concat_columns(columns.into_iter())?; - let mut list = Vec::with_capacity(build_key_column.len()); - for value in build_key_column.iter() { - list.push(RawExpr::Constant { - span: None, - scalar: value.to_owned(), - }) - } - let array = RawExpr::FunctionCall { - span: None, - name: "array".to_string(), - params: vec![], - args: list, - }; - let distinct_list = RawExpr::FunctionCall { - span: None, - name: "array_distinct".to_string(), - params: vec![], - args: vec![array], - }; - - let args = vec![distinct_list, raw_probe_key]; - // Make contain function - let contain_func = RawExpr::FunctionCall { - span: None, - name: "contains".to_string(), - params: vec![], - args, - }; - runtime_filters.insert( - *id as ColumnId, - contain_func - .type_check(self.hash_join_desc.probe_schema.as_ref())? - .project_column_ref(|index| { - self.hash_join_desc - .probe_schema - .index_of(&index.to_string()) - .unwrap() - }), - ); + if let Some(filter) = inlist_filter( + &func_ctx, + &self.hash_join_desc.probe_schema, + build_key, + probe_key, + data_blocks, + )? { + runtime_filters.insert(filter.0, filter.1); } } - data_blocks.clear(); - Ok(()) } } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs index a6aabf31c0b01..f793496441368 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs @@ -12,9 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_exception::Result; +use common_expression::Column; +use common_expression::ColumnId; +use common_expression::DataBlock; use common_expression::DataField; use common_expression::DataSchemaRef; use common_expression::DataSchemaRefExt; +use common_expression::Evaluator; +use common_expression::Expr; +use common_expression::FunctionContext; +use common_expression::RawExpr; +use common_functions::BUILTIN_FUNCTIONS; +use common_sql::TypeCheck; pub(crate) fn build_schema_wrap_nullable(build_schema: &DataSchemaRef) -> DataSchemaRef { let mut nullable_field = Vec::with_capacity(build_schema.fields().len()); @@ -37,3 +47,78 @@ pub(crate) fn probe_schema_wrap_nullable(probe_schema: &DataSchemaRef) -> DataSc } DataSchemaRefExt::create(nullable_field) } + +// Construct inlist runtime filter +pub(crate) fn inlist_filter( + func_ctx: &FunctionContext, + probe_schema: &DataSchemaRef, + build_key: &Expr, + probe_key: &Expr, + build_blocks: &[DataBlock], +) -> Result> { + // Currently, only support key is a column, will support more later. + // Such as t1.a + 1 = t2.a, or t1.a + t1.b = t2.a (left side is probe side) + if let Expr::ColumnRef { + span, + id, + data_type, + display_name, + } = probe_key + { + let column_id: usize = probe_schema.fields[*id].name().parse().unwrap(); + let raw_probe_key = RawExpr::ColumnRef { + span: span.clone(), + id: column_id, + data_type: data_type.clone(), + display_name: display_name.clone(), + }; + let mut columns = Vec::with_capacity(build_blocks.len()); + for block in build_blocks.iter() { + if block.num_columns() == 0 { + continue; + } + let evaluator = Evaluator::new(block, &func_ctx, &BUILTIN_FUNCTIONS); + let column = evaluator + .run(build_key)? + .convert_to_full_column(build_key.data_type(), block.num_rows()); + columns.push(column); + } + // Generate inlist using build column + let build_key_column = Column::concat_columns(columns.into_iter())?; + let mut list = Vec::with_capacity(build_key_column.len()); + for value in build_key_column.iter() { + list.push(RawExpr::Constant { + span: None, + scalar: value.to_owned(), + }) + } + let array = RawExpr::FunctionCall { + span: None, + name: "array".to_string(), + params: vec![], + args: list, + }; + let distinct_list = RawExpr::FunctionCall { + span: None, + name: "array_distinct".to_string(), + params: vec![], + args: vec![array], + }; + + let args = vec![distinct_list, raw_probe_key]; + // Make contain function + let contain_func = RawExpr::FunctionCall { + span: None, + name: "contains".to_string(), + params: vec![], + args, + }; + return Ok(Some(( + *id as ColumnId, + contain_func + .type_check(probe_schema.as_ref())? + .project_column_ref(|index| probe_schema.index_of(&index.to_string()).unwrap()), + ))); + } + Ok(None) +} diff --git a/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs b/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs index a8138386c895c..21f982bf3d279 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs @@ -31,6 +31,7 @@ use common_pipeline_core::processors::Processor; use common_pipeline_core::processors::ProcessorPtr; use common_pipeline_sources::SyncSource; use common_pipeline_sources::SyncSourcer; +use log::info; use super::parquet_data_source::DataSource; use crate::fuse_part::FusePartInfo; @@ -114,7 +115,7 @@ impl SyncSource for ReadParquetDataSource { &self.runtime_filters, &self.partitions.ctx.get_function_context()?, )? { - return Ok(None); + return Ok(Some(DataBlock::empty())); } if let Some(index_reader) = self.index_reader.as_ref() { diff --git a/src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs b/src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs index 8a271bfe7cb36..222a3d9a69a4b 100644 --- a/src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs +++ b/src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs @@ -22,6 +22,7 @@ use common_expression::Expr; use common_expression::FunctionContext; use common_expression::Scalar; use common_functions::BUILTIN_FUNCTIONS; +use log::info; use storages_common_index::statistics_to_domain; use crate::FusePartInfo; @@ -36,7 +37,7 @@ pub fn runtime_filter_pruner( } let part = FusePartInfo::from_part(part)?; - Ok(filters.iter().any(|(id, filter)| { + let pruned = filters.iter().any(|(id, filter)| { let column_refs = filter.column_refs(); // Currently only support filter with one column(probe key). debug_assert!(column_refs.len() == 1); @@ -54,15 +55,22 @@ pub fn runtime_filter_pruner( func_ctx, &BUILTIN_FUNCTIONS, ); - matches!(new_expr, Expr::Constant { + return matches!(new_expr, Expr::Constant { scalar: Scalar::Boolean(false), .. - }) - } else { - false + }); } - } else { - false } - })) + false + }); + + if pruned { + info!( + "Pruned partition with {:?} rows by runtime filter", + part.nums_rows + ); + return Ok(true); + } + + Ok(false) }