diff --git a/Cargo.lock b/Cargo.lock index 481499022f54..b135d630e2d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4200,6 +4200,7 @@ dependencies = [ "jsonb", "log", "opendal", + "parking_lot 0.12.3", "parquet", "rand", "serde", diff --git a/src/query/catalog/src/table.rs b/src/query/catalog/src/table.rs index 58b7f281f1c3..1ae950ae984e 100644 --- a/src/query/catalog/src/table.rs +++ b/src/query/catalog/src/table.rs @@ -222,6 +222,17 @@ pub trait Table: Sync + Send { ))) } + fn build_prune_pipeline( + &self, + table_ctx: Arc, + plan: &DataSourcePlan, + source_pipeline: &mut Pipeline, + ) -> Result> { + let (_, _, _) = (table_ctx, plan, source_pipeline); + + Ok(None) + } + /// Assembly the pipeline of appending data to storage fn append_data(&self, ctx: Arc, pipeline: &mut Pipeline) -> Result<()> { let (_, _) = (ctx, pipeline); diff --git a/src/query/service/src/pipelines/builders/builder_scan.rs b/src/query/service/src/pipelines/builders/builder_scan.rs index c80f3331bb32..eb277a417b93 100644 --- a/src/query/service/src/pipelines/builders/builder_scan.rs +++ b/src/query/service/src/pipelines/builders/builder_scan.rs @@ -41,6 +41,15 @@ impl PipelineBuilder { self.ctx.set_partitions(scan.source.parts.clone())?; self.ctx .set_wait_runtime_filter(scan.scan_id, self.contain_sink_processor); + if self.ctx.get_settings().get_enable_prune_pipeline()? { + if let Some(prune_pipeline) = table.build_prune_pipeline( + self.ctx.clone(), + &scan.source, + &mut self.main_pipeline, + )? { + self.pipelines.push(prune_pipeline); + } + } table.read_data( self.ctx.clone(), &scan.source, diff --git a/src/query/service/tests/it/storages/fuse/mod.rs b/src/query/service/tests/it/storages/fuse/mod.rs index 1bb74ec6c78a..d896b1b91e6b 100644 --- a/src/query/service/tests/it/storages/fuse/mod.rs +++ b/src/query/service/tests/it/storages/fuse/mod.rs @@ -19,6 +19,7 @@ mod io; mod meta; mod operations; mod pruning; +mod pruning_pipeline; mod statistics; mod table; mod table_functions; diff --git a/src/query/service/tests/it/storages/fuse/pruning_pipeline.rs b/src/query/service/tests/it/storages/fuse/pruning_pipeline.rs new file mode 100644 index 000000000000..af2654dc576d --- /dev/null +++ b/src/query/service/tests/it/storages/fuse/pruning_pipeline.rs @@ -0,0 +1,357 @@ +// Copyright 2021 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_ast::ast::Engine; +use databend_common_base::base::tokio; +use databend_common_base::runtime::TrySpawn; +use databend_common_catalog::plan::PartInfoPtr; +use databend_common_catalog::plan::PartStatistics; +use databend_common_catalog::plan::PushDownInfo; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::types::number::Int64Type; +use databend_common_expression::types::number::UInt64Type; +use databend_common_expression::types::ArgType; +use databend_common_expression::types::NumberDataType; +use databend_common_expression::DataBlock; +use databend_common_expression::FromData; +use databend_common_expression::RemoteExpr; +use databend_common_expression::TableDataType; +use databend_common_expression::TableField; +use databend_common_expression::TableSchemaRef; +use databend_common_expression::TableSchemaRefExt; +use databend_common_meta_app::schema::CreateOption; +use databend_common_pipeline_core::Pipeline; +use databend_common_sql::parse_to_filters; +use databend_common_sql::plans::CreateTablePlan; +use databend_common_sql::BloomIndexColumns; +use databend_common_storages_fuse::pruning::create_segment_location_vector; +use databend_common_storages_fuse::pruning::FusePruner; +use databend_common_storages_fuse::FuseBlockPartInfo; +use databend_common_storages_fuse::FuseStorageFormat; +use databend_common_storages_fuse::FuseTable; +use databend_query::interpreters::CreateTableInterpreter; +use databend_query::interpreters::Interpreter; +use databend_query::pipelines::executor::ExecutorSettings; +use databend_query::pipelines::executor::QueryPipelineExecutor; +use databend_query::sessions::QueryContext; +use databend_query::sessions::TableContext; +use databend_query::storages::fuse::io::MetaReaders; +use databend_query::storages::fuse::FUSE_OPT_KEY_BLOCK_PER_SEGMENT; +use databend_query::storages::fuse::FUSE_OPT_KEY_ROW_PER_BLOCK; +use databend_query::test_kits::*; +use databend_storages_common_cache::LoadParams; +use databend_storages_common_table_meta::meta::TableSnapshot; +use databend_storages_common_table_meta::meta::Versioned; +use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID; +use databend_storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION; +use opendal::Operator; + +async fn apply_block_pruning( + table_snapshot: Arc, + schema: TableSchemaRef, + push_down: &Option, + ctx: Arc, + op: Operator, + bloom_index_cols: BloomIndexColumns, + fuse_table: &FuseTable, + cache_key: Option, +) -> Result> { + let ctx: Arc = ctx; + let segment_locs = table_snapshot.segments.clone(); + let segment_locs = create_segment_location_vector(segment_locs, None); + let fuse_pruner = Arc::new(FusePruner::create( + &ctx, + op, + schema, + push_down, + bloom_index_cols, + None, + FuseStorageFormat::Parquet, + )?); + + let mut prune_pipeline = Pipeline::create(); + let (segment_tx, segment_rx) = async_channel::bounded(8); + let (res_tx, res_rx) = async_channel::unbounded(); + fuse_table.prune_segments_with_pipeline( + fuse_pruner.clone(), + &mut prune_pipeline, + ctx.clone(), + segment_rx, + res_tx, + cache_key, + )?; + prune_pipeline.set_max_threads(1); + prune_pipeline.set_on_init(move || { + ctx.get_runtime()?.try_spawn( + async move { + let segment_pruned_result = + fuse_pruner.clone().segment_pruning(segment_locs).await?; + for segment in segment_pruned_result { + let _ = segment_tx.send(Ok(segment)).await; + } + Ok::<_, ErrorCode>(()) + }, + None, + )?; + Ok(()) + }); + + let settings = ExecutorSettings { + query_id: Arc::new("".to_string()), + max_execute_time_in_seconds: Default::default(), + enable_queries_executor: false, + max_threads: 8, + executor_node_id: "".to_string(), + }; + let executor = QueryPipelineExecutor::create(prune_pipeline, settings)?; + + executor.execute()?; + let mut got = Vec::new(); + + while let Ok(Ok(segment)) = res_rx.recv().await { + got.push(segment); + } + + Ok(got) +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_block_pruner() -> Result<()> { + let fixture = TestFixture::setup().await?; + let ctx = fixture.new_query_ctx().await?; + + fixture.create_default_database().await?; + + let test_tbl_name = "test_index_helper"; + let test_schema = TableSchemaRefExt::create(vec![ + TableField::new("a", TableDataType::Number(NumberDataType::UInt64)), + TableField::new("b", TableDataType::Number(NumberDataType::UInt64)), + ]); + + let num_blocks = 10; + let row_per_block = 10; + let num_blocks_opt = row_per_block.to_string(); + + // create test table + let create_table_plan = CreateTablePlan { + catalog: "default".to_owned(), + create_option: CreateOption::Create, + tenant: fixture.default_tenant(), + database: fixture.default_db_name(), + table: test_tbl_name.to_string(), + schema: test_schema.clone(), + engine: Engine::Fuse, + engine_options: Default::default(), + storage_params: None, + options: [ + (FUSE_OPT_KEY_ROW_PER_BLOCK.to_owned(), num_blocks_opt), + (FUSE_OPT_KEY_BLOCK_PER_SEGMENT.to_owned(), "1".to_owned()), + (OPT_KEY_DATABASE_ID.to_owned(), "1".to_owned()), + ] + .into(), + field_comments: vec![], + as_select: None, + cluster_key: None, + inverted_indexes: None, + }; + + let interpreter = CreateTableInterpreter::try_create(ctx.clone(), create_table_plan)?; + let _ = interpreter.execute(ctx.clone()).await?; + + // get table + let catalog = ctx.get_catalog("default").await?; + let table = catalog + .get_table( + &fixture.default_tenant(), + fixture.default_db_name().as_str(), + test_tbl_name, + ) + .await?; + + let gen_col = |value, rows| { + UInt64Type::from_data(std::iter::repeat(value).take(rows).collect::>()) + }; + + // prepare test blocks + // - there will be `num_blocks` blocks, for each block, it comprises of `row_per_block` rows, + // in our case, there will be 10 blocks, and 10 rows for each block + let blocks = (0..num_blocks) + .map(|idx| { + DataBlock::new_from_columns(vec![ + // value of column a always equals 1 + gen_col(1, row_per_block), + // for column b + // - for all block `B` in blocks, whose index is `i` + // - for all row in `B`, value of column b equals `i` + gen_col(idx as u64, row_per_block), + ]) + }) + .collect::>(); + + fixture + .append_commit_blocks(table.clone(), blocks, false, true) + .await?; + + // get the latest tbl + let table = catalog + .get_table( + &fixture.default_tenant(), + fixture.default_db_name().as_str(), + test_tbl_name, + ) + .await?; + + let fuse_table = FuseTable::try_from_table(table.as_ref())?; + + let snapshot_loc = table + .get_table_info() + .options() + .get(OPT_KEY_SNAPSHOT_LOCATION) + .unwrap(); + + let reader = MetaReaders::table_snapshot_reader(fuse_table.get_operator()); + + let load_params = LoadParams { + location: snapshot_loc.clone(), + len_hint: None, + ver: TableSnapshot::VERSION, + put_cache: false, + }; + + let snapshot = reader.read(&load_params).await?; + + // nothing is pruned + let e1 = PushDownInfo { + filters: Some(parse_to_filters(ctx.clone(), table.clone(), "a > 3")?), + ..Default::default() + }; + + // some blocks pruned + let mut e2 = PushDownInfo::default(); + let max_val_of_b = 6u64; + + e2.filters = Some(parse_to_filters( + ctx.clone(), + table.clone(), + "a > 0 and b > 6", + )?); + let b2 = num_blocks - max_val_of_b as usize - 1; + + // Sort asc Limit: TopN-pruner. + let e3 = PushDownInfo { + order_by: vec![( + RemoteExpr::ColumnRef { + span: None, + id: "b".to_string(), + data_type: Int64Type::data_type(), + display_name: "b".to_string(), + }, + true, + false, + )], + limit: Some(3), + ..Default::default() + }; + + // Sort desc Limit: TopN-pruner. + let e4 = PushDownInfo { + order_by: vec![( + RemoteExpr::ColumnRef { + span: None, + id: "b".to_string(), + data_type: Int64Type::data_type(), + display_name: "b".to_string(), + }, + false, + false, + )], + limit: Some(4), + ..Default::default() + }; + + // Limit push-down, Limit-pruner. + let e5 = PushDownInfo { + order_by: vec![], + limit: Some(11), + ..Default::default() + }; + + let extras = vec![ + (None, num_blocks, num_blocks * row_per_block), + (Some(e1), 0, 0), + (Some(e2), b2, b2 * row_per_block), + (Some(e3), 3, 3 * row_per_block), + (Some(e4), 4, 4 * row_per_block), + (Some(e5), 2, 2 * row_per_block), + ]; + + let stats_res = vec![ + (10, 10, 10, 10), + (10, 0, 0, 0), + (10, 3, 3, 3), + (10, 10, 10, 10), + (10, 10, 10, 10), + (10, 10, 10, 2), + ]; + + for (id, (extra, expected_blocks, expected_rows)) in extras.into_iter().enumerate() { + let cache_key = Some(format!("test_block_pruner_{}", id)); + let parts = apply_block_pruning( + snapshot.clone(), + table.get_table_info().schema(), + &extra, + ctx.clone(), + fuse_table.get_operator(), + fuse_table.bloom_index_cols(), + fuse_table, + cache_key.clone(), + ) + .await?; + let rows = parts + .iter() + .map(|b| { + b.as_any() + .downcast_ref::() + .unwrap() + .nums_rows + }) + .sum::(); + + assert_eq!(expected_rows, rows); + assert_eq!(expected_blocks, parts.len()); + + let (stats, partitions) = FuseTable::check_prune_cache(&cache_key).unwrap(); + check_stats(stats, &stats_res, id)?; + assert_eq!(expected_blocks, partitions.partitions.len()); + } + + Ok(()) +} + +fn check_stats( + stats: PartStatistics, + stats_res: &[(usize, usize, usize, usize)], + id: usize, +) -> Result<()> { + let (segments_before, segment_after, block_before, block_after) = stats_res[id]; + let prune_stats = stats.pruning_stats; + assert_eq!(prune_stats.segments_range_pruning_before, segments_before); + assert_eq!(prune_stats.segments_range_pruning_after, segment_after); + assert_eq!(prune_stats.blocks_range_pruning_before, block_before); + assert_eq!(prune_stats.blocks_range_pruning_after, block_after); + Ok(()) +} diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index b18b5cb4d2d6..542b463be0e2 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -952,6 +952,12 @@ impl DefaultSettings { mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=1)), }), + ("enable_prune_pipeline", DefaultSettingValue { + value: UserSettingValue::UInt64(1), + desc: "Enable pruning pipeline", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(0..=1)), + }), ("persist_materialized_cte", DefaultSettingValue { value: UserSettingValue::UInt64(0), // 0 for in-memory, 1 for disk desc: "Decides if materialized CTEs should be persisted to disk.", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index b76b50d9dc33..e0699ec866df 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -818,6 +818,10 @@ impl Settings { self.try_set_u64("short_sql_max_length", val) } + pub fn get_enable_prune_pipeline(&self) -> Result { + Ok(self.try_get_u64("enable_prune_pipeline")? == 1) + } + pub fn get_enable_distributed_pruning(&self) -> Result { Ok(self.try_get_u64("enable_distributed_pruning")? == 1) } diff --git a/src/query/storages/common/pruner/src/topn_pruner.rs b/src/query/storages/common/pruner/src/topn_pruner.rs index 3ed19225be9e..93cf9cdb075b 100644 --- a/src/query/storages/common/pruner/src/topn_pruner.rs +++ b/src/query/storages/common/pruner/src/topn_pruner.rs @@ -25,6 +25,7 @@ use crate::BlockMetaIndex; /// TopN pruner. /// Pruning for order by x limit N. +#[derive(Clone)] pub struct TopNPrunner { schema: TableSchemaRef, sort: Vec<(RemoteExpr, bool, bool)>, diff --git a/src/query/storages/fuse/Cargo.toml b/src/query/storages/fuse/Cargo.toml index 921e0b488154..b97dc949207c 100644 --- a/src/query/storages/fuse/Cargo.toml +++ b/src/query/storages/fuse/Cargo.toml @@ -60,6 +60,7 @@ itertools = { workspace = true } jsonb = { workspace = true } log = { workspace = true } opendal = { workspace = true } +parking_lot = { workspace = true } parquet = { workspace = true } rand = { workspace = true } serde = { workspace = true } diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index d49fc46565a8..a6f7054ee5db 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -22,11 +22,13 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Instant; +use async_channel::Receiver; use chrono::Duration; use chrono::TimeDelta; use databend_common_base::runtime::GlobalIORuntime; use databend_common_catalog::catalog::StorageDescription; use databend_common_catalog::plan::DataSourcePlan; +use databend_common_catalog::plan::PartInfoPtr; use databend_common_catalog::plan::PartStatistics; use databend_common_catalog::plan::Partitions; use databend_common_catalog::plan::PushDownInfo; @@ -90,6 +92,7 @@ use databend_storages_common_table_meta::table::OPT_KEY_TABLE_COMPRESSION; use log::info; use log::warn; use opendal::Operator; +use parking_lot::Mutex; use uuid::Uuid; use crate::fuse_column::FuseTableColumnStatisticsProvider; @@ -134,8 +137,12 @@ pub struct FuseTable { // If this is set, reading from fuse_table should only return the increment blocks pub(crate) changes_desc: Option, + + pub(crate) pruned_result_receiver: Arc>, } +type PartInfoReceiver = Option>>; + impl FuseTable { pub fn try_create(table_info: TableInfo) -> Result> { Ok(Self::do_create(table_info)?) @@ -244,6 +251,7 @@ impl FuseTable { table_compression: table_compression.as_str().try_into()?, table_type, changes_desc: None, + pruned_result_receiver: Arc::new(Mutex::new(None)), })) } @@ -775,6 +783,15 @@ impl Table for FuseTable { self.do_read_data(ctx, plan, pipeline, put_cache) } + fn build_prune_pipeline( + &self, + table_ctx: Arc, + plan: &DataSourcePlan, + source_pipeline: &mut Pipeline, + ) -> Result> { + self.do_build_prune_pipeline(table_ctx, plan, source_pipeline) + } + fn append_data(&self, ctx: Arc, pipeline: &mut Pipeline) -> Result<()> { self.do_append_data(ctx, pipeline) } diff --git a/src/query/storages/fuse/src/lib.rs b/src/query/storages/fuse/src/lib.rs index 83ffec230bd4..f351bb6dbbd8 100644 --- a/src/query/storages/fuse/src/lib.rs +++ b/src/query/storages/fuse/src/lib.rs @@ -33,6 +33,7 @@ mod fuse_type; pub mod io; pub mod operations; pub mod pruning; +mod pruning_pipeline; pub mod statistics; pub mod table_functions; diff --git a/src/query/storages/fuse/src/operations/read_data.rs b/src/query/storages/fuse/src/operations/read_data.rs index b95061fe3287..dbf17d94d7c6 100644 --- a/src/query/storages/fuse/src/operations/read_data.rs +++ b/src/query/storages/fuse/src/operations/read_data.rs @@ -36,10 +36,10 @@ use crate::io::BlockReader; use crate::io::VirtualColumnReader; use crate::operations::read::build_fuse_parquet_source_pipeline; use crate::operations::read::fuse_source::build_fuse_native_source_pipeline; -use crate::pruning::SegmentLocation; use crate::FuseLazyPartInfo; use crate::FuseStorageFormat; use crate::FuseTable; +use crate::SegmentLocation; impl FuseTable { pub fn create_block_reader( @@ -81,7 +81,7 @@ impl FuseTable { ) } - fn adjust_io_request(&self, ctx: &Arc) -> Result { + pub(crate) fn adjust_io_request(&self, ctx: &Arc) -> Result { let max_threads = ctx.get_settings().get_max_threads()? as usize; let max_io_requests = ctx.get_settings().get_max_storage_io_requests()? as usize; @@ -196,33 +196,14 @@ impl FuseTable { .transpose()?, ); - let (tx, rx) = if !lazy_init_segments.is_empty() { - let (tx, rx) = async_channel::bounded(max_io_requests); - (Some(tx), Some(rx)) - } else { - (None, None) - }; - - self.build_fuse_source_pipeline( - ctx.clone(), - pipeline, - self.storage_format, - block_reader, - plan, - topk, - max_io_requests, - index_reader, - virtual_reader, - rx, - )?; - - // replace the column which has data mask if needed - self.apply_data_mask_policy_if_needed(ctx.clone(), plan, pipeline)?; - - if let Some(sender) = tx { + let enable_prune_pipeline = ctx.get_settings().get_enable_prune_pipeline()?; + let rx = if !enable_prune_pipeline && !lazy_init_segments.is_empty() { + // If the prune pipeline is disabled and is lazy init segments, we need to fallback let table = self.clone(); let table_schema = self.schema_with_stream(); let push_downs = plan.push_downs.clone(); + let ctx = ctx.clone(); + let (tx, rx) = async_channel::bounded(max_io_requests); pipeline.set_on_init(move || { ctx.get_runtime()?.try_spawn( async move { @@ -239,11 +220,11 @@ impl FuseTable { Ok((_, partitions)) => { for part in partitions.partitions { // ignore the error, the sql may be killed or early stop - let _ = sender.send(Ok(part)).await; + let _ = tx.send(Ok(part)).await; } } Err(err) => { - let _ = sender.send(Err(err)).await; + let _ = tx.send(Err(err)).await; } } Ok::<_, ErrorCode>(()) @@ -253,7 +234,26 @@ impl FuseTable { Ok(()) }); - } + Some(rx) + } else { + self.pruned_result_receiver.lock().take() + }; + + self.build_fuse_source_pipeline( + ctx.clone(), + pipeline, + self.storage_format, + block_reader, + plan, + topk, + max_io_requests, + index_reader, + virtual_reader, + rx, + )?; + + // replace the column which has data mask if needed + self.apply_data_mask_policy_if_needed(ctx.clone(), plan, pipeline)?; Ok(()) } diff --git a/src/query/storages/fuse/src/operations/read_partitions.rs b/src/query/storages/fuse/src/operations/read_partitions.rs index 482bb42ee1ac..f3ad971fd62a 100644 --- a/src/query/storages/fuse/src/operations/read_partitions.rs +++ b/src/query/storages/fuse/src/operations/read_partitions.rs @@ -16,6 +16,10 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; +use async_channel::Receiver; +use async_channel::Sender; +use databend_common_base::runtime::TrySpawn; +use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartInfoPtr; use databend_common_catalog::plan::PartStatistics; use databend_common_catalog::plan::Partitions; @@ -27,27 +31,43 @@ use databend_common_catalog::plan::TopK; use databend_common_catalog::plan::VirtualColumnInfo; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::Scalar; use databend_common_expression::TableSchemaRef; +use databend_common_pipeline_core::ExecutionInfo; +use databend_common_pipeline_core::Pipeline; use databend_common_sql::field_default_value; use databend_common_storage::ColumnNodes; use databend_storages_common_cache::CacheAccessor; use databend_storages_common_cache::CachedObject; use databend_storages_common_index::BloomIndex; use databend_storages_common_pruner::BlockMetaIndex; +use databend_storages_common_pruner::TopNPrunner; use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::ColumnStatistics; +use databend_storages_common_table_meta::meta::CompactSegmentInfo; use databend_storages_common_table_meta::table::ChangeType; use log::info; +use opendal::Operator; use sha2::Digest; use sha2::Sha256; use crate::fuse_part::FuseBlockPartInfo; use crate::io::BloomIndexBuilder; use crate::pruning::create_segment_location_vector; +use crate::pruning::table_sample; +use crate::pruning::BlockPruner; use crate::pruning::FusePruner; use crate::pruning::SegmentLocation; +use crate::pruning_pipeline::AsyncBlockPruneTransform; +use crate::pruning_pipeline::ExtractSegmentTransform; +use crate::pruning_pipeline::PrunedSegmentReceiverSource; +use crate::pruning_pipeline::SampleBlockMetasTransform; +use crate::pruning_pipeline::SendPartInfoSink; +use crate::pruning_pipeline::SendPartState; +use crate::pruning_pipeline::SyncBlockPruneTransform; +use crate::pruning_pipeline::TopNPruneTransform; use crate::FuseLazyPartInfo; use crate::FuseTable; @@ -129,6 +149,99 @@ impl FuseTable { } } + pub(crate) fn do_build_prune_pipeline( + &self, + ctx: Arc, + plan: &DataSourcePlan, + source_pipeline: &mut Pipeline, + ) -> Result> { + let segments_location = plan.statistics.snapshot.clone(); + let table_schema = self.schema_with_stream(); + let dal = self.operator.clone(); + let mut lazy_init_segments = Vec::with_capacity(plan.parts.len()); + + for part in &plan.parts.partitions { + if let Some(lazy_part_info) = part.as_any().downcast_ref::() { + lazy_init_segments.push(SegmentLocation { + segment_idx: lazy_part_info.segment_index, + location: lazy_part_info.segment_location.clone(), + snapshot_loc: segments_location.clone(), + }); + } + } + // If there is no lazy part, we don't need to prune + if lazy_init_segments.is_empty() { + return Ok(None); + } + let push_downs = plan.push_downs.clone(); + let max_io_requests = self.adjust_io_request(&ctx)?; + let (part_info_tx, part_info_rx) = async_channel::bounded(max_io_requests); + self.pruned_result_receiver.lock().replace(part_info_rx); + + // If we can get the pruning result from cache, we don't need to prune again, but need sent + // the parts to the next pipeline + let derterministic_cache_key = + push_downs + .as_ref() + .filter(|p| p.is_deterministic) + .map(|push_downs| { + format!( + "{:x}", + Sha256::digest(format!("{:?}_{:?}", segments_location, push_downs)) + ) + }); + + if let Some((_stat, part)) = Self::check_prune_cache(&derterministic_cache_key) { + let sender = part_info_tx.clone(); + info!("prune pipeline: get prune result from cache"); + source_pipeline.set_on_init(move || { + ctx.get_runtime()?.try_spawn( + async move { + for part in part.partitions { + // ignore the error, the sql may be killed or early stop + let _ = sender.send(Ok(part)).await; + } + }, + None, + )?; + + Ok(()) + }); + return Ok(None); + } + + let mut prune_pipeline = Pipeline::create(); + let pruner = + Arc::new(self.build_fuse_pruner(ctx.clone(), push_downs, table_schema, dal)?); + + let (segment_tx, segment_rx) = async_channel::bounded(max_io_requests); + + self.prune_segments_with_pipeline( + pruner.clone(), + &mut prune_pipeline, + ctx.clone(), + segment_rx, + part_info_tx, + derterministic_cache_key.clone(), + )?; + prune_pipeline.set_on_init(move || { + ctx.get_runtime()?.try_spawn( + async move { + let segment_pruned_result = + pruner.clone().segment_pruning(lazy_init_segments).await?; + for segment in segment_pruned_result { + let _ = segment_tx.send(Ok(segment)).await; + } + Ok::<_, ErrorCode>(()) + }, + None, + )?; + Ok(()) + }); + + Ok(Some(prune_pipeline)) + } + #[fastrace::trace] #[async_backtrace::framed] pub async fn prune_snapshot_blocks( @@ -160,19 +273,149 @@ impl FuseTable { ) }); - if let Some(cache_key) = derterministic_cache_key.as_ref() { + if let Some(cached_result) = Self::check_prune_cache(&derterministic_cache_key) { + info!("prune snapshot block: get prune result from cache"); + return Ok(cached_result); + } + + let mut pruner = + self.build_fuse_pruner(ctx.clone(), push_downs.clone(), table_schema.clone(), dal)?; + + let block_metas = pruner.read_pruning(segments_location).await?; + let pruning_stats = pruner.pruning_stats(); + + info!( + "prune snapshot block end, final block numbers:{}, cost:{:?}", + block_metas.len(), + start.elapsed() + ); + + let block_metas = block_metas + .into_iter() + .map(|(block_meta_index, block_meta)| (Some(block_meta_index), block_meta)) + .collect::>(); + + let schema = self.schema_with_stream(); + let result = self.read_partitions_with_metas( + ctx.clone(), + schema, + push_downs, + &block_metas, + summary, + pruning_stats, + )?; + + if let Some(cache_key) = derterministic_cache_key { if let Some(cache) = CacheItem::cache() { - if let Some(data) = cache.get(cache_key) { - info!( - "prune snapshot block from cache, final block numbers:{}, cost:{:?}", - data.1.len(), - start.elapsed() - ); - return Ok((data.0.clone(), data.1.clone())); - } + cache.insert(cache_key, result.clone()); } } + Ok(result) + } + + pub fn prune_segments_with_pipeline( + &self, + pruner: Arc, + prune_pipeline: &mut Pipeline, + ctx: Arc, + segment_rx: Receiver)>>, + part_info_tx: Sender>, + derterministic_cache_key: Option, + ) -> Result<()> { + let max_threads = ctx.get_settings().get_max_threads()? as usize; + prune_pipeline.add_source( + |output| PrunedSegmentReceiverSource::create(ctx.clone(), segment_rx.clone(), output), + max_threads, + )?; + prune_pipeline + .add_transform(|input, output| ExtractSegmentTransform::create(input, output, true))?; + let sample_probability = table_sample(&pruner.push_down)?; + if let Some(probability) = sample_probability { + prune_pipeline.add_transform(|input, output| { + SampleBlockMetasTransform::create(input, output, probability) + })?; + } + let block_pruner = Arc::new(BlockPruner::create(pruner.pruning_ctx.clone())?); + if pruner.pruning_ctx.bloom_pruner.is_some() + || pruner.pruning_ctx.inverted_index_pruner.is_some() + { + // async pruning with bloom index or inverted index. + prune_pipeline.add_transform(|input, output| { + AsyncBlockPruneTransform::create(input, output, block_pruner.clone()) + })?; + } else { + // sync pruning without a bloom index and inverted index. + prune_pipeline.add_transform(|input, output| { + SyncBlockPruneTransform::create(input, output, block_pruner.clone()) + })?; + } + + let push_down = pruner.push_down.clone(); + + if push_down + .as_ref() + .filter(|p| !p.order_by.is_empty() && p.limit.is_some() && p.filters.is_none()) + .is_some() + { + // if there are ordering + limit clause and no filter, use topn pruner + let schema = pruner.table_schema.clone(); + let push_down = push_down.as_ref().unwrap(); + let limit = push_down.limit.unwrap(); + let sort = push_down.order_by.clone(); + let topn_pruner = TopNPrunner::create(schema, sort, limit); + prune_pipeline.resize(1, false)?; + prune_pipeline.add_transform(move |input, output| { + TopNPruneTransform::create(input, output, topn_pruner.clone()) + })?; + } + let top_k = push_down + .as_ref() + .filter(|_| self.is_native()) // Only native format supports topk push down. + .and_then(|p| p.top_k(self.schema().as_ref())) + .map(|topk| field_default_value(ctx.clone(), &topk.field).map(|d| (topk, d))) + .transpose()?; + + let limit = push_down + .as_ref() + .filter(|p| p.order_by.is_empty() && p.filters.is_none()) + .and_then(|p| p.limit); + + let send_part_state = Arc::new(SendPartState::create( + derterministic_cache_key, + limit, + pruner.clone(), + self.data_metrics.clone(), + )); + prune_pipeline.add_sink(|input| { + SendPartInfoSink::create( + input, + part_info_tx.clone(), + push_down.clone(), + top_k.clone(), + pruner.table_schema.clone(), + send_part_state.clone(), + ) + })?; + + prune_pipeline.set_on_finished(move |info: &ExecutionInfo| { + if let Ok(()) = info.res { + // only populating cache when the pipeline is finished successfully + send_part_state.populating_cache(); + } + Ok(()) + }); + + Ok(()) + } + + pub fn build_fuse_pruner( + &self, + ctx: Arc, + push_downs: Option, + table_schema: TableSchemaRef, + dal: Operator, + ) -> Result { let bloom_index_builder = if ctx .get_settings() .get_enable_auto_fix_missing_bloom_index()? @@ -194,7 +437,7 @@ impl FuseTable { None }; - let mut pruner = if !self.is_native() || self.cluster_key_meta.is_none() { + let pruner = if !self.is_native() || self.cluster_key_meta.is_none() { FusePruner::create( &ctx, dal, @@ -219,36 +462,22 @@ impl FuseTable { self.get_storage_format(), )? }; - let block_metas = pruner.read_pruning(segments_location).await?; - let pruning_stats = pruner.pruning_stats(); - - info!( - "prune snapshot block end, final block numbers:{}, cost:{:?}", - block_metas.len(), - start.elapsed() - ); - - let block_metas = block_metas - .into_iter() - .map(|(block_meta_index, block_meta)| (Some(block_meta_index), block_meta)) - .collect::>(); + Ok(pruner) + } - let schema = self.schema_with_stream(); - let result = self.read_partitions_with_metas( - ctx.clone(), - schema, - push_downs, - &block_metas, - summary, - pruning_stats, - )?; + pub fn check_prune_cache( + derterministic_cache_key: &Option, + ) -> Option<(PartStatistics, Partitions)> { + type CacheItem = (PartStatistics, Partitions); - if let Some(cache_key) = derterministic_cache_key { + if let Some(cache_key) = derterministic_cache_key.as_ref() { if let Some(cache) = CacheItem::cache() { - cache.insert(cache_key, result.clone()); + if let Some(data) = cache.get(cache_key) { + return Some((data.0.clone(), data.1.clone())); + } } } - Ok(result) + None } pub fn read_partitions_with_metas( @@ -506,7 +735,7 @@ impl FuseTable { (statistics, partitions) } - fn all_columns_part( + pub fn all_columns_part( schema: Option<&TableSchemaRef>, block_meta_index: &Option, top_k: &Option<(TopK, Scalar)>, @@ -556,7 +785,7 @@ impl FuseTable { ) } - pub(crate) fn projection_part( + pub fn projection_part( meta: &BlockMeta, block_meta_index: &Option, column_nodes: &ColumnNodes, diff --git a/src/query/storages/fuse/src/pruning/block_pruner.rs b/src/query/storages/fuse/src/pruning/block_pruner.rs index 40e84d06c837..4a2dab38d315 100644 --- a/src/query/storages/fuse/src/pruning/block_pruner.rs +++ b/src/query/storages/fuse/src/pruning/block_pruner.rs @@ -66,7 +66,7 @@ impl BlockPruner { } /// Apply internal column pruning. - fn internal_column_pruning( + pub fn internal_column_pruning( &self, block_metas: &[Arc], ) -> Vec<(usize, Arc)> { @@ -89,7 +89,7 @@ impl BlockPruner { // async pruning with bloom index, inverted index or virtual columns. #[async_backtrace::framed] - async fn block_pruning( + pub async fn block_pruning( &self, segment_location: SegmentLocation, block_metas: Arc>>, @@ -301,7 +301,7 @@ impl BlockPruner { Ok(result) } - fn block_pruning_sync( + pub fn block_pruning_sync( &self, segment_location: SegmentLocation, block_metas: Arc>>, diff --git a/src/query/storages/fuse/src/pruning/fuse_pruner.rs b/src/query/storages/fuse/src/pruning/fuse_pruner.rs index 17228a737bd1..3e736d606e74 100644 --- a/src/query/storages/fuse/src/pruning/fuse_pruner.rs +++ b/src/query/storages/fuse/src/pruning/fuse_pruner.rs @@ -443,6 +443,53 @@ impl FusePruner { } } + // Temporarily using, will remove after finish pruning refactor. + pub async fn segment_pruning( + &self, + mut segment_locs: Vec, + ) -> Result)>> { + // Segment pruner. + let segment_pruner = + SegmentPruner::create(self.pruning_ctx.clone(), self.table_schema.clone())?; + + let mut remain = segment_locs.len() % self.max_concurrency; + let batch_size = segment_locs.len() / self.max_concurrency; + let mut works = Vec::with_capacity(self.max_concurrency); + while !segment_locs.is_empty() { + let gap_size = std::cmp::min(1, remain); + let batch_size = batch_size + gap_size; + remain -= gap_size; + + let mut batch = segment_locs.drain(0..batch_size).collect::>(); + works.push(self.pruning_ctx.pruning_runtime.spawn({ + let segment_pruner = segment_pruner.clone(); + let pruning_ctx = self.pruning_ctx.clone(); + async move { + // Build pruning tasks. + if let Some(internal_column_pruner) = &pruning_ctx.internal_column_pruner { + batch = batch + .into_iter() + .filter(|segment| { + internal_column_pruner + .should_keep(SEGMENT_NAME_COL_NAME, &segment.location.0) + }) + .collect::>(); + } + let pruned_segments = segment_pruner.pruning(batch).await?; + Result::<_>::Ok(pruned_segments) + } + })); + } + + let workers = futures::future::try_join_all(works).await?; + let mut pruned_segments = vec![]; + for worker in workers { + let res = worker?; + pruned_segments.extend(res); + } + Ok(pruned_segments) + } + fn extract_block_metas( segment_path: &str, segment: &CompactSegmentInfo, @@ -575,7 +622,7 @@ impl FusePruner { } } -fn table_sample(push_down_info: &Option) -> Result> { +pub fn table_sample(push_down_info: &Option) -> Result> { let mut sample_probability = None; if let Some(sample) = push_down_info .as_ref() diff --git a/src/query/storages/fuse/src/pruning/mod.rs b/src/query/storages/fuse/src/pruning/mod.rs index b496ed6d482e..62ebee69c5d2 100644 --- a/src/query/storages/fuse/src/pruning/mod.rs +++ b/src/query/storages/fuse/src/pruning/mod.rs @@ -24,6 +24,7 @@ mod virtual_column_pruner; pub use block_pruner::BlockPruner; pub use bloom_pruner::BloomPruner; pub use bloom_pruner::BloomPrunerCreator; +pub use fuse_pruner::table_sample; pub use fuse_pruner::FusePruner; pub use fuse_pruner::PruningContext; pub use inverted_index_pruner::create_inverted_index_query; diff --git a/src/query/storages/fuse/src/pruning_pipeline/async_block_prune_transform.rs b/src/query/storages/fuse/src/pruning_pipeline/async_block_prune_transform.rs new file mode 100644 index 000000000000..8a32a48bc552 --- /dev/null +++ b/src/query/storages/fuse/src/pruning_pipeline/async_block_prune_transform.rs @@ -0,0 +1,79 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::BlockMetaInfoDowncast; +use databend_common_expression::DataBlock; +use databend_common_pipeline_core::processors::InputPort; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_transforms::AsyncAccumulatingTransform; +use databend_common_pipeline_transforms::AsyncAccumulatingTransformer; +use databend_storages_common_pruner::BlockMetaIndex; +use databend_storages_common_table_meta::meta::BlockMeta; + +use crate::pruning::BlockPruner; +use crate::pruning_pipeline::block_metas_meta::BlockMetasMeta; +use crate::pruning_pipeline::block_prune_result_meta::BlockPruneResult; + +pub struct AsyncBlockPruneTransform { + pub block_pruner: Arc, +} + +impl AsyncBlockPruneTransform { + pub fn create( + input: Arc, + output: Arc, + block_pruner: Arc, + ) -> Result { + Ok(ProcessorPtr::create(AsyncAccumulatingTransformer::create( + input, + output, + AsyncBlockPruneTransform { block_pruner }, + ))) + } +} + +#[async_trait::async_trait] +impl AsyncAccumulatingTransform for AsyncBlockPruneTransform { + const NAME: &'static str = "AsyncBlockPruneTransform"; + + async fn transform(&mut self, mut data: DataBlock) -> Result> { + if let Some(ptr) = data.take_meta() { + if let Some(meta) = BlockMetasMeta::downcast_from(ptr) { + let block_meta_indexes = + self.block_pruner.internal_column_pruning(&meta.block_metas); + + let result: Vec<(BlockMetaIndex, Arc)> = self + .block_pruner + .block_pruning(meta.segment_location, meta.block_metas, block_meta_indexes) + .await?; + + if result.is_empty() { + return Ok(None); + } + + return Ok(Some(DataBlock::empty_with_meta(BlockPruneResult::create( + result, + )))); + } + } + Err(ErrorCode::Internal( + "Cannot downcast meta to BlockMetasMeta", + )) + } +} diff --git a/src/query/storages/fuse/src/pruning_pipeline/block_metas_meta.rs b/src/query/storages/fuse/src/pruning_pipeline/block_metas_meta.rs new file mode 100644 index 000000000000..f294abd736e2 --- /dev/null +++ b/src/query/storages/fuse/src/pruning_pipeline/block_metas_meta.rs @@ -0,0 +1,52 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Debug; +use std::fmt::Formatter; +use std::sync::Arc; + +use databend_common_expression::local_block_meta_serde; +use databend_common_expression::BlockMetaInfo; +use databend_common_expression::BlockMetaInfoPtr; +use databend_storages_common_table_meta::meta::BlockMeta; + +use crate::SegmentLocation; + +pub struct BlockMetasMeta { + pub block_metas: Arc>>, + pub segment_location: SegmentLocation, +} + +impl BlockMetasMeta { + pub fn create( + block_metas: Arc>>, + segment_location: SegmentLocation, + ) -> BlockMetaInfoPtr { + Box::new(BlockMetasMeta { + block_metas, + segment_location, + }) + } +} + +impl Debug for BlockMetasMeta { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BlockMetasMeta").finish() + } +} + +local_block_meta_serde!(BlockMetasMeta); + +#[typetag::serde(name = "block_metas_meta")] +impl BlockMetaInfo for BlockMetasMeta {} diff --git a/src/query/storages/fuse/src/pruning_pipeline/block_prune_result_meta.rs b/src/query/storages/fuse/src/pruning_pipeline/block_prune_result_meta.rs new file mode 100644 index 000000000000..4320a80b05c8 --- /dev/null +++ b/src/query/storages/fuse/src/pruning_pipeline/block_prune_result_meta.rs @@ -0,0 +1,44 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Debug; +use std::fmt::Formatter; +use std::sync::Arc; + +use databend_common_expression::local_block_meta_serde; +use databend_common_expression::BlockMetaInfo; +use databend_common_expression::BlockMetaInfoPtr; +use databend_storages_common_pruner::BlockMetaIndex; +use databend_storages_common_table_meta::meta::BlockMeta; + +pub struct BlockPruneResult { + pub block_metas: Vec<(BlockMetaIndex, Arc)>, +} + +impl BlockPruneResult { + pub fn create(block_metas: Vec<(BlockMetaIndex, Arc)>) -> BlockMetaInfoPtr { + Box::new(BlockPruneResult { block_metas }) + } +} + +impl Debug for BlockPruneResult { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BlockPruneResult").finish() + } +} + +local_block_meta_serde!(BlockPruneResult); + +#[typetag::serde(name = "block_prune_result")] +impl BlockMetaInfo for BlockPruneResult {} diff --git a/src/query/storages/fuse/src/pruning_pipeline/extract_segment_transform.rs b/src/query/storages/fuse/src/pruning_pipeline/extract_segment_transform.rs new file mode 100644 index 000000000000..979c1ff76ce7 --- /dev/null +++ b/src/query/storages/fuse/src/pruning_pipeline/extract_segment_transform.rs @@ -0,0 +1,91 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_expression::DataBlock; +use databend_common_pipeline_core::processors::InputPort; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_transforms::processors::BlockMetaAccumulatingTransform; +use databend_common_pipeline_transforms::processors::BlockMetaAccumulatingTransformer; +use databend_storages_common_cache::CacheAccessor; +use databend_storages_common_cache::CacheManager; +use databend_storages_common_table_meta::meta::BlockMeta; +use databend_storages_common_table_meta::meta::CompactSegmentInfo; + +use crate::pruning_pipeline::block_metas_meta::BlockMetasMeta; +use crate::pruning_pipeline::pruned_segment_meta::PrunedSegmentMeta; + +pub struct ExtractSegmentTransform { + populate_cache: bool, +} + +impl ExtractSegmentTransform { + pub fn create( + input: Arc, + output: Arc, + populate_cache: bool, + ) -> databend_common_exception::Result { + Ok(ProcessorPtr::create( + BlockMetaAccumulatingTransformer::create(input, output, ExtractSegmentTransform { + populate_cache, + }), + )) + } +} + +impl BlockMetaAccumulatingTransform for ExtractSegmentTransform { + const NAME: &'static str = "ExtractSegmentTransform"; + + fn transform( + &mut self, + data: PrunedSegmentMeta, + ) -> databend_common_exception::Result> { + let (segment_location, info) = data.segments; + + let block_metas = + Self::extract_block_metas(&segment_location.location.0, &info, self.populate_cache)?; + + if block_metas.is_empty() { + return Ok(None); + }; + + Ok(Some(DataBlock::empty_with_meta(BlockMetasMeta::create( + block_metas, + segment_location, + )))) + } +} + +impl ExtractSegmentTransform { + fn extract_block_metas( + segment_path: &str, + segment: &CompactSegmentInfo, + populate_cache: bool, + ) -> databend_common_exception::Result>>> { + if let Some(cache) = CacheManager::instance().get_block_meta_cache() { + if let Some(metas) = cache.get(segment_path) { + Ok(metas) + } else { + match populate_cache { + true => Ok(cache.insert(segment_path.to_string(), segment.block_metas()?)), + false => Ok(Arc::new(segment.block_metas()?)), + } + } + } else { + Ok(Arc::new(segment.block_metas()?)) + } + } +} diff --git a/src/query/storages/fuse/src/pruning_pipeline/mod.rs b/src/query/storages/fuse/src/pruning_pipeline/mod.rs new file mode 100644 index 000000000000..858a4d17f400 --- /dev/null +++ b/src/query/storages/fuse/src/pruning_pipeline/mod.rs @@ -0,0 +1,33 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod async_block_prune_transform; +mod block_metas_meta; +mod block_prune_result_meta; +mod extract_segment_transform; +mod pruned_segment_meta; +mod pruned_segment_receiver_source; +mod sample_block_metas_transform; +mod send_part_info_sink; +mod sync_block_prune_transform; +mod topn_prune_transform; + +pub use async_block_prune_transform::AsyncBlockPruneTransform; +pub use extract_segment_transform::ExtractSegmentTransform; +pub use pruned_segment_receiver_source::PrunedSegmentReceiverSource; +pub use sample_block_metas_transform::SampleBlockMetasTransform; +pub use send_part_info_sink::SendPartInfoSink; +pub use send_part_info_sink::SendPartState; +pub use sync_block_prune_transform::SyncBlockPruneTransform; +pub use topn_prune_transform::TopNPruneTransform; diff --git a/src/query/storages/fuse/src/pruning_pipeline/pruned_segment_meta.rs b/src/query/storages/fuse/src/pruning_pipeline/pruned_segment_meta.rs new file mode 100644 index 000000000000..6f336dd180a6 --- /dev/null +++ b/src/query/storages/fuse/src/pruning_pipeline/pruned_segment_meta.rs @@ -0,0 +1,45 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Debug; +use std::fmt::Formatter; +use std::sync::Arc; + +use databend_common_expression::local_block_meta_serde; +use databend_common_expression::BlockMetaInfo; +use databend_common_expression::BlockMetaInfoPtr; +use databend_storages_common_table_meta::meta::CompactSegmentInfo; + +use crate::SegmentLocation; + +pub struct PrunedSegmentMeta { + pub segments: (SegmentLocation, Arc), +} + +impl PrunedSegmentMeta { + pub fn create(segments: (SegmentLocation, Arc)) -> BlockMetaInfoPtr { + Box::new(PrunedSegmentMeta { segments }) + } +} + +impl Debug for PrunedSegmentMeta { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PrunedSegmentMeta").finish() + } +} + +local_block_meta_serde!(PrunedSegmentMeta); + +#[typetag::serde(name = "pruned_segment_meta")] +impl BlockMetaInfo for PrunedSegmentMeta {} diff --git a/src/query/storages/fuse/src/pruning_pipeline/pruned_segment_receiver_source.rs b/src/query/storages/fuse/src/pruning_pipeline/pruned_segment_receiver_source.rs new file mode 100644 index 000000000000..115274703b9b --- /dev/null +++ b/src/query/storages/fuse/src/pruning_pipeline/pruned_segment_receiver_source.rs @@ -0,0 +1,67 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use async_channel::Receiver; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::Result; +use databend_common_expression::DataBlock; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_sources::AsyncSource; +use databend_common_pipeline_sources::AsyncSourcer; +use databend_storages_common_table_meta::meta::CompactSegmentInfo; + +use crate::pruning_pipeline::pruned_segment_meta::PrunedSegmentMeta; +use crate::SegmentLocation; + +pub struct PrunedSegmentReceiverSource { + pub meta_receiver: Receiver)>>, +} + +impl PrunedSegmentReceiverSource { + pub fn create( + ctx: Arc, + receiver: Receiver)>>, + output_port: Arc, + ) -> Result { + AsyncSourcer::create(ctx, output_port, Self { + meta_receiver: receiver, + }) + } +} + +#[async_trait::async_trait] +impl AsyncSource for PrunedSegmentReceiverSource { + const NAME: &'static str = "PrunedSegmentReceiverSource"; + const SKIP_EMPTY_DATA_BLOCK: bool = false; + + #[async_backtrace::framed] + async fn generate(&mut self) -> Result> { + match self.meta_receiver.recv().await { + Ok(Ok(segments)) => Ok(Some(DataBlock::empty_with_meta(PrunedSegmentMeta::create( + segments, + )))), + Ok(Err(e)) => Err( + // The error is occurred in pruning process + e, + ), + Err(_) => { + // The channel is closed, we should return None to stop generating + Ok(None) + } + } + } +} diff --git a/src/query/storages/fuse/src/pruning_pipeline/sample_block_metas_transform.rs b/src/query/storages/fuse/src/pruning_pipeline/sample_block_metas_transform.rs new file mode 100644 index 000000000000..c5ad2410834d --- /dev/null +++ b/src/query/storages/fuse/src/pruning_pipeline/sample_block_metas_transform.rs @@ -0,0 +1,101 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::max; +use std::sync::Arc; + +use databend_common_exception::Result; +use databend_common_expression::DataBlock; +use databend_common_pipeline_core::processors::InputPort; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_transforms::processors::BlockMetaAccumulatingTransform; +use databend_common_pipeline_transforms::processors::BlockMetaAccumulatingTransformer; +use databend_storages_common_table_meta::meta::BlockMeta; +use rand::distributions::Bernoulli; +use rand::distributions::Distribution; +use rand::seq::SliceRandom; +use rand::thread_rng; + +use crate::pruning_pipeline::block_metas_meta::BlockMetasMeta; +const SMALL_DATASET_SAMPLE_THRESHOLD: usize = 100; + +pub struct SampleBlockMetasTransform { + probability: f64, +} + +impl SampleBlockMetasTransform { + pub fn create( + input: Arc, + output: Arc, + probability: f64, + ) -> Result { + Ok(ProcessorPtr::create( + BlockMetaAccumulatingTransformer::create(input, output, SampleBlockMetasTransform { + probability, + }), + )) + } +} + +impl BlockMetaAccumulatingTransform for SampleBlockMetasTransform { + const NAME: &'static str = "SampleBlockMetasTransform"; + + fn transform(&mut self, data: BlockMetasMeta) -> Result> { + let sample_block_metas = self.sample_block_metas(&data.block_metas); + Ok(Some(DataBlock::empty_with_meta(BlockMetasMeta::create( + sample_block_metas, + data.segment_location, + )))) + } +} + +impl SampleBlockMetasTransform { + fn sample_block_metas( + &self, + block_metas: &Arc>>, + ) -> Arc>> { + if block_metas.len() <= SMALL_DATASET_SAMPLE_THRESHOLD { + // Deterministic sampling for small datasets + // Ensure at least one block is sampled for small datasets + let sample_size = max( + 1, + (block_metas.len() as f64 * self.probability).round() as usize, + ); + let mut rng = thread_rng(); + Arc::new( + block_metas + .choose_multiple(&mut rng, sample_size) + .cloned() + .collect(), + ) + } else { + // Random sampling for larger datasets + let mut sample_block_metas = Vec::with_capacity(block_metas.len()); + let mut rng = thread_rng(); + let bernoulli = Bernoulli::new(self.probability).unwrap(); + for block in block_metas.iter() { + if bernoulli.sample(&mut rng) { + sample_block_metas.push(block.clone()); + } + } + // Ensure at least one block is sampled for large datasets too + if sample_block_metas.is_empty() && !block_metas.is_empty() { + // Safe to unwrap, because we've checked that block_metas is not empty + sample_block_metas.push(block_metas.choose(&mut rng).unwrap().clone()); + } + Arc::new(sample_block_metas) + } + } +} diff --git a/src/query/storages/fuse/src/pruning_pipeline/send_part_info_sink.rs b/src/query/storages/fuse/src/pruning_pipeline/send_part_info_sink.rs new file mode 100644 index 000000000000..be58d04fd502 --- /dev/null +++ b/src/query/storages/fuse/src/pruning_pipeline/send_part_info_sink.rs @@ -0,0 +1,330 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::sync::Arc; + +use async_channel::Sender; +use databend_common_catalog::plan::PartInfoPtr; +use databend_common_catalog::plan::PartStatistics; +use databend_common_catalog::plan::Partitions; +use databend_common_catalog::plan::PartitionsShuffleKind; +use databend_common_catalog::plan::Projection; +use databend_common_catalog::plan::PushDownInfo; +use databend_common_catalog::plan::TopK; +use databend_common_catalog::plan::VirtualColumnInfo; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::BlockMetaInfoDowncast; +use databend_common_expression::DataBlock; +use databend_common_expression::Scalar; +use databend_common_expression::TableSchemaRef; +use databend_common_pipeline_core::processors::InputPort; +use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_sinks::AsyncSink; +use databend_common_pipeline_sinks::AsyncSinker; +use databend_common_storage::ColumnNodes; +use databend_common_storage::StorageMetrics; +use databend_storages_common_cache::CacheAccessor; +use databend_storages_common_cache::CachedObject; +use databend_storages_common_pruner::BlockMetaIndex; +use databend_storages_common_table_meta::meta::BlockMeta; +use parking_lot::Mutex; + +use crate::pruning::FusePruner; +use crate::pruning_pipeline::block_prune_result_meta::BlockPruneResult; +use crate::FuseTable; + +pub struct SendPartCache { + partitions: Partitions, + statistics: PartStatistics, + derterministic_cache_key: Option, + fuse_pruner: Arc, +} + +pub struct SendPartState { + cache: Mutex, + limit: AtomicUsize, + data_metrics: Arc, +} + +impl SendPartState { + pub fn create( + derterministic_cache_key: Option, + limit: Option, + fuse_pruner: Arc, + data_metrics: Arc, + ) -> Self { + SendPartState { + cache: Mutex::new(SendPartCache { + partitions: Partitions::default(), + statistics: PartStatistics::default_exact(), + derterministic_cache_key, + fuse_pruner, + }), + limit: AtomicUsize::new(limit.unwrap_or(usize::MAX)), + data_metrics, + } + } + + pub fn populating_cache(&self) { + type CacheItem = (PartStatistics, Partitions); + let mut send_part_cache = self.cache.lock(); + let pruning_stats = send_part_cache.fuse_pruner.pruning_stats(); + send_part_cache.statistics.pruning_stats = pruning_stats; + if let Some(cache_key) = &send_part_cache.derterministic_cache_key { + if let Some(cache) = CacheItem::cache() { + cache.insert( + cache_key.clone(), + ( + send_part_cache.statistics.clone(), + send_part_cache.partitions.clone(), + ), + ); + } + } + } + + pub fn detach_sinker(&self, partitions: &Partitions, statistics: &PartStatistics) { + let mut send_part_cache = self.cache.lock(); + // concat partitions and statistics + send_part_cache + .partitions + .partitions + .extend(partitions.partitions.clone()); + send_part_cache.statistics.merge(statistics); + if !statistics.is_exact { + send_part_cache.statistics.is_exact = false; + } + // the kind is determined by the push_downs, should be same for all partitions + send_part_cache.partitions.kind = partitions.kind.clone(); + self.data_metrics + .inc_partitions_total(send_part_cache.statistics.partitions_total as u64); + self.data_metrics + .inc_partitions_scanned(send_part_cache.statistics.partitions_scanned as u64); + } +} + +pub struct SendPartInfoSink { + sender: Option>>, + push_downs: Option, + top_k: Option<(TopK, Scalar)>, + schema: TableSchemaRef, + partitions: Partitions, + statistics: PartStatistics, + send_part_state: Arc, +} + +impl SendPartInfoSink { + pub fn create( + input: Arc, + sender: Sender>, + push_downs: Option, + top_k: Option<(TopK, Scalar)>, + schema: TableSchemaRef, + send_part_state: Arc, + ) -> Result { + let partitions = Partitions::default(); + let statistics = PartStatistics::default(); + Ok(ProcessorPtr::create(AsyncSinker::create( + input, + SendPartInfoSink { + sender: Some(sender), + push_downs, + top_k, + schema, + partitions, + statistics, + send_part_state, + }, + ))) + } +} + +#[async_trait::async_trait] +impl AsyncSink for SendPartInfoSink { + const NAME: &'static str = "SendPartInfoSink"; + + async fn on_finish(&mut self) -> Result<()> { + self.send_part_state + .detach_sinker(&self.partitions, &self.statistics); + drop(self.sender.take()); + Ok(()) + } + + async fn consume(&mut self, mut data_block: DataBlock) -> Result { + if let Some(meta) = data_block.take_meta() { + if let Some(data) = BlockPruneResult::downcast_from(meta) { + let arrow_schema = self.schema.as_ref().into(); + let column_nodes = ColumnNodes::new_from_schema(&arrow_schema, Some(&self.schema)); + let block_metas = &data.block_metas; + self.statistics.partitions_scanned += block_metas.len(); + let info_ptr = match self.push_downs.clone() { + None => self.all_columns_partitions(block_metas), + Some(extras) => match &extras.projection { + None => self.all_columns_partitions(block_metas), + Some(projection) => self.projection_partitions( + block_metas, + projection, + &column_nodes, + &extras.output_columns, + &extras.virtual_column, + ), + }, + }; + + self.partitions.partitions.extend(info_ptr.clone()); + + for info in info_ptr { + if let Some(sender) = &self.sender { + let _ = sender.send(Ok(info)).await; + } + } + + return Ok(false); + } + } + Err(ErrorCode::Internal( + "Cannot downcast data block meta to BlockPruneResult".to_string(), + )) + } +} + +impl SendPartInfoSink { + fn all_columns_partitions( + &mut self, + block_metas: &[(BlockMetaIndex, Arc)], + ) -> Vec { + self.partitions.kind = PartitionsShuffleKind::Mod; + if self.send_part_state.limit.load(Ordering::SeqCst) == 0 { + return vec![]; + } + let mut parts = Vec::with_capacity(block_metas.len()); + + for (block_meta_index, block_meta) in block_metas.iter() { + let rows = block_meta.row_count as usize; + let previous_limit = self.send_part_state.limit.fetch_sub( + rows.min(self.send_part_state.limit.load(Ordering::SeqCst)), + Ordering::SeqCst, + ); + parts.push(FuseTable::all_columns_part( + Some(&self.schema), + &Some(block_meta_index.to_owned()), + &self.top_k, + block_meta, + )); + self.statistics.read_rows += rows; + self.statistics.read_bytes += block_meta.block_size as usize; + + if rows >= previous_limit { + if rows != previous_limit { + self.statistics.is_exact = false; + } + break; + } + } + parts + } + + fn projection_partitions( + &mut self, + block_metas: &[(BlockMetaIndex, Arc)], + projection: &Projection, + column_nodes: &ColumnNodes, + output_columns: &Option, + virtual_column: &Option, + ) -> Vec { + if self.send_part_state.limit.load(Ordering::SeqCst) == 0 { + return vec![]; + } + + let mut parts = Vec::with_capacity(block_metas.len()); + + // Output columns don't have source columns of virtual columns, + // which can be ignored if all virtual columns are generated. + let columns = if let Some(output_columns) = output_columns { + output_columns.project_column_nodes(column_nodes).unwrap() + } else { + projection.project_column_nodes(column_nodes).unwrap() + }; + + for (block_meta_index, block_meta) in block_metas.iter() { + let rows = block_meta.row_count as usize; + let previous_limit = self.send_part_state.limit.fetch_sub( + rows.min(self.send_part_state.limit.load(Ordering::SeqCst)), + Ordering::SeqCst, + ); + parts.push(FuseTable::projection_part( + block_meta, + &Some(block_meta_index.to_owned()), + column_nodes, + self.top_k.clone(), + projection, + )); + self.statistics.read_rows += rows; + for column in &columns { + for column_id in &column.leaf_column_ids { + // ignore all deleted field + if let Some(col_metas) = &block_meta.col_metas.get(column_id) { + let (_, len) = col_metas.offset_length(); + self.statistics.read_bytes += len as usize; + } + } + } + let virtual_block_meta = &block_meta_index.virtual_block_meta; + if let Some(virtual_column) = virtual_column { + if let Some(virtual_block_meta) = virtual_block_meta { + // Add bytes of virtual columns + for virtual_column_meta in virtual_block_meta.virtual_column_metas.values() { + let (_, len) = virtual_column_meta.offset_length(); + self.statistics.read_bytes += len as usize; + } + + // Check whether source columns can be ignored. + // If not, add bytes of source columns. + for source_column_id in &virtual_column.source_column_ids { + if virtual_block_meta + .ignored_source_column_ids + .contains(source_column_id) + { + continue; + } + if let Some(col_metas) = &block_meta.col_metas.get(source_column_id) { + let (_, len) = col_metas.offset_length(); + self.statistics.read_bytes += len as usize; + } + } + } else { + // If virtual column meta not exist, all source columns are needed. + for source_column_id in &virtual_column.source_column_ids { + if let Some(col_metas) = &block_meta.col_metas.get(source_column_id) { + let (_, len) = col_metas.offset_length(); + self.statistics.read_bytes += len as usize; + } + } + } + } + + if rows >= previous_limit { + if rows != previous_limit { + self.statistics.is_exact = false; + } + break; + } + } + + parts + } +} diff --git a/src/query/storages/fuse/src/pruning_pipeline/sync_block_prune_transform.rs b/src/query/storages/fuse/src/pruning_pipeline/sync_block_prune_transform.rs new file mode 100644 index 000000000000..4b0d84e904c6 --- /dev/null +++ b/src/query/storages/fuse/src/pruning_pipeline/sync_block_prune_transform.rs @@ -0,0 +1,66 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_exception::Result; +use databend_common_expression::DataBlock; +use databend_common_pipeline_core::processors::InputPort; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_transforms::processors::BlockMetaAccumulatingTransform; +use databend_common_pipeline_transforms::processors::BlockMetaAccumulatingTransformer; + +use crate::pruning::BlockPruner; +use crate::pruning_pipeline::block_metas_meta::BlockMetasMeta; +use crate::pruning_pipeline::block_prune_result_meta::BlockPruneResult; + +pub struct SyncBlockPruneTransform { + block_pruner: Arc, +} + +impl SyncBlockPruneTransform { + pub fn create( + input: Arc, + output: Arc, + block_pruner: Arc, + ) -> Result { + Ok(ProcessorPtr::create( + BlockMetaAccumulatingTransformer::create(input, output, SyncBlockPruneTransform { + block_pruner, + }), + )) + } +} + +impl BlockMetaAccumulatingTransform for SyncBlockPruneTransform { + const NAME: &'static str = "SyncBlockPruneTransform"; + + fn transform(&mut self, data: BlockMetasMeta) -> Result> { + let block_meta_indexes = self.block_pruner.internal_column_pruning(&data.block_metas); + + let result = self.block_pruner.block_pruning_sync( + data.segment_location, + data.block_metas, + block_meta_indexes, + )?; + if result.is_empty() { + return Ok(None); + } + + Ok(Some(DataBlock::empty_with_meta(BlockPruneResult::create( + result, + )))) + } +} diff --git a/src/query/storages/fuse/src/pruning_pipeline/topn_prune_transform.rs b/src/query/storages/fuse/src/pruning_pipeline/topn_prune_transform.rs new file mode 100644 index 000000000000..69e968118796 --- /dev/null +++ b/src/query/storages/fuse/src/pruning_pipeline/topn_prune_transform.rs @@ -0,0 +1,76 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_exception::Result; +use databend_common_expression::DataBlock; +use databend_common_pipeline_core::processors::InputPort; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_transforms::BlockMetaAccumulatingTransform; +use databend_common_pipeline_transforms::BlockMetaAccumulatingTransformer; +use databend_storages_common_pruner::BlockMetaIndex; +use databend_storages_common_pruner::TopNPrunner; +use databend_storages_common_table_meta::meta::BlockMeta; + +use crate::pruning_pipeline::block_prune_result_meta::BlockPruneResult; + +// TopNPruneTransform is a processor that will accumulate the block meta and not push to +// downstream until all data is received and pruned. +pub struct TopNPruneTransform { + topn_pruner: TopNPrunner, + metas: Vec<(BlockMetaIndex, Arc)>, +} + +impl BlockMetaAccumulatingTransform for TopNPruneTransform { + const NAME: &'static str = "TopNPruneTransform"; + + fn transform(&mut self, data: BlockPruneResult) -> Result> { + self.metas.extend(data.block_metas); + Ok(None) + } + + fn on_finish(&mut self, _output: bool) -> Result> { + self.do_topn_prune() + } +} + +impl TopNPruneTransform { + pub fn create( + input: Arc, + output: Arc, + topn_pruner: TopNPrunner, + ) -> Result { + Ok(ProcessorPtr::create( + BlockMetaAccumulatingTransformer::create(input, output, TopNPruneTransform { + topn_pruner, + metas: vec![], + }), + )) + } + fn do_topn_prune(&self) -> Result> { + let pruned = self + .topn_pruner + .prune(self.metas.clone()) + .unwrap_or_else(|_| self.metas.clone()); + if pruned.is_empty() { + Ok(None) + } else { + Ok(Some(DataBlock::empty_with_meta(BlockPruneResult::create( + pruned, + )))) + } + } +}