Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(planner): fix push down filter through eval scalar #13232

Merged
merged 2 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use common_exception::Result;
use crate::optimizer::rule::Rule;
use crate::optimizer::rule::RuleID;
use crate::optimizer::rule::TransformResult;
use crate::optimizer::ColumnSet;
use crate::optimizer::RelExpr;
use crate::optimizer::SExpr;
use crate::plans::AggregateFunction;
Expand All @@ -43,7 +42,7 @@ use crate::MetadataRef;
pub struct RulePushDownFilterEvalScalar {
id: RuleID,
patterns: Vec<SExpr>,
metadata: MetadataRef,
_metadata: MetadataRef,
}

impl RulePushDownFilterEvalScalar {
Expand Down Expand Up @@ -77,7 +76,7 @@ impl RulePushDownFilterEvalScalar {
))),
)),
)],
metadata,
_metadata: metadata,
}
}

Expand Down Expand Up @@ -249,61 +248,47 @@ impl Rule for RulePushDownFilterEvalScalar {
}

fn apply(&self, s_expr: &SExpr, state: &mut TransformResult) -> Result<()> {
let mut filter: Filter = s_expr.plan().clone().try_into()?;

let mut used_columns = ColumnSet::new();
for pred in filter.predicates.iter() {
used_columns = used_columns.union(&pred.used_columns()).cloned().collect();
}

let input = s_expr.child(0)?;
let filter: Filter = s_expr.plan().clone().try_into()?;
let eval_scalar: EvalScalar = s_expr.child(0)?.plan().clone().try_into()?;

let rel_expr = RelExpr::with_s_expr(input);
let eval_scalar_child_prop = rel_expr.derive_relational_prop_child(0)?;

let scalar_rel_expr = RelExpr::with_s_expr(s_expr);
let eval_scalar_prop = scalar_rel_expr.derive_relational_prop_child(0)?;

let metadata = self.metadata.read();
let table_entries = metadata.tables();
let is_source_of_view = table_entries.iter().any(|t| t.is_source_of_view());
let mut remaining_predicates = vec![];
let mut pushed_down_predicates = vec![];

// Replacing `DerivedColumn` in `Filter` with the column expression defined in the view.
// This allows us to eliminate the `EvalScalar` and push the filter down to the `Scan`.
if (used_columns.is_subset(&eval_scalar_prop.output_columns)
&& !used_columns.is_subset(&eval_scalar_child_prop.output_columns))
|| is_source_of_view
{
let new_predicates = &filter
.predicates
.iter()
.map(|predicate| Self::replace_predicate(predicate, &eval_scalar.items))
.collect::<Result<Vec<ScalarExpr>>>()?;
for pred in filter.predicates.iter() {
if pred
.used_columns()
.is_subset(&eval_scalar_prop.output_columns)
{
// Replace `BoundColumnRef` with the column expression introduced in `EvalScalar`.
let rewritten_predicate = Self::replace_predicate(pred, &eval_scalar.items)?;
pushed_down_predicates.push(rewritten_predicate);
} else {
remaining_predicates.push(pred.clone());
}
}

filter.predicates = new_predicates.to_vec();
let mut result = s_expr.child(0)?.child(0)?.clone();

used_columns.clear();
for pred in filter.predicates.iter() {
used_columns = used_columns.union(&pred.used_columns()).cloned().collect();
}
if !pushed_down_predicates.is_empty() {
let pushed_down_filter = Filter {
predicates: pushed_down_predicates,
};
result = SExpr::create_unary(Arc::new(pushed_down_filter.into()), Arc::new(result));
}

// Check if `Filter` can be satisfied by children of `EvalScalar`
if used_columns.is_subset(&eval_scalar_child_prop.output_columns) {
// TODO(leiysky): partial push down conjunctions
// For example, `select a from (select a, a+1 as b from t) where a = 1 and b = 2`
// can be optimized as `select a from (select a, a+1 as b from t where a = 1) where b = 2`
let new_expr = SExpr::create_unary(
Arc::new(eval_scalar.into()),
Arc::new(SExpr::create_unary(
Arc::new(filter.into()),
Arc::new(input.child(0)?.clone()),
)),
);
state.add_result(new_expr);
result = SExpr::create_unary(Arc::new(eval_scalar.into()), Arc::new(result));

if !remaining_predicates.is_empty() {
let remaining_filter = Filter {
predicates: remaining_predicates,
};
result = SExpr::create_unary(Arc::new(remaining_filter.into()), Arc::new(result));
result.set_applied_rule(&self.id);
}

state.add_result(result);
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ RowFetch
├── estimated rows: 0.00
└── Filter
├── output columns: [t_11831.uid (#0), t_11831.time (#3), t_11831._row_id (#4)]
├── filters: [is_true(t1.uid (#0) = 11), is_true(t_11831.time (#3) >= 1686672000000), is_true(t_11831.time (#3) <= 1686758399000)]
├── filters: [is_true(t_11831.uid (#0) = 11), is_true(t_11831.time (#3) >= 1686672000000), is_true(t_11831.time (#3) <= 1686758399000)]
├── estimated rows: 0.00
└── TableScan
├── table: default.default.t_11831
Expand All @@ -106,7 +106,7 @@ RowFetch
├── read bytes: 0
├── partitions total: 0
├── partitions scanned: 0
├── push downs: [filters: [and_filters(and_filters(and_filters(and_filters(t_11831.time (#3) >= 1686672000000, t_11831.time (#3) <= 1686758399000), t1.uid (#0) = 11), t_11831.time (#3) >= 1686672000000), t_11831.time (#3) <= 1686758399000)], limit: NONE]
├── push downs: [filters: [and_filters(and_filters(and_filters(and_filters(t_11831.time (#3) >= 1686672000000, t_11831.time (#3) <= 1686758399000), t_11831.uid (#0) = 11), t_11831.time (#3) >= 1686672000000), t_11831.time (#3) <= 1686758399000)], limit: NONE]
└── estimated rows: 0.00

statement ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ Limit
├── estimated rows: 0.20
└── Filter
├── output columns: [numbers.number (#0)]
├── filters: [numbers.b (#0) > 1]
├── filters: [numbers.number (#0) > 1]
├── estimated rows: 0.20
└── TableScan
├── table: default.system.numbers
Expand All @@ -67,7 +67,7 @@ Limit
├── read bytes: 8
├── partitions total: 1
├── partitions scanned: 1
├── push downs: [filters: [numbers.b (#0) > 1], limit: NONE]
├── push downs: [filters: [numbers.number (#0) > 1], limit: NONE]
└── estimated rows: 1.00

query T
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ explain select * from (select * from numbers(1)) as t1 where number = 1
----
Filter
├── output columns: [numbers.number (#0)]
├── filters: [t1.number (#0) = 1]
├── filters: [numbers.number (#0) = 1]
├── estimated rows: 0.20
└── TableScan
├── table: default.system.numbers
Expand All @@ -25,7 +25,7 @@ Filter
├── read bytes: 8
├── partitions total: 1
├── partitions scanned: 1
├── push downs: [filters: [t1.number (#0) = 1], limit: NONE]
├── push downs: [filters: [numbers.number (#0) = 1], limit: NONE]
└── estimated rows: 1.00

query T
Expand Down Expand Up @@ -58,7 +58,7 @@ EvalScalar
├── estimated rows: 0.20
└── Filter
├── output columns: [numbers.number (#0)]
├── filters: [t1.a (#0) = 1]
├── filters: [numbers.number (#0) = 1]
├── estimated rows: 0.20
└── TableScan
├── table: default.system.numbers
Expand All @@ -67,7 +67,7 @@ EvalScalar
├── read bytes: 8
├── partitions total: 1
├── partitions scanned: 1
├── push downs: [filters: [t1.a (#0) = 1], limit: NONE]
├── push downs: [filters: [numbers.number (#0) = 1], limit: NONE]
└── estimated rows: 1.00

query T
Expand Down
8 changes: 4 additions & 4 deletions tests/sqllogictests/suites/mode/standalone/explain/sort.test
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Sort
├── estimated rows: 0.00
└── Filter
├── output columns: [t1.a (#0)]
├── filters: [is_true(t2.a (#0) > 1)]
├── filters: [is_true(t1.a (#0) > 1)]
├── estimated rows: 0.00
└── TableScan
├── table: default.default.t1
Expand All @@ -19,7 +19,7 @@ Sort
├── read bytes: 0
├── partitions total: 0
├── partitions scanned: 0
├── push downs: [filters: [is_true(t2.a (#0) > 1)], limit: NONE]
├── push downs: [filters: [is_true(t1.a (#0) > 1)], limit: NONE]
└── estimated rows: 0.00

query T
Expand Down Expand Up @@ -52,7 +52,7 @@ Sort
├── estimated rows: 0.00
└── Filter
├── output columns: [t1.a (#0)]
├── filters: [is_true(t2.a (#0) > 1)]
├── filters: [is_true(t1.a (#0) > 1)]
├── estimated rows: 0.00
└── TableScan
├── table: default.default.t1
Expand All @@ -61,7 +61,7 @@ Sort
├── read bytes: 0
├── partitions total: 0
├── partitions scanned: 0
├── push downs: [filters: [is_true(t2.a (#0) > 1)], limit: NONE]
├── push downs: [filters: [is_true(t1.a (#0) > 1)], limit: NONE]
└── estimated rows: 0.00

statement ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ Limit
├── estimated rows: 0.20
└── Filter
├── output columns: [numbers.number (#0)]
├── filters: [numbers.b (#0) > 1]
├── filters: [numbers.number (#0) > 1]
├── estimated rows: 0.20
└── TableScan
├── table: default.system.numbers
Expand All @@ -67,7 +67,7 @@ Limit
├── read bytes: 8
├── partitions total: 1
├── partitions scanned: 1
├── push downs: [filters: [numbers.b (#0) > 1], limit: NONE]
├── push downs: [filters: [numbers.number (#0) > 1], limit: NONE]
└── estimated rows: 1.00

query T
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ explain select * from (select * from numbers(1)) as t1 where number = 1
----
Filter
├── output columns: [numbers.number (#0)]
├── filters: [t1.number (#0) = 1]
├── filters: [numbers.number (#0) = 1]
├── estimated rows: 0.20
└── TableScan
├── table: default.system.numbers
Expand All @@ -25,7 +25,7 @@ Filter
├── read bytes: 8
├── partitions total: 1
├── partitions scanned: 1
├── push downs: [filters: [t1.number (#0) = 1], limit: NONE]
├── push downs: [filters: [numbers.number (#0) = 1], limit: NONE]
└── estimated rows: 1.00

query T
Expand Down Expand Up @@ -58,7 +58,7 @@ EvalScalar
├── estimated rows: 0.20
└── Filter
├── output columns: [numbers.number (#0)]
├── filters: [t1.a (#0) = 1]
├── filters: [numbers.number (#0) = 1]
├── estimated rows: 0.20
└── TableScan
├── table: default.system.numbers
Expand All @@ -67,7 +67,7 @@ EvalScalar
├── read bytes: 8
├── partitions total: 1
├── partitions scanned: 1
├── push downs: [filters: [t1.a (#0) = 1], limit: NONE]
├── push downs: [filters: [numbers.number (#0) = 1], limit: NONE]
└── estimated rows: 1.00

query T
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Sort
├── read bytes: 0
├── partitions total: 0
├── partitions scanned: 0
├── push downs: [filters: [is_true(t2.a (#0) > 1)], limit: NONE]
├── push downs: [filters: [is_true(t1.a (#0) > 1)], limit: NONE]
└── estimated rows: 0.00

query T
Expand Down Expand Up @@ -49,7 +49,7 @@ Sort
├── read bytes: 0
├── partitions total: 0
├── partitions scanned: 0
├── push downs: [filters: [is_true(t2.a (#0) > 1)], limit: NONE]
├── push downs: [filters: [is_true(t1.a (#0) > 1)], limit: NONE]
└── estimated rows: 0.00

statement ok
Expand Down
Loading