Skip to content

Commit

Permalink
feat: invert filter in fuse
Browse files Browse the repository at this point in the history
  • Loading branch information
sundy-li committed Sep 27, 2023
1 parent 00261dc commit 0e35d75
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 8 deletions.
1 change: 1 addition & 0 deletions src/query/storages/common/pruner/src/block_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub struct BlockMetaIndex {
pub block_location: String,
pub segment_location: String,
pub snapshot_location: Option<String>,
pub omit_filter: bool,
}

#[typetag::serde(name = "block_meta_index")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ impl NativeDeserializeDataTransform {
Ok(false)
}

/// No more data need to read, finish process.
/// No more data need to read, finish current chunk process.
fn finish_process(&mut self) -> Result<()> {
let _ = self.chunks.pop_front();
let _ = self.parts.pop_front().unwrap();
Expand Down Expand Up @@ -570,9 +570,21 @@ impl Processor for NativeDeserializeDataTransform {
return self.finish_process_with_empty_block();
}

let mut prewhere_filter = self.prewhere_filter.as_ref().clone();

// Init array_iters and array_skip_pages to read pages in subsequent processes.
if !self.inited {
let fuse_part = FusePartInfo::from_part(&self.parts[0])?;

if fuse_part
.block_meta_index
.as_ref()
.map(|meta| meta.omit_filter)
.unwrap_or(false)
{
prewhere_filter = None;
}

if let Some(range) = fuse_part.range() {
self.offset_in_part = fuse_part.page_size() * range.start;
}
Expand Down Expand Up @@ -690,7 +702,7 @@ impl Processor for NativeDeserializeDataTransform {
}
}

let filter = match self.prewhere_filter.as_ref() {
let filter = match prewhere_filter.as_ref() {
Some(filter) => {
// Arrays are empty means all prewhere columns are default values,
// the filter have checked in the first process, don't need check again.
Expand Down
30 changes: 24 additions & 6 deletions src/query/storages/fuse/src/pruning/block_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ impl BlockPruner {
let pruning_semaphore = &self.pruning_ctx.pruning_semaphore;
let limit_pruner = self.pruning_ctx.limit_pruner.clone();
let range_pruner = self.pruning_ctx.range_pruner.clone();
let invert_range_pruner = self.pruning_ctx.invert_range_pruner.clone();
let page_pruner = self.pruning_ctx.page_pruner.clone();

let segment_block_metas = segment_info.block_metas()?;
Expand All @@ -95,8 +96,9 @@ impl BlockPruner {
return None;
}

type BlockPruningFutureReturn =
Pin<Box<dyn Future<Output = (usize, bool, Option<Range<usize>>, String)> + Send>>;
type BlockPruningFutureReturn = Pin<
Box<dyn Future<Output = (usize, bool, bool, Option<Range<usize>>, String)> + Send>,
>;
type BlockPruningFuture =
Box<dyn FnOnce(OwnedSemaphorePermit) -> BlockPruningFutureReturn + Send + 'static>;

Expand All @@ -123,6 +125,7 @@ impl BlockPruner {

// not pruned by block zone map index,
let bloom_pruner = bloom_pruner.clone();
let invert_range_pruner = invert_range_pruner.clone();
let limit_pruner = limit_pruner.clone();
let page_pruner = page_pruner.clone();
let index_location = block_meta.bloom_filter_index_location.clone();
Expand Down Expand Up @@ -158,9 +161,19 @@ impl BlockPruner {

let (keep, range) =
page_pruner.should_keep(&block_meta.cluster_stats);
(block_idx, keep, range, block_meta.location.0.clone())

let omit = !invert_range_pruner.should_keep(
&block_meta.col_stats,
Some(&block_meta.col_metas),
);

(block_idx, keep, omit, range, block_meta.location.0.clone())
} else {
(block_idx, keep, None, block_meta.location.0.clone())
let omit = !invert_range_pruner.should_keep(
&block_meta.col_stats,
Some(&block_meta.col_metas),
);
(block_idx, keep, omit, None, block_meta.location.0.clone())
}
})
});
Expand All @@ -169,7 +182,7 @@ impl BlockPruner {
let v: BlockPruningFuture = Box::new(move |permit: OwnedSemaphorePermit| {
Box::pin(async move {
let _permit = permit;
(block_idx, false, None, block_meta.location.0.clone())
(block_idx, false, false, None, block_meta.location.0.clone())
})
});
v
Expand All @@ -190,7 +203,7 @@ impl BlockPruner {
let mut result = Vec::with_capacity(joint.len());
let block_num = segment_info.summary.block_count as usize;
for item in joint {
let (block_idx, keep, range, block_location) = item;
let (block_idx, keep, omit_filter, range, block_location) = item;
if keep {
let block = segment_block_metas[block_idx].clone();

Expand All @@ -206,6 +219,7 @@ impl BlockPruner {
block_location: block_location.clone(),
segment_location: segment_location.location.0.clone(),
snapshot_location: segment_location.snapshot_loc.clone(),
omit_filter,
},
block,
))
Expand All @@ -228,6 +242,7 @@ impl BlockPruner {
let pruning_stats = self.pruning_ctx.pruning_stats.clone();
let limit_pruner = self.pruning_ctx.limit_pruner.clone();
let range_pruner = self.pruning_ctx.range_pruner.clone();
let invert_range_pruner = self.pruning_ctx.invert_range_pruner.clone();
let page_pruner = self.pruning_ctx.page_pruner.clone();

let start = Instant::now();
Expand Down Expand Up @@ -274,6 +289,8 @@ impl BlockPruner {

let (keep, range) = page_pruner.should_keep(&block_meta.cluster_stats);
if keep {
let omit_filter = !invert_range_pruner
.should_keep(&block_meta.col_stats, Some(&block_meta.col_metas));
result.push((
BlockMetaIndex {
segment_idx: segment_location.segment_idx,
Expand All @@ -284,6 +301,7 @@ impl BlockPruner {
block_location: block_meta.as_ref().location.0.clone(),
segment_location: segment_location.location.0.clone(),
snapshot_location: segment_location.snapshot_loc.clone(),
omit_filter,
},
block_meta.clone(),
))
Expand Down
16 changes: 16 additions & 0 deletions src/query/storages/fuse/src/pruning/fuse_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub struct PruningContext {

pub limit_pruner: Arc<dyn Limiter + Send + Sync>,
pub range_pruner: Arc<dyn RangePruner + Send + Sync>,
pub invert_range_pruner: Arc<dyn RangePruner + Send + Sync>,
pub bloom_pruner: Option<Arc<dyn BloomPruner + Send + Sync>>,
pub page_pruner: Arc<dyn PagePruner + Send + Sync>,
pub internal_column_pruner: Option<Arc<InternalColumnPruner>>,
Expand Down Expand Up @@ -88,6 +89,13 @@ impl PruningContext {
.map(|f| f.filter.as_expr(&BUILTIN_FUNCTIONS))
});

let inverted_filter_expr = push_down.as_ref().and_then(|extra| {
extra
.filters
.as_ref()
.map(|f| f.inverted_filter.as_expr(&BUILTIN_FUNCTIONS))
});

// Limit pruner.
// if there are ordering/filter clause, ignore limit, even it has been pushed down
let limit = push_down
Expand Down Expand Up @@ -118,6 +126,13 @@ impl PruningContext {
func_ctx.clone(),
&table_schema,
filter_expr.as_ref(),
default_stats.clone(),
)?;

let invert_range_pruner = RangePrunerCreator::try_create_with_default_stats(
func_ctx.clone(),
&table_schema,
inverted_filter_expr.as_ref(),
default_stats,
)?;

Expand Down Expand Up @@ -163,6 +178,7 @@ impl PruningContext {
pruning_semaphore,
limit_pruner,
range_pruner,
invert_range_pruner,
bloom_pruner,
page_pruner,
internal_column_pruner,
Expand Down

0 comments on commit 0e35d75

Please sign in to comment.