Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query): new filter execution framework #13846

Merged
merged 88 commits into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
88 commits
Select commit Hold shift + click to select a range
d52f450
new filter exection framework
Dousir9 Nov 29, 2023
858975f
Merge branch 'main' of github.com:datafuselabs/databend into improve_…
Dousir9 Nov 29, 2023
ce26926
refine
Dousir9 Nov 29, 2023
ebc756c
refine
Dousir9 Nov 29, 2023
1e9c598
remove useless
Dousir9 Nov 29, 2023
930d04b
refine test
Dousir9 Nov 29, 2023
b5aa8bb
vectorized
Dousir9 Nov 29, 2023
9184963
fix
Dousir9 Nov 29, 2023
720c67c
make lint
Dousir9 Nov 29, 2023
c13e9b9
fix
Dousir9 Nov 29, 2023
5351e6b
improve process_and and process_or
Dousir9 Nov 30, 2023
2276457
introduce different strategy to generate datablock
Dousir9 Nov 30, 2023
1717d8c
Merge branch 'main' of github.com:datafuselabs/databend into improve_…
Dousir9 Nov 30, 2023
d61ef36
support column ref and constant
Dousir9 Dec 1, 2023
0cedf21
remove selection_op_ref
Dousir9 Dec 1, 2023
f126530
fix BooleanConstant
Dousir9 Dec 1, 2023
96fd3fd
remove println
Dousir9 Dec 1, 2023
7e06f24
fix cast
Dousir9 Dec 4, 2023
8383896
Merge branch 'main' of github.com:datafuselabs/databend into improve_…
Dousir9 Dec 5, 2023
1f0e353
refactor
Dousir9 Dec 5, 2023
a85719c
add util
Dousir9 Dec 5, 2023
203d771
fix Generic
Dousir9 Dec 5, 2023
3cac72b
fix function return type
Dousir9 Dec 6, 2023
5ea0f83
Merge branch 'main' of github.com:datafuselabs/databend into improve_…
Dousir9 Dec 6, 2023
ff5c2e9
fix process_or and process_and
Dousir9 Dec 6, 2023
67ec5f4
reuse selection_range
Dousir9 Dec 6, 2023
b28f20e
Merge branch 'main' of github.com:datafuselabs/databend into improve_…
Dousir9 Dec 6, 2023
bba67e1
fix when process_or and process_and if count == 0
Dousir9 Dec 6, 2023
866fabb
support case when
Dousir9 Dec 7, 2023
a9117f3
Merge branch 'main' of github.com:datafuselabs/databend into improve_…
Dousir9 Dec 7, 2023
ade2d8a
merge
Dousir9 Dec 7, 2023
aa0d636
Merge branch 'main' of github.com:datafuselabs/databend into improve_…
Dousir9 Dec 8, 2023
1329543
fix test
Dousir9 Dec 8, 2023
9da89ab
fix generics
Dousir9 Dec 8, 2023
77f7113
support LambdaFunction
Dousir9 Dec 8, 2023
de0acb4
support tuple and fix test
Dousir9 Dec 11, 2023
84e7c24
Merge branch 'main' of github.com:datafuselabs/databend into improve_…
Dousir9 Dec 11, 2023
e5dccf0
fix test
Dousir9 Dec 11, 2023
8e035ee
Merge branch 'main' of github.com:datafuselabs/databend into improve_…
Dousir9 Dec 11, 2023
62ef9a7
fix test
Dousir9 Dec 11, 2023
9f19af4
move
Dousir9 Dec 11, 2023
f163abb
support native
Dousir9 Dec 11, 2023
b8966ba
fix block size
Dousir9 Dec 12, 2023
863eb8d
Merge branch 'main' of github.com:datafuselabs/databend into improve_…
Dousir9 Dec 12, 2023
a329809
add test and fix
Dousir9 Dec 13, 2023
05196b3
fix select_scalars
Dousir9 Dec 14, 2023
43ecb0a
improve test
Dousir9 Dec 14, 2023
1555c95
refine test
Dousir9 Dec 14, 2023
43f03c6
refactor
Dousir9 Dec 14, 2023
3b4dadd
rename selection_op
Dousir9 Dec 15, 2023
58ab486
add more comments
Dousir9 Dec 15, 2023
b7d46d4
rename selection_op
Dousir9 Dec 15, 2023
faff93d
remove unreachable
Dousir9 Dec 15, 2023
0128181
Merge branch 'main' of github.com:datafuselabs/databend into improve_…
Dousir9 Dec 15, 2023
5719d6d
format
Dousir9 Dec 15, 2023
c8df799
fix test
Dousir9 Dec 15, 2023
2482a73
Merge branch 'main' of github.com:datafuselabs/databend into improve_…
Dousir9 Dec 16, 2023
417cc14
use databend_xxx
Dousir9 Dec 16, 2023
7bd1386
rename idx
Dousir9 Dec 19, 2023
2e15453
add comments for SelectExpr
Dousir9 Dec 19, 2023
5b49248
Merge branch 'main' of github.com:datafuselabs/databend into improve_…
Dousir9 Dec 19, 2023
ebaa7e7
remove useless code
Dousir9 Dec 19, 2023
a457594
rename helper.rs to selection_op.rs
Dousir9 Dec 19, 2023
60cc3f3
make lint
Dousir9 Dec 19, 2023
87dfb6e
rename selection_op to select_op
Dousir9 Dec 19, 2023
f612bfb
Merge branch 'main' of github.com:datafuselabs/databend into improve_…
Dousir9 Dec 20, 2023
89f9a66
remove data_type() and use infer_data_type()
Dousir9 Dec 20, 2023
99982b4
use select_value_type to refine code
Dousir9 Dec 21, 2023
563eca5
remove useless code
Dousir9 Dec 21, 2023
14434b3
rename count to filtered_count
Dousir9 Dec 22, 2023
61819b5
improve codes
sundy-li Dec 24, 2023
d5693d6
improve codes
sundy-li Dec 24, 2023
90b5ec6
refine code
Dousir9 Dec 26, 2023
c1bde37
fix select_scalars
Dousir9 Dec 26, 2023
00a4f1a
Merge branch 'main' of github.com:datafuselabs/databend into improve_…
Dousir9 Dec 26, 2023
37cec7c
add more test
Dousir9 Dec 28, 2023
ad24307
add default compare operation for types
Dousir9 Dec 28, 2023
a8d4e39
remove useless code
Dousir9 Dec 28, 2023
ca4fb31
improve select
Dousir9 Dec 28, 2023
c710e2f
add # Safety comments
Dousir9 Dec 28, 2023
e5223e4
Merge branch 'main' of github.com:datafuselabs/databend into improve_…
Dousir9 Dec 28, 2023
e16a808
merge
Dousir9 Dec 28, 2023
3270ac4
merge main
Dousir9 Dec 28, 2023
f5304cc
fix typo
Dousir9 Dec 28, 2023
0ccdf02
refine compare
Dousir9 Dec 29, 2023
220d9e0
Merge branch 'main' of github.com:datafuselabs/databend into improve_…
Dousir9 Dec 29, 2023
66e9b6b
Merge branch 'main' of github.com:datafuselabs/databend into improve_…
Dousir9 Dec 29, 2023
69811e6
adapt runtime filter
Dousir9 Dec 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions src/query/expression/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ impl DataBlock {
}

pub fn split_by_rows(&self, max_rows_per_block: usize) -> (Vec<Self>, Option<Self>) {
let mut res = vec![];
let mut res = Vec::with_capacity(self.num_rows / max_rows_per_block);
let mut offset = 0;
let mut remain_rows = self.num_rows;
while remain_rows >= max_rows_per_block {
Expand All @@ -297,7 +297,24 @@ impl DataBlock {
(res, remain)
}

pub fn split_by_rows_no_tail(&self, min_rows_per_block: usize) -> Vec<Self> {
pub fn split_by_rows_no_tail(&self, max_rows_per_block: usize) -> Vec<Self> {
let mut res =
Vec::with_capacity((self.num_rows + max_rows_per_block - 1) / max_rows_per_block);
let mut offset = 0;
let mut remain_rows = self.num_rows;
while remain_rows >= max_rows_per_block {
let cut = self.slice(offset..(offset + max_rows_per_block));
res.push(cut);
offset += max_rows_per_block;
remain_rows -= max_rows_per_block;
}
if remain_rows > 0 {
res.push(self.slice(offset..(offset + remain_rows)));
}
res
}

pub fn split_by_rows_if_needed_no_tail(&self, min_rows_per_block: usize) -> Vec<Self> {
let max_rows_per_block = min_rows_per_block * 2;
let mut res = vec![];
let mut offset = 0;
Expand Down
207 changes: 202 additions & 5 deletions src/query/expression/src/evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ impl<'a> Evaluator<'a> {
}
}

pub fn data_block(&self) -> &DataBlock {
self.data_block
}

pub fn func_ctx(&self) -> &FunctionContext {
self.func_ctx
}

#[cfg(debug_assertions)]
fn check_expr(&self, expr: &Expr) {
let column_refs = expr.column_refs();
Expand All @@ -95,7 +103,7 @@ impl<'a> Evaluator<'a> {

/// Run an expression partially, only the rows that are valid in the validity bitmap
/// will be evaluated, the rest will be default values and should not throw any error.
fn partial_run(&self, expr: &Expr, validity: Option<Bitmap>) -> Result<Value<AnyType>> {
pub fn partial_run(&self, expr: &Expr, validity: Option<Bitmap>) -> Result<Value<AnyType>> {
debug_assert!(
validity.is_none() || validity.as_ref().unwrap().len() == self.data_block.num_rows()
);
Expand Down Expand Up @@ -216,7 +224,7 @@ impl<'a> Evaluator<'a> {
result
}

fn run_cast(
pub fn run_cast(
&self,
span: Span,
src_type: &DataType,
Expand Down Expand Up @@ -510,7 +518,7 @@ impl<'a> Evaluator<'a> {
}
}

fn run_try_cast(
pub fn run_try_cast(
&self,
span: Span,
src_type: &DataType,
Expand Down Expand Up @@ -785,7 +793,7 @@ impl<'a> Evaluator<'a> {
// depending on the truthiness of the condition. `if` should register it's signature
// as other functions do in `FunctionRegistry`, but it's does not necessarily implement
// the eval function because it will be evaluated here.
fn eval_if(
pub fn eval_if(
&self,
args: &[Expr],
generics: &[DataType],
Expand Down Expand Up @@ -952,7 +960,7 @@ impl<'a> Evaluator<'a> {
unreachable!("expr is not a set returning function: {expr}")
}

fn run_lambda(
pub fn run_lambda(
&self,
func_name: &str,
args: Vec<Value<AnyType>>,
Expand Down Expand Up @@ -1045,6 +1053,195 @@ impl<'a> Evaluator<'a> {
}
}
}

pub fn get_children(
&self,
args: &[Expr],
validity: Option<Bitmap>,
) -> Result<Vec<(Value<AnyType>, DataType)>> {
let children = args
.iter()
.map(|expr| self.get_select_child(expr, validity.clone()))
.collect::<Result<Vec<_>>>()?;
assert!(
children
.iter()
.filter_map(|val| match &val.0 {
Value::Column(col) => Some(col.len()),
Value::Scalar(_) => None,
})
.all_equal()
);
Ok(children)
}

pub fn remove_generics_data_type(
&self,
generics: &[DataType],
data_type: &DataType,
) -> DataType {
match data_type {
DataType::Generic(index) => generics[*index].clone(),
DataType::Nullable(box DataType::Generic(index)) => {
DataType::Nullable(Box::new(generics[*index].clone()))
}
_ => data_type.clone(),
}
}

pub fn get_select_child(
&self,
expr: &Expr,
validity: Option<Bitmap>,
) -> Result<(Value<AnyType>, DataType)> {
debug_assert!(
validity.is_none() || validity.as_ref().unwrap().len() == self.data_block.num_rows()
);

#[cfg(debug_assertions)]
self.check_expr(expr);

let result = match expr {
Expr::Constant { scalar, .. } => Ok((
Value::Scalar(scalar.clone()),
scalar.as_ref().infer_data_type(),
)),
Expr::ColumnRef { id, .. } => {
let entry = self.data_block.get_by_offset(*id);
Ok((entry.value.clone(), entry.data_type.clone()))
}
Expr::Cast {
span,
is_try,
expr,
dest_type,
} => {
let value = self.get_select_child(expr, validity.clone())?.0;
if *is_try {
Ok((
self.run_try_cast(*span, expr.data_type(), dest_type, value)?,
dest_type.clone(),
))
} else {
Ok((
self.run_cast(*span, expr.data_type(), dest_type, value, validity)?,
dest_type.clone(),
))
}
}
Expr::FunctionCall {
function,
args,
generics,
..
} if function.signature.name == "if" => {
let return_type =
self.remove_generics_data_type(generics, &function.signature.return_type);
Ok((self.eval_if(args, generics, validity)?, return_type))
}

Expr::FunctionCall {
function,
args,
generics,
..
} if function.signature.name == "and_filters" => {
let return_type =
self.remove_generics_data_type(generics, &function.signature.return_type);
Ok((self.eval_and_filters(args, validity)?, return_type))
}

Expr::FunctionCall {
function,
args,
generics,
..
} => {
let args = args
.iter()
.map(|expr| self.get_select_child(expr, validity.clone()))
.collect::<Result<Vec<_>>>()?;
assert!(
args.iter()
.filter_map(|val| match &val.0 {
Value::Column(col) => Some(col.len()),
Value::Scalar(_) => None,
})
.all_equal()
);

let cols_ref = args
.iter()
.map(|(val, _)| Value::as_ref(val))
.collect::<Vec<_>>();
let mut ctx = EvalContext {
generics,
num_rows: self.data_block.num_rows(),
validity,
errors: None,
func_ctx: self.func_ctx,
};
let (_, eval) = function.eval.as_scalar().unwrap();
let result = (eval)(cols_ref.as_slice(), &mut ctx);
// ctx.render_error(*span, id.params(), &args, &function.signature.name)?;
let return_type =
self.remove_generics_data_type(generics, &function.signature.return_type);
Ok((result, return_type))
}
Expr::LambdaFunctionCall {
name,
args,
lambda_expr,
return_type,
..
} => {
let args = args
.iter()
.map(|expr| self.partial_run(expr, validity.clone()))
.collect::<Result<Vec<_>>>()?;
assert!(
args.iter()
.filter_map(|val| match val {
Value::Column(col) => Some(col.len()),
Value::Scalar(_) => None,
})
.all_equal()
);

Ok((
self.run_lambda(name, args, lambda_expr)?,
return_type.clone(),
))
}
};

#[cfg(debug_assertions)]
if result.is_err() {
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;

static RECURSING: AtomicBool = AtomicBool::new(false);
if RECURSING
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
assert_eq!(
ConstantFolder::fold_with_domain(
expr,
&self.data_block.domains().into_iter().enumerate().collect(),
self.func_ctx,
self.fn_registry
)
.1,
None,
"domain calculation should not return any domain for expressions that are possible to fail with err {}",
result.unwrap_err()
);
RECURSING.store(false, Ordering::SeqCst);
}
}
result
}
}

pub struct ConstantFolder<'a, Index: ColumnIndex> {
Expand Down
Loading
Loading