Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
xudong963 committed Dec 1, 2023
1 parent 3218cde commit f616257
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 83 deletions.
3 changes: 1 addition & 2 deletions src/query/pipeline/core/src/processors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::ColumnId;
use common_expression::Expr;
use common_expression::RemoteExpr;
use futures::future::BoxFuture;
use futures::FutureExt;
use minitrace::prelude::*;
Expand Down Expand Up @@ -106,7 +105,7 @@ pub trait Processor: Send {
fn add_runtime_filters(&mut self, _filters: HashMap<ColumnId, Expr>) -> Result<()> {
Err(ErrorCode::Unimplemented(format!(
"{} can't add runtime filters",
self.name
self.name()
)))
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/query/pipeline/sources/src/sync_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ impl<T: 'static + SyncSource> Processor for SyncSourcer<T> {
match self.inner.generate()? {
None => self.is_finish = true,
Some(data_block) => {
if data_block.is_empty() && data_block.get_meta().is_none() {
// A part was pruned by runtime filter
return Ok(());
}
let progress_values = ProgressValues {
rows: data_block.num_rows(),
bytes: data_block.memory_size(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use parking_lot::RwLock;
use crate::pipelines::processors::transforms::hash_join::build_state::BuildState;
use crate::pipelines::processors::transforms::hash_join::row::RowSpace;
use crate::pipelines::processors::transforms::hash_join::util::build_schema_wrap_nullable;
use crate::pipelines::processors::transforms::hash_join::util::inlist_filter;
use crate::pipelines::processors::HashJoinDesc;
use crate::sessions::QueryContext;

Expand Down Expand Up @@ -266,81 +267,17 @@ impl HashJoinState {
.iter()
.zip(self.hash_join_desc.probe_keys.iter())
{
// Only support key is a column
if let Expr::ColumnRef {
span,
id,
data_type,
display_name,
} = probe_key
{
let column_id: usize = self.hash_join_desc.probe_schema.fields[*id]
.name()
.parse()
.unwrap();
let raw_probe_key = RawExpr::ColumnRef {
span: span.clone(),
id: column_id,
data_type: data_type.clone(),
display_name: display_name.clone(),
};
let mut columns = Vec::with_capacity(data_blocks.len());
for block in data_blocks.iter() {
if block.num_columns() == 0 {
continue;
}
let evaluator = Evaluator::new(block, &func_ctx, &BUILTIN_FUNCTIONS);
let column = evaluator
.run(build_key)?
.convert_to_full_column(build_key.data_type(), block.num_rows());
columns.push(column);
}
// Generate inlist using build column
let build_key_column = Column::concat_columns(columns.into_iter())?;
let mut list = Vec::with_capacity(build_key_column.len());
for value in build_key_column.iter() {
list.push(RawExpr::Constant {
span: None,
scalar: value.to_owned(),
})
}
let array = RawExpr::FunctionCall {
span: None,
name: "array".to_string(),
params: vec![],
args: list,
};
let distinct_list = RawExpr::FunctionCall {
span: None,
name: "array_distinct".to_string(),
params: vec![],
args: vec![array],
};

let args = vec![distinct_list, raw_probe_key];
// Make contain function
let contain_func = RawExpr::FunctionCall {
span: None,
name: "contains".to_string(),
params: vec![],
args,
};
runtime_filters.insert(
*id as ColumnId,
contain_func
.type_check(self.hash_join_desc.probe_schema.as_ref())?
.project_column_ref(|index| {
self.hash_join_desc
.probe_schema
.index_of(&index.to_string())
.unwrap()
}),
);
if let Some(filter) = inlist_filter(
&func_ctx,
&self.hash_join_desc.probe_schema,
build_key,
probe_key,
data_blocks,
)? {
runtime_filters.insert(filter.0, filter.1);
}
}

data_blocks.clear();

Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_exception::Result;
use common_expression::Column;
use common_expression::ColumnId;
use common_expression::DataBlock;
use common_expression::DataField;
use common_expression::DataSchemaRef;
use common_expression::DataSchemaRefExt;
use common_expression::Evaluator;
use common_expression::Expr;
use common_expression::FunctionContext;
use common_expression::RawExpr;
use common_functions::BUILTIN_FUNCTIONS;
use common_sql::TypeCheck;

pub(crate) fn build_schema_wrap_nullable(build_schema: &DataSchemaRef) -> DataSchemaRef {
let mut nullable_field = Vec::with_capacity(build_schema.fields().len());
Expand All @@ -37,3 +47,78 @@ pub(crate) fn probe_schema_wrap_nullable(probe_schema: &DataSchemaRef) -> DataSc
}
DataSchemaRefExt::create(nullable_field)
}

// Construct inlist runtime filter
pub(crate) fn inlist_filter(
func_ctx: &FunctionContext,
probe_schema: &DataSchemaRef,
build_key: &Expr,
probe_key: &Expr,
build_blocks: &[DataBlock],
) -> Result<Option<(ColumnId, Expr)>> {
// Currently, only support key is a column, will support more later.
// Such as t1.a + 1 = t2.a, or t1.a + t1.b = t2.a (left side is probe side)
if let Expr::ColumnRef {
span,
id,
data_type,
display_name,
} = probe_key
{
let column_id: usize = probe_schema.fields[*id].name().parse().unwrap();
let raw_probe_key = RawExpr::ColumnRef {
span: span.clone(),
id: column_id,
data_type: data_type.clone(),
display_name: display_name.clone(),
};
let mut columns = Vec::with_capacity(build_blocks.len());
for block in build_blocks.iter() {
if block.num_columns() == 0 {
continue;
}
let evaluator = Evaluator::new(block, &func_ctx, &BUILTIN_FUNCTIONS);
let column = evaluator
.run(build_key)?
.convert_to_full_column(build_key.data_type(), block.num_rows());
columns.push(column);
}
// Generate inlist using build column
let build_key_column = Column::concat_columns(columns.into_iter())?;
let mut list = Vec::with_capacity(build_key_column.len());
for value in build_key_column.iter() {
list.push(RawExpr::Constant {
span: None,
scalar: value.to_owned(),
})
}
let array = RawExpr::FunctionCall {
span: None,
name: "array".to_string(),
params: vec![],
args: list,
};
let distinct_list = RawExpr::FunctionCall {
span: None,
name: "array_distinct".to_string(),
params: vec![],
args: vec![array],
};

let args = vec![distinct_list, raw_probe_key];
// Make contain function
let contain_func = RawExpr::FunctionCall {
span: None,
name: "contains".to_string(),
params: vec![],
args,
};
return Ok(Some((
*id as ColumnId,
contain_func
.type_check(probe_schema.as_ref())?
.project_column_ref(|index| probe_schema.index_of(&index.to_string()).unwrap()),
)));
}
Ok(None)
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use common_pipeline_core::processors::Processor;
use common_pipeline_core::processors::ProcessorPtr;
use common_pipeline_sources::SyncSource;
use common_pipeline_sources::SyncSourcer;
use log::info;

use super::parquet_data_source::DataSource;
use crate::fuse_part::FusePartInfo;
Expand Down Expand Up @@ -114,7 +115,7 @@ impl SyncSource for ReadParquetDataSource<true> {
&self.runtime_filters,
&self.partitions.ctx.get_function_context()?,
)? {
return Ok(None);
return Ok(Some(DataBlock::empty()));
}

if let Some(index_reader) = self.index_reader.as_ref() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_expression::Expr;
use common_expression::FunctionContext;
use common_expression::Scalar;
use common_functions::BUILTIN_FUNCTIONS;
use log::info;
use storages_common_index::statistics_to_domain;

use crate::FusePartInfo;
Expand All @@ -36,7 +37,7 @@ pub fn runtime_filter_pruner(
}

let part = FusePartInfo::from_part(part)?;
Ok(filters.iter().any(|(id, filter)| {
let pruned = filters.iter().any(|(id, filter)| {
let column_refs = filter.column_refs();
// Currently only support filter with one column(probe key).
debug_assert!(column_refs.len() == 1);
Expand All @@ -54,15 +55,22 @@ pub fn runtime_filter_pruner(
func_ctx,
&BUILTIN_FUNCTIONS,
);
matches!(new_expr, Expr::Constant {
return matches!(new_expr, Expr::Constant {
scalar: Scalar::Boolean(false),
..
})
} else {
false
});
}
} else {
false
}
}))
false
});

if pruned {
info!(
"Pruned partition with {:?} rows by runtime filter",
part.nums_rows
);
return Ok(true);
}

Ok(false)
}

0 comments on commit f616257

Please sign in to comment.