From 31861bb10e355667b34c6b91ec7abfdaa0502eec Mon Sep 17 00:00:00 2001 From: xudong963 Date: Sat, 9 Dec 2023 17:03:02 +0800 Subject: [PATCH] redesign --- .../src/plan/datasource/datasource_plan.rs | 2 + src/query/catalog/src/table_context.rs | 7 ++- src/query/pipeline/sources/src/sync_source.rs | 10 ----- .../src/pipelines/builders/builder_join.rs | 6 ++- .../pipelines/builders/builder_recluster.rs | 1 + .../src/pipelines/executor/executor_graph.rs | 3 -- .../transforms/hash_join/hash_join_state.rs | 18 +++----- .../hash_join/transform_hash_join_probe.rs | 11 ----- .../processors/transforms/hash_join/util.rs | 4 +- .../src/schedulers/fragments/fragmenter.rs | 3 ++ src/query/service/src/sessions/query_ctx.rs | 29 ++++++++++++ .../service/src/sessions/query_ctx_shared.rs | 4 ++ .../tests/it/sql/exec/get_table_bind_test.rs | 9 ++++ .../it/storages/fuse/operations/commit.rs | 10 +++++ src/query/sql/src/executor/physical_plan.rs | 44 +++++++++++++++++++ .../physical_exchange_source.rs | 4 ++ .../physical_plans/physical_table_scan.rs | 3 +- src/query/sql/src/executor/table_read_plan.rs | 2 + .../fuse/src/operations/read/fuse_source.rs | 2 + .../read/parquet_data_source_reader.rs | 15 +++---- .../operations/read/runtime_filter_prunner.rs | 7 ++- 21 files changed, 139 insertions(+), 55 deletions(-) diff --git a/src/query/catalog/src/plan/datasource/datasource_plan.rs b/src/query/catalog/src/plan/datasource/datasource_plan.rs index ea5e59fe78fd..d118d7111092 100644 --- a/src/query/catalog/src/plan/datasource/datasource_plan.rs +++ b/src/query/catalog/src/plan/datasource/datasource_plan.rs @@ -47,6 +47,8 @@ pub struct DataSourcePlan { // data mask policy for `output_schema` columns pub data_mask_policy: Option>, + + pub table_index: usize, } impl DataSourcePlan { diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index c8e2220261b2..9d8aae970d8d 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -25,7 +25,8 @@ use common_base::base::Progress; use common_base::base::ProgressValues; use common_exception::ErrorCode; use common_exception::Result; -use common_expression::{DataBlock, Expr}; +use common_expression::DataBlock; +use common_expression::Expr; use common_expression::FunctionContext; use common_io::prelude::FormatSettings; use common_meta_app::principal::FileFormatParams; @@ -228,5 +229,7 @@ pub trait TableContext: Send + Sync { /// Get license key from context, return empty if license is not found or error happened. fn get_license_key(&self) -> String; - // fn set_runtime_filter(&self, filters: HashMap>) + fn set_runtime_filter(&self, filters: (usize, Vec>)); + + fn get_runtime_filter_with_id(&self, id: usize) -> Vec>; } diff --git a/src/query/pipeline/sources/src/sync_source.rs b/src/query/pipeline/sources/src/sync_source.rs index 195c6b562583..6805697b17a5 100644 --- a/src/query/pipeline/sources/src/sync_source.rs +++ b/src/query/pipeline/sources/src/sync_source.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::any::Any; -use std::collections::HashMap; use std::sync::Arc; use common_base::base::Progress; @@ -21,7 +20,6 @@ use common_base::base::ProgressValues; use common_catalog::table_context::TableContext; use common_exception::Result; use common_expression::DataBlock; -use common_expression::Expr; use common_pipeline_core::processors::Event; use common_pipeline_core::processors::OutputPort; use common_pipeline_core::processors::Processor; @@ -34,14 +32,6 @@ pub trait SyncSource: Send { const NAME: &'static str; fn generate(&mut self) -> Result>; - - fn can_add_runtime_filter(&self) -> bool { - false - } - - fn add_runtime_filters(&mut self, _filters: &HashMap>) -> Result<()> { - Ok(()) - } } // TODO: This can be refactored using proc macros diff --git a/src/query/service/src/pipelines/builders/builder_join.rs b/src/query/service/src/pipelines/builders/builder_join.rs index 3dc99058a9a3..1cae642bf8d2 100644 --- a/src/query/service/src/pipelines/builders/builder_join.rs +++ b/src/query/service/src/pipelines/builders/builder_join.rs @@ -124,18 +124,20 @@ impl PipelineBuilder { } pub(crate) fn build_join(&mut self, join: &HashJoin) -> Result<()> { - let state = self.build_join_state(join)?; + let id = join.probe.get_table_index(); + let state = self.build_join_state(join, id)?; self.expand_build_side_pipeline(&join.build, join, state.clone())?; self.build_join_probe(join, state) } - fn build_join_state(&mut self, join: &HashJoin) -> Result> { + fn build_join_state(&mut self, join: &HashJoin, id: IndexType) -> Result> { HashJoinState::try_create( self.ctx.clone(), join.build.output_schema()?, &join.build_projections, HashJoinDesc::create(join)?, &join.probe_to_build, + id, ) } diff --git a/src/query/service/src/pipelines/builders/builder_recluster.rs b/src/query/service/src/pipelines/builders/builder_recluster.rs index 47732a6e133e..e833d5b55f25 100644 --- a/src/query/service/src/pipelines/builders/builder_recluster.rs +++ b/src/query/service/src/pipelines/builders/builder_recluster.rs @@ -76,6 +76,7 @@ impl PipelineBuilder { base_block_ids: None, update_stream_columns: table.change_tracking_enabled(), data_mask_policy: None, + table_index: usize::MAX, }; self.ctx.set_partitions(plan.parts.clone())?; diff --git a/src/query/service/src/pipelines/executor/executor_graph.rs b/src/query/service/src/pipelines/executor/executor_graph.rs index cbd1f2384713..beca082b0b1d 100644 --- a/src/query/service/src/pipelines/executor/executor_graph.rs +++ b/src/query/service/src/pipelines/executor/executor_graph.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::collections::VecDeque; use std::fmt::Debug; use std::fmt::Formatter; @@ -24,7 +23,6 @@ use common_base::runtime::TrackedFuture; use common_base::runtime::TrySpawn; use common_exception::ErrorCode; use common_exception::Result; -use common_expression::Expr; use common_pipeline_core::processors::profile::Profile; use common_pipeline_core::processors::EventCause; use common_pipeline_core::Pipeline; @@ -37,7 +35,6 @@ use petgraph::dot::Dot; use petgraph::prelude::EdgeIndex; use petgraph::prelude::NodeIndex; use petgraph::prelude::StableGraph; -use petgraph::stable_graph::Neighbors; use petgraph::Direction; use crate::pipelines::executor::ExecutorTask; diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs index 1d45f4d9a341..04decd166803 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::cell::SyncUnsafeCell; -use std::collections::HashMap; use std::collections::HashSet; use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicI8; @@ -28,7 +27,6 @@ use common_catalog::table_context::TableContext; use common_exception::ErrorCode; use common_exception::Result; use common_expression::DataSchemaRef; -use common_expression::Expr; use common_expression::HashMethodFixedKeys; use common_expression::HashMethodSerializer; use common_expression::HashMethodSingleString; @@ -37,6 +35,7 @@ use common_hashtable::HashtableKeyable; use common_hashtable::StringHashJoinHashMap; use common_sql::plans::JoinType; use common_sql::ColumnSet; +use common_sql::IndexType; use ethnum::U256; use parking_lot::RwLock; @@ -121,10 +120,8 @@ pub struct HashJoinState { /// If partition_id is -1, it means all partitions are spilled. pub(crate) partition_id: AtomicI8, - /// Runtime filters - pub(crate) runtime_filters: RwLock>>, /// If the join node generate runtime filters, the scan node will use it to do prune. - pub(crate) scan_node_id: AtomicUsize, + pub(crate) table_index: IndexType, } impl HashJoinState { @@ -134,6 +131,7 @@ impl HashJoinState { build_projections: &ColumnSet, hash_join_desc: HashJoinDesc, probe_to_build: &[(usize, (bool, bool))], + table_index: IndexType, ) -> Result> { if matches!( hash_join_desc.join_type, @@ -162,8 +160,7 @@ impl HashJoinState { continue_build_watcher, _continue_build_dummy_receiver, partition_id: AtomicI8::new(-2), - runtime_filters: Default::default(), - scan_node_id: Default::default(), + table_index, })) } @@ -264,7 +261,6 @@ impl HashJoinState { data_blocks.clear(); return Ok(()); } - let mut runtime_filters = self.runtime_filters.write(); for (build_key, probe_key) in self .hash_join_desc .build_keys @@ -272,14 +268,10 @@ impl HashJoinState { .zip(self.hash_join_desc.probe_keys_rt.iter()) { if let Some(filter) = inlist_filter(&func_ctx, build_key, probe_key, data_blocks)? { - runtime_filters.insert(filter.0, filter.1); + self.ctx.set_runtime_filter((self.table_index, filter)) } } data_blocks.clear(); Ok(()) } - - pub(crate) fn set_scan_node_id(&self, id: usize) { - self.scan_node_id.store(id, Ordering::Release); - } } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs index d95359445dbb..3a0af7fa344b 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::any::Any; -use std::collections::HashMap; use std::collections::VecDeque; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -22,7 +21,6 @@ use common_catalog::table_context::TableContext; use common_exception::ErrorCode; use common_exception::Result; use common_expression::DataBlock; -use common_expression::Expr; use common_expression::FunctionContext; use common_sql::optimizer::ColumnSet; use common_sql::plans::JoinType; @@ -73,8 +71,6 @@ pub struct TransformHashJoinProbe { // If input data can't find proper partitions to spill, // directly probe them with hashtable. need_spill: bool, - - runtime_filters: HashMap>, } impl TransformHashJoinProbe { @@ -114,7 +110,6 @@ impl TransformHashJoinProbe { spill_state: probe_spill_state, processor_id: id, need_spill: true, - runtime_filters: Default::default(), })) } @@ -405,12 +400,6 @@ impl Processor for TransformHashJoinProbe { .hash_join_state .wait_first_round_build_done() .await?; - self.runtime_filters = self - .join_probe_state - .hash_join_state - .runtime_filters - .read() - .clone(); } else { self.join_probe_state .hash_join_state diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs index ec562f3c2ab7..6d6bc57e6da6 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/util.rs @@ -53,7 +53,7 @@ pub(crate) fn inlist_filter( build_key: &Expr, probe_key: &Expr, build_blocks: &[DataBlock], -) -> Result)>> { +) -> Result>>> { // Currently, only support key is a column, will support more later. // Such as t1.a + 1 = t2.a, or t1.a + t1.b = t2.a (left side is probe side) if let Expr::ColumnRef { @@ -124,7 +124,7 @@ pub(crate) fn inlist_filter( args, }; let expr = type_check::check(&contain_func, &BUILTIN_FUNCTIONS)?; - return Ok(Some((id.to_string(), expr))); + return Ok(Some(vec![expr])); } Ok(None) } diff --git a/src/query/service/src/schedulers/fragments/fragmenter.rs b/src/query/service/src/schedulers/fragments/fragmenter.rs index 67d88aecba60..f1f6648a1609 100644 --- a/src/query/service/src/schedulers/fragments/fragmenter.rs +++ b/src/query/service/src/schedulers/fragments/fragmenter.rs @@ -286,6 +286,8 @@ impl PhysicalPlanReplacer for Fragmenter { .all(|fragment| !matches!(&fragment.exchange, Some(DataExchange::Merge(_)))), )?; + let table_index = plan.get_table_index(); + let mut source_fragment = PlanFragment { plan, fragment_type, @@ -311,6 +313,7 @@ impl PhysicalPlanReplacer for Fragmenter { query_id: self.query_id.clone(), source_fragment_id, + table_index, })) } } diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 8b78581f9db5..4e7e2ca5ff4e 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -49,6 +49,7 @@ use common_exception::ErrorCode; use common_exception::Result; use common_expression::date_helper::TzFactory; use common_expression::DataBlock; +use common_expression::Expr; use common_expression::FunctionContext; use common_io::prelude::FormatSettings; use common_meta_app::principal::FileFormatParams; @@ -850,6 +851,34 @@ impl TableContext for QueryContext { queries_profile } + + fn set_runtime_filter(&self, filters: (IndexType, Vec>)) { + let mut runtime_filters = self.shared.runtime_filters.write(); + match runtime_filters.entry(filters.0) { + Entry::Vacant(v) => { + info!( + "set {:?} runtime filters for {:?}", + filters.1.len(), + filters.0 + ); + v.insert(filters.1); + } + Entry::Occupied(mut v) => { + info!( + "add {:?} runtime filters for {:?}", + filters.1.len(), + filters.0 + ); + v.get_mut().extend(filters.1); + } + } + } + + fn get_runtime_filter_with_id(&self, id: IndexType) -> Vec> { + let runtime_filters = self.shared.runtime_filters.read(); + // If don't find the runtime filters, return empty vector. + runtime_filters.get(&id).cloned().unwrap_or_default() + } } impl TrySpawn for QueryContext { diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 11b42a58b89d..406917daa964 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -28,12 +28,14 @@ use common_catalog::table_context::MaterializedCtesBlocks; use common_catalog::table_context::StageAttachment; use common_exception::ErrorCode; use common_exception::Result; +use common_expression::Expr; use common_meta_app::principal::OnErrorMode; use common_meta_app::principal::RoleInfo; use common_meta_app::principal::UserDefinedConnection; use common_meta_app::principal::UserInfo; use common_pipeline_core::InputError; use common_settings::Settings; +use common_sql::IndexType; use common_storage::CopyStatus; use common_storage::DataOperator; use common_storage::MergeStatus; @@ -104,6 +106,7 @@ pub struct QueryContextShared { pub(in crate::sessions) user_agent: Arc>, /// Key is (cte index, used_count), value contains cte's materialized blocks pub(in crate::sessions) materialized_cte_tables: MaterializedCtesBlocks, + pub(in crate::sessions) runtime_filters: Arc>>>>, } impl QueryContextShared { @@ -145,6 +148,7 @@ impl QueryContextShared { join_spill_progress: Arc::new(Progress::create()), agg_spill_progress: Arc::new(Progress::create()), group_by_spill_progress: Arc::new(Progress::create()), + runtime_filters: Default::default(), })) } diff --git a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs index cefe5c89595d..3d73c7168d18 100644 --- a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs +++ b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs @@ -36,6 +36,7 @@ use common_catalog::table_context::TableContext; use common_exception::ErrorCode; use common_exception::Result; use common_expression::DataBlock; +use common_expression::Expr; use common_expression::FunctionContext; use common_io::prelude::FormatSettings; use common_meta_app::principal::FileFormatParams; @@ -104,6 +105,7 @@ use common_meta_types::MetaId; use common_pipeline_core::processors::profile::Profile; use common_pipeline_core::InputError; use common_settings::Settings; +use common_sql::IndexType; use common_sql::Planner; use common_storage::CopyStatus; use common_storage::DataOperator; @@ -706,6 +708,13 @@ impl TableContext for CtxDelegation { fn get_merge_status(&self) -> Arc> { todo!() } + + fn set_runtime_filter(&self, _filters: (IndexType, Vec>)) { + todo!() + } + fn get_runtime_filter_with_id(&self, _id: IndexType) -> Vec> { + todo!() + } } #[tokio::test(flavor = "multi_thread")] diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index 591968602454..7677b378b23e 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -35,6 +35,7 @@ use common_catalog::table_context::TableContext; use common_exception::ErrorCode; use common_exception::Result; use common_expression::DataBlock; +use common_expression::Expr; use common_expression::FunctionContext; use common_io::prelude::FormatSettings; use common_meta_app::principal::FileFormatParams; @@ -103,6 +104,7 @@ use common_meta_types::MetaId; use common_pipeline_core::processors::profile::Profile; use common_pipeline_core::InputError; use common_settings::Settings; +use common_sql::IndexType; use common_storage::CopyStatus; use common_storage::DataOperator; use common_storage::FileStatus; @@ -668,6 +670,14 @@ impl TableContext for CtxDelegation { fn get_merge_status(&self) -> Arc> { todo!() } + + fn set_runtime_filter(&self, _filters: (IndexType, Vec>)) { + todo!() + } + + fn get_runtime_filter_with_id(&self, _id: IndexType) -> Vec> { + todo!() + } } #[derive(Clone, Debug)] diff --git a/src/query/sql/src/executor/physical_plan.rs b/src/query/sql/src/executor/physical_plan.rs index f84643bf9967..12c3e637427f 100644 --- a/src/query/sql/src/executor/physical_plan.rs +++ b/src/query/sql/src/executor/physical_plan.rs @@ -54,6 +54,7 @@ use crate::executor::physical_plans::Udf; use crate::executor::physical_plans::UnionAll; use crate::executor::physical_plans::UpdateSource; use crate::executor::physical_plans::Window; +use crate::IndexType; #[derive(Clone, Debug, serde::Serialize, serde::Deserialize, EnumAsInner)] pub enum PhysicalPlan { @@ -353,4 +354,47 @@ impl PhysicalPlan { Self::ExchangeSource(_) | Self::ExchangeSink(_) | Self::Exchange(_) ) } + + pub fn get_table_index(&self) -> IndexType { + match self { + PhysicalPlan::TableScan(scan) => scan.table_index, + PhysicalPlan::Filter(plan) => plan.input.get_table_index(), + PhysicalPlan::Project(plan) => plan.input.get_table_index(), + PhysicalPlan::EvalScalar(plan) => plan.input.get_table_index(), + PhysicalPlan::ProjectSet(plan) => plan.input.get_table_index(), + PhysicalPlan::AggregateExpand(plan) => plan.input.get_table_index(), + PhysicalPlan::AggregatePartial(plan) => plan.input.get_table_index(), + PhysicalPlan::AggregateFinal(plan) => plan.input.get_table_index(), + PhysicalPlan::Window(plan) => plan.input.get_table_index(), + PhysicalPlan::Sort(plan) => plan.input.get_table_index(), + PhysicalPlan::Limit(plan) => plan.input.get_table_index(), + PhysicalPlan::RowFetch(plan) => plan.input.get_table_index(), + PhysicalPlan::HashJoin(plan) => plan.probe.get_table_index(), + PhysicalPlan::Exchange(plan) => plan.input.get_table_index(), + PhysicalPlan::ExchangeSink(plan) => plan.input.get_table_index(), + PhysicalPlan::ExchangeSource(plan) => plan.table_index, + PhysicalPlan::DistributedInsertSelect(plan) => plan.input.get_table_index(), + PhysicalPlan::MaterializedCte(_) | + // Todo: support union and range join return valid table index by join probe keys + PhysicalPlan::UnionAll(_) | + PhysicalPlan::RangeJoin(_)| + PhysicalPlan::ConstantTableScan(_) + |PhysicalPlan::CteScan(_) + | PhysicalPlan::Udf(_) + | PhysicalPlan::DeleteSource(_) + | PhysicalPlan::CopyIntoTable(_) + | PhysicalPlan::ReplaceAsyncSourcer(_) + | PhysicalPlan::ReplaceDeduplicate(_) + | PhysicalPlan::ReplaceInto(_) + | PhysicalPlan::MergeIntoSource(_) + | PhysicalPlan::MergeInto(_) + | PhysicalPlan::MergeIntoAppendNotMatched(_) + | PhysicalPlan::MergeIntoAddRowNumber(_) + | PhysicalPlan::CompactSource(_) + | PhysicalPlan::CommitSink(_) + | PhysicalPlan::ReclusterSource(_) + | PhysicalPlan::ReclusterSink(_) + | PhysicalPlan::UpdateSource(_) => usize::MAX, + } + } } diff --git a/src/query/sql/src/executor/physical_plans/physical_exchange_source.rs b/src/query/sql/src/executor/physical_plans/physical_exchange_source.rs index b50be6792d11..84e1766f58cf 100644 --- a/src/query/sql/src/executor/physical_plans/physical_exchange_source.rs +++ b/src/query/sql/src/executor/physical_plans/physical_exchange_source.rs @@ -15,6 +15,8 @@ use common_exception::Result; use common_expression::DataSchemaRef; +use crate::IndexType; + #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct ExchangeSource { // A unique id of operator in a `PhysicalPlan` tree, only used for display. @@ -26,6 +28,8 @@ pub struct ExchangeSource { // Fragment ID of source fragment pub source_fragment_id: usize, pub query_id: String, + + pub table_index: IndexType, } impl ExchangeSource { diff --git a/src/query/sql/src/executor/physical_plans/physical_table_scan.rs b/src/query/sql/src/executor/physical_plans/physical_table_scan.rs index 7346788061c8..b1ac1bd691a5 100644 --- a/src/query/sql/src/executor/physical_plans/physical_table_scan.rs +++ b/src/query/sql/src/executor/physical_plans/physical_table_scan.rs @@ -70,7 +70,6 @@ pub struct TableScan { pub source: Box, pub internal_column: Option>, - // Only used for display pub table_index: IndexType, pub stat_info: Option, } @@ -241,7 +240,7 @@ impl PhysicalPlanBuilder { self.dry_run, ) .await?; - + source.table_index = scan.table_index; if let Some(agg_index) = &scan.agg_index { let source_schema = source.schema(); let push_down = source.push_downs.as_mut().unwrap(); diff --git a/src/query/sql/src/executor/table_read_plan.rs b/src/query/sql/src/executor/table_read_plan.rs index 58c112f4dece..b073bd3fdb69 100644 --- a/src/query/sql/src/executor/table_read_plan.rs +++ b/src/query/sql/src/executor/table_read_plan.rs @@ -292,6 +292,8 @@ impl ToReadDataSourcePlan for dyn Table { base_block_ids, update_stream_columns: false, data_mask_policy, + // Set a dummy id, will be set real id later + table_index: usize::MAX, }) } } diff --git a/src/query/storages/fuse/src/operations/read/fuse_source.rs b/src/query/storages/fuse/src/operations/read/fuse_source.rs index 95f79814cb1e..d6f1a8f2a2b3 100644 --- a/src/query/storages/fuse/src/operations/read/fuse_source.rs +++ b/src/query/storages/fuse/src/operations/read/fuse_source.rs @@ -160,6 +160,7 @@ pub fn build_fuse_parquet_source_pipeline( output.clone(), ReadParquetDataSource::::create( i, + plan.table_index, ctx.clone(), table_schema.clone(), output, @@ -184,6 +185,7 @@ pub fn build_fuse_parquet_source_pipeline( output.clone(), ReadParquetDataSource::::create( i, + plan.table_index, ctx.clone(), table_schema.clone(), output, diff --git a/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs b/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs index fa66de4b50ef..d8f67b23ee70 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs @@ -13,18 +13,15 @@ // limitations under the License. use std::any::Any; -use std::collections::HashMap; use std::sync::Arc; use common_base::base::tokio; use common_catalog::plan::PartInfoPtr; use common_catalog::plan::StealablePartitions; -use common_catalog::table::Table; use common_catalog::table_context::TableContext; use common_exception::ErrorCode; use common_exception::Result; use common_expression::DataBlock; -use common_expression::Expr; use common_expression::TableSchema; use common_pipeline_core::processors::Event; use common_pipeline_core::processors::OutputPort; @@ -32,6 +29,7 @@ use common_pipeline_core::processors::Processor; use common_pipeline_core::processors::ProcessorPtr; use common_pipeline_sources::SyncSource; use common_pipeline_sources::SyncSourcer; +use common_sql::IndexType; use super::parquet_data_source::DataSource; use crate::fuse_part::FusePartInfo; @@ -46,6 +44,7 @@ use crate::operations::read::runtime_filter_prunner::runtime_filter_pruner; pub struct ReadParquetDataSource { ctx: Arc, id: usize, + table_index: IndexType, finished: bool, batch_size: usize, block_reader: Arc, @@ -57,7 +56,6 @@ pub struct ReadParquetDataSource { index_reader: Arc>, virtual_reader: Arc>, - runtime_filters: HashMap>, table_schema: Arc, } @@ -65,6 +63,7 @@ impl ReadParquetDataSource { #[allow(clippy::too_many_arguments)] pub fn create( id: usize, + table_index: IndexType, ctx: Arc, table_schema: Arc, output: Arc, @@ -79,6 +78,7 @@ impl ReadParquetDataSource { SyncSourcer::create(ctx.clone(), output.clone(), ReadParquetDataSource:: { ctx: ctx.clone(), id, + table_index, output, batch_size, block_reader, @@ -87,7 +87,6 @@ impl ReadParquetDataSource { partitions, index_reader, virtual_reader, - runtime_filters: HashMap::new(), table_schema, }) } else { @@ -96,6 +95,7 @@ impl ReadParquetDataSource { > { ctx: ctx.clone(), id, + table_index, output, batch_size, block_reader, @@ -104,7 +104,6 @@ impl ReadParquetDataSource { partitions, index_reader, virtual_reader, - runtime_filters: HashMap::new(), table_schema, }))) } @@ -121,7 +120,7 @@ impl SyncSource for ReadParquetDataSource { if runtime_filter_pruner( self.table_schema.clone(), &part, - &self.runtime_filters, + &self.ctx.get_runtime_filter_with_id(self.table_index), &self.partitions.ctx.get_function_context()?, )? { return Ok(Some(DataBlock::empty())); @@ -225,7 +224,7 @@ impl Processor for ReadParquetDataSource { if runtime_filter_pruner( self.table_schema.clone(), part, - &self.runtime_filters, + &self.ctx.get_runtime_filter_with_id(self.table_index), &self.partitions.ctx.get_function_context()?, )? { continue; diff --git a/src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs b/src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs index 94bb4e0838ca..b2a427dfbf95 100644 --- a/src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs +++ b/src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs @@ -31,7 +31,7 @@ use crate::FusePartInfo; pub fn runtime_filter_pruner( table_schema: Arc, part: &PartInfoPtr, - filters: &HashMap>, + filters: &Vec>, func_ctx: &FunctionContext, ) -> Result { if filters.is_empty() { @@ -39,7 +39,7 @@ pub fn runtime_filter_pruner( } let part = FusePartInfo::from_part(part)?; - let pruned = filters.iter().any(|(_, filter)| { + let pruned = filters.iter().any(|filter| { let column_refs = filter.column_refs(); // Currently only support filter with one column(probe key). debug_assert!(column_refs.len() == 1); @@ -47,6 +47,9 @@ pub fn runtime_filter_pruner( let name = column_refs.keys().last().unwrap(); if let Some(stats) = &part.columns_stat { let column_ids = table_schema.leaf_columns_of(name); + if column_ids.len() != 1 { + return false; + } debug_assert!(column_ids.len() == 1); if let Some(stat) = stats.get(&column_ids[0]) { debug_assert_eq!(stat.min.as_ref().infer_data_type(), ty.remove_nullable());