Skip to content

Commit

Permalink
refine plan id generation
Browse files Browse the repository at this point in the history
  • Loading branch information
leiysky committed Feb 7, 2024
1 parent 7ab90cc commit 52d4ad2
Show file tree
Hide file tree
Showing 12 changed files with 48 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ impl PhysicalPlanBuilder {
};

// 2. Build physical plan.
let plan_id = self.next_plan_id();
let input = self.build(s_expr.child(0)?, required).await?;
let input_schema = input.output_schema()?;
let group_items = agg.group_items.iter().map(|v| v.index).collect::<Vec<_>>();
Expand Down Expand Up @@ -181,14 +182,14 @@ impl PhysicalPlanBuilder {
{
let aggregate_partial = if let Some(grouping_sets) = agg.grouping_sets {
let expand = AggregateExpand {
plan_id: self.next_plan_id(),
plan_id,
input,
group_bys: group_items.clone(),
grouping_sets,
stat_info: Some(stat_info.clone()),
};
AggregatePartial {
plan_id: self.next_plan_id(),
plan_id,
input: Box::new(PhysicalPlan::AggregateExpand(expand)),
agg_funcs,
group_by_display,
Expand All @@ -197,7 +198,7 @@ impl PhysicalPlanBuilder {
}
} else {
AggregatePartial {
plan_id: self.next_plan_id(),
plan_id,
input,
agg_funcs,
group_by_display,
Expand All @@ -221,7 +222,7 @@ impl PhysicalPlanBuilder {
.data_type();

PhysicalPlan::Exchange(Exchange {
plan_id: self.next_plan_id(),
plan_id,
kind,
allow_adjust_parallelism: true,
ignore_exchange: false,
Expand All @@ -237,14 +238,14 @@ impl PhysicalPlanBuilder {
_ => {
if let Some(grouping_sets) = agg.grouping_sets {
let expand = AggregateExpand {
plan_id: self.next_plan_id(),
plan_id,
input: Box::new(input),
group_bys: group_items.clone(),
grouping_sets,
stat_info: Some(stat_info.clone()),
};
PhysicalPlan::AggregatePartial(AggregatePartial {
plan_id: self.next_plan_id(),
plan_id,
agg_funcs,
group_by_display,
group_by: group_items,
Expand All @@ -253,7 +254,7 @@ impl PhysicalPlanBuilder {
})
} else {
PhysicalPlan::AggregatePartial(AggregatePartial {
plan_id: self.next_plan_id(),
plan_id,
agg_funcs,
group_by_display,
group_by: group_items,
Expand Down Expand Up @@ -334,7 +335,7 @@ impl PhysicalPlanBuilder {
let before_group_by_schema = partial.input.output_schema()?;
let limit = agg.limit;
PhysicalPlan::AggregateFinal(AggregateFinal {
plan_id: self.next_plan_id(),
plan_id,
group_by_display: partial.group_by_display.clone(),
input: Box::new(input),
group_by: group_items,
Expand All @@ -354,7 +355,7 @@ impl PhysicalPlanBuilder {
let limit = agg.limit;

PhysicalPlan::AggregateFinal(AggregateFinal {
plan_id: self.next_plan_id(),
plan_id,
group_by_display: partial.group_by_display.clone(),
input: Box::new(input),
group_by: group_items,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ impl PhysicalPlanBuilder {
})
}
// 2. Build physical plan.
let plan_id = self.next_plan_id();
if used.is_empty() {
self.build(s_expr.child(0)?, required).await
} else {
Expand All @@ -109,7 +110,7 @@ impl PhysicalPlanBuilder {
};

let eval_scalar = crate::plans::EvalScalar { items: used };
self.create_eval_scalar(&eval_scalar, column_projections, input, stat_info)
self.create_eval_scalar(&eval_scalar, column_projections, input, stat_info, plan_id)
}
}

Expand All @@ -119,6 +120,7 @@ impl PhysicalPlanBuilder {
column_projections: Vec<IndexType>,
input: PhysicalPlan,
stat_info: PlanStatsInfo,
plan_id: u32,
) -> Result<PhysicalPlan> {
let input_schema = input.output_schema()?;

Expand Down Expand Up @@ -158,7 +160,7 @@ impl PhysicalPlanBuilder {
}
}
Ok(PhysicalPlan::EvalScalar(EvalScalar {
plan_id: self.next_plan_id(),
plan_id,
projections,
input: Box::new(input),
exprs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ impl PhysicalPlanBuilder {
}

// 2. Build physical plan.
let plan_id = self.next_plan_id();
let input = Box::new(self.build(s_expr.child(0)?, required).await?);
let input_schema = input.output_schema()?;
let mut keys = vec![];
Expand All @@ -83,7 +84,7 @@ impl PhysicalPlanBuilder {
}
};
Ok(PhysicalPlan::Exchange(Exchange {
plan_id: self.next_plan_id(),
plan_id,
input,
kind,
keys,
Expand Down
4 changes: 3 additions & 1 deletion src/query/sql/src/executor/physical_plans/physical_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ impl PhysicalPlanBuilder {
required: ColumnSet,
stat_info: PlanStatsInfo,
) -> Result<PhysicalPlan> {
let plan_id = self.next_plan_id();

// 1. Prune unused Columns.
let column_projections = required.clone().into_iter().collect::<Vec<_>>();
let used = filter.predicates.iter().fold(required, |acc, v| {
Expand All @@ -78,7 +80,7 @@ impl PhysicalPlanBuilder {
}

Ok(PhysicalPlan::Filter(Filter {
plan_id: self.next_plan_id(),
plan_id,
projections,
input,
predicates: filter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ impl PhysicalPlanBuilder {
column_projections: Vec<IndexType>,
stat_info: PlanStatsInfo,
) -> Result<PhysicalPlan> {
let plan_id = self.next_plan_id();

let mut probe_side = Box::new(self.build(s_expr.child(0)?, required.0).await?);
let mut build_side = Box::new(self.build(s_expr.child(1)?, required.1).await?);

Expand Down Expand Up @@ -468,7 +470,7 @@ impl PhysicalPlanBuilder {
let output_schema = DataSchemaRefExt::create(output_fields);

Ok(PhysicalPlan::HashJoin(HashJoin {
plan_id: self.next_plan_id(),
plan_id,
projections,
build_projections,
probe_projections,
Expand Down
13 changes: 7 additions & 6 deletions src/query/sql/src/executor/physical_plans/physical_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,13 @@ impl PhysicalPlanBuilder {
required.extend(metadata.row_id_indexes());

// 2. Build physical plan.
let limit_plan_id = self.next_plan_id();
let row_fetch_plan_id = self.next_plan_id();
let input_plan = self.build(s_expr.child(0)?, required).await?;
let next_plan_id = self.next_plan_id();
let metadata = self.metadata.read().clone();
if limit.before_exchange || metadata.lazy_columns().is_empty() {
return Ok(PhysicalPlan::Limit(Limit {
plan_id: next_plan_id,
plan_id: limit_plan_id,
input: Box::new(input_plan),
limit: limit.limit,
offset: limit.offset,
Expand Down Expand Up @@ -100,10 +101,10 @@ impl PhysicalPlanBuilder {
.cloned()
.collect::<Vec<_>>();

if limit.before_exchange || lazy_columns.is_empty() {
if lazy_columns.is_empty() {
// If there is no lazy column, we don't need to build a `RowFetch` plan.
return Ok(PhysicalPlan::Limit(Limit {
plan_id: next_plan_id,
plan_id: limit_plan_id,
input: Box::new(input_plan),
limit: limit.limit,
offset: limit.offset,
Expand Down Expand Up @@ -140,9 +141,9 @@ impl PhysicalPlanBuilder {
);

Ok(PhysicalPlan::RowFetch(RowFetch {
plan_id: self.next_plan_id(),
plan_id: row_fetch_plan_id,
input: Box::new(PhysicalPlan::Limit(Limit {
plan_id: next_plan_id,
plan_id: limit_plan_id,
input: Box::new(input_plan),
limit: limit.limit,
offset: limit.offset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl PhysicalPlanBuilder {
}

// 2. Build physical plan.
let plan_id = self.next_plan_id();
let input = self.build(s_expr.child(0)?, required).await?;
let input_schema = input.output_schema()?;
let srf_exprs = project_set
Expand All @@ -97,7 +98,7 @@ impl PhysicalPlanBuilder {
}

Ok(PhysicalPlan::ProjectSet(ProjectSet {
plan_id: self.next_plan_id(),
plan_id,
input: Box::new(input),
srf_exprs,
projections,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ impl PhysicalPlanBuilder {
mut range_conditions: Vec<ScalarExpr>,
mut other_conditions: Vec<ScalarExpr>,
) -> Result<PhysicalPlan> {
let plan_id = self.next_plan_id();

let left_prop = RelExpr::with_s_expr(s_expr.child(1)?).derive_relational_prop()?;
let right_prop = RelExpr::with_s_expr(s_expr.child(0)?).derive_relational_prop()?;

Expand Down Expand Up @@ -116,7 +118,7 @@ impl PhysicalPlanBuilder {
);

Ok(PhysicalPlan::RangeJoin(RangeJoin {
plan_id: self.next_plan_id(),
plan_id,
left: Box::new(left_side),
right: Box::new(right_side),
conditions: range_conditions
Expand Down
4 changes: 3 additions & 1 deletion src/query/sql/src/executor/physical_plans/physical_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ impl PhysicalPlanBuilder {
mut required: ColumnSet,
stat_info: PlanStatsInfo,
) -> Result<PhysicalPlan> {
let plan_id = self.next_plan_id();

// 1. Prune unused Columns.
sort.items.iter().for_each(|s| {
required.insert(s.index);
Expand All @@ -121,7 +123,7 @@ impl PhysicalPlanBuilder {

// 2. Build physical plan.
Ok(PhysicalPlan::Sort(Sort {
plan_id: self.next_plan_id(),
plan_id,
input: Box::new(self.build(s_expr.child(0)?, required).await?),
order_by: sort
.items
Expand Down
3 changes: 2 additions & 1 deletion src/query/sql/src/executor/physical_plans/physical_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ impl PhysicalPlanBuilder {
}

// 2. Build physical plan.
let plan_id = self.next_plan_id();
if used.is_empty() {
return self.build(s_expr.child(0)?, required).await;
}
Expand Down Expand Up @@ -148,7 +149,7 @@ impl PhysicalPlanBuilder {
.collect::<Result<Vec<_>>>()?;

Ok(PhysicalPlan::Udf(Udf {
plan_id: self.next_plan_id(),
plan_id,
input: Box::new(input),
udf_funcs,
stat_info: Some(stat_info),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl PhysicalPlanBuilder {
});

// 2. Build physical plan.
let plan_id = self.next_plan_id();
let left_plan = self.build(s_expr.child(0)?, left_required).await?;
let right_plan = self.build(s_expr.child(1)?, right_required).await?;
let left_schema = left_plan.output_schema()?;
Expand Down Expand Up @@ -134,13 +135,15 @@ impl PhysicalPlanBuilder {
let new_plan = if scalar_items.is_empty() {
plan
} else {
let plan_id = plan_builder.next_plan_id();
plan_builder.create_eval_scalar(
&crate::plans::EvalScalar {
items: scalar_items,
},
indexes.to_vec(),
plan,
stat_info,
plan_id,
)?
};

Expand Down Expand Up @@ -180,7 +183,7 @@ impl PhysicalPlanBuilder {
.collect::<Vec<_>>();

Ok(PhysicalPlan::UnionAll(UnionAll {
plan_id: self.next_plan_id(),
plan_id,
left: Box::new(left_plan),
right: Box::new(right_plan),
pairs,
Expand Down
6 changes: 5 additions & 1 deletion src/query/sql/src/executor/physical_plans/physical_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ impl PhysicalPlanBuilder {
mut required: ColumnSet,
stat_info: PlanStatsInfo,
) -> Result<PhysicalPlan> {
let plan_id = self.next_plan_id();

// 1. DO NOT Prune unused Columns cause window may not in required, eg:
// select s1.a from ( select t1.a as a, dense_rank() over(order by t1.a desc) as rk
// from (select 'a1' as a) t1 ) s1
Expand Down Expand Up @@ -192,13 +194,15 @@ impl PhysicalPlanBuilder {
scalar_items.push(order.order_by_item.clone())
}
let input = if !scalar_items.is_empty() {
let plan_id = self.next_plan_id();
self.create_eval_scalar(
&crate::planner::plans::EvalScalar {
items: scalar_items,
},
column_projections,
input,
stat_info.clone(),
plan_id,
)?
} else {
input
Expand Down Expand Up @@ -355,7 +359,7 @@ impl PhysicalPlanBuilder {
};

Ok(PhysicalPlan::Window(Window {
plan_id: self.next_plan_id(),
plan_id,
index: w.index,
input: Box::new(input),
func,
Expand Down

0 comments on commit 52d4ad2

Please sign in to comment.