diff --git a/src/frontend/src/handler/declare_cursor.rs b/src/frontend/src/handler/declare_cursor.rs index 3ea8fb60b3ef0..f4ae44f8c2b3f 100644 --- a/src/frontend/src/handler/declare_cursor.rs +++ b/src/frontend/src/handler/declare_cursor.rs @@ -157,7 +157,7 @@ pub async fn create_chunk_stream_for_cursor( plan_fragmenter, query_mode, schema, - dependent_relations, + scan_tables, .. } = plan_fragmenter_result; @@ -170,22 +170,10 @@ pub async fn create_chunk_stream_for_cursor( match query_mode { QueryMode::Auto => unreachable!(), QueryMode::Local => CursorDataChunkStream::LocalDataChunk(Some( - local_execute( - session.clone(), - query, - can_timeout_cancel, - dependent_relations, - ) - .await?, + local_execute(session.clone(), query, can_timeout_cancel, scan_tables).await?, )), QueryMode::Distributed => CursorDataChunkStream::DistributedDataChunk(Some( - distribute_execute( - session.clone(), - query, - can_timeout_cancel, - dependent_relations, - ) - .await?, + distribute_execute(session.clone(), query, can_timeout_cancel, scan_tables).await?, )), }, schema.fields.clone(), diff --git a/src/frontend/src/handler/query.rs b/src/frontend/src/handler/query.rs index 510924d59d6bb..4ff0964d9145b 100644 --- a/src/frontend/src/handler/query.rs +++ b/src/frontend/src/handler/query.rs @@ -39,7 +39,7 @@ use crate::handler::HandlerArgs; use crate::optimizer::plan_node::Explain; use crate::optimizer::{ ExecutionModeDecider, OptimizerContext, OptimizerContextRef, RelationCollectorVisitor, - SysTableVisitor, + ScanTableVisitor, SysTableVisitor, }; use crate::planner::Planner; use crate::scheduler::plan_fragmenter::Query; @@ -206,6 +206,7 @@ pub struct BatchQueryPlanResult { // subset of the final one. i.e. the final one may contain more implicit dependencies on // indices. pub(crate) dependent_relations: HashSet, + pub(crate) scan_tables: HashSet, } fn gen_batch_query_plan( @@ -230,6 +231,8 @@ fn gen_batch_query_plan( let dependent_relations = RelationCollectorVisitor::collect_with(dependent_relations, batch_plan.clone()); + let scan_tables = ScanTableVisitor::collect(batch_plan.clone()); + let must_local = must_run_in_local_mode(batch_plan.clone()); let query_mode = match (must_dist, must_local) { @@ -259,7 +262,8 @@ fn gen_batch_query_plan( query_mode, schema, stmt_type, - dependent_relations: dependent_relations.into_iter().collect(), + dependent_relations, + scan_tables, }) } @@ -311,7 +315,7 @@ pub struct BatchPlanFragmenterResult { pub(crate) query_mode: QueryMode, pub(crate) schema: Schema, pub(crate) stmt_type: StatementType, - pub(crate) dependent_relations: HashSet, + pub(crate) scan_tables: HashSet, } pub fn gen_batch_plan_fragmenter( @@ -323,7 +327,8 @@ pub fn gen_batch_plan_fragmenter( query_mode, schema, stmt_type, - dependent_relations, + scan_tables, + .. } = plan_result; tracing::trace!( @@ -347,7 +352,7 @@ pub fn gen_batch_plan_fragmenter( query_mode, schema, stmt_type, - dependent_relations, + scan_tables, }) } @@ -361,8 +366,7 @@ pub async fn create_stream( query_mode, schema, stmt_type, - dependent_relations, - .. + scan_tables, } = plan_fragmenter_result; let mut can_timeout_cancel = true; @@ -393,13 +397,7 @@ pub async fn create_stream( let row_stream = match query_mode { QueryMode::Auto => unreachable!(), QueryMode::Local => PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new( - local_execute( - session.clone(), - query, - can_timeout_cancel, - dependent_relations, - ) - .await?, + local_execute(session.clone(), query, can_timeout_cancel, scan_tables).await?, column_types, formats, session.clone(), @@ -407,13 +405,7 @@ pub async fn create_stream( // Local mode do not support cancel tasks. QueryMode::Distributed => { PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new( - distribute_execute( - session.clone(), - query, - can_timeout_cancel, - dependent_relations, - ) - .await?, + distribute_execute(session.clone(), query, can_timeout_cancel, scan_tables).await?, column_types, formats, session.clone(), diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index de5c3deaf0d6b..1e1e7cb51a653 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -29,7 +29,7 @@ pub use plan_rewriter::PlanRewriter; mod plan_visitor; pub use plan_visitor::{ - ExecutionModeDecider, PlanVisitor, RelationCollectorVisitor, SysTableVisitor, + ExecutionModeDecider, PlanVisitor, RelationCollectorVisitor, ScanTableVisitor, SysTableVisitor, }; use risingwave_sqlparser::ast::OnConflict; diff --git a/src/frontend/src/optimizer/plan_visitor/mod.rs b/src/frontend/src/optimizer/plan_visitor/mod.rs index 63a0484cfdfd5..4bd8c4cd57e43 100644 --- a/src/frontend/src/optimizer/plan_visitor/mod.rs +++ b/src/frontend/src/optimizer/plan_visitor/mod.rs @@ -40,7 +40,9 @@ pub use cardinality_visitor::*; mod jsonb_stream_key_checker; pub use jsonb_stream_key_checker::*; mod distributed_dml_visitor; +mod scan_table_visitor; pub use distributed_dml_visitor::*; +pub use scan_table_visitor::*; use crate::for_all_plan_nodes; use crate::optimizer::plan_node::*; diff --git a/src/frontend/src/optimizer/plan_visitor/scan_table_visitor.rs b/src/frontend/src/optimizer/plan_visitor/scan_table_visitor.rs new file mode 100644 index 0000000000000..0cb385108cfbd --- /dev/null +++ b/src/frontend/src/optimizer/plan_visitor/scan_table_visitor.rs @@ -0,0 +1,57 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; + +use risingwave_common::catalog::TableId; + +use super::{DefaultBehavior, DefaultValue}; +use crate::optimizer::plan_node::{BatchLogSeqScan, BatchLookupJoin}; +use crate::optimizer::plan_visitor::PlanVisitor; +use crate::PlanRef; + +#[derive(Debug, Clone, Default)] +pub struct ScanTableVisitor { + tables: HashSet, +} + +impl ScanTableVisitor { + pub fn collect(plan: PlanRef) -> HashSet { + let mut visitor = Self::default(); + visitor.visit(plan); + visitor.tables + } +} + +impl PlanVisitor for ScanTableVisitor { + type Result = (); + + type DefaultBehavior = impl DefaultBehavior; + + fn default_behavior() -> Self::DefaultBehavior { + DefaultValue + } + + fn visit_batch_seq_scan(&mut self, plan: &crate::optimizer::plan_node::BatchSeqScan) { + self.tables.insert(plan.core().table_desc.table_id); + } + + fn visit_batch_log_seq_scan(&mut self, plan: &BatchLogSeqScan) -> Self::Result { + self.tables.insert(plan.core().table_desc.table_id); + } + + fn visit_batch_lookup_join(&mut self, plan: &BatchLookupJoin) -> Self::Result { + self.tables.insert(plan.right_table_desc().table_id); + } +} diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index 9f14aae5a1931..3d5dbf4ee9a4e 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -14,7 +14,7 @@ use core::mem; use core::time::Duration; -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::rc::Rc; use std::sync::Arc; use std::time::Instant; @@ -650,7 +650,8 @@ impl SubscriptionCursor { query_mode, schema, stmt_type: StatementType::SELECT, - dependent_relations: table_catalog.dependent_relations.iter().cloned().collect(), + dependent_relations: HashSet::from_iter([table_catalog.id]), + scan_tables: HashSet::from_iter([table_catalog.id]), }) }