Skip to content

Commit

Permalink
impl ScanTableVisitor
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Sep 23, 2024
1 parent 4e897e9 commit 4723dbe
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 39 deletions.
18 changes: 3 additions & 15 deletions src/frontend/src/handler/declare_cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ pub async fn create_chunk_stream_for_cursor(
plan_fragmenter,
query_mode,
schema,
dependent_relations,
scan_tables,
..
} = plan_fragmenter_result;

Expand All @@ -170,22 +170,10 @@ pub async fn create_chunk_stream_for_cursor(
match query_mode {
QueryMode::Auto => unreachable!(),
QueryMode::Local => CursorDataChunkStream::LocalDataChunk(Some(
local_execute(
session.clone(),
query,
can_timeout_cancel,
dependent_relations,
)
.await?,
local_execute(session.clone(), query, can_timeout_cancel, scan_tables).await?,
)),
QueryMode::Distributed => CursorDataChunkStream::DistributedDataChunk(Some(
distribute_execute(
session.clone(),
query,
can_timeout_cancel,
dependent_relations,
)
.await?,
distribute_execute(session.clone(), query, can_timeout_cancel, scan_tables).await?,
)),
},
schema.fields.clone(),
Expand Down
34 changes: 13 additions & 21 deletions src/frontend/src/handler/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::Explain;
use crate::optimizer::{
ExecutionModeDecider, OptimizerContext, OptimizerContextRef, RelationCollectorVisitor,
SysTableVisitor,
ScanTableVisitor, SysTableVisitor,
};
use crate::planner::Planner;
use crate::scheduler::plan_fragmenter::Query;
Expand Down Expand Up @@ -206,6 +206,7 @@ pub struct BatchQueryPlanResult {
// subset of the final one. i.e. the final one may contain more implicit dependencies on
// indices.
pub(crate) dependent_relations: HashSet<TableId>,
pub(crate) scan_tables: HashSet<TableId>,
}

fn gen_batch_query_plan(
Expand All @@ -230,6 +231,8 @@ fn gen_batch_query_plan(
let dependent_relations =
RelationCollectorVisitor::collect_with(dependent_relations, batch_plan.clone());

let scan_tables = ScanTableVisitor::collect(batch_plan.clone());

let must_local = must_run_in_local_mode(batch_plan.clone());

let query_mode = match (must_dist, must_local) {
Expand Down Expand Up @@ -259,7 +262,8 @@ fn gen_batch_query_plan(
query_mode,
schema,
stmt_type,
dependent_relations: dependent_relations.into_iter().collect(),
dependent_relations,
scan_tables,
})
}

Expand Down Expand Up @@ -311,7 +315,7 @@ pub struct BatchPlanFragmenterResult {
pub(crate) query_mode: QueryMode,
pub(crate) schema: Schema,
pub(crate) stmt_type: StatementType,
pub(crate) dependent_relations: HashSet<TableId>,
pub(crate) scan_tables: HashSet<TableId>,
}

pub fn gen_batch_plan_fragmenter(
Expand All @@ -323,7 +327,8 @@ pub fn gen_batch_plan_fragmenter(
query_mode,
schema,
stmt_type,
dependent_relations,
scan_tables,
..
} = plan_result;

tracing::trace!(
Expand All @@ -347,7 +352,7 @@ pub fn gen_batch_plan_fragmenter(
query_mode,
schema,
stmt_type,
dependent_relations,
scan_tables,
})
}

Expand All @@ -361,8 +366,7 @@ pub async fn create_stream(
query_mode,
schema,
stmt_type,
dependent_relations,
..
scan_tables,
} = plan_fragmenter_result;

let mut can_timeout_cancel = true;
Expand Down Expand Up @@ -393,27 +397,15 @@ pub async fn create_stream(
let row_stream = match query_mode {
QueryMode::Auto => unreachable!(),
QueryMode::Local => PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new(
local_execute(
session.clone(),
query,
can_timeout_cancel,
dependent_relations,
)
.await?,
local_execute(session.clone(), query, can_timeout_cancel, scan_tables).await?,
column_types,
formats,
session.clone(),
)),
// Local mode do not support cancel tasks.
QueryMode::Distributed => {
PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new(
distribute_execute(
session.clone(),
query,
can_timeout_cancel,
dependent_relations,
)
.await?,
distribute_execute(session.clone(), query, can_timeout_cancel, scan_tables).await?,
column_types,
formats,
session.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub use plan_rewriter::PlanRewriter;
mod plan_visitor;

pub use plan_visitor::{
ExecutionModeDecider, PlanVisitor, RelationCollectorVisitor, SysTableVisitor,
ExecutionModeDecider, PlanVisitor, RelationCollectorVisitor, ScanTableVisitor, SysTableVisitor,
};
use risingwave_sqlparser::ast::OnConflict;

Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/plan_visitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ pub use cardinality_visitor::*;
mod jsonb_stream_key_checker;
pub use jsonb_stream_key_checker::*;
mod distributed_dml_visitor;
mod scan_table_visitor;
pub use distributed_dml_visitor::*;
pub use scan_table_visitor::*;

use crate::for_all_plan_nodes;
use crate::optimizer::plan_node::*;
Expand Down
57 changes: 57 additions & 0 deletions src/frontend/src/optimizer/plan_visitor/scan_table_visitor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;

use risingwave_common::catalog::TableId;

use super::{DefaultBehavior, DefaultValue};
use crate::optimizer::plan_node::{BatchLogSeqScan, BatchLookupJoin};
use crate::optimizer::plan_visitor::PlanVisitor;
use crate::PlanRef;

#[derive(Debug, Clone, Default)]
pub struct ScanTableVisitor {
tables: HashSet<TableId>,
}

impl ScanTableVisitor {
pub fn collect(plan: PlanRef) -> HashSet<TableId> {
let mut visitor = Self::default();
visitor.visit(plan);
visitor.tables
}
}

impl PlanVisitor for ScanTableVisitor {
type Result = ();

type DefaultBehavior = impl DefaultBehavior<Self::Result>;

fn default_behavior() -> Self::DefaultBehavior {
DefaultValue
}

fn visit_batch_seq_scan(&mut self, plan: &crate::optimizer::plan_node::BatchSeqScan) {
self.tables.insert(plan.core().table_desc.table_id);
}

fn visit_batch_log_seq_scan(&mut self, plan: &BatchLogSeqScan) -> Self::Result {
self.tables.insert(plan.core().table_desc.table_id);
}

fn visit_batch_lookup_join(&mut self, plan: &BatchLookupJoin) -> Self::Result {
self.tables.insert(plan.right_table_desc().table_id);
}
}
5 changes: 3 additions & 2 deletions src/frontend/src/session/cursor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use core::mem;
use core::time::Duration;
use std::collections::{HashMap, VecDeque};
use std::collections::{HashMap, HashSet, VecDeque};
use std::rc::Rc;
use std::sync::Arc;
use std::time::Instant;
Expand Down Expand Up @@ -650,7 +650,8 @@ impl SubscriptionCursor {
query_mode,
schema,
stmt_type: StatementType::SELECT,
dependent_relations: table_catalog.dependent_relations.iter().cloned().collect(),
dependent_relations: HashSet::from_iter([table_catalog.id]),
scan_tables: HashSet::from_iter([table_catalog.id]),
})
}

Expand Down

0 comments on commit 4723dbe

Please sign in to comment.