diff --git a/Cargo.lock b/Cargo.lock index c02ddbbcf463..035eac5adf3c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5489,6 +5489,7 @@ dependencies = [ name = "databend-storages-common-table-meta" version = "0.1.0" dependencies = [ + "arrow", "bincode 1.3.3", "chrono", "databend-common-arrow", diff --git a/src/query/ee/tests/it/inverted_index/pruning.rs b/src/query/ee/tests/it/inverted_index/pruning.rs index 504fe405fe06..7682bdee5194 100644 --- a/src/query/ee/tests/it/inverted_index/pruning.rs +++ b/src/query/ee/tests/it/inverted_index/pruning.rs @@ -72,6 +72,11 @@ async fn apply_block_pruning( FusePruner::create(&ctx, dal, schema, push_down, bloom_index_cols, None)? .read_pruning(segment_locs) .await + .map(|v| { + v.into_iter() + .map(|(block_meta_index, block_meta, _)| (block_meta_index, block_meta)) + .collect() + }) } #[tokio::test(flavor = "multi_thread")] diff --git a/src/query/expression/src/block.rs b/src/query/expression/src/block.rs index 820d5841d4a3..5feb64decb8a 100644 --- a/src/query/expression/src/block.rs +++ b/src/query/expression/src/block.rs @@ -150,7 +150,9 @@ impl DataBlock { #[inline] pub fn new_from_columns(columns: Vec) -> Self { - assert!(!columns.is_empty()); + if columns.is_empty() { + return DataBlock::empty(); + } let num_rows = columns[0].len(); debug_assert!(columns.iter().all(|c| c.len() == num_rows)); diff --git a/src/query/expression/src/converts/arrow/mod.rs b/src/query/expression/src/converts/arrow/mod.rs index 1774ba3ed7d3..a57f9834a86b 100644 --- a/src/query/expression/src/converts/arrow/mod.rs +++ b/src/query/expression/src/converts/arrow/mod.rs @@ -18,3 +18,4 @@ mod to; pub const EXTENSION_KEY: &str = "Extension"; pub use to::table_schema_to_arrow_schema; +pub use to::table_type_to_arrow_type; diff --git a/src/query/expression/src/converts/arrow/to.rs b/src/query/expression/src/converts/arrow/to.rs index 98379b1cd1f0..b2c238d15e9e 100644 --- a/src/query/expression/src/converts/arrow/to.rs +++ b/src/query/expression/src/converts/arrow/to.rs @@ -35,6 +35,7 @@ use crate::Column; use crate::DataBlock; use crate::DataField; use crate::DataSchema; +use crate::TableDataType; use crate::TableField; use crate::TableSchema; @@ -197,3 +198,7 @@ fn arrow_field_from_arrow2_field(field: Arrow2Field) -> ArrowField { ArrowField::new(field.name, data_type, field.is_nullable).with_metadata(metadata) } + +pub fn table_type_to_arrow_type(ty: &TableDataType) -> ArrowDataType { + arrow_schema::DataType::from(crate::converts::arrow2::table_type_to_arrow_type(ty)) +} diff --git a/src/query/expression/src/converts/arrow2/mod.rs b/src/query/expression/src/converts/arrow2/mod.rs index 36210f307103..94640d656ecc 100644 --- a/src/query/expression/src/converts/arrow2/mod.rs +++ b/src/query/expression/src/converts/arrow2/mod.rs @@ -23,3 +23,4 @@ pub const ARROW_EXT_TYPE_GEOMETRY: &str = "Geometry"; pub const ARROW_EXT_TYPE_GEOGRAPHY: &str = "Geography"; pub use to::set_validities; +pub use to::table_type_to_arrow_type; diff --git a/src/query/expression/src/converts/arrow2/to.rs b/src/query/expression/src/converts/arrow2/to.rs index 5d01b76fe873..f92517d463f8 100644 --- a/src/query/expression/src/converts/arrow2/to.rs +++ b/src/query/expression/src/converts/arrow2/to.rs @@ -78,7 +78,7 @@ impl From<&DataField> for ArrowField { // Note: Arrow's data type is not nullable, so we need to explicitly // add nullable information to Arrow's field afterwards. -fn table_type_to_arrow_type(ty: &TableDataType) -> ArrowDataType { +pub fn table_type_to_arrow_type(ty: &TableDataType) -> ArrowDataType { match ty { TableDataType::Null => ArrowDataType::Null, TableDataType::EmptyArray => ArrowDataType::Extension( diff --git a/src/query/service/tests/it/storages/fuse/bloom_index_meta_size.rs b/src/query/service/tests/it/storages/fuse/bloom_index_meta_size.rs index 9f4997981964..04fca4ff1af7 100644 --- a/src/query/service/tests/it/storages/fuse/bloom_index_meta_size.rs +++ b/src/query/service/tests/it/storages/fuse/bloom_index_meta_size.rs @@ -27,6 +27,7 @@ use databend_common_expression::FromData; use databend_common_expression::Scalar; use databend_common_expression::TableDataType; use databend_common_expression::TableField; +use databend_common_expression::TableSchema; use databend_common_expression::TableSchemaRefExt; use databend_common_storages_fuse::io::TableMetaLocationGenerator; use databend_common_storages_fuse::statistics::gen_columns_statistics; @@ -39,6 +40,7 @@ use databend_storages_common_cache::InMemoryLruCache; use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::ColumnMeta; use databend_storages_common_table_meta::meta::ColumnStatistics; +use databend_storages_common_table_meta::meta::ColumnarSegmentInfo; use databend_storages_common_table_meta::meta::CompactSegmentInfo; use databend_storages_common_table_meta::meta::Compression; use databend_storages_common_table_meta::meta::Location; @@ -48,6 +50,7 @@ use databend_storages_common_table_meta::meta::Statistics; use databend_storages_common_table_meta::meta::Versioned; use opendal::Operator; use parquet::format::FileMetaData; +use rand::Rng; use sysinfo::get_current_pid; use sysinfo::System; use uuid::Uuid; @@ -160,7 +163,46 @@ async fn test_segment_info_size() -> databend_common_exception::Result<()> { let cache_number = 3000; let num_block_per_seg = 1000; - let segment_info = build_test_segment_info(num_block_per_seg)?; + let (segment_info, _) = build_test_segment_info(num_block_per_seg)?; + + let sys = System::new_all(); + let pid = get_current_pid().unwrap(); + let process = sys.process(pid).unwrap(); + let base_memory_usage = process.memory(); + let scenario = format!( + "{} SegmentInfo, {} block per seg ", + cache_number, num_block_per_seg + ); + + eprintln!( + "scenario {}, pid {}, base memory {}", + scenario, pid, base_memory_usage + ); + + let cache = InMemoryLruCache::with_items_capacity(String::from(""), cache_number); + for _ in 0..cache_number { + let uuid = Uuid::new_v4(); + let block_metas = segment_info + .blocks + .iter() + .map(|b: &Arc| Arc::new(b.as_ref().clone())) + .collect::>(); + let statistics = segment_info.summary.clone(); + let segment_info = SegmentInfo::new(block_metas, statistics); + cache.insert(format!("{}", uuid.simple()), segment_info); + } + show_memory_usage("SegmentInfoCache", base_memory_usage, cache_number); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +#[ignore] +async fn test_columnar_segment_info_size() -> databend_common_exception::Result<()> { + let cache_number = 3000; + let num_block_per_seg = 1000; + + let (segment_info, table_schema) = build_test_segment_info(num_block_per_seg)?; let sys = System::new_all(); let pid = get_current_pid().unwrap(); @@ -180,20 +222,16 @@ async fn test_segment_info_size() -> databend_common_exception::Result<()> { { for _ in 0..cache_number { let uuid = Uuid::new_v4(); - let block_metas = segment_info - .blocks - .iter() - .map(|b: &Arc| Arc::new(b.as_ref().clone())) - .collect::>(); - let statistics = segment_info.summary.clone(); - let segment_info = SegmentInfo::new(block_metas, statistics); cache.insert( format!("{}", uuid.simple()), - CompactSegmentInfo::try_from(segment_info)?, + ColumnarSegmentInfo::try_from_segment_info_and_schema( + segment_info.clone(), + &table_schema, + )?, ); } } - show_memory_usage("SegmentInfoCache", base_memory_usage, cache_number); + show_memory_usage("ColumnarSegmentInfoCache", base_memory_usage, cache_number); Ok(()) } @@ -205,7 +243,7 @@ async fn test_segment_raw_bytes_size() -> databend_common_exception::Result<()> let cache_number = 3000; let num_block_per_seg = 1000; - let segment_info = build_test_segment_info(num_block_per_seg)?; + let (segment_info, _) = build_test_segment_info(num_block_per_seg)?; let segment_info_bytes = CompactSegmentInfo::try_from(segment_info)?; let sys = System::new_all(); @@ -245,7 +283,7 @@ async fn test_segment_raw_repr_bytes_size() -> databend_common_exception::Result let cache_number = 3000; let num_block_per_seg = 1000; - let segment_info = build_test_segment_info(num_block_per_seg)?; + let (segment_info, _) = build_test_segment_info(num_block_per_seg)?; let segment_raw = CompactSegmentInfo::try_from(&segment_info)?; let sys = System::new_all(); @@ -280,80 +318,95 @@ async fn test_segment_raw_repr_bytes_size() -> databend_common_exception::Result fn build_test_segment_info( num_blocks_per_seg: usize, -) -> databend_common_exception::Result { - let col_meta = ColumnMeta::Parquet(SingleColumnMeta { - offset: 0, - len: 0, - num_values: 0, - }); - - let col_stat = ColumnStatistics::new( - Scalar::String(String::from_utf8(vec![b'a'; STATS_STRING_PREFIX_LEN])?), - Scalar::String(String::from_utf8(vec![b'a'; STATS_STRING_PREFIX_LEN])?), - 0, - 0, - None, - ); - - let number_col_stat = ColumnStatistics::new( - Scalar::Number(NumberScalar::Int32(0)), - Scalar::Number(NumberScalar::Int32(0)), - 0, - 0, - None, - ); - - // 20 string columns, 5 number columns +) -> databend_common_exception::Result<(SegmentInfo, TableSchema)> { + let mut rng = rand::thread_rng(); let num_string_columns = 20; let num_number_columns = 5; - let col_metas = (0..num_string_columns + num_number_columns) - .map(|id| (id as ColumnId, col_meta.clone())) - .collect::>(); - - assert_eq!(num_number_columns + num_string_columns, col_metas.len()); - - let mut col_stats = (0..num_string_columns) - .map(|id| (id as ColumnId, col_stat.clone())) - .collect::>(); - for idx in num_string_columns..num_string_columns + num_number_columns { - col_stats.insert(idx as ColumnId, number_col_stat.clone()); + let location_gen = TableMetaLocationGenerator::with_prefix("/root/12345/67890".to_owned()); + let mut block_metas = Vec::with_capacity(num_blocks_per_seg); + for _ in 0..num_blocks_per_seg { + let (block_location, block_uuid) = location_gen.gen_block_location(); + let mut col_stats = HashMap::new(); + let mut col_metas = HashMap::new(); + for id in 0..num_string_columns + num_number_columns { + col_metas.insert( + id as ColumnId, + ColumnMeta::Parquet(SingleColumnMeta { + offset: rng.gen_range(0..150 * 1024 * 1024), + len: rng.gen_range(10 * 1024..10 * 1024 * 1024), + num_values: rng.gen_range(100_000..1_000_000), + }), + ); + } + for id in 0..num_string_columns { + col_stats.insert( + id as ColumnId, + ColumnStatistics::new( + Scalar::String( + (0..STATS_STRING_PREFIX_LEN) + .map(|_| rng.gen_range(b'a'..=b'z') as char) + .collect(), + ), + Scalar::String( + (0..STATS_STRING_PREFIX_LEN) + .map(|_| rng.gen_range(b'a'..=b'z') as char) + .collect(), + ), + rng.gen_range(100_000..1_000_000), + rng.gen_range(100_000..1_000_000), + Some(rng.gen_range(10_000..100_000)), + ), + ); + } + for id in num_string_columns..num_string_columns + num_number_columns { + col_stats.insert( + id as ColumnId, + ColumnStatistics::new( + Scalar::Number(NumberScalar::Int32(rng.gen_range(-100_000..100_000))), + Scalar::Number(NumberScalar::Int32(rng.gen_range(-100_000..100_000))), + rng.gen_range(100_000..1_000_000), + rng.gen_range(100_000..1_000_000), + Some(rng.gen_range(10_000..100_000)), + ), + ); + } + assert_eq!(col_metas.len(), num_string_columns + num_number_columns); + assert_eq!(col_stats.len(), num_string_columns + num_number_columns); + let block_meta = BlockMeta { + row_count: rng.gen_range(100_000..1_000_000), + block_size: rng.gen_range(50 * 1024 * 1024..150 * 1024 * 1024), + file_size: rng.gen_range(10 * 1024 * 1024..50 * 1024 * 1024), + col_stats, + col_metas, + cluster_stats: None, + location: block_location, + bloom_filter_index_location: Some(location_gen.block_bloom_index_location(&block_uuid)), + bloom_filter_index_size: rng.gen_range(1024 * 1024..5 * 1024 * 1024), + inverted_index_size: None, + compression: Compression::Lz4, + create_on: Some(Utc::now()), + }; + block_metas.push(Arc::new(block_meta)); } - assert_eq!(num_number_columns + num_string_columns, col_stats.len()); - let location_gen = TableMetaLocationGenerator::with_prefix("/root/12345/67890".to_owned()); + let mut fields = vec![]; + for id in 0..num_string_columns { + fields.push(TableField::new( + &format!("col_{}", id), + TableDataType::String, + )); + } + for id in num_string_columns..num_string_columns + num_number_columns { + fields.push(TableField::new( + &format!("col_{}", id), + TableDataType::Number(NumberDataType::Int32), + )); + } + let table_schema = TableSchema::new(fields); - let (block_location, block_uuid) = location_gen.gen_block_location(); - let block_meta = BlockMeta { - row_count: 0, - block_size: 0, - file_size: 0, - col_stats: col_stats.clone(), - col_metas, - cluster_stats: None, - location: block_location, - bloom_filter_index_location: Some(location_gen.block_bloom_index_location(&block_uuid)), - bloom_filter_index_size: 0, - inverted_index_size: None, - compression: Compression::Lz4, - create_on: Some(Utc::now()), - }; - - let block_metas = (0..num_blocks_per_seg) - .map(|_| Arc::new(block_meta.clone())) - .collect::>(); + let statistics = Statistics::default(); - let statistics = Statistics { - row_count: 0, - block_count: 0, - perfect_block_count: 0, - uncompressed_byte_size: 0, - compressed_byte_size: 0, - index_size: 0, - col_stats: col_stats.clone(), - cluster_stats: None, - }; - - Ok(SegmentInfo::new(block_metas, statistics)) + Ok((SegmentInfo::new(block_metas, statistics), table_schema)) } #[allow(dead_code)] diff --git a/src/query/service/tests/it/storages/fuse/operations/read_plan.rs b/src/query/service/tests/it/storages/fuse/operations/read_plan.rs index ae769f50dc25..71b02a819f4f 100644 --- a/src/query/service/tests/it/storages/fuse/operations/read_plan.rs +++ b/src/query/service/tests/it/storages/fuse/operations/read_plan.rs @@ -105,7 +105,7 @@ fn test_to_partitions() -> Result<()> { )); let blocks_metas = (0..num_of_block) - .map(|_| (None, block_meta.clone())) + .map(|_| (None, block_meta.clone(), None)) .collect::>(); let column_nodes = (0..num_of_col).map(col_nodes_gen).collect::>(); diff --git a/src/query/service/tests/it/storages/fuse/pruning.rs b/src/query/service/tests/it/storages/fuse/pruning.rs index 0baddad87ded..927dcecf6df1 100644 --- a/src/query/service/tests/it/storages/fuse/pruning.rs +++ b/src/query/service/tests/it/storages/fuse/pruning.rs @@ -66,7 +66,7 @@ async fn apply_block_pruning( FusePruner::create(&ctx, op, schema, push_down, bloom_index_cols, None)? .read_pruning(segment_locs) .await - .map(|v| v.into_iter().map(|(_, v)| v).collect()) + .map(|v| v.into_iter().map(|(_, v, _)| v).collect()) } #[tokio::test(flavor = "multi_thread")] diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 50f9aeb27e5b..cef42fda470a 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -399,6 +399,12 @@ impl DefaultSettings { mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=1)), }), + ("enable_columnar_segment_info", DefaultSettingValue { + value: UserSettingValue::UInt64(1), + desc: "Enables columnar segment info.", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(0..=1)), + }), ("enable_query_result_cache", DefaultSettingValue { value: UserSettingValue::UInt64(0), desc: "Enables caching query results to improve performance for identical queries.", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 55c2e46cd7e5..48ac59a15d29 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -764,4 +764,8 @@ impl Settings { pub fn set_short_sql_max_length(&self, val: u64) -> Result<()> { self.try_set_u64("short_sql_max_length", val) } + + pub fn get_enable_columnar_segment_info(&self) -> Result { + Ok(self.try_get_u64("enable_columnar_segment_info")? == 1) + } } diff --git a/src/query/storages/common/cache/src/caches.rs b/src/query/storages/common/cache/src/caches.rs index 8d36618e42d2..d06b0e45a1c0 100644 --- a/src/query/storages/common/cache/src/caches.rs +++ b/src/query/storages/common/cache/src/caches.rs @@ -23,6 +23,7 @@ use databend_storages_common_index::BloomIndexMeta; use databend_storages_common_index::InvertedIndexFile; use databend_storages_common_index::InvertedIndexMeta; use databend_storages_common_table_meta::meta::BlockMeta; +use databend_storages_common_table_meta::meta::ColumnarSegmentInfo; use databend_storages_common_table_meta::meta::CompactSegmentInfo; use databend_storages_common_table_meta::meta::SegmentInfo; use databend_storages_common_table_meta::meta::TableSnapshot; @@ -34,6 +35,7 @@ use crate::InMemoryLruCache; /// In memory object cache of SegmentInfo pub type CompactSegmentInfoCache = InMemoryLruCache; +pub type ColumnarSegmentInfoCache = InMemoryLruCache; pub type BlockMetaCache = InMemoryLruCache>>; @@ -171,6 +173,24 @@ impl From for CacheValue { } } +impl From for CacheValue { + fn from(value: SegmentInfo) -> Self { + CacheValue { + mem_bytes: 0, // TODO + inner: Arc::new(value), + } + } +} + +impl From for CacheValue { + fn from(value: ColumnarSegmentInfo) -> Self { + CacheValue { + mem_bytes: 0, // TODO + inner: Arc::new(value), + } + } +} + impl From>> for CacheValue>> { fn from(value: Vec>) -> Self { CacheValue { diff --git a/src/query/storages/common/cache/src/manager.rs b/src/query/storages/common/cache/src/manager.rs index 6553f64f30fd..4da4861152f0 100644 --- a/src/query/storages/common/cache/src/manager.rs +++ b/src/query/storages/common/cache/src/manager.rs @@ -27,6 +27,7 @@ use crate::caches::BloomIndexFilterCache; use crate::caches::BloomIndexMetaCache; use crate::caches::CacheValue; use crate::caches::ColumnArrayCache; +use crate::caches::ColumnarSegmentInfoCache; use crate::caches::CompactSegmentInfoCache; use crate::caches::FileMetaDataCache; use crate::caches::InvertedIndexFileCache; @@ -45,6 +46,7 @@ pub struct CacheManager { table_snapshot_cache: Option, table_statistic_cache: Option, compact_segment_info_cache: Option, + columnar_segment_info_cache: Option, bloom_index_filter_cache: Option, bloom_index_meta_cache: Option, inverted_index_meta_cache: Option, @@ -126,6 +128,7 @@ impl CacheManager { table_statistic_cache: None, table_data_cache, in_memory_table_data_cache, + columnar_segment_info_cache: None, block_meta_cache: None, })); } else { @@ -141,6 +144,10 @@ impl CacheManager { MEMORY_CACHE_COMPACT_SEGMENT_INFO, config.table_meta_segment_bytes as usize, ); + let columnar_segment_info_cache = Self::new_named_bytes_cache( + MEMORY_CACHE_COLUMNAR_SEGMENT_INFO, + config.table_meta_segment_bytes as usize, + ); let bloom_index_filter_cache = Self::new_named_bytes_cache( MEMORY_CACHE_BLOOM_INDEX_FILTER, config.table_bloom_index_filter_size as usize, @@ -194,6 +201,7 @@ impl CacheManager { table_data_cache, in_memory_table_data_cache, block_meta_cache, + columnar_segment_info_cache, })); } @@ -220,6 +228,10 @@ impl CacheManager { self.compact_segment_info_cache.clone() } + pub fn get_columnar_segment_cache(&self) -> Option { + self.columnar_segment_info_cache.clone() + } + pub fn get_bloom_index_filter_cache(&self) -> Option { self.bloom_index_filter_cache.clone() } @@ -307,6 +319,7 @@ const MEMORY_CACHE_INVERTED_INDEX_FILE_META_DATA: &str = const MEMORY_CACHE_BLOOM_INDEX_FILE_META_DATA: &str = "memory_cache_bloom_index_file_meta_data"; const MEMORY_CACHE_BLOOM_INDEX_FILTER: &str = "memory_cache_bloom_index_filter"; const MEMORY_CACHE_COMPACT_SEGMENT_INFO: &str = "memory_cache_compact_segment_info"; +const MEMORY_CACHE_COLUMNAR_SEGMENT_INFO: &str = "memory_cache_columnar_segment_info"; const MEMORY_CACHE_TABLE_STATISTICS: &str = "memory_cache_table_statistics"; const MEMORY_CACHE_TABLE_SNAPSHOT: &str = "memory_cache_table_snapshot"; const MEMORY_CACHE_BLOCK_META: &str = "memory_cache_block_meta"; diff --git a/src/query/storages/common/index/src/range_index.rs b/src/query/storages/common/index/src/range_index.rs index 3a7e4e10b2e0..3393bcd68285 100644 --- a/src/query/storages/common/index/src/range_index.rs +++ b/src/query/storages/common/index/src/range_index.rs @@ -46,9 +46,9 @@ use crate::Index; #[derive(Clone)] pub struct RangeIndex { - expr: Expr, - func_ctx: FunctionContext, - schema: TableSchemaRef, + pub expr: Expr, + pub func_ctx: FunctionContext, + pub schema: TableSchemaRef, // Default stats for each column if no stats are available (e.g. for new-add columns) default_stats: StatisticsOfColumns, diff --git a/src/query/storages/common/pruner/src/lib.rs b/src/query/storages/common/pruner/src/lib.rs index 04bc0341fc43..03adead72928 100644 --- a/src/query/storages/common/pruner/src/lib.rs +++ b/src/query/storages/common/pruner/src/lib.rs @@ -30,4 +30,5 @@ pub use page_pruner::PagePruner; pub use page_pruner::PagePrunerCreator; pub use range_pruner::RangePruner; pub use range_pruner::RangePrunerCreator; +pub use topn_pruner::PruneResult; pub use topn_pruner::TopNPrunner; diff --git a/src/query/storages/common/pruner/src/topn_pruner.rs b/src/query/storages/common/pruner/src/topn_pruner.rs index 3ed19225be9e..f1bf9e906675 100644 --- a/src/query/storages/common/pruner/src/topn_pruner.rs +++ b/src/query/storages/common/pruner/src/topn_pruner.rs @@ -12,17 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_expression::ColumnId; use databend_common_expression::RemoteExpr; use databend_common_expression::TableDataType; use databend_common_expression::TableSchemaRef; use databend_storages_common_table_meta::meta::BlockMeta; +use databend_storages_common_table_meta::meta::ColumnStatistics; use crate::BlockMetaIndex; +pub type PruneResult = Vec<( + BlockMetaIndex, + Arc, + Option>, +)>; + /// TopN pruner. /// Pruning for order by x limit N. pub struct TopNPrunner { @@ -46,10 +55,7 @@ impl TopNPrunner { } impl TopNPrunner { - pub fn prune( - &self, - metas: Vec<(BlockMetaIndex, Arc)>, - ) -> Result)>> { + pub fn prune(&self, metas: PruneResult) -> Result { if self.sort.len() != 1 { return Ok(metas); } @@ -82,15 +88,21 @@ impl TopNPrunner { } let mut id_stats = metas - .iter() - .map(|(id, meta)| { - let stat = meta.col_stats.get(&sort_column_id).ok_or_else(|| { - ErrorCode::UnknownException(format!( - "Unable to get the colStats by ColumnId: {}", - sort_column_id - )) - })?; - Ok((id.clone(), stat.clone(), meta.clone())) + .into_iter() + .map(|(id, meta, col_stats)| { + let stat = col_stats + .as_ref() + .and_then(|col_stats| col_stats.get(&sort_column_id)); + let stat = match stat { + Some(stat) => stat, + None => meta.col_stats.get(&sort_column_id).ok_or_else(|| { + ErrorCode::UnknownException(format!( + "Unable to get the colStats by ColumnId: {}", + sort_column_id + )) + })?, + }; + Ok((id, stat.clone(), meta, col_stats)) }) .collect::>>()?; @@ -106,8 +118,8 @@ impl TopNPrunner { } }); Ok(id_stats - .iter() - .map(|s| (s.0.clone(), s.2.clone())) + .into_iter() + .map(|s| (s.0.clone(), s.2.clone(), s.3)) .take(self.limit) .collect()) } diff --git a/src/query/storages/common/table_meta/Cargo.toml b/src/query/storages/common/table_meta/Cargo.toml index a9f27c7db3f2..8c342215db19 100644 --- a/src/query/storages/common/table_meta/Cargo.toml +++ b/src/query/storages/common/table_meta/Cargo.toml @@ -10,6 +10,7 @@ edition = { workspace = true } dev = ["snap"] [dependencies] +arrow = { workspace = true } bincode = "1.3.3" chrono = { workspace = true } databend-common-arrow = { workspace = true } @@ -28,7 +29,6 @@ simple_hll = { version = "0.0.1", features = ["serde_borsh"] } snap = { version = "1.1.0", optional = true } typetag = { workspace = true } zstd = "0.12.3" - [dev-dependencies] pot = "2.0.0" diff --git a/src/query/storages/common/table_meta/src/meta/compression.rs b/src/query/storages/common/table_meta/src/meta/compression.rs index 3266baa38ed7..dc4cfe5b1e35 100644 --- a/src/query/storages/common/table_meta/src/meta/compression.rs +++ b/src/query/storages/common/table_meta/src/meta/compression.rs @@ -13,6 +13,7 @@ // limitations under the License. #[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq, Copy, Clone, Debug)] +#[repr(u8)] pub enum Compression { // Lz4 will be deprecated. Lz4, diff --git a/src/query/storages/common/table_meta/src/meta/current/mod.rs b/src/query/storages/common/table_meta/src/meta/current/mod.rs index 22d2a1441535..0521ce6e44d2 100644 --- a/src/query/storages/common/table_meta/src/meta/current/mod.rs +++ b/src/query/storages/common/table_meta/src/meta/current/mod.rs @@ -20,6 +20,8 @@ pub use v2::ColumnStatistics; pub use v2::MetaHLL; pub use v2::Statistics; pub use v3::TableSnapshotStatistics; +pub use v4::ColumnarBlockMeta; +pub use v4::ColumnarSegmentInfo; pub use v4::CompactSegmentInfo; pub use v4::SegmentInfo; pub use v4::TableSnapshot; diff --git a/src/query/storages/common/table_meta/src/meta/v4/columnar_segment.rs b/src/query/storages/common/table_meta/src/meta/v4/columnar_segment.rs new file mode 100644 index 000000000000..063c1c7169ad --- /dev/null +++ b/src/query/storages/common/table_meta/src/meta/v4/columnar_segment.rs @@ -0,0 +1,221 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use databend_common_arrow::arrow::bitmap::Bitmap; +use databend_common_arrow::arrow::bitmap::MutableBitmap; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::types::DataType; +use databend_common_expression::types::NullableColumn; +use databend_common_expression::types::NumberDataType; +use databend_common_expression::types::UInt64Type; +use databend_common_expression::Column; +use databend_common_expression::ColumnBuilder; +use databend_common_expression::DataBlock; +use databend_common_expression::FromData; +use databend_common_expression::TableDataType; +use databend_common_expression::TableField; +use databend_common_expression::TableSchema; +use databend_common_expression::TableSchemaRef; + +use crate::meta::v2::BlockMeta; +use crate::meta::ColumnStatistics; +use crate::meta::FormatVersion; +use crate::meta::SegmentInfo; +use crate::meta::Statistics; + +/// BlockMeta.col_stats is empty, and ColumnStatistics is stored in columnar_block_metas. +/// +/// It is a minimal implementation for now, some other fields of block_metas(col_metas, cluster_stats) will be stored in columnar_block_metas later. +#[derive(Clone)] +pub struct ColumnarSegmentInfo { + pub format_version: FormatVersion, + pub summary: Statistics, + pub block_metas: Vec>, + pub columnar_block_metas: ColumnarBlockMeta, +} + +#[derive(Clone)] +pub struct ColumnarBlockMeta { + pub data: DataBlock, + pub schema: TableSchemaRef, +} + +const FIELDS_OF_COL_STATS: &[&str] = &[ + "min", + "max", + "null_count", + "in_memory_size", + "distinct_of_values", +]; + +impl ColumnarSegmentInfo { + pub fn try_from_segment_info_and_schema( + value: SegmentInfo, + schema: &TableSchema, + ) -> std::result::Result { + let block_metas = Self::build_lite_block_metas(&value.blocks)?; + let columnar_block_metas = Self::block_metas_to_columnar(&value.blocks, schema)?; + Ok(Self { + format_version: value.format_version, + summary: value.summary, + block_metas, + columnar_block_metas, + }) + } + + fn build_lite_block_metas(blocks: &[Arc]) -> Result>> { + let mut block_metas = Vec::with_capacity(blocks.len()); + for block in blocks { + let new_block_meta = BlockMeta { + col_stats: HashMap::new(), + ..block.as_ref().clone() + }; + block_metas.push(Arc::new(new_block_meta)); + } + Ok(block_metas) + } + + fn block_metas_to_columnar( + blocks: &[Arc], + table_schema: &TableSchema, + ) -> Result { + let mut fields = Vec::with_capacity(table_schema.fields.len()); + let mut columns: Vec = Vec::with_capacity(table_schema.fields.len()); + + for table_field in table_schema.fields.iter() { + if !range_index_supported_type(&table_field.data_type) { + continue; + } + + fields.push(TableField::new( + &table_field.column_id.to_string(), + TableDataType::Nullable(Box::new(TableDataType::Tuple { + fields_name: FIELDS_OF_COL_STATS.iter().map(|s| s.to_string()).collect(), + fields_type: Self::types_of_col_stats(&table_field.data_type), + })), + )); + + let data_type = DataType::from(table_field.data_type()); + let mut nulls = MutableBitmap::with_capacity(blocks.len()); + let mut mins = ColumnBuilder::with_capacity(&data_type, blocks.len()); + let mut maxs = ColumnBuilder::with_capacity(&data_type, blocks.len()); + let mut null_counts = Vec::with_capacity(blocks.len()); + let mut in_memory_sizes = Vec::with_capacity(blocks.len()); + let mut distinct_of_values = Vec::with_capacity(blocks.len()); + for block in blocks { + match block.col_stats.get(&table_field.column_id) { + Some(col_stats) => { + nulls.push(true); + mins.push(col_stats.min.as_ref()); + maxs.push(col_stats.max.as_ref()); + null_counts.push(col_stats.null_count); + in_memory_sizes.push(col_stats.in_memory_size); + distinct_of_values.push(col_stats.distinct_of_values); + } + None => { + nulls.push(false); + mins.push_default(); + maxs.push_default(); + null_counts.push(Default::default()); + in_memory_sizes.push(Default::default()); + distinct_of_values.push(Default::default()); + } + } + } + + columns.push(Column::Nullable(Box::new(NullableColumn::new( + Column::Tuple(vec![ + mins.build(), + maxs.build(), + UInt64Type::from_data(null_counts), + UInt64Type::from_data(in_memory_sizes), + UInt64Type::from_opt_data(distinct_of_values), + ]), + Bitmap::from(nulls), + )))); + } + + let schema = Arc::new(TableSchema::new(fields)); + let data = DataBlock::new_from_columns(columns); + Ok(ColumnarBlockMeta { data, schema }) + } + + fn types_of_col_stats(data_type: &TableDataType) -> Vec { + vec![ + data_type.clone(), + data_type.clone(), + TableDataType::Number(NumberDataType::UInt64), + TableDataType::Number(NumberDataType::UInt64), + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))), + ] + } + + pub fn to_lite_segment_info(&self) -> SegmentInfo { + SegmentInfo { + format_version: self.format_version, + summary: self.summary.clone(), + blocks: self.block_metas.clone(), + } + } + + pub fn col_stats( + columnar_block_metas: ColumnarBlockMeta, + block_id: usize, + ) -> Result> { + let mut col_stats = HashMap::with_capacity(columnar_block_metas.data.num_columns()); + for (block_entry, table_field) in columnar_block_metas + .data + .columns() + .iter() + .zip(columnar_block_metas.schema.fields()) + { + let col_id = table_field.name.parse::().unwrap(); + let column = block_entry.to_column(columnar_block_metas.data.num_rows()); + let nullable_column = column.into_nullable().unwrap(); + match nullable_column.index(block_id) { + Some(Some(scalar)) => { + let tuple = scalar.as_tuple().unwrap(); + col_stats.insert(col_id, ColumnStatistics { + min: tuple[0].to_owned(), + max: tuple[1].to_owned(), + null_count: *tuple[2].as_number().unwrap().as_u_int64().unwrap(), + in_memory_size: *tuple[3].as_number().unwrap().as_u_int64().unwrap(), + distinct_of_values: tuple[4].as_number().map(|n| *n.as_u_int64().unwrap()), + }); + } + Some(None) => { + continue; + } + None => unreachable!(), + } + } + Ok(col_stats) + } +} + +fn range_index_supported_type(data_type: &TableDataType) -> bool { + let inner_type = data_type.remove_nullable(); + matches!( + inner_type, + TableDataType::Number(_) + | TableDataType::Date + | TableDataType::Timestamp + | TableDataType::String + | TableDataType::Decimal(_) + ) +} diff --git a/src/query/storages/common/table_meta/src/meta/v4/mod.rs b/src/query/storages/common/table_meta/src/meta/v4/mod.rs index ca6e09db166d..2188ab39dc29 100644 --- a/src/query/storages/common/table_meta/src/meta/v4/mod.rs +++ b/src/query/storages/common/table_meta/src/meta/v4/mod.rs @@ -12,9 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod columnar_segment; mod segment; mod snapshot; +pub use columnar_segment::ColumnarBlockMeta; +pub use columnar_segment::ColumnarSegmentInfo; pub use segment::CompactSegmentInfo; pub use segment::SegmentInfo; pub use snapshot::TableSnapshot; diff --git a/src/query/storages/common/table_meta/src/meta/v4/segment.rs b/src/query/storages/common/table_meta/src/meta/v4/segment.rs index 81a8a524aa44..19e09792ce2e 100644 --- a/src/query/storages/common/table_meta/src/meta/v4/segment.rs +++ b/src/query/storages/common/table_meta/src/meta/v4/segment.rs @@ -38,7 +38,7 @@ use crate::meta::Versioned; /// A segment comprises one or more blocks /// The structure of the segment is the same as that of v2, but the serialization and deserialization methods are different -#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] pub struct SegmentInfo { /// format version of SegmentInfo table meta data /// diff --git a/src/query/storages/fuse/src/io/mod.rs b/src/query/storages/fuse/src/io/mod.rs index 53b2809a3c01..9737e7969b5f 100644 --- a/src/query/storages/fuse/src/io/mod.rs +++ b/src/query/storages/fuse/src/io/mod.rs @@ -22,6 +22,7 @@ pub use locations::TableMetaLocationGenerator; pub use read::AggIndexReader; pub use read::BlockReader; pub use read::BloomBlockFilterReader; +pub use read::ColumnarSegmentInfoReader; pub use read::CompactSegmentInfoReader; pub use read::InvertedIndexReader; pub use read::MergeIOReadResult; diff --git a/src/query/storages/fuse/src/io/read/meta/meta_readers.rs b/src/query/storages/fuse/src/io/read/meta/meta_readers.rs index ddb2dabbcf56..57c83b9c4e90 100644 --- a/src/query/storages/fuse/src/io/read/meta/meta_readers.rs +++ b/src/query/storages/fuse/src/io/read/meta/meta_readers.rs @@ -26,6 +26,7 @@ use databend_storages_common_cache::LoadParams; use databend_storages_common_cache::Loader; use databend_storages_common_index::BloomIndexMeta; use databend_storages_common_index::InvertedIndexMeta; +use databend_storages_common_table_meta::meta::ColumnarSegmentInfo; use databend_storages_common_table_meta::meta::CompactSegmentInfo; use databend_storages_common_table_meta::meta::SegmentInfoVersion; use databend_storages_common_table_meta::meta::SingleColumnMeta; @@ -49,6 +50,8 @@ pub type BloomIndexMetaReader = InMemoryItemCacheReader>; pub type CompactSegmentInfoReader = InMemoryItemCacheReader>; +pub type ColumnarSegmentInfoReader = + InMemoryItemCacheReader>; pub type InvertedIndexMetaReader = InMemoryItemCacheReader>; @@ -62,6 +65,16 @@ impl MetaReaders { ) } + pub fn columnar_segment_info_reader( + dal: Operator, + schema: TableSchemaRef, + ) -> ColumnarSegmentInfoReader { + ColumnarSegmentInfoReader::new( + CacheManager::instance().get_columnar_segment_cache(), + LoaderWrapper((dal, schema)), + ) + } + pub fn segment_info_reader_without_cache( dal: Operator, schema: TableSchemaRef, @@ -137,6 +150,19 @@ impl Loader for LoaderWrapper<(Operator, TableSchemaRef)> { } } +#[async_trait::async_trait] +impl Loader for LoaderWrapper<(Operator, TableSchemaRef)> { + #[async_backtrace::framed] + async fn load(&self, params: &LoadParams) -> Result { + let compact_segment_info: CompactSegmentInfo = self.load(params).await?; + let schema = &self.0.1; + let segment_info = databend_storages_common_table_meta::meta::SegmentInfo::try_from( + &compact_segment_info, + )?; + ColumnarSegmentInfo::try_from_segment_info_and_schema(segment_info, schema) + } +} + #[async_trait::async_trait] impl Loader for LoaderWrapper { #[async_backtrace::framed] diff --git a/src/query/storages/fuse/src/io/read/meta/mod.rs b/src/query/storages/fuse/src/io/read/meta/mod.rs index be275c173b5c..291ba0e043ed 100644 --- a/src/query/storages/fuse/src/io/read/meta/mod.rs +++ b/src/query/storages/fuse/src/io/read/meta/mod.rs @@ -14,6 +14,7 @@ mod meta_readers; +pub use meta_readers::ColumnarSegmentInfoReader; pub use meta_readers::CompactSegmentInfoReader; pub use meta_readers::MetaReaders; pub use meta_readers::TableSnapshotReader; diff --git a/src/query/storages/fuse/src/io/read/mod.rs b/src/query/storages/fuse/src/io/read/mod.rs index 288220c73cc3..7c8515b61d9b 100644 --- a/src/query/storages/fuse/src/io/read/mod.rs +++ b/src/query/storages/fuse/src/io/read/mod.rs @@ -29,6 +29,7 @@ pub use block::NativeReaderExt; pub use block::NativeSourceData; pub use bloom::BloomBlockFilterReader; pub use inverted_index::InvertedIndexReader; +pub use meta::ColumnarSegmentInfoReader; pub use meta::CompactSegmentInfoReader; pub use meta::MetaReaders; pub use meta::TableSnapshotReader; diff --git a/src/query/storages/fuse/src/operations/changes.rs b/src/query/storages/fuse/src/operations/changes.rs index 2c2b17d3c39a..e41f85ab91f3 100644 --- a/src/query/storages/fuse/src/operations/changes.rs +++ b/src/query/storages/fuse/src/operations/changes.rs @@ -309,7 +309,9 @@ impl FuseTable { let block_metas = block_metas .into_iter() - .map(|(block_meta_index, block_meta)| (Some(block_meta_index), block_meta)) + .map(|(block_meta_index, block_meta, col_stats)| { + (Some(block_meta_index), block_meta, col_stats) + }) .collect::>(); let (stats, parts) = self.read_partitions_with_metas( diff --git a/src/query/storages/fuse/src/operations/mutation/meta/compact_part.rs b/src/query/storages/fuse/src/operations/mutation/meta/compact_part.rs index c69b144252dc..145cdc45a9e7 100644 --- a/src/query/storages/fuse/src/operations/mutation/meta/compact_part.rs +++ b/src/query/storages/fuse/src/operations/mutation/meta/compact_part.rs @@ -25,17 +25,41 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::CompactSegmentInfo; +use databend_storages_common_table_meta::meta::SegmentInfo; use databend_storages_common_table_meta::meta::Statistics; use crate::operations::common::BlockMetaIndex; use crate::operations::mutation::BlockIndex; use crate::operations::mutation::SegmentIndex; +use crate::pruning::SegmentInfoVariant; /// Compact segment part information. #[derive(serde::Serialize, serde::Deserialize, PartialEq, Clone)] pub struct CompactLazyPartInfo { pub segment_indices: Vec, - pub compact_segments: Vec>, + pub compact_segments: Vec, +} + +#[derive(serde::Serialize, serde::Deserialize, PartialEq, Clone)] +pub enum SegmentInfoWithoutColumnarBlockMeta { + Compact(Arc), + Columnar(Arc), +} + +impl SegmentInfoWithoutColumnarBlockMeta { + pub fn block_metas(&self) -> Result>> { + match self { + SegmentInfoWithoutColumnarBlockMeta::Compact(segment) => segment.block_metas(), + SegmentInfoWithoutColumnarBlockMeta::Columnar(segment) => Ok(segment.blocks.clone()), + } + } + + pub fn summary(&self) -> Statistics { + match self { + SegmentInfoWithoutColumnarBlockMeta::Compact(segment) => segment.summary.clone(), + SegmentInfoWithoutColumnarBlockMeta::Columnar(segment) => segment.summary.clone(), + } + } } #[typetag::serde(name = "compact_lazy")] @@ -64,11 +88,26 @@ impl PartInfo for CompactLazyPartInfo { impl CompactLazyPartInfo { pub fn create( segment_indices: Vec, - compact_segments: Vec>, + compact_segments: Vec, ) -> PartInfoPtr { + let segments = compact_segments + .into_iter() + .map(|segment| match segment { + SegmentInfoVariant::Compact(segment) => { + SegmentInfoWithoutColumnarBlockMeta::Compact(segment) + } + SegmentInfoVariant::Columnar(segment) => { + // columnar_block_metas is not needed for compact task since pruning has done + // so it is dropped here. + SegmentInfoWithoutColumnarBlockMeta::Columnar(Arc::new( + segment.to_lite_segment_info(), + )) + } + }) + .collect(); Arc::new(Box::new(CompactLazyPartInfo { segment_indices, - compact_segments, + compact_segments: segments, })) } } diff --git a/src/query/storages/fuse/src/operations/mutation/meta/mod.rs b/src/query/storages/fuse/src/operations/mutation/meta/mod.rs index 0a14e6b7ad28..c67db01e9e53 100644 --- a/src/query/storages/fuse/src/operations/mutation/meta/mod.rs +++ b/src/query/storages/fuse/src/operations/mutation/meta/mod.rs @@ -20,6 +20,7 @@ pub use compact_part::CompactBlockPartInfo; pub use compact_part::CompactExtraInfo; pub use compact_part::CompactLazyPartInfo; pub use compact_part::CompactTaskInfo; +pub use compact_part::SegmentInfoWithoutColumnarBlockMeta; pub use mutation_meta::ClusterStatsGenType; pub use mutation_meta::CompactSourceMeta; pub use mutation_meta::SerializeBlock; diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs index df6bcd5e8633..d9617f6f42b4 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs @@ -46,6 +46,8 @@ use crate::operations::mutation::CompactLazyPartInfo; use crate::operations::mutation::CompactTaskInfo; use crate::operations::mutation::SegmentIndex; use crate::operations::CompactOptions; +use crate::operations::SegmentInfoWithoutColumnarBlockMeta; +use crate::pruning::SegmentInfoVariant; use crate::statistics::reducers::merge_statistics_mut; use crate::statistics::sort_by_cluster_stats; use crate::TableContext; @@ -146,7 +148,8 @@ impl BlockCompactMutator { // Check the segment to be compacted. // Size of compacted segment should be in range R == [threshold, 2 * threshold) for (segment_idx, compact_segment) in segment_infos.into_iter() { - let segments_vec = checker.add(segment_idx, compact_segment); + let segment_info = SegmentInfoVariant::Compact(compact_segment); + let segments_vec = checker.add(segment_idx, segment_info); for segments in segments_vec { checker.generate_part(segments, &mut parts); } @@ -294,7 +297,7 @@ impl BlockCompactMutator { } pub struct SegmentCompactChecker { - segments: Vec<(SegmentIndex, Arc)>, + segments: Vec<(SegmentIndex, SegmentInfoVariant)>, total_block_count: u64, block_threshold: u64, cluster_key_id: Option, @@ -315,13 +318,13 @@ impl SegmentCompactChecker { } } - fn check_for_compact(&mut self, segments: &[(SegmentIndex, Arc)]) -> bool { + fn check_for_compact(&mut self, segments: &[(SegmentIndex, SegmentInfoVariant)]) -> bool { if segments.is_empty() { return false; } if segments.len() == 1 { - let summary = &segments[0].1.summary; + let summary = &segments[0].1.summary(); if (summary.block_count == 1 || summary.perfect_block_count == summary.block_count) && (self.cluster_key_id.is_none() || self.cluster_key_id @@ -334,16 +337,16 @@ impl SegmentCompactChecker { self.compacted_segment_cnt += segments.len(); self.compacted_block_cnt += segments .iter() - .fold(0, |acc, x| acc + x.1.summary.block_count); + .fold(0, |acc, x| acc + x.1.summary().block_count); true } pub fn add( &mut self, idx: SegmentIndex, - segment: Arc, - ) -> Vec)>> { - self.total_block_count += segment.summary.block_count; + segment: SegmentInfoVariant, + ) -> Vec> { + self.total_block_count += segment.summary().block_count; if self.total_block_count < self.block_threshold { self.segments.push((idx, segment)); return vec![]; @@ -366,7 +369,7 @@ impl SegmentCompactChecker { pub fn generate_part( &mut self, - segments: Vec<(SegmentIndex, Arc)>, + segments: Vec<(SegmentIndex, SegmentInfoVariant)>, parts: &mut Vec, ) { if !segments.is_empty() && self.check_for_compact(&segments) { @@ -392,7 +395,7 @@ impl SegmentCompactChecker { let residual_block_cnt = self .segments .iter() - .fold(0, |acc, e| acc + e.1.summary.block_count); + .fold(0, |acc, e| acc + e.1.summary().block_count); self.compacted_segment_cnt + residual_segment_cnt >= num_segment_limit || self.compacted_block_cnt + residual_block_cnt >= num_block_limit as u64 } @@ -508,7 +511,7 @@ impl CompactTaskBuilder { async fn build_tasks( &mut self, segment_indices: Vec, - compact_segments: Vec>, + compact_segments: Vec, semaphore: Arc, ) -> Result> { let mut block_idx = 0; @@ -524,7 +527,7 @@ impl CompactTaskBuilder { let handler = runtime.spawn(async move { let blocks = segment.block_metas()?; drop(permit); - Ok::<_, ErrorCode>((blocks, segment.summary.clone())) + Ok::<_, ErrorCode>((blocks, segment.summary().clone())) }); handlers.push(handler); } diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs index a5c7099bbce8..2de146f3fdef 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs @@ -36,7 +36,6 @@ use databend_common_expression::TableSchemaRef; use databend_common_storage::ColumnNodes; use databend_storages_common_pruner::BlockMetaIndex; use databend_storages_common_table_meta::meta::BlockMeta; -use databend_storages_common_table_meta::meta::CompactSegmentInfo; use databend_storages_common_table_meta::meta::Statistics; use databend_storages_common_table_meta::meta::TableSnapshot; use fastrace::func_path; @@ -49,6 +48,7 @@ use log::warn; use crate::operations::mutation::SegmentCompactChecker; use crate::operations::BlockCompactMutator; use crate::operations::CompactLazyPartInfo; +use crate::pruning::SegmentInfoVariant; use crate::statistics::reducers::merge_statistics_mut; use crate::statistics::sort_by_cluster_stats; use crate::FuseTable; @@ -148,7 +148,7 @@ impl ReclusterMutator { #[async_backtrace::framed] pub async fn target_select( &self, - compact_segments: Vec<(SegmentLocation, Arc)>, + compact_segments: Vec<(SegmentLocation, SegmentInfoVariant)>, mode: ReclusterMode, ) -> Result<(u64, ReclusterParts)> { match mode { @@ -160,14 +160,14 @@ impl ReclusterMutator { #[async_backtrace::framed] pub async fn generate_recluster_tasks( &self, - compact_segments: Vec<(SegmentLocation, Arc)>, + compact_segments: Vec<(SegmentLocation, SegmentInfoVariant)>, ) -> Result<(u64, ReclusterParts)> { // Sort segments by cluster statistics let mut compact_segments = compact_segments; compact_segments.sort_by(|a, b| { sort_by_cluster_stats( - &a.1.summary.cluster_stats, - &b.1.summary.cluster_stats, + &a.1.summary().cluster_stats, + &b.1.summary().cluster_stats, self.cluster_key_id, ) }); @@ -178,7 +178,7 @@ impl ReclusterMutator { let selected_segments = compact_segments .into_iter() .map(|(loc, info)| { - selected_statistics.push(info.summary.clone()); + selected_statistics.push(info.summary().clone()); selected_segs_idx.push(loc.segment_idx); info }) @@ -372,7 +372,7 @@ impl ReclusterMutator { async fn generate_compact_tasks( &self, - compact_segments: Vec<(SegmentLocation, Arc)>, + compact_segments: Vec<(SegmentLocation, SegmentInfoVariant)>, ) -> Result<(u64, ReclusterParts)> { debug!("recluster: generate compact tasks"); let settings = self.ctx.get_settings(); @@ -385,7 +385,7 @@ impl ReclusterMutator { SegmentCompactChecker::new(self.block_per_seg as u64, Some(self.cluster_key_id)); for (loc, compact_segment) in compact_segments.into_iter() { - recluster_blocks_count += compact_segment.summary.block_count; + recluster_blocks_count += compact_segment.summary().block_count; let segments_vec = checker.add(loc.segment_idx, compact_segment); for segments in segments_vec { checker.generate_part(segments, &mut parts); @@ -451,8 +451,13 @@ impl ReclusterMutator { ); } + let block_metas = block_metas + .iter() + .map(|v| (v.0.clone(), v.1.clone(), None)) + .collect::>(); + let (stats, parts) = - FuseTable::to_partitions(Some(&self.schema), block_metas, column_nodes, None, None); + FuseTable::to_partitions(Some(&self.schema), &block_metas, column_nodes, None, None); ReclusterTask { parts, stats, @@ -464,7 +469,7 @@ impl ReclusterMutator { pub fn select_segments( &self, - compact_segments: &[(SegmentLocation, Arc)], + compact_segments: &[(SegmentLocation, SegmentInfoVariant)], max_len: usize, ) -> Result<(ReclusterMode, IndexSet)> { let mut blocks_num = 0; @@ -478,7 +483,7 @@ impl ReclusterMutator { let mut level = -1; // Check if the segment is clustered let is_clustered = compact_segment - .summary + .summary() .cluster_stats .as_ref() .is_some_and(|v| { @@ -497,13 +502,13 @@ impl ReclusterMutator { } // Skip if segment has more blocks than required and no reclustering is needed - if level < 0 && compact_segment.summary.block_count as usize >= self.block_per_seg { + if level < 0 && compact_segment.summary().block_count as usize >= self.block_per_seg { continue; } // Process clustered segment - if let Some(stats) = &compact_segment.summary.cluster_stats { - blocks_num += compact_segment.summary.block_count as usize; + if let Some(stats) = &compact_segment.summary().cluster_stats { + blocks_num += compact_segment.summary().block_count as usize; // Track small segments for special handling later if blocks_num < self.block_per_seg { small_segments.insert(i); @@ -554,7 +559,7 @@ impl ReclusterMutator { #[async_backtrace::framed] async fn gather_blocks( &self, - compact_segments: Vec>, + compact_segments: Vec, ) -> Result>> { // combine all the tasks. let mut iter = compact_segments.into_iter(); diff --git a/src/query/storages/fuse/src/operations/mutation_source.rs b/src/query/storages/fuse/src/operations/mutation_source.rs index ea6ad2213ec6..bc06d594723b 100644 --- a/src/query/storages/fuse/src/operations/mutation_source.rs +++ b/src/query/storages/fuse/src/operations/mutation_source.rs @@ -270,8 +270,9 @@ impl FuseTable { if !block_metas.is_empty() { if let Some(range_index) = pruner.get_inverse_range_index() { - for (block_meta_idx, block_meta) in &block_metas { - if !range_index.should_keep(&block_meta.as_ref().col_stats, None) { + for (block_meta_idx, block_meta, col_stats) in &block_metas { + let col_stats = col_stats.as_ref().unwrap_or(&block_meta.col_stats); + if !range_index.should_keep(col_stats, None) { // this block should be deleted completely whole_block_deletions .insert((block_meta_idx.segment_idx, block_meta_idx.block_idx)); @@ -283,7 +284,9 @@ impl FuseTable { let range_block_metas = block_metas .clone() .into_iter() - .map(|(block_meta_index, block_meta)| (Some(block_meta_index), block_meta)) + .map(|(block_meta_index, block_meta, col_stats)| { + (Some(block_meta_index), block_meta, col_stats) + }) .collect::>(); let (_, inner_parts) = self.read_partitions_with_metas( @@ -300,7 +303,7 @@ impl FuseTable { block_metas .into_iter() .zip(inner_parts.partitions.into_iter()) - .map(|((index, block_meta), inner_part)| { + .map(|((index, block_meta, _col_stats), inner_part)| { let cluster_stats = if with_origin { block_meta.cluster_stats.clone() } else { diff --git a/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs index 791c6544ceea..9d8b573f24d5 100644 --- a/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs @@ -218,6 +218,7 @@ impl NativeRowsFetcher { let part_info = FuseTable::projection_part( block_meta, &None, + &None, &column_nodes, None, &self.projection, diff --git a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs index 7a8313c417b9..1b40f2e99dc7 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs @@ -213,6 +213,7 @@ impl ParquetRowsFetcher { let part_info = FuseTable::projection_part( block_meta, &None, + &None, &column_nodes, None, &self.projection, diff --git a/src/query/storages/fuse/src/operations/read_partitions.rs b/src/query/storages/fuse/src/operations/read_partitions.rs index 5a00dd102b0a..d434598987b9 100644 --- a/src/query/storages/fuse/src/operations/read_partitions.rs +++ b/src/query/storages/fuse/src/operations/read_partitions.rs @@ -27,6 +27,7 @@ use databend_common_catalog::plan::TopK; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; +use databend_common_expression::ColumnId; use databend_common_expression::Scalar; use databend_common_expression::TableSchemaRef; use databend_common_sql::field_default_value; @@ -51,6 +52,12 @@ use crate::pruning::SegmentLocation; use crate::FuseLazyPartInfo; use crate::FuseTable; +type BlockMetas<'a> = &'a [( + Option, + Arc, + Option>, +)]; + impl FuseTable { #[fastrace::trace] #[async_backtrace::framed] @@ -211,7 +218,7 @@ impl FuseTable { )? }; let block_metas = pruner.read_pruning(segments_location).await?; - let pruning_stats = pruner.pruning_stats(); + let pruning_stats: PruningStatistics = pruner.pruning_stats(); info!( "prune snapshot block end, final block numbers:{}, cost:{:?}", @@ -221,7 +228,9 @@ impl FuseTable { let block_metas = block_metas .into_iter() - .map(|(block_meta_index, block_meta)| (Some(block_meta_index), block_meta)) + .map(|(block_meta_index, block_meta, col_stats)| { + (Some(block_meta_index), block_meta, col_stats) + }) .collect::>(); let schema = self.schema_with_stream(); @@ -247,7 +256,7 @@ impl FuseTable { ctx: Arc, schema: TableSchemaRef, push_downs: Option, - block_metas: &[(Option, Arc)], + block_metas: BlockMetas, partitions_total: usize, pruning_stats: PruningStatistics, ) -> Result<(PartStatistics, Partitions)> { @@ -282,7 +291,7 @@ impl FuseTable { pub fn to_partitions( schema: Option<&TableSchemaRef>, - block_metas: &[(Option, Arc)], + block_metas: BlockMetas, column_nodes: &ColumnNodes, top_k: Option<(TopK, Scalar)>, push_downs: Option, @@ -305,26 +314,26 @@ impl FuseTable { }; if top_k.asc { block_metas.sort_by(|a, b| { - let a = - a.1.col_stats - .get(&top_k.field.column_id) - .unwrap_or(&default_stats); - let b = - b.1.col_stats - .get(&top_k.field.column_id) - .unwrap_or(&default_stats); + let a_col_stats = a.2.as_ref().unwrap_or(&a.1.col_stats); + let b_col_stats = b.2.as_ref().unwrap_or(&b.1.col_stats); + let a = a_col_stats + .get(&top_k.field.column_id) + .unwrap_or(&default_stats); + let b = b_col_stats + .get(&top_k.field.column_id) + .unwrap_or(&default_stats); (a.min().as_ref(), a.max().as_ref()).cmp(&(b.min().as_ref(), b.max().as_ref())) }); } else { block_metas.sort_by(|a, b| { - let a = - a.1.col_stats - .get(&top_k.field.column_id) - .unwrap_or(&default_stats); - let b = - b.1.col_stats - .get(&top_k.field.column_id) - .unwrap_or(&default_stats); + let a_col_stats = a.2.as_ref().unwrap_or(&a.1.col_stats); + let b_col_stats = b.2.as_ref().unwrap_or(&b.1.col_stats); + let a = a_col_stats + .get(&top_k.field.column_id) + .unwrap_or(&default_stats); + let b = b_col_stats + .get(&top_k.field.column_id) + .unwrap_or(&default_stats); (b.max().as_ref(), b.min().as_ref()).cmp(&(a.max().as_ref(), a.min().as_ref())) }); } @@ -360,7 +369,7 @@ impl FuseTable { fn all_columns_partitions( schema: Option<&TableSchemaRef>, - block_metas: &[(Option, Arc)], + block_metas: BlockMetas, top_k: Option<(TopK, Scalar)>, limit: usize, ) -> (PartStatistics, Partitions) { @@ -372,13 +381,14 @@ impl FuseTable { } let mut remaining = limit; - for (block_meta_index, block_meta) in block_metas.iter() { + for (block_meta_index, block_meta, col_stats) in block_metas.iter() { let rows = block_meta.row_count as usize; partitions.partitions.push(Self::all_columns_part( schema, block_meta_index, &top_k, block_meta, + col_stats, )); statistics.read_rows += rows; statistics.read_bytes += block_meta.block_size as usize; @@ -398,7 +408,7 @@ impl FuseTable { } fn projection_partitions( - block_metas: &[(Option, Arc)], + block_metas: BlockMetas, column_nodes: &ColumnNodes, projection: &Projection, top_k: Option<(TopK, Scalar)>, @@ -414,9 +424,10 @@ impl FuseTable { let columns = projection.project_column_nodes(column_nodes).unwrap(); let mut remaining = limit; - for (block_meta_index, block_meta) in block_metas.iter() { + for (block_meta_index, block_meta, col_stats) in block_metas.iter() { partitions.partitions.push(Self::projection_part( block_meta, + col_stats, block_meta_index, column_nodes, top_k.clone(), @@ -454,9 +465,11 @@ impl FuseTable { block_meta_index: &Option, top_k: &Option<(TopK, Scalar)>, meta: &BlockMeta, + col_stats: &Option>, ) -> PartInfoPtr { let mut columns_meta = HashMap::with_capacity(meta.col_metas.len()); let mut columns_stats = HashMap::with_capacity(meta.col_stats.len()); + let col_stats = col_stats.as_ref().unwrap_or(&meta.col_stats); for column_id in meta.col_metas.keys() { // ignore all deleted field @@ -471,7 +484,7 @@ impl FuseTable { columns_meta.insert(*column_id, meta.clone()); } - if let Some(stat) = meta.col_stats.get(column_id) { + if let Some(stat) = col_stats.get(column_id) { columns_stats.insert(*column_id, stat.clone()); } } @@ -481,7 +494,7 @@ impl FuseTable { let create_on = meta.create_on; let sort_min_max = top_k.as_ref().map(|(top_k, default)| { - meta.col_stats + col_stats .get(&top_k.field.column_id) .map(|stat| (stat.min().clone(), stat.max().clone())) .unwrap_or((default.clone(), default.clone())) @@ -501,6 +514,7 @@ impl FuseTable { pub(crate) fn projection_part( meta: &BlockMeta, + col_stats: &Option>, block_meta_index: &Option, column_nodes: &ColumnNodes, top_k: Option<(TopK, Scalar)>, @@ -508,6 +522,7 @@ impl FuseTable { ) -> PartInfoPtr { let mut columns_meta = HashMap::with_capacity(projection.len()); let mut columns_stat = HashMap::with_capacity(projection.len()); + let col_stats = col_stats.as_ref().unwrap_or(&meta.col_stats); let columns = projection.project_column_nodes(column_nodes).unwrap(); for column in &columns { @@ -516,7 +531,7 @@ impl FuseTable { if let Some(column_meta) = meta.col_metas.get(column_id) { columns_meta.insert(*column_id, column_meta.clone()); } - if let Some(column_stat) = meta.col_stats.get(column_id) { + if let Some(column_stat) = col_stats.get(column_id) { columns_stat.insert(*column_id, column_stat.clone()); } } diff --git a/src/query/storages/fuse/src/operations/recluster.rs b/src/query/storages/fuse/src/operations/recluster.rs index 14c9cfab6e06..9d7c1e8c53fd 100644 --- a/src/query/storages/fuse/src/operations/recluster.rs +++ b/src/query/storages/fuse/src/operations/recluster.rs @@ -30,7 +30,6 @@ use databend_common_expression::TableSchemaRef; use databend_common_metrics::storage::metrics_inc_recluster_build_task_milliseconds; use databend_common_metrics::storage::metrics_inc_recluster_segment_nums_scheduled; use databend_common_sql::BloomIndexColumns; -use databend_storages_common_table_meta::meta::CompactSegmentInfo; use databend_storages_common_table_meta::meta::TableSnapshot; use log::warn; use opendal::Operator; @@ -40,6 +39,7 @@ use crate::operations::mutation::ReclusterMode; use crate::operations::ReclusterMutator; use crate::pruning::create_segment_location_vector; use crate::pruning::PruningContext; +use crate::pruning::SegmentInfoVariant; use crate::pruning::SegmentPruner; use crate::FuseTable; use crate::SegmentLocation; @@ -169,7 +169,7 @@ impl FuseTable { pub async fn generate_recluster_parts( mutator: Arc, - compact_segments: Vec<(SegmentLocation, Arc)>, + compact_segments: Vec<(SegmentLocation, SegmentInfoVariant)>, ) -> Result> { let mut selected_segs = vec![]; let mut block_count = 0; @@ -182,11 +182,11 @@ impl FuseTable { let latest = compact_segments.len() - 1; for (idx, compact_segment) in compact_segments.into_iter().enumerate() { - if !mutator.segment_can_recluster(&compact_segment.1.summary) { + if !mutator.segment_can_recluster(compact_segment.1.summary()) { continue; } - block_count += compact_segment.1.summary.block_count as usize; + block_count += compact_segment.1.summary().block_count as usize; selected_segs.push(compact_segment); if block_count >= mutator.block_per_seg || idx == latest { let selected_segs = std::mem::take(&mut selected_segs); @@ -226,7 +226,7 @@ impl FuseTable { dal: Operator, push_down: &Option, mut segment_locs: Vec, - ) -> Result)>> { + ) -> Result> { let max_concurrency = { let max_threads = ctx.get_settings().get_max_threads()? as usize; let v = std::cmp::max(max_threads, 10); @@ -281,7 +281,6 @@ impl FuseTable { let res = worker?; metas.extend(res); } - Ok(metas) } } diff --git a/src/query/storages/fuse/src/pruning/block_pruner.rs b/src/query/storages/fuse/src/pruning/block_pruner.rs index 02b059ad873d..65defc14036d 100644 --- a/src/query/storages/fuse/src/pruning/block_pruner.rs +++ b/src/query/storages/fuse/src/pruning/block_pruner.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::future::Future; use std::ops::Range; use std::pin::Pin; @@ -26,7 +27,11 @@ use databend_common_expression::types::F32; use databend_common_expression::BLOCK_NAME_COL_NAME; use databend_common_metrics::storage::*; use databend_storages_common_pruner::BlockMetaIndex; +use databend_storages_common_pruner::PruneResult; use databend_storages_common_table_meta::meta::BlockMeta; +use databend_storages_common_table_meta::meta::ColumnStatistics; +use databend_storages_common_table_meta::meta::ColumnarBlockMeta; +use databend_storages_common_table_meta::meta::ColumnarSegmentInfo; use futures_util::future; use log::info; @@ -47,7 +52,8 @@ impl BlockPruner { &self, segment_location: SegmentLocation, block_metas: Arc>>, - ) -> Result)>> { + columnar_block_metas: Option, + ) -> Result { // Apply internal column pruning. let block_meta_indexes = self.internal_column_pruning(&block_metas); @@ -56,11 +62,21 @@ impl BlockPruner { || self.pruning_ctx.inverted_index_pruner.is_some() { // async pruning with bloom index or inverted index. - self.block_pruning(segment_location, block_metas, block_meta_indexes) - .await + self.block_pruning( + segment_location, + block_metas, + block_meta_indexes, + columnar_block_metas, + ) + .await } else { // sync pruning without a bloom index and inverted index. - self.block_pruning_sync(segment_location, block_metas, block_meta_indexes) + self.block_pruning_sync( + segment_location, + block_metas, + block_meta_indexes, + columnar_block_metas, + ) } } @@ -93,7 +109,8 @@ impl BlockPruner { segment_location: SegmentLocation, block_metas: Arc>>, block_meta_indexes: Vec<(usize, Arc)>, - ) -> Result)>> { + columnar_block_metas: Option, + ) -> Result { let pruning_stats = self.pruning_ctx.pruning_stats.clone(); let pruning_runtime = &self.pruning_ctx.pruning_runtime; let pruning_semaphore = &self.pruning_ctx.pruning_semaphore; @@ -125,16 +142,20 @@ impl BlockPruner { pruning_stats.set_blocks_range_pruning_before(1); } - let mut prune_result = BlockPruneResult::new( + + let mut prune_result = BlockPruneResult { block_idx, - block_meta.location.0.clone(), - false, - None, - None, - ); + block_location: block_meta.location.0.clone(), + col_stats: columnar_block_metas.as_ref().map(|c| ColumnarSegmentInfo::col_stats(c.clone(), block_idx).unwrap()), + ..Default::default() + }; let block_meta = block_meta.clone(); let row_count = block_meta.row_count; - let should_keep = range_pruner.should_keep(&block_meta.col_stats, Some(&block_meta.col_metas)); + let col_stats = match &prune_result.col_stats { + Some(col_stats) => col_stats, + None => &block_meta.col_stats, + }; + let should_keep = range_pruner.should_keep(col_stats, Some(&block_meta.col_metas)); if should_keep { // Perf. { @@ -168,8 +189,13 @@ impl BlockPruner { pruning_stats.set_blocks_bloom_pruning_before(1); } + let col_stats = match &prune_result.col_stats { + Some(col_stats) => col_stats, + None => &block_meta.col_stats, + }; + let keep_by_bloom = bloom_pruner - .should_keep(&index_location, index_size, &block_meta.col_stats, column_ids, &block_meta) + .should_keep(&index_location, index_size, col_stats, column_ids, &block_meta) .await; let keep = keep_by_bloom && limit_pruner.within_limit(row_count); @@ -275,6 +301,7 @@ impl BlockPruner { matched_rows: prune_result.matched_rows.clone(), }, block, + prune_result.col_stats, )) } } @@ -292,7 +319,8 @@ impl BlockPruner { segment_location: SegmentLocation, block_metas: Arc>>, block_meta_indexes: Vec<(usize, Arc)>, - ) -> Result)>> { + columnar_block_metas: Option, + ) -> Result { let pruning_stats = self.pruning_ctx.pruning_stats.clone(); let limit_pruner = self.pruning_ctx.limit_pruner.clone(); let range_pruner = self.pruning_ctx.range_pruner.clone(); @@ -316,9 +344,14 @@ impl BlockPruner { break; } let row_count = block_meta.row_count; - if range_pruner.should_keep(&block_meta.col_stats, Some(&block_meta.col_metas)) - && limit_pruner.within_limit(row_count) - { + let col_stats = columnar_block_metas + .as_ref() + .map(|c| ColumnarSegmentInfo::col_stats(c.clone(), block_idx).unwrap()); + let should_keep = range_pruner.should_keep( + col_stats.as_ref().unwrap_or_else(|| &block_meta.col_stats), + Some(&block_meta.col_metas), + ); + if should_keep && limit_pruner.within_limit(row_count) { // Perf. { metrics_inc_blocks_range_pruning_after(1); @@ -342,6 +375,7 @@ impl BlockPruner { matched_rows: None, }, block_meta.clone(), + col_stats, )) } } @@ -357,34 +391,18 @@ impl BlockPruner { } // result of block pruning +#[derive(Default)] struct BlockPruneResult { // the block index in segment - block_idx: usize, + pub block_idx: usize, // the location of the block - block_location: String, + pub block_location: String, // whether keep the block after pruning - keep: bool, + pub keep: bool, // the page ranges should keeped in the block - range: Option>, + pub range: Option>, // the matched rows and scores should keeped in the block // only used by inverted index search - matched_rows: Option)>>, -} - -impl BlockPruneResult { - fn new( - block_idx: usize, - block_location: String, - keep: bool, - range: Option>, - matched_rows: Option)>>, - ) -> Self { - Self { - block_idx, - keep, - range, - block_location, - matched_rows, - } - } + pub matched_rows: Option)>>, + pub col_stats: Option>, } diff --git a/src/query/storages/fuse/src/pruning/fuse_pruner.rs b/src/query/storages/fuse/src/pruning/fuse_pruner.rs index 093cbf17ced8..11f2b20af92e 100644 --- a/src/query/storages/fuse/src/pruning/fuse_pruner.rs +++ b/src/query/storages/fuse/src/pruning/fuse_pruner.rs @@ -14,6 +14,8 @@ use std::sync::Arc; +use databend_common_arrow::arrow::bitmap::Bitmap; +use databend_common_arrow::arrow::bitmap::MutableBitmap; use databend_common_base::base::tokio::sync::Semaphore; use databend_common_base::runtime::Runtime; use databend_common_base::runtime::TrySpawn; @@ -30,19 +32,19 @@ use databend_storages_common_cache::BlockMetaCache; use databend_storages_common_cache::CacheAccessor; use databend_storages_common_cache::CacheManager; use databend_storages_common_index::RangeIndex; -use databend_storages_common_pruner::BlockMetaIndex; use databend_storages_common_pruner::InternalColumnPruner; use databend_storages_common_pruner::Limiter; use databend_storages_common_pruner::LimiterPrunerCreator; use databend_storages_common_pruner::PagePruner; use databend_storages_common_pruner::PagePrunerCreator; +use databend_storages_common_pruner::PruneResult; use databend_storages_common_pruner::RangePruner; use databend_storages_common_pruner::RangePrunerCreator; use databend_storages_common_pruner::TopNPrunner; use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::ClusterKey; use databend_storages_common_table_meta::meta::ColumnStatistics; -use databend_storages_common_table_meta::meta::CompactSegmentInfo; +use databend_storages_common_table_meta::meta::ColumnarBlockMeta; use databend_storages_common_table_meta::meta::StatisticsOfColumns; use log::info; use log::warn; @@ -51,6 +53,7 @@ use rand::distributions::Bernoulli; use rand::distributions::Distribution; use rand::thread_rng; +use super::segment_pruner::SegmentInfoVariant; use crate::io::BloomIndexBuilder; use crate::operations::DeletedSegmentInfo; use crate::pruning::segment_pruner::SegmentPruner; @@ -273,7 +276,7 @@ impl FusePruner { pub async fn read_pruning( &mut self, segment_locs: Vec, - ) -> Result)>> { + ) -> Result { self.pruning(segment_locs, false).await } @@ -281,7 +284,7 @@ impl FusePruner { pub async fn delete_pruning( &mut self, segment_locs: Vec, - ) -> Result)>> { + ) -> Result { self.pruning(segment_locs, true).await } @@ -292,7 +295,7 @@ impl FusePruner { &mut self, mut segment_locs: Vec, delete_pruning: bool, - ) -> Result)>> { + ) -> Result { // Segment pruner. let segment_pruner = SegmentPruner::create(self.pruning_ctx.clone(), self.table_schema.clone())?; @@ -332,14 +335,14 @@ impl FusePruner { let pruned_segments = segment_pruner.pruning(batch).await?; if delete_pruning { - for (segment_location, compact_segment_info) in &pruned_segments { + for (segment_location, segment_info_variant) in &pruned_segments { if let Some(range_index) = &inverse_range_index { if !range_index - .should_keep(&compact_segment_info.summary.col_stats, None) + .should_keep(&segment_info_variant.summary().col_stats, None) { deleted_segments.push(DeletedSegmentInfo { index: segment_location.segment_idx, - summary: compact_segment_info.summary.clone(), + summary: segment_info_variant.summary().clone(), }); continue; }; @@ -348,34 +351,55 @@ impl FusePruner { // since block metas touched by deletion are not likely to // be accessed soon. let populate_block_meta_cache = false; - let block_metas = Self::extract_block_metas( + let (block_metas, columnar_block_metas) = Self::extract_block_metas( &segment_location.location.0, - compact_segment_info, + segment_info_variant, populate_block_meta_cache, )?; res.extend( block_pruner - .pruning(segment_location.clone(), block_metas) + .pruning( + segment_location.clone(), + block_metas, + columnar_block_metas, + ) .await?, ); } } else { let sample_probability = table_sample(&push_down); for (location, info) in pruned_segments { - let mut block_metas = + let (mut block_metas, mut columnar_block_metas) = Self::extract_block_metas(&location.location.0, &info, true)?; if let Some(probability) = sample_probability { let mut sample_block_metas = Vec::with_capacity(block_metas.len()); let mut rng = thread_rng(); let bernoulli = Bernoulli::new(probability).unwrap(); + let mut sampled_bitmap = + MutableBitmap::with_capacity(block_metas.len()); for block in block_metas.iter() { if bernoulli.sample(&mut rng) { sample_block_metas.push(block.clone()); + sampled_bitmap.push(true); + } else { + sampled_bitmap.push(false); } } block_metas = Arc::new(sample_block_metas); + if let Some(c) = std::mem::take(&mut columnar_block_metas) { + columnar_block_metas = Some(ColumnarBlockMeta { + data: c + .data + .filter_with_bitmap(&Bitmap::from(sampled_bitmap))?, + schema: c.schema, + }); + } } - res.extend(block_pruner.pruning(location.clone(), block_metas).await?); + res.extend( + block_pruner + .pruning(location.clone(), block_metas, columnar_block_metas) + .await?, + ); } } Result::<_>::Ok((res, deleted_segments)) @@ -385,7 +409,7 @@ impl FusePruner { let workers = futures::future::try_join_all(works).await?; - let mut metas = vec![]; + let mut metas: PruneResult = vec![]; for worker in workers { let mut res = worker?; metas.extend(res.0); @@ -401,22 +425,34 @@ impl FusePruner { } } + #[allow(clippy::type_complexity)] fn extract_block_metas( segment_path: &str, - segment: &CompactSegmentInfo, + segment: &SegmentInfoVariant, populate_cache: bool, - ) -> Result>>> { - if let Some(cache) = CacheManager::instance().get_block_meta_cache() { - if let Some(metas) = cache.get(segment_path) { - Ok(metas) - } else { - match populate_cache { - true => Ok(cache.insert(segment_path.to_string(), segment.block_metas()?)), - false => Ok(Arc::new(segment.block_metas()?)), - } + ) -> Result<(Arc>>, Option)> { + match segment { + SegmentInfoVariant::Compact(segment) => { + let block_metas = if let Some(cache) = + CacheManager::instance().get_block_meta_cache() + { + if let Some(metas) = cache.get(segment_path) { + metas + } else { + match populate_cache { + true => cache.insert(segment_path.to_string(), segment.block_metas()?), + false => Arc::new(segment.block_metas()?), + } + } + } else { + Arc::new(segment.block_metas()?) + }; + Ok((block_metas, None)) } - } else { - Ok(Arc::new(segment.block_metas()?)) + SegmentInfoVariant::Columnar(segment) => Ok(( + Arc::new(segment.block_metas.clone()), + Some(segment.columnar_block_metas.clone()), + )), } } @@ -424,7 +460,7 @@ impl FusePruner { pub async fn stream_pruning( &mut self, mut block_metas: Vec>, - ) -> Result)>> { + ) -> Result { let mut remain = block_metas.len() % self.max_concurrency; let batch_size = block_metas.len() / self.max_concurrency; let mut works = Vec::with_capacity(self.max_concurrency); @@ -450,6 +486,7 @@ impl FusePruner { snapshot_loc: None, }, Arc::new(batch), + None, // stream pruning does not use columnar segment info cache for now, so pass None. ) .await?; @@ -474,10 +511,7 @@ impl FusePruner { // topn pruner: // if there are ordering + limit clause and no filters, use topn pruner - fn topn_pruning( - &self, - metas: Vec<(BlockMetaIndex, Arc)>, - ) -> Result)>> { + fn topn_pruning(&self, metas: PruneResult) -> Result { let push_down = self.push_down.clone(); if push_down .as_ref() diff --git a/src/query/storages/fuse/src/pruning/mod.rs b/src/query/storages/fuse/src/pruning/mod.rs index e619c2b16e24..073e9eb8d761 100644 --- a/src/query/storages/fuse/src/pruning/mod.rs +++ b/src/query/storages/fuse/src/pruning/mod.rs @@ -30,4 +30,5 @@ pub use inverted_index_pruner::InvertedIndexPruner; pub use pruner_location::create_segment_location_vector; pub use pruner_location::SegmentLocation; pub use pruning_statistics::FusePruningStatistics; +pub use segment_pruner::SegmentInfoVariant; pub use segment_pruner::SegmentPruner; diff --git a/src/query/storages/fuse/src/pruning/segment_pruner.rs b/src/query/storages/fuse/src/pruning/segment_pruner.rs index d8701a9a8821..c73267469401 100644 --- a/src/query/storages/fuse/src/pruning/segment_pruner.rs +++ b/src/query/storages/fuse/src/pruning/segment_pruner.rs @@ -17,8 +17,15 @@ use std::sync::Arc; use databend_common_exception::Result; use databend_common_expression::TableSchemaRef; use databend_common_metrics::storage::*; +use databend_storages_common_cache::LoadParams; +use databend_storages_common_table_meta::meta::BlockMeta; +use databend_storages_common_table_meta::meta::ColumnarSegmentInfo; use databend_storages_common_table_meta::meta::CompactSegmentInfo; +use databend_storages_common_table_meta::meta::Location; +use databend_storages_common_table_meta::meta::Statistics; +use opendal::Operator; +use crate::io::MetaReaders; use crate::io::SegmentsIO; use crate::pruning::PruningContext; use crate::pruning::SegmentLocation; @@ -43,7 +50,7 @@ impl SegmentPruner { pub async fn pruning( &self, segment_locs: Vec, - ) -> Result)>> { + ) -> Result> { if segment_locs.is_empty() { return Ok(vec![]); } @@ -54,15 +61,18 @@ impl SegmentPruner { let range_pruner = self.pruning_ctx.range_pruner.clone(); for segment_location in segment_locs { - let info = SegmentsIO::read_compact_segment( + let info = SegmentInfoVariant::read( self.pruning_ctx.dal.clone(), segment_location.location.clone(), self.table_schema.clone(), true, + self.pruning_ctx + .ctx + .get_settings() + .get_enable_columnar_segment_info()?, ) .await?; - - let total_bytes = info.summary.uncompressed_byte_size; + let total_bytes = info.summary().uncompressed_byte_size; // Perf. { metrics_inc_segments_range_pruning_before(1); @@ -71,7 +81,7 @@ impl SegmentPruner { pruning_stats.set_segments_range_pruning_before(1); } - if range_pruner.should_keep(&info.summary.col_stats, None) { + if range_pruner.should_keep(&info.summary().col_stats, None) { // Perf. { metrics_inc_segments_range_pruning_after(1); @@ -86,3 +96,51 @@ impl SegmentPruner { Ok(res) } } + +#[derive(Clone)] +pub enum SegmentInfoVariant { + Compact(Arc), + Columnar(Arc), +} + +impl SegmentInfoVariant { + pub async fn read( + op: Operator, + location: Location, + table_schema: TableSchemaRef, + enable_columnar_segment_info: bool, + put_cache: bool, + ) -> Result { + if enable_columnar_segment_info { + let (path, ver) = location; + let reader = MetaReaders::columnar_segment_info_reader(op, table_schema); + + let load_params = LoadParams { + location: path, + len_hint: None, + ver, + put_cache, + }; + + let info = reader.read(&load_params).await?; + Ok(SegmentInfoVariant::Columnar(info)) + } else { + let info = SegmentsIO::read_compact_segment(op, location, table_schema, true).await?; + Ok(SegmentInfoVariant::Compact(info)) + } + } + + pub fn summary(&self) -> &Statistics { + match self { + SegmentInfoVariant::Compact(info) => &info.summary, + SegmentInfoVariant::Columnar(info) => &info.summary, + } + } + + pub fn block_metas(&self) -> Result>> { + match self { + SegmentInfoVariant::Compact(info) => info.block_metas(), + SegmentInfoVariant::Columnar(info) => Ok(info.block_metas.clone()), + } + } +}