Skip to content

Commit

Permalink
refine plan id generation
Browse files Browse the repository at this point in the history
  • Loading branch information
leiysky committed Feb 7, 2024
1 parent 7ab90cc commit 9b342c9
Show file tree
Hide file tree
Showing 17 changed files with 165 additions and 38 deletions.
146 changes: 137 additions & 9 deletions src/query/sql/src/executor/physical_plan_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ pub struct PhysicalPlanBuilder {
pub(crate) metadata: MetadataRef,
pub(crate) ctx: Arc<dyn TableContext>,
pub(crate) func_ctx: FunctionContext,
pub(crate) next_plan_id: u32,
pub(crate) dry_run: bool,
// Record cte_idx and the cte's output columns
pub(crate) cte_output_columns: HashMap<IndexType, Vec<ColumnBinding>>,
Expand All @@ -45,19 +44,12 @@ impl PhysicalPlanBuilder {
Self {
metadata,
ctx,
next_plan_id: 0,
func_ctx,
dry_run,
cte_output_columns: Default::default(),
}
}

pub(crate) fn next_plan_id(&mut self) -> u32 {
let id = self.next_plan_id;
self.next_plan_id += 1;
id
}

pub(crate) fn build_plan_stat_info(&self, s_expr: &SExpr) -> Result<PlanStatsInfo> {
let rel_expr = RelExpr::with_s_expr(s_expr);
let stat_info = rel_expr.derive_cardinality()?;
Expand All @@ -67,9 +59,20 @@ impl PhysicalPlanBuilder {
})
}

pub async fn build(&mut self, s_expr: &SExpr, required: ColumnSet) -> Result<PhysicalPlan> {
let mut plan = self.build_physical_plan(s_expr, required).await?;
adjust_plan_id(&mut plan, &mut 0);

Ok(plan)
}

#[async_recursion::async_recursion]
#[async_backtrace::framed]
pub async fn build(&mut self, s_expr: &SExpr, required: ColumnSet) -> Result<PhysicalPlan> {
pub async fn build_physical_plan(
&mut self,
s_expr: &SExpr,
required: ColumnSet,
) -> Result<PhysicalPlan> {
// Build stat info.
let stat_info = self.build_plan_stat_info(s_expr)?;
match s_expr.plan() {
Expand Down Expand Up @@ -114,3 +117,128 @@ impl PhysicalPlanBuilder {
}
}
}

/// Adjust the plan_id of the physical plan.
/// This function will assign a unique plan_id to each physical plan node in a top-down manner.
/// Which means the plan_id of a node is always greater than the plan_id of its parent node.
fn adjust_plan_id(plan: &mut PhysicalPlan, next_id: &mut u32) {
match plan {
PhysicalPlan::TableScan(plan) => {
plan.plan_id = *next_id;
*next_id += 1;
}
PhysicalPlan::Filter(plan) => {
plan.plan_id = *next_id;
*next_id += 1;
adjust_plan_id(&mut plan.input, next_id);
}
PhysicalPlan::Project(plan) => {
plan.plan_id = *next_id;
*next_id += 1;
adjust_plan_id(&mut plan.input, next_id);
}
PhysicalPlan::EvalScalar(plan) => {
plan.plan_id = *next_id;
*next_id += 1;
adjust_plan_id(&mut plan.input, next_id);
}
PhysicalPlan::ProjectSet(plan) => {
plan.plan_id = *next_id;
*next_id += 1;
adjust_plan_id(&mut plan.input, next_id);
}
PhysicalPlan::AggregateExpand(plan) => {
plan.plan_id = *next_id;
*next_id += 1;
adjust_plan_id(&mut plan.input, next_id);
}
PhysicalPlan::AggregatePartial(plan) => {
plan.plan_id = *next_id;
*next_id += 1;
adjust_plan_id(&mut plan.input, next_id);
}
PhysicalPlan::AggregateFinal(plan) => {
plan.plan_id = *next_id;
*next_id += 1;
adjust_plan_id(&mut plan.input, next_id);
}
PhysicalPlan::Window(plan) => {
plan.plan_id = *next_id;
*next_id += 1;
adjust_plan_id(&mut plan.input, next_id);
}
PhysicalPlan::Sort(plan) => {
plan.plan_id = *next_id;
*next_id += 1;
adjust_plan_id(&mut plan.input, next_id);
}
PhysicalPlan::Limit(plan) => {
plan.plan_id = *next_id;
*next_id += 1;
adjust_plan_id(&mut plan.input, next_id);
}
PhysicalPlan::RowFetch(plan) => {
plan.plan_id = *next_id;
*next_id += 1;
adjust_plan_id(&mut plan.input, next_id);
}
PhysicalPlan::HashJoin(plan) => {
plan.plan_id = *next_id;
*next_id += 1;
adjust_plan_id(&mut plan.probe, next_id);
adjust_plan_id(&mut plan.build, next_id);
}
PhysicalPlan::RangeJoin(plan) => {
plan.plan_id = *next_id;
*next_id += 1;
adjust_plan_id(&mut plan.left, next_id);
adjust_plan_id(&mut plan.right, next_id);
}
PhysicalPlan::Exchange(plan) => {
plan.plan_id = *next_id;
*next_id += 1;
adjust_plan_id(&mut plan.input, next_id);
}
PhysicalPlan::UnionAll(plan) => {
plan.plan_id = *next_id;
*next_id += 1;
adjust_plan_id(&mut plan.left, next_id);
adjust_plan_id(&mut plan.right, next_id);
}
PhysicalPlan::CteScan(plan) => {
plan.plan_id = *next_id;
*next_id += 1;
}
PhysicalPlan::MaterializedCte(plan) => {
plan.plan_id = *next_id;
*next_id += 1;
}
PhysicalPlan::ConstantTableScan(plan) => {
plan.plan_id = *next_id;
*next_id += 1;
}
PhysicalPlan::Udf(plan) => {
plan.plan_id = *next_id;
*next_id += 1;
adjust_plan_id(&mut plan.input, next_id);
}
PhysicalPlan::DistributedInsertSelect(plan) => {
plan.plan_id = *next_id;
*next_id += 1;
adjust_plan_id(&mut plan.input, next_id);
}
PhysicalPlan::ExchangeSource(plan) => {
plan.plan_id = *next_id;
*next_id += 1;
}
PhysicalPlan::ExchangeSink(plan) => {
plan.plan_id = *next_id;
*next_id += 1;
}
PhysicalPlan::CopyIntoTable(plan) => {
plan.plan_id = *next_id;
*next_id += 1;
}
_ => {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,14 +181,14 @@ impl PhysicalPlanBuilder {
{
let aggregate_partial = if let Some(grouping_sets) = agg.grouping_sets {
let expand = AggregateExpand {
plan_id: self.next_plan_id(),
plan_id: 0,
input,
group_bys: group_items.clone(),
grouping_sets,
stat_info: Some(stat_info.clone()),
};
AggregatePartial {
plan_id: self.next_plan_id(),
plan_id: 0,
input: Box::new(PhysicalPlan::AggregateExpand(expand)),
agg_funcs,
group_by_display,
Expand All @@ -197,7 +197,7 @@ impl PhysicalPlanBuilder {
}
} else {
AggregatePartial {
plan_id: self.next_plan_id(),
plan_id: 0,
input,
agg_funcs,
group_by_display,
Expand All @@ -221,7 +221,7 @@ impl PhysicalPlanBuilder {
.data_type();

PhysicalPlan::Exchange(Exchange {
plan_id: self.next_plan_id(),
plan_id: 0,
kind,
allow_adjust_parallelism: true,
ignore_exchange: false,
Expand All @@ -237,14 +237,14 @@ impl PhysicalPlanBuilder {
_ => {
if let Some(grouping_sets) = agg.grouping_sets {
let expand = AggregateExpand {
plan_id: self.next_plan_id(),
plan_id: 0,
input: Box::new(input),
group_bys: group_items.clone(),
grouping_sets,
stat_info: Some(stat_info.clone()),
};
PhysicalPlan::AggregatePartial(AggregatePartial {
plan_id: self.next_plan_id(),
plan_id: 0,
agg_funcs,
group_by_display,
group_by: group_items,
Expand All @@ -253,7 +253,7 @@ impl PhysicalPlanBuilder {
})
} else {
PhysicalPlan::AggregatePartial(AggregatePartial {
plan_id: self.next_plan_id(),
plan_id: 0,
agg_funcs,
group_by_display,
group_by: group_items,
Expand Down Expand Up @@ -334,7 +334,7 @@ impl PhysicalPlanBuilder {
let before_group_by_schema = partial.input.output_schema()?;
let limit = agg.limit;
PhysicalPlan::AggregateFinal(AggregateFinal {
plan_id: self.next_plan_id(),
plan_id: 0,
group_by_display: partial.group_by_display.clone(),
input: Box::new(input),
group_by: group_items,
Expand All @@ -354,7 +354,7 @@ impl PhysicalPlanBuilder {
let limit = agg.limit;

PhysicalPlan::AggregateFinal(AggregateFinal {
plan_id: self.next_plan_id(),
plan_id: 0,
group_by_display: partial.group_by_display.clone(),
input: Box::new(input),
group_by: group_items,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl PhysicalPlanBuilder {
};
// 2. Build physical plan.
Ok(PhysicalPlan::ConstantTableScan(ConstantTableScan {
plan_id: self.next_plan_id(),
plan_id: 0,
values,
num_rows: scan.num_rows,
output_schema: DataSchemaRefExt::create(fields),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl PhysicalPlanBuilder {

// 2. Build physical plan.
Ok(PhysicalPlan::CteScan(CteScan {
plan_id: self.next_plan_id(),
plan_id: 0,
cte_idx: cte_scan.cte_idx,
output_schema: DataSchemaRefExt::create(pruned_fields),
offsets: pruned_offsets,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl PhysicalPlanBuilder {
}
}
Ok(PhysicalPlan::EvalScalar(EvalScalar {
plan_id: self.next_plan_id(),
plan_id: 0,
projections,
input: Box::new(input),
exprs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl PhysicalPlanBuilder {
}
};
Ok(PhysicalPlan::Exchange(Exchange {
plan_id: self.next_plan_id(),
plan_id: 0,
input,
kind,
keys,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl PhysicalPlanBuilder {
}

Ok(PhysicalPlan::Filter(Filter {
plan_id: self.next_plan_id(),
plan_id: 0,
projections,
input,
predicates: filter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ impl PhysicalPlanBuilder {
let output_schema = DataSchemaRefExt::create(output_fields);

Ok(PhysicalPlan::HashJoin(HashJoin {
plan_id: self.next_plan_id(),
plan_id: 0,
projections,
build_projections,
probe_projections,
Expand Down
9 changes: 4 additions & 5 deletions src/query/sql/src/executor/physical_plans/physical_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,10 @@ impl PhysicalPlanBuilder {

// 2. Build physical plan.
let input_plan = self.build(s_expr.child(0)?, required).await?;
let next_plan_id = self.next_plan_id();
let metadata = self.metadata.read().clone();
if limit.before_exchange || metadata.lazy_columns().is_empty() {
return Ok(PhysicalPlan::Limit(Limit {
plan_id: next_plan_id,
plan_id: 0,
input: Box::new(input_plan),
limit: limit.limit,
offset: limit.offset,
Expand Down Expand Up @@ -103,7 +102,7 @@ impl PhysicalPlanBuilder {
if limit.before_exchange || lazy_columns.is_empty() {
// If there is no lazy column, we don't need to build a `RowFetch` plan.
return Ok(PhysicalPlan::Limit(Limit {
plan_id: next_plan_id,
plan_id: 0,
input: Box::new(input_plan),
limit: limit.limit,
offset: limit.offset,
Expand Down Expand Up @@ -140,9 +139,9 @@ impl PhysicalPlanBuilder {
);

Ok(PhysicalPlan::RowFetch(RowFetch {
plan_id: self.next_plan_id(),
plan_id: 0,
input: Box::new(PhysicalPlan::Limit(Limit {
plan_id: next_plan_id,
plan_id: 0,
input: Box::new(input_plan),
limit: limit.limit,
offset: limit.offset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl PhysicalPlanBuilder {

// 2. Build physical plan.
Ok(PhysicalPlan::MaterializedCte(MaterializedCte {
plan_id: self.next_plan_id(),
plan_id: 0,
left: Box::new(self.build(s_expr.child(0)?, left_required).await?),
right: Box::new(self.build(s_expr.child(1)?, required).await?),
cte_idx: cte.cte_idx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl PhysicalPlanBuilder {
}

Ok(PhysicalPlan::ProjectSet(ProjectSet {
plan_id: self.next_plan_id(),
plan_id: 0,
input: Box::new(input),
srf_exprs,
projections,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl PhysicalPlanBuilder {
);

Ok(PhysicalPlan::RangeJoin(RangeJoin {
plan_id: self.next_plan_id(),
plan_id: 0,
left: Box::new(left_side),
right: Box::new(right_side),
conditions: range_conditions
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/executor/physical_plans/physical_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl PhysicalPlanBuilder {

// 2. Build physical plan.
Ok(PhysicalPlan::Sort(Sort {
plan_id: self.next_plan_id(),
plan_id: 0,
input: Box::new(self.build(s_expr.child(0)?, required).await?),
order_by: sort
.items
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ impl PhysicalPlanBuilder {
Some(project_internal_columns)
};
Ok(PhysicalPlan::TableScan(TableScan {
plan_id: self.next_plan_id(),
plan_id: 0,
name_mapping,
source: Box::new(source),
table_index: scan.table_index,
Expand Down Expand Up @@ -272,7 +272,7 @@ impl PhysicalPlanBuilder {
)
.await?;
Ok(PhysicalPlan::TableScan(TableScan {
plan_id: self.next_plan_id(),
plan_id: 0,
name_mapping: BTreeMap::from([("dummy".to_string(), DUMMY_COLUMN_INDEX)]),
source: Box::new(source),
table_index: DUMMY_TABLE_INDEX,
Expand Down
Loading

0 comments on commit 9b342c9

Please sign in to comment.