Skip to content

Commit

Permalink
refactor apply_cse by using cached expr
Browse files Browse the repository at this point in the history
  • Loading branch information
ariesdevil committed Oct 8, 2023
1 parent 534002d commit a2fe357
Show file tree
Hide file tree
Showing 13 changed files with 79 additions and 367 deletions.
56 changes: 56 additions & 0 deletions src/query/expression/src/evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cell::RefCell;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::ops::Not;
use std::rc::Rc;

use common_arrow::arrow::bitmap;
use common_arrow::arrow::bitmap::Bitmap;
Expand Down Expand Up @@ -60,6 +63,8 @@ pub struct Evaluator<'a> {
input_columns: &'a DataBlock,
func_ctx: &'a FunctionContext,
fn_registry: &'a FunctionRegistry,
#[allow(clippy::type_complexity)]
cached_values: Option<Rc<RefCell<HashMap<Expr, Value<AnyType>>>>>,
}

impl<'a> Evaluator<'a> {
Expand All @@ -72,6 +77,16 @@ impl<'a> Evaluator<'a> {
input_columns,
func_ctx,
fn_registry,
cached_values: None,
}
}

pub fn with_cache(self) -> Self {
Self {
input_columns: self.input_columns,
func_ctx: self.func_ctx,
fn_registry: self.fn_registry,
cached_values: Some(Rc::new(RefCell::new(HashMap::new()))),
}
}

Expand All @@ -89,6 +104,25 @@ impl<'a> Evaluator<'a> {
}
}

pub fn run_exprs(&self, exprs: &[Expr]) -> Result<Vec<BlockEntry>> {
let mut columns: Vec<BlockEntry> = Vec::with_capacity(exprs.len());
for expr in exprs {
// try get result from cache
if let Some(cached_values) = &self.cached_values {
if let Some(cached_result) = cached_values.borrow().get(expr) {
let col = BlockEntry::new(expr.data_type().clone(), cached_result.clone());
columns.push(col);
continue;
}
}

let result = self.run(expr)?;
let col = BlockEntry::new(expr.data_type().clone(), result);
columns.push(col);
}
Ok(columns)
}

pub fn run(&self, expr: &Expr) -> Result<Value<AnyType>> {
self.partial_run(expr, None)
}
Expand All @@ -103,6 +137,13 @@ impl<'a> Evaluator<'a> {
#[cfg(debug_assertions)]
self.check_expr(expr);

// try get result from cache
if let Some(cached_values) = &self.cached_values {
if let Some(cached_result) = cached_values.borrow().get(expr) {
return Ok(cached_result.clone());
}
}

let result = match expr {
Expr::Constant { scalar, .. } => Ok(Value::Scalar(scalar.clone())),
Expr::ColumnRef { id, .. } => Ok(self.input_columns.get_by_offset(*id).value.clone()),
Expand Down Expand Up @@ -204,6 +245,21 @@ impl<'a> Evaluator<'a> {
RECURSING.store(false, Ordering::SeqCst);
}
}

// Do not cache `ColumnRef` and `Constant`
if !matches!(expr, Expr::ColumnRef {..} | Expr::Constant{..}) && let Some(cached_values) = &self.cached_values {
if let Ok(r) = &result {
// Prepare data that lock used.
let expr_cloned = expr.clone();
let result_cloned = r.clone();

// Acquire the write lock and insert the value.
if let Entry::Vacant(v) = cached_values.borrow_mut().entry(expr_cloned) {
v.insert(result_cloned);
}
}
}

result
}

Expand Down
1 change: 0 additions & 1 deletion src/query/expression/src/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ pub enum Expr<Index: ColumnIndex = usize> {
#[educe(Hash(ignore), PartialEq(ignore), Eq(ignore))]
span: Span,
id: FunctionID,
#[educe(Hash(ignore), PartialEq(ignore), Eq(ignore))]
function: Arc<Function>,
generics: Vec<DataType>,
args: Vec<Expr<Index>>,
Expand Down
6 changes: 5 additions & 1 deletion src/query/expression/src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use common_arrow::arrow::bitmap::MutableBitmap;
use common_exception::ErrorCode;
use common_exception::Result;
use common_exception::Span;
use educe::Educe;
use enum_as_inner::EnumAsInner;
use itertools::Itertools;
use serde::Deserialize;
Expand Down Expand Up @@ -52,12 +53,15 @@ pub type AutoCastRules<'a> = &'a [(DataType, DataType)];
pub trait FunctionFactory =
Fn(&[usize], &[DataType]) -> Option<Arc<Function>> + Send + Sync + 'static;

#[derive(Educe)]
#[educe(PartialEq, Eq, Hash)]
pub struct Function {
pub signature: FunctionSignature,
#[educe(PartialEq(ignore), Eq(ignore), Hash(ignore))]
pub eval: FunctionEval,
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct FunctionSignature {
pub name: String,
pub args_type: Vec<DataType>,
Expand Down
1 change: 1 addition & 0 deletions src/query/expression/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#![allow(incomplete_features)]
#![feature(int_roundings)]
#![feature(trait_upcasting)]
#![feature(let_chains)]

#[allow(dead_code)]
mod block;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,13 +301,11 @@ impl Interpreter for RefreshIndexInterpreter {
let index = input_schema.index_of(field.name())?;
projections.push(index);
}
let num_input_columns = input_schema.num_fields();
let func_ctx = self.ctx.get_function_context()?;
build_res.main_pipeline.add_transform(|input, output| {
Ok(ProcessorPtr::create(CompoundBlockOperator::create(
input,
output,
num_input_columns,
func_ctx.clone(),
vec![BlockOperator::Project {
projection: projections.clone(),
Expand Down
27 changes: 3 additions & 24 deletions src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1137,12 +1137,10 @@ impl PipelineBuilder {
let index = column_binding.index;
projections.push(input_schema.index_of(index.to_string().as_str())?);
}
let num_input_columns = input_schema.num_fields();
pipeline.add_transform(|input, output| {
Ok(ProcessorPtr::create(CompoundBlockOperator::create(
input,
output,
num_input_columns,
func_ctx.clone(),
vec![BlockOperator::Project {
projection: projections.clone(),
Expand Down Expand Up @@ -1216,12 +1214,10 @@ impl PipelineBuilder {
let ops = vec![BlockOperator::Project { projection }];
let func_ctx = self.ctx.get_function_context()?;

let num_input_columns = schema.num_fields();
self.main_pipeline.add_transform(|input, output| {
Ok(ProcessorPtr::create(CompoundBlockOperator::create(
input,
output,
num_input_columns,
func_ctx.clone(),
ops.clone(),
)))
Expand Down Expand Up @@ -1278,15 +1274,13 @@ impl PipelineBuilder {
))
})?;

let num_input_columns = filter.input.output_schema()?.num_fields();
self.main_pipeline.add_transform(|input, output| {
let transform = CompoundBlockOperator::new(
vec![BlockOperator::Filter {
projections: filter.projections.clone(),
expr: predicate.clone(),
}],
self.ctx.get_function_context()?,
num_input_columns,
);

if self.enable_profiling {
Expand All @@ -1311,13 +1305,10 @@ impl PipelineBuilder {
self.build_pipeline(&project.input)?;
let func_ctx = self.ctx.get_function_context()?;

let num_input_columns = project.input.output_schema()?.num_fields();

self.main_pipeline.add_transform(|input, output| {
Ok(ProcessorPtr::create(CompoundBlockOperator::create(
input,
output,
num_input_columns,
func_ctx.clone(),
vec![BlockOperator::Project {
projection: project.projections.clone(),
Expand All @@ -1329,7 +1320,6 @@ impl PipelineBuilder {
fn build_eval_scalar(&mut self, eval_scalar: &EvalScalar) -> Result<()> {
self.build_pipeline(&eval_scalar.input)?;

let input_schema = eval_scalar.input.output_schema()?;
let exprs = eval_scalar
.exprs
.iter()
Expand All @@ -1347,11 +1337,8 @@ impl PipelineBuilder {

let func_ctx = self.ctx.get_function_context()?;

let num_input_columns = input_schema.num_fields();

self.main_pipeline.add_transform(|input, output| {
let transform =
CompoundBlockOperator::new(vec![op.clone()], func_ctx.clone(), num_input_columns);
let transform = CompoundBlockOperator::new(vec![op.clone()], func_ctx.clone());

if self.enable_profiling {
Ok(ProcessorPtr::create(TransformProfileWrapper::create(
Expand Down Expand Up @@ -1385,11 +1372,8 @@ impl PipelineBuilder {

let func_ctx = self.ctx.get_function_context()?;

let num_input_columns = project_set.input.output_schema()?.num_fields();

self.main_pipeline.add_transform(|input, output| {
let transform =
CompoundBlockOperator::new(vec![op.clone()], func_ctx.clone(), num_input_columns);
let transform = CompoundBlockOperator::new(vec![op.clone()], func_ctx.clone());

if self.enable_profiling {
Ok(ProcessorPtr::create(TransformProfileWrapper::create(
Expand All @@ -1413,14 +1397,10 @@ impl PipelineBuilder {
let funcs = lambda.lambda_funcs.clone();
let op = BlockOperator::LambdaMap { funcs };

let input_schema = lambda.input.output_schema()?;
let func_ctx = self.ctx.get_function_context()?;

let num_input_columns = input_schema.num_fields();

self.main_pipeline.add_transform(|input, output| {
let transform =
CompoundBlockOperator::new(vec![op.clone()], func_ctx.clone(), num_input_columns);
let transform = CompoundBlockOperator::new(vec![op.clone()], func_ctx.clone());

if self.enable_profiling {
Ok(ProcessorPtr::create(TransformProfileWrapper::create(
Expand Down Expand Up @@ -1889,7 +1869,6 @@ impl PipelineBuilder {
Ok(ProcessorPtr::create(CompoundBlockOperator::create(
input,
output,
input_schema.num_fields(),
self.ctx.get_function_context()?,
vec![BlockOperator::Project {
projection: projection.clone(),
Expand Down
33 changes: 12 additions & 21 deletions src/query/sql/src/evaluator/block_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,14 @@ impl BlockOperator {
None => Ok(input),
}
} else {
for expr in exprs {
let evaluator = Evaluator::new(&input, func_ctx, &BUILTIN_FUNCTIONS);
let result = evaluator.run(expr)?;
let col = BlockEntry::new(expr.data_type().clone(), result);
input.add_column(col);
}
// Local variable run in single thread, so `Rc` and `RefCell` are enough.
let evaluator =
Evaluator::new(&input, func_ctx, &BUILTIN_FUNCTIONS).with_cache();
let result = evaluator.run_exprs(exprs)?;
let block = DataBlock::new(result, input.num_rows());
match projections {
Some(projections) => Ok(input.project(projections)),
None => Ok(input),
Some(projections) => Ok(block.project(projections)),
None => Ok(block),
}
}
}
Expand Down Expand Up @@ -449,30 +448,22 @@ pub struct CompoundBlockOperator {
}

impl CompoundBlockOperator {
pub fn new(
operators: Vec<BlockOperator>,
ctx: FunctionContext,
input_num_columns: usize,
) -> Self {
let operators = Self::compact_map(operators, input_num_columns);
pub fn new(operators: Vec<BlockOperator>, ctx: FunctionContext) -> Self {
let operators = Self::compact_map(operators);
Self { operators, ctx }
}

pub fn create(
input_port: Arc<InputPort>,
output_port: Arc<OutputPort>,
input_num_columns: usize,
ctx: FunctionContext,
operators: Vec<BlockOperator>,
) -> Box<dyn Processor> {
let operators = Self::compact_map(operators, input_num_columns);
let operators = Self::compact_map(operators);
Transformer::<Self>::create(input_port, output_port, Self { operators, ctx })
}

pub fn compact_map(
operators: Vec<BlockOperator>,
input_num_columns: usize,
) -> Vec<BlockOperator> {
pub fn compact_map(operators: Vec<BlockOperator>) -> Vec<BlockOperator> {
let mut results = Vec::with_capacity(operators.len());

for op in operators {
Expand All @@ -496,7 +487,7 @@ impl CompoundBlockOperator {
}
}

crate::evaluator::cse::apply_cse(results, input_num_columns)
results
}
}

Expand Down
Loading

0 comments on commit a2fe357

Please sign in to comment.