Skip to content

Commit

Permalink
refactor: integrate fuse table block pruning into pipeline (#16841)
Browse files Browse the repository at this point in the history
* chore: add setting to provide fallback

* refactor: make block pruning into pipeline

* refactor: make block pruning into pipeline

* fix: sender on finish unexpected close

* fix: take data block meta instead of clone

* chore: remove dbg

* chore: license header

* fix: add back broken prune cache

* fix: populating cache only when pipeline successfully finished

* fix: broken statistics

* fix: broken cache

* ensure can fallback

* test: add unit test for prune pipeline (segment pruner and block pruner)

* enable flag

* apply review suggestion

* fmt

* fix

* fix

* revert

---------

Co-authored-by: Winter Zhang <[email protected]>
  • Loading branch information
dqhl76 and zhang2014 authored Dec 10, 2024
1 parent 5e2b4b8 commit cf84449
Show file tree
Hide file tree
Showing 27 changed files with 1,740 additions and 70 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,17 @@ pub trait Table: Sync + Send {
)))
}

fn build_prune_pipeline(
&self,
table_ctx: Arc<dyn TableContext>,
plan: &DataSourcePlan,
source_pipeline: &mut Pipeline,
) -> Result<Option<Pipeline>> {
let (_, _, _) = (table_ctx, plan, source_pipeline);

Ok(None)
}

/// Assembly the pipeline of appending data to storage
fn append_data(&self, ctx: Arc<dyn TableContext>, pipeline: &mut Pipeline) -> Result<()> {
let (_, _) = (ctx, pipeline);
Expand Down
9 changes: 9 additions & 0 deletions src/query/service/src/pipelines/builders/builder_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ impl PipelineBuilder {
self.ctx.set_partitions(scan.source.parts.clone())?;
self.ctx
.set_wait_runtime_filter(scan.scan_id, self.contain_sink_processor);
if self.ctx.get_settings().get_enable_prune_pipeline()? {
if let Some(prune_pipeline) = table.build_prune_pipeline(
self.ctx.clone(),
&scan.source,
&mut self.main_pipeline,
)? {
self.pipelines.push(prune_pipeline);
}
}
table.read_data(
self.ctx.clone(),
&scan.source,
Expand Down
1 change: 1 addition & 0 deletions src/query/service/tests/it/storages/fuse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod io;
mod meta;
mod operations;
mod pruning;
mod pruning_pipeline;
mod statistics;
mod table;
mod table_functions;
Expand Down
Loading

0 comments on commit cf84449

Please sign in to comment.