Skip to content

Commit

Permalink
chore(planner): fix push down filter scan (#14415)
Browse files Browse the repository at this point in the history
* fix push down filter scan

* test: update explain agg index sqllogictest
  • Loading branch information
Dousir9 authored Jan 22, 2024
1 parent 79e75b6 commit 17dc68f
Show file tree
Hide file tree
Showing 10 changed files with 216 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,15 @@ impl RulePushDownFilterScan {
}
}

// Using the columns of the source table to replace the columns in the view,
// this allows us to perform push-down filtering operations at the storage layer.
fn replace_view_column(
// Replace columns in a predicate.
// If replace_view is true, we will use the columns of the source table to replace the columns in
// the view, this allows us to perform push-down filtering operations at the storage layer.
// If replace_view is false, we will replace column alias name with original column name.
fn replace_predicate_column(
predicate: &ScalarExpr,
table_entries: &[TableEntry],
column_entries: &[ColumnEntry],
column_entries: &[&ColumnEntry],
replace_view: bool,
) -> Result<ScalarExpr> {
match predicate {
ScalarExpr::BoundColumnRef(column) => {
Expand All @@ -97,20 +100,24 @@ impl RulePushDownFilterScan {
.iter()
.find(|table_entry| table_entry.index() == base_column.table_index)
{
let column_binding = ColumnBindingBuilder::new(
let mut column_binding_builder = ColumnBindingBuilder::new(
base_column.column_name.clone(),
base_column.column_index,
column.column.data_type.clone(),
column.column.visibility.clone(),
)
.table_name(Some(table_entry.name().to_string()))
.database_name(Some(table_entry.database().to_string()))
.table_index(Some(table_entry.index()))
.virtual_computed_expr(column.column.virtual_computed_expr.clone())
.build();
.table_index(Some(table_entry.index()));

if replace_view {
column_binding_builder = column_binding_builder
.virtual_computed_expr(column.column.virtual_computed_expr.clone());
}

let bound_column_ref = BoundColumnRef {
span: column.span,
column: column_binding,
column: column_binding_builder.build(),
};
return Ok(ScalarExpr::BoundColumnRef(bound_column_ref));
}
Expand All @@ -124,7 +131,12 @@ impl RulePushDownFilterScan {
.args
.iter()
.map(|arg| {
Self::replace_view_column(arg, table_entries, column_entries)
Self::replace_predicate_column(
arg,
table_entries,
column_entries,
replace_view,
)
})
.collect::<Result<Vec<ScalarExpr>>>()?;

Expand All @@ -138,15 +150,23 @@ impl RulePushDownFilterScan {
})
}
WindowFuncType::LagLead(ll) => {
let new_arg =
Self::replace_view_column(&ll.arg, table_entries, column_entries)?;
let new_default =
match ll.default.clone().map(|d| {
Self::replace_view_column(&d, table_entries, column_entries)
}) {
None => None,
Some(d) => Some(Box::new(d?)),
};
let new_arg = Self::replace_predicate_column(
&ll.arg,
table_entries,
column_entries,
replace_view,
)?;
let new_default = match ll.default.clone().map(|d| {
Self::replace_predicate_column(
&d,
table_entries,
column_entries,
replace_view,
)
}) {
None => None,
Some(d) => Some(Box::new(d?)),
};
WindowFuncType::LagLead(LagLeadFunction {
is_lag: ll.is_lag,
arg: Box::new(new_arg),
Expand All @@ -156,8 +176,12 @@ impl RulePushDownFilterScan {
})
}
WindowFuncType::NthValue(func) => {
let new_arg =
Self::replace_view_column(&func.arg, table_entries, column_entries)?;
let new_arg = Self::replace_predicate_column(
&func.arg,
table_entries,
column_entries,
replace_view,
)?;
WindowFuncType::NthValue(NthValueFunction {
n: func.n,
arg: Box::new(new_arg),
Expand All @@ -170,15 +194,26 @@ impl RulePushDownFilterScan {
let partition_by = window
.partition_by
.iter()
.map(|arg| Self::replace_view_column(arg, table_entries, column_entries))
.map(|arg| {
Self::replace_predicate_column(
arg,
table_entries,
column_entries,
replace_view,
)
})
.collect::<Result<Vec<ScalarExpr>>>()?;

let order_by = window
.order_by
.iter()
.map(|item| {
let replaced_scalar =
Self::replace_view_column(&item.expr, table_entries, column_entries)?;
let replaced_scalar = Self::replace_predicate_column(
&item.expr,
table_entries,
column_entries,
replace_view,
)?;
Ok(WindowOrderBy {
expr: replaced_scalar,
asc: item.asc,
Expand All @@ -200,7 +235,14 @@ impl RulePushDownFilterScan {
let args = agg_func
.args
.iter()
.map(|arg| Self::replace_view_column(arg, table_entries, column_entries))
.map(|arg| {
Self::replace_predicate_column(
arg,
table_entries,
column_entries,
replace_view,
)
})
.collect::<Result<Vec<ScalarExpr>>>()?;

Ok(ScalarExpr::AggregateFunction(AggregateFunction {
Expand All @@ -216,7 +258,14 @@ impl RulePushDownFilterScan {
let args = lambda_func
.args
.iter()
.map(|arg| Self::replace_view_column(arg, table_entries, column_entries))
.map(|arg| {
Self::replace_predicate_column(
arg,
table_entries,
column_entries,
replace_view,
)
})
.collect::<Result<Vec<ScalarExpr>>>()?;

Ok(ScalarExpr::LambdaFunction(LambdaFunc {
Expand All @@ -232,7 +281,14 @@ impl RulePushDownFilterScan {
let arguments = func
.arguments
.iter()
.map(|arg| Self::replace_view_column(arg, table_entries, column_entries))
.map(|arg| {
Self::replace_predicate_column(
arg,
table_entries,
column_entries,
replace_view,
)
})
.collect::<Result<Vec<ScalarExpr>>>()?;

Ok(ScalarExpr::FunctionCall(FunctionCall {
Expand All @@ -243,7 +299,12 @@ impl RulePushDownFilterScan {
}))
}
ScalarExpr::CastExpr(cast) => {
let arg = Self::replace_view_column(&cast.argument, table_entries, column_entries)?;
let arg = Self::replace_predicate_column(
&cast.argument,
table_entries,
column_entries,
replace_view,
)?;
Ok(ScalarExpr::CastExpr(CastExpr {
span: cast.span,
is_try: cast.is_try,
Expand All @@ -255,7 +316,14 @@ impl RulePushDownFilterScan {
let arguments = udf
.arguments
.iter()
.map(|arg| Self::replace_view_column(arg, table_entries, column_entries))
.map(|arg| {
Self::replace_predicate_column(
arg,
table_entries,
column_entries,
replace_view,
)
})
.collect::<Result<Vec<ScalarExpr>>>()?;

Ok(ScalarExpr::UDFServerCall(UDFServerCall {
Expand All @@ -273,17 +341,25 @@ impl RulePushDownFilterScan {
}
}

fn find_push_down_predicates(&self, predicates: &[ScalarExpr]) -> Result<Vec<ScalarExpr>> {
fn find_push_down_predicates(
&self,
predicates: &[ScalarExpr],
scan: &Scan,
) -> Result<Vec<ScalarExpr>> {
let metadata = self.metadata.read();
let column_entries = metadata.columns();
let column_entries = scan
.columns
.iter()
.map(|index| metadata.column(*index))
.collect::<Vec<_>>();
let table_entries = metadata.tables();
let is_source_of_view = table_entries.iter().any(|t| t.is_source_of_view());

let mut filtered_predicates = vec![];
for predicate in predicates {
let used_columns = predicate.used_columns();
let mut contain_derived_column = false;
for column_entry in column_entries {
for column_entry in column_entries.iter() {
if let ColumnEntry::DerivedColumn(column) = column_entry {
// Don't push down predicate that contains derived column
// Because storage can't know such columns.
Expand All @@ -294,13 +370,13 @@ impl RulePushDownFilterScan {
}
}
if !contain_derived_column {
if is_source_of_view {
let new_predicate =
Self::replace_view_column(predicate, table_entries, column_entries)?;
filtered_predicates.push(new_predicate);
} else {
filtered_predicates.push(predicate.clone());
}
let predicate = Self::replace_predicate_column(
predicate,
table_entries,
&column_entries,
is_source_of_view,
)?;
filtered_predicates.push(predicate);
}
}

Expand All @@ -315,18 +391,18 @@ impl Rule for RulePushDownFilterScan {

fn apply(&self, s_expr: &SExpr, state: &mut TransformResult) -> Result<()> {
let filter: Filter = s_expr.plan().clone().try_into()?;
let mut get: Scan = s_expr.child(0)?.plan().clone().try_into()?;
let mut scan: Scan = s_expr.child(0)?.plan().clone().try_into()?;

let add_filters = self.find_push_down_predicates(&filter.predicates)?;
let add_filters = self.find_push_down_predicates(&filter.predicates, &scan)?;

match get.push_down_predicates.as_mut() {
match scan.push_down_predicates.as_mut() {
Some(vs) => vs.extend(add_filters),
None => get.push_down_predicates = Some(add_filters),
None => scan.push_down_predicates = Some(add_filters),
}

let mut result = SExpr::create_unary(
Arc::new(filter.into()),
Arc::new(SExpr::create_leaf(Arc::new(get.into()))),
Arc::new(SExpr::create_leaf(Arc::new(scan.into()))),
);
result.set_applied_rule(&self.id);
state.add_result(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ Filter
│ ├── read bytes: 0
│ ├── partitions total: 0
│ ├── partitions scanned: 0
│ ├── push downs: [filters: [t.b (#4) > 3], limit: NONE]
│ ├── push downs: [filters: [t1.b (#4) > 3], limit: NONE]
│ ├── aggregating index: [SELECT b, SUM(a) FROM test_index_db.t1 WHERE (b > 3) GROUP BY b]
│ ├── rewritten query: [selection: [index_col_0 (#0), index_col_1 (#1)]]
│ └── estimated rows: 0.00
Expand Down
Loading

0 comments on commit 17dc68f

Please sign in to comment.