diff --git a/src/query/expression/src/evaluator.rs b/src/query/expression/src/evaluator.rs index 265922bf829a1..f245e5436616f 100644 --- a/src/query/expression/src/evaluator.rs +++ b/src/query/expression/src/evaluator.rs @@ -12,8 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cell::RefCell; +use std::collections::hash_map::Entry; use std::collections::HashMap; use std::ops::Not; +use std::rc::Rc; use common_arrow::arrow::bitmap; use common_arrow::arrow::bitmap::Bitmap; @@ -60,6 +63,8 @@ pub struct Evaluator<'a> { input_columns: &'a DataBlock, func_ctx: &'a FunctionContext, fn_registry: &'a FunctionRegistry, + #[allow(clippy::type_complexity)] + cached_values: Option>>>>, } impl<'a> Evaluator<'a> { @@ -72,6 +77,16 @@ impl<'a> Evaluator<'a> { input_columns, func_ctx, fn_registry, + cached_values: None, + } + } + + pub fn with_cache(self) -> Self { + Self { + input_columns: self.input_columns, + func_ctx: self.func_ctx, + fn_registry: self.fn_registry, + cached_values: Some(Rc::new(RefCell::new(HashMap::new()))), } } @@ -89,6 +104,25 @@ impl<'a> Evaluator<'a> { } } + pub fn run_exprs(&self, exprs: &[Expr]) -> Result> { + let mut columns: Vec = Vec::with_capacity(exprs.len()); + for expr in exprs { + // try get result from cache + if let Some(cached_values) = &self.cached_values { + if let Some(cached_result) = cached_values.borrow().get(expr) { + let col = BlockEntry::new(expr.data_type().clone(), cached_result.clone()); + columns.push(col); + continue; + } + } + + let result = self.run(expr)?; + let col = BlockEntry::new(expr.data_type().clone(), result); + columns.push(col); + } + Ok(columns) + } + pub fn run(&self, expr: &Expr) -> Result> { self.partial_run(expr, None) } @@ -103,6 +137,13 @@ impl<'a> Evaluator<'a> { #[cfg(debug_assertions)] self.check_expr(expr); + // try get result from cache + if let Some(cached_values) = &self.cached_values { + if let Some(cached_result) = cached_values.borrow().get(expr) { + return Ok(cached_result.clone()); + } + } + let result = match expr { Expr::Constant { scalar, .. } => Ok(Value::Scalar(scalar.clone())), Expr::ColumnRef { id, .. } => Ok(self.input_columns.get_by_offset(*id).value.clone()), @@ -204,6 +245,21 @@ impl<'a> Evaluator<'a> { RECURSING.store(false, Ordering::SeqCst); } } + + // Do not cache `ColumnRef` and `Constant` + if !matches!(expr, Expr::ColumnRef {..} | Expr::Constant{..}) && let Some(cached_values) = &self.cached_values { + if let Ok(r) = &result { + // Prepare data that lock used. + let expr_cloned = expr.clone(); + let result_cloned = r.clone(); + + // Acquire the write lock and insert the value. + if let Entry::Vacant(v) = cached_values.borrow_mut().entry(expr_cloned) { + v.insert(result_cloned); + } + } + } + result } diff --git a/src/query/expression/src/expression.rs b/src/query/expression/src/expression.rs index 1762063491abb..c68b81d93efb5 100644 --- a/src/query/expression/src/expression.rs +++ b/src/query/expression/src/expression.rs @@ -104,7 +104,6 @@ pub enum Expr { #[educe(Hash(ignore), PartialEq(ignore), Eq(ignore))] span: Span, id: FunctionID, - #[educe(Hash(ignore), PartialEq(ignore), Eq(ignore))] function: Arc, generics: Vec, args: Vec>, diff --git a/src/query/expression/src/function.rs b/src/query/expression/src/function.rs index e5b8ec3daca0c..ad186bc0a62ef 100755 --- a/src/query/expression/src/function.rs +++ b/src/query/expression/src/function.rs @@ -24,6 +24,7 @@ use common_arrow::arrow::bitmap::MutableBitmap; use common_exception::ErrorCode; use common_exception::Result; use common_exception::Span; +use educe::Educe; use enum_as_inner::EnumAsInner; use itertools::Itertools; use serde::Deserialize; @@ -52,12 +53,15 @@ pub type AutoCastRules<'a> = &'a [(DataType, DataType)]; pub trait FunctionFactory = Fn(&[usize], &[DataType]) -> Option> + Send + Sync + 'static; +#[derive(Educe)] +#[educe(PartialEq, Eq, Hash)] pub struct Function { pub signature: FunctionSignature, + #[educe(PartialEq(ignore), Eq(ignore), Hash(ignore))] pub eval: FunctionEval, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct FunctionSignature { pub name: String, pub args_type: Vec, diff --git a/src/query/expression/src/lib.rs b/src/query/expression/src/lib.rs index 45a90d2c1c9d7..da9f613028145 100755 --- a/src/query/expression/src/lib.rs +++ b/src/query/expression/src/lib.rs @@ -34,6 +34,7 @@ #![allow(incomplete_features)] #![feature(int_roundings)] #![feature(trait_upcasting)] +#![feature(let_chains)] #[allow(dead_code)] mod block; diff --git a/src/query/service/src/interpreters/interpreter_index_refresh.rs b/src/query/service/src/interpreters/interpreter_index_refresh.rs index 2c151e36c3bbd..568cb4082cf1a 100644 --- a/src/query/service/src/interpreters/interpreter_index_refresh.rs +++ b/src/query/service/src/interpreters/interpreter_index_refresh.rs @@ -301,13 +301,11 @@ impl Interpreter for RefreshIndexInterpreter { let index = input_schema.index_of(field.name())?; projections.push(index); } - let num_input_columns = input_schema.num_fields(); let func_ctx = self.ctx.get_function_context()?; build_res.main_pipeline.add_transform(|input, output| { Ok(ProcessorPtr::create(CompoundBlockOperator::create( input, output, - num_input_columns, func_ctx.clone(), vec![BlockOperator::Project { projection: projections.clone(), diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index bb95ebfc46f4c..55937c232853f 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -1137,12 +1137,10 @@ impl PipelineBuilder { let index = column_binding.index; projections.push(input_schema.index_of(index.to_string().as_str())?); } - let num_input_columns = input_schema.num_fields(); pipeline.add_transform(|input, output| { Ok(ProcessorPtr::create(CompoundBlockOperator::create( input, output, - num_input_columns, func_ctx.clone(), vec![BlockOperator::Project { projection: projections.clone(), @@ -1216,12 +1214,10 @@ impl PipelineBuilder { let ops = vec![BlockOperator::Project { projection }]; let func_ctx = self.ctx.get_function_context()?; - let num_input_columns = schema.num_fields(); self.main_pipeline.add_transform(|input, output| { Ok(ProcessorPtr::create(CompoundBlockOperator::create( input, output, - num_input_columns, func_ctx.clone(), ops.clone(), ))) @@ -1278,7 +1274,6 @@ impl PipelineBuilder { )) })?; - let num_input_columns = filter.input.output_schema()?.num_fields(); self.main_pipeline.add_transform(|input, output| { let transform = CompoundBlockOperator::new( vec![BlockOperator::Filter { @@ -1286,7 +1281,6 @@ impl PipelineBuilder { expr: predicate.clone(), }], self.ctx.get_function_context()?, - num_input_columns, ); if self.enable_profiling { @@ -1311,13 +1305,10 @@ impl PipelineBuilder { self.build_pipeline(&project.input)?; let func_ctx = self.ctx.get_function_context()?; - let num_input_columns = project.input.output_schema()?.num_fields(); - self.main_pipeline.add_transform(|input, output| { Ok(ProcessorPtr::create(CompoundBlockOperator::create( input, output, - num_input_columns, func_ctx.clone(), vec![BlockOperator::Project { projection: project.projections.clone(), @@ -1329,7 +1320,6 @@ impl PipelineBuilder { fn build_eval_scalar(&mut self, eval_scalar: &EvalScalar) -> Result<()> { self.build_pipeline(&eval_scalar.input)?; - let input_schema = eval_scalar.input.output_schema()?; let exprs = eval_scalar .exprs .iter() @@ -1347,11 +1337,8 @@ impl PipelineBuilder { let func_ctx = self.ctx.get_function_context()?; - let num_input_columns = input_schema.num_fields(); - self.main_pipeline.add_transform(|input, output| { - let transform = - CompoundBlockOperator::new(vec![op.clone()], func_ctx.clone(), num_input_columns); + let transform = CompoundBlockOperator::new(vec![op.clone()], func_ctx.clone()); if self.enable_profiling { Ok(ProcessorPtr::create(TransformProfileWrapper::create( @@ -1385,11 +1372,8 @@ impl PipelineBuilder { let func_ctx = self.ctx.get_function_context()?; - let num_input_columns = project_set.input.output_schema()?.num_fields(); - self.main_pipeline.add_transform(|input, output| { - let transform = - CompoundBlockOperator::new(vec![op.clone()], func_ctx.clone(), num_input_columns); + let transform = CompoundBlockOperator::new(vec![op.clone()], func_ctx.clone()); if self.enable_profiling { Ok(ProcessorPtr::create(TransformProfileWrapper::create( @@ -1413,14 +1397,10 @@ impl PipelineBuilder { let funcs = lambda.lambda_funcs.clone(); let op = BlockOperator::LambdaMap { funcs }; - let input_schema = lambda.input.output_schema()?; let func_ctx = self.ctx.get_function_context()?; - let num_input_columns = input_schema.num_fields(); - self.main_pipeline.add_transform(|input, output| { - let transform = - CompoundBlockOperator::new(vec![op.clone()], func_ctx.clone(), num_input_columns); + let transform = CompoundBlockOperator::new(vec![op.clone()], func_ctx.clone()); if self.enable_profiling { Ok(ProcessorPtr::create(TransformProfileWrapper::create( @@ -1889,7 +1869,6 @@ impl PipelineBuilder { Ok(ProcessorPtr::create(CompoundBlockOperator::create( input, output, - input_schema.num_fields(), self.ctx.get_function_context()?, vec![BlockOperator::Project { projection: projection.clone(), diff --git a/src/query/sql/src/evaluator/block_operator.rs b/src/query/sql/src/evaluator/block_operator.rs index 289979b700889..aa910dd84b872 100644 --- a/src/query/sql/src/evaluator/block_operator.rs +++ b/src/query/sql/src/evaluator/block_operator.rs @@ -91,15 +91,14 @@ impl BlockOperator { None => Ok(input), } } else { - for expr in exprs { - let evaluator = Evaluator::new(&input, func_ctx, &BUILTIN_FUNCTIONS); - let result = evaluator.run(expr)?; - let col = BlockEntry::new(expr.data_type().clone(), result); - input.add_column(col); - } + // Local variable run in single thread, so `Rc` and `RefCell` are enough. + let evaluator = + Evaluator::new(&input, func_ctx, &BUILTIN_FUNCTIONS).with_cache(); + let result = evaluator.run_exprs(exprs)?; + let block = DataBlock::new(result, input.num_rows()); match projections { - Some(projections) => Ok(input.project(projections)), - None => Ok(input), + Some(projections) => Ok(block.project(projections)), + None => Ok(block), } } } @@ -449,30 +448,22 @@ pub struct CompoundBlockOperator { } impl CompoundBlockOperator { - pub fn new( - operators: Vec, - ctx: FunctionContext, - input_num_columns: usize, - ) -> Self { - let operators = Self::compact_map(operators, input_num_columns); + pub fn new(operators: Vec, ctx: FunctionContext) -> Self { + let operators = Self::compact_map(operators); Self { operators, ctx } } pub fn create( input_port: Arc, output_port: Arc, - input_num_columns: usize, ctx: FunctionContext, operators: Vec, ) -> Box { - let operators = Self::compact_map(operators, input_num_columns); + let operators = Self::compact_map(operators); Transformer::::create(input_port, output_port, Self { operators, ctx }) } - pub fn compact_map( - operators: Vec, - input_num_columns: usize, - ) -> Vec { + pub fn compact_map(operators: Vec) -> Vec { let mut results = Vec::with_capacity(operators.len()); for op in operators { @@ -496,7 +487,7 @@ impl CompoundBlockOperator { } } - crate::evaluator::cse::apply_cse(results, input_num_columns) + results } } diff --git a/src/query/sql/src/evaluator/cse.rs b/src/query/sql/src/evaluator/cse.rs deleted file mode 100644 index fcf1589cb1d27..0000000000000 --- a/src/query/sql/src/evaluator/cse.rs +++ /dev/null @@ -1,184 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; - -use common_expression::Expr; -use log::info; - -use super::BlockOperator; -use crate::optimizer::ColumnSet; - -/// Eliminate common expression in `Map` operator -pub fn apply_cse( - operators: Vec, - mut input_num_columns: usize, -) -> Vec { - let mut results = Vec::with_capacity(operators.len()); - - for op in operators { - match op { - BlockOperator::Map { exprs, projections } => { - // find common expression - let mut cse_counter = HashMap::new(); - for expr in exprs.iter() { - count_expressions(expr, &mut cse_counter); - } - - let mut cse_candidates: Vec = cse_counter - .iter() - .filter(|(_, count)| **count > 1) - .map(|(expr, _)| expr.clone()) - .collect(); - - // Make sure the smaller expr goes firstly - cse_candidates.sort_by_key(|a| a.sql_display().len()); - - let mut temp_var_counter = input_num_columns; - if !cse_candidates.is_empty() { - let mut new_exprs = Vec::new(); - let mut cse_replacements = HashMap::new(); - - let candidates_nums = cse_candidates.len(); - for cse_candidate in &cse_candidates { - let temp_var = format!("__temp_cse_{}", temp_var_counter); - let temp_expr = Expr::ColumnRef { - span: None, - id: temp_var_counter, - data_type: cse_candidate.data_type().clone(), - display_name: temp_var.clone(), - }; - - let mut expr_cloned = cse_candidate.clone(); - perform_cse_replacement(&mut expr_cloned, &cse_replacements); - - info!( - "cse_candidate: {}, temp_expr: {}", - expr_cloned.sql_display(), - temp_expr.sql_display() - ); - - new_exprs.push(expr_cloned); - cse_replacements.insert(cse_candidate.sql_display(), temp_expr); - temp_var_counter += 1; - } - - let mut new_projections: Option = - projections.as_ref().map(|projections| { - projections - .iter() - .filter(|idx| **idx < input_num_columns) - .copied() - .collect::() - }); - - let has_projections = new_projections.is_some(); - for mut expr in exprs { - perform_cse_replacement(&mut expr, &cse_replacements); - new_exprs.push(expr); - - if has_projections { - // Safe to unwrap(). - if projections - .as_ref() - .unwrap() - .contains(&(temp_var_counter - candidates_nums)) - { - new_projections.as_mut().unwrap().insert(temp_var_counter); - } - } - temp_var_counter += 1; - } - - results.push(BlockOperator::Map { - exprs: new_exprs, - projections: new_projections, - }); - } else { - results.push(BlockOperator::Map { exprs, projections }); - } - } - BlockOperator::Project { projection } => { - input_num_columns = projection.len(); - results.push(BlockOperator::Project { projection }); - } - _ => results.push(op), - } - } - - results -} - -/// `count_expressions` recursively counts the occurrences of expressions in an expression tree -/// and stores the count in a HashMap. -fn count_expressions(expr: &Expr, counter: &mut HashMap) { - match expr { - Expr::FunctionCall { args, .. } => { - let entry = counter.entry(expr.clone()).or_insert(0); - *entry += 1; - - for arg in args { - count_expressions(arg, counter); - } - } - Expr::Cast { - expr: inner_expr, .. - } => { - let entry = counter.entry(expr.clone()).or_insert(0); - *entry += 1; - - count_expressions(inner_expr, counter); - } - // ignore constant and column ref - Expr::Constant { .. } | Expr::ColumnRef { .. } => {} - Expr::UDFServerCall { args, .. } => { - let entry = counter.entry(expr.clone()).or_insert(0); - *entry += 1; - - for arg in args { - count_expressions(arg, counter); - } - } - } -} - -// `perform_cse_replacement` performs common subexpression elimination (CSE) on an expression tree -// by replacing subexpressions that appear multiple times with a single shared expression. -fn perform_cse_replacement(expr: &mut Expr, cse_replacements: &HashMap) { - // If expr itself is a key in cse_replacements, return the replaced expression. - if let Some(replacement) = cse_replacements.get(&expr.sql_display()) { - *expr = replacement.clone(); - return; - } - - match expr { - Expr::Cast { - expr: inner_expr, .. - } => { - perform_cse_replacement(inner_expr.as_mut(), cse_replacements); - } - Expr::FunctionCall { args, .. } => { - for arg in args.iter_mut() { - perform_cse_replacement(arg, cse_replacements); - } - } - // ignore constant and column ref - Expr::Constant { .. } | Expr::ColumnRef { .. } => {} - Expr::UDFServerCall { args, .. } => { - for arg in args.iter_mut() { - perform_cse_replacement(arg, cse_replacements); - } - } - } -} diff --git a/src/query/sql/src/evaluator/mod.rs b/src/query/sql/src/evaluator/mod.rs index 81ed63910f626..3d1eea176933e 100644 --- a/src/query/sql/src/evaluator/mod.rs +++ b/src/query/sql/src/evaluator/mod.rs @@ -15,8 +15,6 @@ // TODO(leiysky): move this crate to common-pipeline-core mod block_operator; -mod cse; pub use block_operator::BlockOperator; pub use block_operator::CompoundBlockOperator; -pub use cse::apply_cse; diff --git a/src/query/sql/tests/block_operator.rs b/src/query/sql/tests/block_operator.rs deleted file mode 100644 index 36bb083d3fc83..0000000000000 --- a/src/query/sql/tests/block_operator.rs +++ /dev/null @@ -1,121 +0,0 @@ -// Copyright 2022 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use common_expression::type_check::check; -use common_expression::types::DataType; -use common_expression::types::NumberDataType; -use common_expression::types::NumberScalar; -use common_expression::DataField; -use common_expression::DataSchemaRefExt; -use common_expression::Expr; -use common_expression::RawExpr; -use common_expression::Scalar; -use common_functions::BUILTIN_FUNCTIONS; -use common_sql::evaluator::apply_cse; -use common_sql::evaluator::BlockOperator; -use common_sql::optimizer::ColumnSet; -use itertools::Itertools; - -#[test] -fn test_cse() { - let schema = DataSchemaRefExt::create(vec![DataField::new( - "a", - DataType::Number(NumberDataType::Int32), - )]); - - // a + 1, (a + 1) *2 - let exprs = vec![ - RawExpr::FunctionCall { - span: None, - name: "plus".to_string(), - params: vec![], - args: vec![ - RawExpr::ColumnRef { - span: None, - id: 0usize, - data_type: schema.field(0).data_type().clone(), - display_name: schema.field(0).name().clone(), - }, - RawExpr::Constant { - span: None, - scalar: Scalar::Number(NumberScalar::UInt64(1)), - }, - ], - }, - RawExpr::FunctionCall { - span: None, - name: "multiply".to_string(), - params: vec![], - args: vec![ - RawExpr::FunctionCall { - span: None, - name: "plus".to_string(), - params: vec![], - args: vec![ - RawExpr::ColumnRef { - span: None, - id: 0usize, - data_type: schema.field(0).data_type().clone(), - display_name: schema.field(0).name().clone(), - }, - RawExpr::Constant { - span: None, - scalar: Scalar::Number(NumberScalar::UInt64(1)), - }, - ], - }, - RawExpr::Constant { - span: None, - scalar: Scalar::Number(NumberScalar::UInt64(2)), - }, - ], - }, - ]; - - let exprs: Vec = exprs - .iter() - .map(|expr| check(expr, &BUILTIN_FUNCTIONS).unwrap()) - .collect(); - - let mut projections = ColumnSet::new(); - projections.insert(1); - projections.insert(2); - let operators = vec![BlockOperator::Map { - exprs, - projections: Some(projections), - }]; - - let mut operators = apply_cse(operators, 1); - - assert_eq!(operators.len(), 1); - - match operators.pop().unwrap() { - BlockOperator::Map { exprs, projections } => { - assert_eq!(exprs.len(), 3); - assert_eq!(exprs[0].sql_display(), "a + 1"); - assert_eq!(exprs[1].sql_display(), "__temp_cse_1"); - assert_eq!(exprs[2].sql_display(), "__temp_cse_1 * 2"); - assert_eq!( - projections - .unwrap() - .into_iter() - .sorted() - .collect::>(), - vec![2, 3] - ); - } - - _ => unreachable!(), - } -} diff --git a/src/query/storages/fuse/src/operations/append.rs b/src/query/storages/fuse/src/operations/append.rs index 78974d806c8ff..51aa7aaf7e50a 100644 --- a/src/query/storages/fuse/src/operations/append.rs +++ b/src/query/storages/fuse/src/operations/append.rs @@ -95,13 +95,11 @@ impl FuseTable { let operators = cluster_stats_gen.operators.clone(); if !operators.is_empty() { - let num_input_columns = self.table_info.schema().fields().len(); let func_ctx2 = cluster_stats_gen.func_ctx.clone(); pipeline.add_transform(move |input, output| { Ok(ProcessorPtr::create(CompoundBlockOperator::create( input, output, - num_input_columns, func_ctx2.clone(), operators.clone(), ))) diff --git a/src/query/storages/fuse/src/operations/read_data.rs b/src/query/storages/fuse/src/operations/read_data.rs index 12eab2c274f03..aef4ac15d7e32 100644 --- a/src/query/storages/fuse/src/operations/read_data.rs +++ b/src/query/storages/fuse/src/operations/read_data.rs @@ -119,13 +119,8 @@ impl FuseTable { let func_ctx = query_ctx.get_function_context()?; pipeline.add_transform(|input, output| { - let transform = CompoundBlockOperator::create( - input, - output, - num_input_columns, - func_ctx.clone(), - ops.clone(), - ); + let transform = + CompoundBlockOperator::create(input, output, func_ctx.clone(), ops.clone()); Ok(ProcessorPtr::create(transform)) })?; } diff --git a/src/query/storages/fuse/src/operations/recluster.rs b/src/query/storages/fuse/src/operations/recluster.rs index f3136b3a634b4..f2bad1017cf3b 100644 --- a/src/query/storages/fuse/src/operations/recluster.rs +++ b/src/query/storages/fuse/src/operations/recluster.rs @@ -223,13 +223,11 @@ impl FuseTable { self.get_cluster_stats_gen(ctx.clone(), mutator.level + 1, block_thresholds)?; let operators = cluster_stats_gen.operators.clone(); if !operators.is_empty() { - let num_input_columns = self.table_info.schema().fields().len(); let func_ctx2 = cluster_stats_gen.func_ctx.clone(); pipeline.add_transform(move |input, output| { Ok(ProcessorPtr::create(CompoundBlockOperator::create( input, output, - num_input_columns, func_ctx2.clone(), operators.clone(), )))