Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
xudong963 committed Dec 13, 2023
1 parent 393cbd8 commit dc16126
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use crate::pipelines::processors::transforms::hash_join::SingleStringHashJoinHas
use crate::pipelines::processors::HashJoinState;
use crate::sessions::QueryContext;

pub(crate) const INLIST_RUNTIME_FILTER_THRESHOLD: usize = 10_000;
pub(crate) const INLIST_RUNTIME_FILTER_THRESHOLD: usize = 1024;

/// Define some shared states for all hash join build threads.
pub struct HashJoinBuildState {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -819,12 +819,7 @@ impl Processor for NativeDeserializeDataTransform {
block
};

// Step 8: runtime filter
if self.ctx.has_runtime_filters(self.table_index) && block.num_rows() < 1024 {
block = self.runtime_filter(block)?;
}

// Step 9: Fill `InternalColumnMeta` as `DataBlock.meta` if query internal columns,
// Step 8: Fill `InternalColumnMeta` as `DataBlock.meta` if query internal columns,
// `TransformAddInternalColumns` will generate internal columns using `InternalColumnMeta` in next pipeline.
let mut block = block.resort(&self.src_schema, &self.output_schema)?;
if self.block_reader.query_internal_columns() {
Expand Down Expand Up @@ -852,6 +847,11 @@ impl Processor for NativeDeserializeDataTransform {
block = block.add_meta(Some(Box::new(meta)))?;
}

// Step 9: runtime filter
if self.ctx.has_runtime_filters(self.table_index) {
block = self.runtime_filter(block)?;
}

// Step 10: Add the block to output data
self.offset_in_part += origin_num_rows;
self.add_block(block)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,26 @@ macro_rules! impl_runtime_filter {
// Check if already cached runtime filters
if self.cached_runtime_filter.is_none() {
let runtime_filters = self.ctx.get_runtime_filter_with_id(self.table_index);
let filter = runtime_filters
let runtime_filters = runtime_filters
.iter()
.map(|filter| {
cast_expr_to_non_null_boolean(
filter
.project_column_ref(|name| self.src_schema.index_of(name).unwrap()),
)
.unwrap()
.filter_map(|filter| {
let column_refs = filter.column_refs();
debug_assert!(column_refs.len() == 1);
let name = column_refs.keys().last().unwrap();
// Some probe keys are not in the schema, they are derived from expressions.
self.src_schema.index_of(name).ok().and_then(|idx| {
Some(
cast_expr_to_non_null_boolean(filter.project_column_ref(|_| idx))
.unwrap(),
)
})
})
.collect::<Vec<Expr>>();
if runtime_filters.is_empty() {
return Ok(data_block);
}
let combined_filter = runtime_filters
.into_iter()
.try_reduce(|lhs, rhs| {
check_function(None, "and_filters", &[], &[lhs, rhs], &BUILTIN_FUNCTIONS)
})
Expand All @@ -100,7 +111,7 @@ macro_rules! impl_runtime_filter {
"Invalid empty predicate list".to_string(),
))
})?;
self.cached_runtime_filter = Some(filter);
self.cached_runtime_filter = Some(combined_filter);
}
// Using runtime filter to filter data_block
let func_ctx = self.ctx.get_function_context()?;
Expand Down Expand Up @@ -262,12 +273,6 @@ impl Processor for DeserializeDataTransform {
Some(self.uncompressed_buffer.clone()),
)?;

if self.ctx.has_runtime_filters(self.table_index)
&& data_block.num_rows() < 1024
{
data_block = self.runtime_filter(data_block)?;
}

// Add optional virtual columns
if let Some(virtual_reader) = self.virtual_reader.as_ref() {
data_block = virtual_reader.deserialize_virtual_columns(
Expand Down Expand Up @@ -310,6 +315,10 @@ impl Processor for DeserializeDataTransform {
data_block = data_block.add_meta(Some(Box::new(meta)))?;
}

if self.ctx.has_runtime_filters(self.table_index) {
data_block = self.runtime_filter(data_block)?;
}

self.output_data = Some(data_block);
}
}
Expand Down

0 comments on commit dc16126

Please sign in to comment.