Skip to content

Commit

Permalink
feat: partition by left most cluster key when building merge into fil…
Browse files Browse the repository at this point in the history
…ter (databendlabs#13547)

* chore: support collect statistics of multi join expr in merge into

* partition by first join condition

* fix filter

* update

* fix clippy

* fix

* fix bind

* fix and add log

* fix stackoverflow
  • Loading branch information
SkyFan2002 authored and andylokandy committed Nov 27, 2023
1 parent 08cfd55 commit 5629ec0
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 21 deletions.
158 changes: 143 additions & 15 deletions src/query/service/src/interpreters/interpreter_merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ use std::sync::Arc;
use std::time::Instant;
use std::u64::MAX;

use common_ast::parser::parse_comma_separated_exprs;
use common_ast::parser::tokenize_sql;
use common_ast::Dialect;
use common_catalog::lock::Lock;
use common_catalog::table::TableExt;
use common_exception::ErrorCode;
Expand All @@ -30,6 +33,7 @@ use common_expression::SendableDataBlockStream;
use common_expression::ROW_NUMBER_COL_NAME;
use common_functions::BUILTIN_FUNCTIONS;
use common_meta_app::schema::TableInfo;
use common_sql::bind_one_table;
use common_sql::executor::CommitSink;
use common_sql::executor::Exchange;
use common_sql::executor::FragmentKind::Merge;
Expand All @@ -54,16 +58,20 @@ use common_sql::plans::RelOperator;
use common_sql::plans::ScalarItem;
use common_sql::plans::UpdatePlan;
use common_sql::BindContext;
use common_sql::ColumnBinding;
use common_sql::ColumnBindingBuilder;
use common_sql::IndexType;
use common_sql::MetadataRef;
use common_sql::NameResolutionContext;
use common_sql::ScalarExpr;
use common_sql::TypeCheck;
use common_sql::TypeChecker;
use common_sql::Visibility;
use common_storages_factory::Table;
use common_storages_fuse::FuseTable;
use common_storages_fuse::TableContext;
use itertools::Itertools;
use log::info;
use storages_common_locks::LockManager;
use storages_common_table_meta::meta::TableSnapshot;
use tokio_stream::StreamExt;
Expand Down Expand Up @@ -111,6 +119,20 @@ impl MergeStyleJoin<'_> {
target_sexpr,
}
}

pub fn collect_column_map(&self) -> HashMap<String, ColumnBinding> {
let mut column_map = HashMap::new();
for (t, s) in self
.target_conditions
.iter()
.zip(self.source_conditions.iter())
{
if let (ScalarExpr::BoundColumnRef(t_col), ScalarExpr::BoundColumnRef(s_col)) = (t, s) {
column_map.insert(t_col.column.column_name.clone(), s_col.column.clone());
}
}
column_map
}
}

impl MergeIntoInterpreter {
Expand Down Expand Up @@ -196,7 +218,7 @@ impl MergeIntoInterpreter {
};

let optimized_input = self
.build_static_filter(&input, meta_data, self.ctx.clone())
.build_static_filter(&input, meta_data, self.ctx.clone(), check_table)
.await?;
let mut builder = PhysicalPlanBuilder::new(meta_data.clone(), self.ctx.clone(), false);

Expand Down Expand Up @@ -454,6 +476,7 @@ impl MergeIntoInterpreter {
join: &SExpr,
metadata: &MetadataRef,
ctx: Arc<QueryContext>,
table: Arc<dyn Table>,
) -> Result<Box<SExpr>> {
// 1. collect statistics from the source side
// plan of source table is extended to:
Expand All @@ -469,9 +492,71 @@ impl MergeIntoInterpreter {
let mut eval_scalar_items = Vec::with_capacity(m_join.source_conditions.len());
let mut min_max_binding = Vec::with_capacity(m_join.source_conditions.len() * 2);
let mut min_max_scalar_items = Vec::with_capacity(m_join.source_conditions.len() * 2);
let mut group_items = vec![];
if m_join.source_conditions.is_empty() {
return Ok(Box::new(join.clone()));
}
let column_map = m_join.collect_column_map();
let fuse_table =
table
.as_any()
.downcast_ref::<FuseTable>()
.ok_or(ErrorCode::Unimplemented(format!(
"table {}, engine type {}, does not support MERGE INTO",
table.name(),
table.get_table_info().engine(),
)))?;
let mut group_exprs = vec![];
if let Some(cluster_key_str) = fuse_table.cluster_key_str() {
let sql_dialect = Dialect::MySQL;
let tokens = tokenize_sql(cluster_key_str)?;
let ast_exprs = parse_comma_separated_exprs(&tokens, sql_dialect)?;
let (mut bind_context, metadata) = bind_one_table(table)?;
if !ast_exprs.is_empty() {
let ast_expr = &ast_exprs[0];
let name_resolution_ctx =
NameResolutionContext::try_from(ctx.get_settings().as_ref())?;
let mut type_checker = TypeChecker::new(
&mut bind_context,
ctx.clone(),
&name_resolution_ctx,
metadata.clone(),
&[],
false,
false,
);
let (scalar_expr, _) = *type_checker.resolve(ast_expr).await?;
let projected = scalar_expr.try_project_column_binding(|binding| {
column_map.get(&binding.column_name).cloned()
});
if let Some(p) = projected {
group_exprs.push(p);
}
}
}
for group_expr in group_exprs {
let index = metadata
.write()
.add_derived_column("".to_string(), group_expr.data_type()?);
let evaled = ScalarExpr::BoundColumnRef(BoundColumnRef {
span: None,
column: ColumnBindingBuilder::new(
"".to_string(),
index,
Box::new(group_expr.data_type()?),
Visibility::Visible,
)
.build(),
});
eval_scalar_items.push(ScalarItem {
scalar: group_expr.clone(),
index,
});
group_items.push(ScalarItem {
scalar: evaled.clone(),
index,
});
}
for source_side_expr in m_join.source_conditions {
// eval source side join expr
let index = metadata
Expand Down Expand Up @@ -543,8 +628,6 @@ impl MergeIntoInterpreter {
min_max_scalar_items.push(max);
}

let group_item = eval_scalar_items[0].clone();

let eval_source_side_join_expr_op = EvalScalar {
items: eval_scalar_items,
};
Expand All @@ -568,7 +651,7 @@ impl MergeIntoInterpreter {

let agg_partial_op = Aggregate {
mode: AggregateMode::Partial,
group_items: vec![group_item.clone()],
group_items: group_items.clone(),
aggregate_functions: min_max_scalar_items.clone(),
from_distinct: false,
limit: None,
Expand All @@ -580,7 +663,7 @@ impl MergeIntoInterpreter {
);
let agg_final_op = Aggregate {
mode: AggregateMode::Final,
group_items: vec![group_item],
group_items,
aggregate_functions: min_max_scalar_items,
from_distinct: false,
limit: None,
Expand Down Expand Up @@ -643,16 +726,7 @@ impl MergeIntoInterpreter {
filter_parts.push(and);
}
}
let mut or = filter_parts[0].clone();
for filter_part in filter_parts.iter().skip(1) {
or = ScalarExpr::FunctionCall(FunctionCall {
span: None,
func_name: "or".to_string(),
params: vec![],
arguments: vec![or, filter_part.clone()],
});
}
filters.push(or);
filters.extend(Self::combine_filter_parts(&filter_parts).into_iter());
}
let mut target_plan = m_join.target_sexpr.clone();
Self::push_down_filters(&mut target_plan, &filters)?;
Expand All @@ -661,10 +735,64 @@ impl MergeIntoInterpreter {
Ok(Box::new(new_sexpr))
}

fn combine_filter_parts(filter_parts: &[ScalarExpr]) -> Option<ScalarExpr> {
match filter_parts.len() {
0 => None,
1 => Some(filter_parts[0].clone()),
_ => {
let mid = filter_parts.len() / 2;
let left = Self::combine_filter_parts(&filter_parts[0..mid]);
let right = Self::combine_filter_parts(&filter_parts[mid..]);
if let Some(left) = left {
if let Some(right) = right {
Some(ScalarExpr::FunctionCall(FunctionCall {
span: None,
func_name: "or".to_string(),
params: vec![],
arguments: vec![left, right],
}))
} else {
Some(left)
}
} else {
right
}
}
}
}

fn display_scalar_expr(s: &ScalarExpr) -> String {
match s {
ScalarExpr::BoundColumnRef(x) => x.column.column_name.clone(),
ScalarExpr::ConstantExpr(x) => x.value.to_string(),
ScalarExpr::WindowFunction(x) => format!("{:?}", x),
ScalarExpr::AggregateFunction(x) => format!("{:?}", x),
ScalarExpr::LambdaFunction(x) => format!("{:?}", x),
ScalarExpr::FunctionCall(x) => match x.func_name.as_str() {
"and" | "or" | "gte" | "lte" => {
format!(
"({} {} {})",
Self::display_scalar_expr(&x.arguments[0]),
x.func_name,
Self::display_scalar_expr(&x.arguments[1])
)
}
_ => format!("{:?}", x),
},
ScalarExpr::CastExpr(x) => format!("{:?}", x),
ScalarExpr::SubqueryExpr(x) => format!("{:?}", x),
ScalarExpr::UDFServerCall(x) => format!("{:?}", x),
}
}

fn push_down_filters(s_expr: &mut SExpr, filters: &[ScalarExpr]) -> Result<()> {
match s_expr.plan() {
RelOperator::Scan(s) => {
let mut new_scan = s.clone();
info!("push down {} filters:", filters.len());
for filter in filters {
info!("{}", Self::display_scalar_expr(filter));
}
if let Some(preds) = new_scan.push_down_predicates {
new_scan.push_down_predicates =
Some(preds.iter().chain(filters).cloned().collect());
Expand Down
17 changes: 11 additions & 6 deletions src/query/sql/src/planner/expression_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,11 @@ use crate::BaseTableColumn;
use crate::ColumnEntry;
use crate::IdentifierNormalizer;
use crate::Metadata;
use crate::MetadataRef;
use crate::ScalarExpr;
use crate::Visibility;

pub fn parse_exprs(
ctx: Arc<dyn TableContext>,
table_meta: Arc<dyn Table>,
sql: &str,
) -> Result<Vec<Expr>> {
let settings = Settings::create("".to_string());
pub fn bind_one_table(table_meta: Arc<dyn Table>) -> Result<(BindContext, MetadataRef)> {
let mut bind_context = BindContext::new();
let metadata = Arc::new(RwLock::new(Metadata::default()));
let table_index = metadata.write().add_table(
Expand Down Expand Up @@ -111,7 +107,16 @@ pub fn parse_exprs(

bind_context.add_column_binding(column_binding);
}
Ok((bind_context, metadata))
}

pub fn parse_exprs(
ctx: Arc<dyn TableContext>,
table_meta: Arc<dyn Table>,
sql: &str,
) -> Result<Vec<Expr>> {
let (mut bind_context, metadata) = bind_one_table(table_meta)?;
let settings = Settings::create("".to_string());
let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?;
let mut type_checker = TypeChecker::new(
&mut bind_context,
Expand Down
29 changes: 29 additions & 0 deletions src/query/sql/src/planner/plans/scalar_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,35 @@ impl ScalarExpr {
ScalarExpr::CastExpr(expr) => expr.argument.evaluable(),
}
}

pub fn try_project_column_binding(
&self,
f: impl Fn(&ColumnBinding) -> Option<ColumnBinding> + Copy,
) -> Option<Self> {
match self {
ScalarExpr::BoundColumnRef(expr) => f(&expr.column).map(|x| {
ScalarExpr::BoundColumnRef(BoundColumnRef {
span: None,
column: x,
})
}),
ScalarExpr::FunctionCall(expr) => {
// Any of the arguments return None, then return None
let arguments = expr
.arguments
.iter()
.map(|x| x.try_project_column_binding(f))
.collect::<Option<Vec<_>>>()?;
Some(ScalarExpr::FunctionCall(FunctionCall {
span: None,
func_name: expr.func_name.clone(),
params: expr.params.clone(),
arguments,
}))
}
_ => None,
}
}
}

impl From<BoundColumnRef> for ScalarExpr {
Expand Down

0 comments on commit 5629ec0

Please sign in to comment.