diff --git a/src/query/catalog/src/table.rs b/src/query/catalog/src/table.rs index 44bea62722c1e..6a82b7ee04c4b 100644 --- a/src/query/catalog/src/table.rs +++ b/src/query/catalog/src/table.rs @@ -59,7 +59,8 @@ pub trait Table: Sync + Send { self.get_table_info().engine() } - fn support_internal_column_id(&self, _column_id: ColumnId) -> bool { + /// Whether the table engine supports the given internal column. + fn supported_internal_column(&self, _column_id: ColumnId) -> bool { false } @@ -129,12 +130,6 @@ pub trait Table: Sync + Send { }) } - async fn source_table(&self, ctx: Arc) -> Result>> { - let _ = ctx; - - Ok(None) - } - /// Whether the table engine supports prewhere optimization. /// only Fuse Engine supports this. fn support_prewhere(&self) -> bool { @@ -274,12 +269,22 @@ pub trait Table: Sync + Send { Ok(()) } - async fn table_statistics(&self) -> Result> { + async fn table_statistics( + &self, + ctx: Arc, + ) -> Result> { + let _ = ctx; + Ok(None) } #[async_backtrace::framed] - async fn column_statistics_provider(&self) -> Result> { + async fn column_statistics_provider( + &self, + ctx: Arc, + ) -> Result> { + let _ = ctx; + Ok(Box::new(DummyColumnStatisticsProvider)) } diff --git a/src/query/expression/src/schema.rs b/src/query/expression/src/schema.rs index e3f5b802f7829..1186775ea3518 100644 --- a/src/query/expression/src/schema.rs +++ b/src/query/expression/src/schema.rs @@ -86,11 +86,6 @@ pub fn is_internal_column_id(column_id: ColumnId) -> bool { column_id >= CHANGE_ROW_ID_COLUMN_ID } -#[inline] -pub fn is_internal_stream_column_id(column_id: ColumnId) -> bool { - (CHANGE_ROW_ID_COLUMN_ID..=BASE_ROW_ID_COLUMN_ID).contains(&column_id) -} - #[inline] pub fn is_internal_column(column_name: &str) -> bool { matches!( diff --git a/src/query/service/src/interpreters/common/stream.rs b/src/query/service/src/interpreters/common/stream.rs index 3cff3b5779dff..79d1daf6bb3d1 100644 --- a/src/query/service/src/interpreters/common/stream.rs +++ b/src/query/service/src/interpreters/common/stream.rs @@ -48,7 +48,7 @@ pub async fn build_update_stream_meta_seq( for table in tables.into_iter() { let stream = StreamTable::try_from_table(table.as_ref())?; let stream_info = stream.get_table_info(); - let source_table = stream.source_table(ctx.clone()).await?.unwrap(); + let source_table = stream.source_table(ctx.clone()).await?; let inner_fuse = FuseTable::try_from_table(source_table.as_ref())?; let table_version = inner_fuse.get_table_info().ident.seq; diff --git a/src/query/service/src/interpreters/interpreter_delete.rs b/src/query/service/src/interpreters/interpreter_delete.rs index f5cc53013aa66..eacb93f9cc703 100644 --- a/src/query/service/src/interpreters/interpreter_delete.rs +++ b/src/query/service/src/interpreters/interpreter_delete.rs @@ -119,7 +119,7 @@ impl Interpreter for DeleteInterpreter { tbl.check_mutable()?; let selection = if !self.plan.subquery_desc.is_empty() { - let support_row_id = tbl.support_internal_column_id(ROW_ID_COLUMN_ID); + let support_row_id = tbl.supported_internal_column(ROW_ID_COLUMN_ID); if !support_row_id { return Err(ErrorCode::from_string( "table doesn't support row_id, so it can't use delete with subquery" diff --git a/src/query/service/src/interpreters/interpreter_replace.rs b/src/query/service/src/interpreters/interpreter_replace.rs index 449ff5fd882dd..cc5e44f47d22e 100644 --- a/src/query/service/src/interpreters/interpreter_replace.rs +++ b/src/query/service/src/interpreters/interpreter_replace.rs @@ -301,7 +301,11 @@ impl ReplaceInterpreter { .get_replace_into_bloom_pruning_max_column_number()?; let bloom_filter_column_indexes = if !table.cluster_keys(self.ctx.clone()).is_empty() { fuse_table - .choose_bloom_filter_columns(&on_conflicts, max_num_pruning_columns) + .choose_bloom_filter_columns( + self.ctx.clone(), + &on_conflicts, + max_num_pruning_columns, + ) .await? } else { vec![] diff --git a/src/query/service/src/interpreters/interpreter_update.rs b/src/query/service/src/interpreters/interpreter_update.rs index 929af4c5a567f..83da20c6bddc7 100644 --- a/src/query/service/src/interpreters/interpreter_update.rs +++ b/src/query/service/src/interpreters/interpreter_update.rs @@ -108,7 +108,7 @@ impl Interpreter for UpdateInterpreter { tbl.check_mutable()?; let selection = if !self.plan.subquery_desc.is_empty() { - let support_row_id = tbl.support_internal_column_id(ROW_ID_COLUMN_ID); + let support_row_id = tbl.supported_internal_column(ROW_ID_COLUMN_ID); if !support_row_id { return Err(ErrorCode::from_string( "table doesn't support row_id, so it can't use delete with subquery" diff --git a/src/query/service/src/table_functions/numbers/numbers_table.rs b/src/query/service/src/table_functions/numbers/numbers_table.rs index 63b5a13d4d2a2..1e8ef0a3ec7bc 100644 --- a/src/query/service/src/table_functions/numbers/numbers_table.rs +++ b/src/query/service/src/table_functions/numbers/numbers_table.rs @@ -210,7 +210,10 @@ impl Table for NumbersTable { Ok(()) } - async fn table_statistics(&self) -> Result> { + async fn table_statistics( + &self, + _ctx: Arc, + ) -> Result> { Ok(Some(TableStatistics { num_rows: Some(self.total), data_size: Some(self.total * 8), diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs index 057c65a159c08..71e13fbc3f7c1 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs @@ -158,7 +158,10 @@ async fn test_compact_segment_resolvable_conflict() -> Result<()> { let ctx = fixture.new_query_ctx().await?; let latest = table.refresh(ctx.as_ref()).await?; let latest_fuse_table = FuseTable::try_from_table(latest.as_ref())?; - let table_statistics = latest_fuse_table.table_statistics().await?.unwrap(); + let table_statistics = latest_fuse_table + .table_statistics(ctx.clone()) + .await? + .unwrap(); assert_eq!(table_statistics.num_rows.unwrap() as usize, num_inserts * 2); diff --git a/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs b/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs index cc53f55d7eeca..4e68a763da9dd 100644 --- a/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs +++ b/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs @@ -60,7 +60,7 @@ async fn test_table_modify_column_ndv_statistics() -> Result<()> { assert_eq!(num_inserts, query_count(stream).await? as usize); let expected = HashMap::from([(0, num_inserts as u64)]); - check_column_ndv_statistics(table.clone(), expected.clone()).await?; + check_column_ndv_statistics(ctx.clone(), table.clone(), expected.clone()).await?; // append the same values again, and ndv does changed. append_rows(ctx.clone(), num_inserts).await?; @@ -73,7 +73,7 @@ async fn test_table_modify_column_ndv_statistics() -> Result<()> { let stream = fixture.execute_query(count_qry).await?; assert_eq!(num_inserts * 2, query_count(stream).await? as usize); - check_column_ndv_statistics(table.clone(), expected.clone()).await?; + check_column_ndv_statistics(ctx.clone(), table.clone(), expected.clone()).await?; // delete ctx.evict_table_from_cache("default", "default", "t")?; @@ -87,7 +87,7 @@ async fn test_table_modify_column_ndv_statistics() -> Result<()> { fixture.execute_command(statistics_sql).await?; // check count: delete not affect counts - check_column_ndv_statistics(table.clone(), expected).await?; + check_column_ndv_statistics(ctx, table.clone(), expected).await?; Ok(()) } @@ -165,10 +165,11 @@ async fn test_table_update_analyze_statistics() -> Result<()> { } async fn check_column_ndv_statistics( + ctx: Arc, table: Arc, expected: HashMap, ) -> Result<()> { - let provider = table.column_statistics_provider().await?; + let provider = table.column_statistics_provider(ctx).await?; for (i, num) in expected.iter() { let stat = provider.column_statistics(*i); diff --git a/src/query/sql/src/planner/binder/bind_context.rs b/src/query/sql/src/planner/binder/bind_context.rs index f225659676d92..c1f80a89868f3 100644 --- a/src/query/sql/src/planner/binder/bind_context.rs +++ b/src/query/sql/src/planner/binder/bind_context.rs @@ -26,7 +26,6 @@ use databend_common_catalog::plan::InternalColumn; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_exception::Span; -use databend_common_expression::is_internal_stream_column_id; use databend_common_expression::ColumnId; use databend_common_expression::DataField; use databend_common_expression::DataSchemaRef; @@ -556,7 +555,7 @@ impl BindContext { let metadata = metadata.read(); let table = metadata.table(table_index); - if table.table().engine() != "STREAM" && is_internal_stream_column_id(column_id) { + if !table.table().supported_internal_column(column_id) { return Err(ErrorCode::SemanticError(format!( "Internal column `{}` is not allowed in table `{}`", column_binding.internal_column.column_name(), diff --git a/src/query/sql/src/planner/binder/select.rs b/src/query/sql/src/planner/binder/select.rs index 2acde501a9a69..2c966bcf9fbc4 100644 --- a/src/query/sql/src/planner/binder/select.rs +++ b/src/query/sql/src/planner/binder/select.rs @@ -846,7 +846,7 @@ impl Binder { if !metadata .table(0) .table() - .support_internal_column_id(ROW_ID_COLUMN_ID) + .supported_internal_column(ROW_ID_COLUMN_ID) { return Ok(()); } diff --git a/src/query/sql/src/planner/binder/table.rs b/src/query/sql/src/planner/binder/table.rs index 3cb3b60c90a24..5c4a3148b4ddb 100644 --- a/src/query/sql/src/planner/binder/table.rs +++ b/src/query/sql/src/planner/binder/table.rs @@ -1203,12 +1203,7 @@ impl Binder { let table = self.metadata.read().table(table_index).clone(); let table_name = table.name(); let table = table.table(); - let source = table.source_table(self.ctx.clone()).await?; - let statistics_provider = if let Some(source) = source { - source.column_statistics_provider().await? - } else { - table.column_statistics_provider().await? - }; + let statistics_provider = table.column_statistics_provider(self.ctx.clone()).await?; let table_version = if table.engine() == "STREAM" { let options = table.options(); let table_version = options @@ -1345,7 +1340,7 @@ impl Binder { predicates.push(predicate); } - let stat = table.table_statistics().await?; + let stat = table.table_statistics(self.ctx.clone()).await?; let scan = SExpr::create_leaf(Arc::new( Scan { table_index, diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index 43187ff4f560a..3d520159652fd 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -438,7 +438,7 @@ impl Table for FuseTable { Some(self.data_metrics.clone()) } - fn support_internal_column_id(&self, column_id: ColumnId) -> bool { + fn supported_internal_column(&self, column_id: ColumnId) -> bool { column_id >= SNAPSHOT_NAME_COLUMN_ID } @@ -688,7 +688,10 @@ impl Table for FuseTable { self.do_analyze(&ctx).await } - async fn table_statistics(&self) -> Result> { + async fn table_statistics( + &self, + _ctx: Arc, + ) -> Result> { let stats = match self.table_type { FuseTableType::AttachedReadOnly => { let snapshot = self.read_table_snapshot().await?.ok_or_else(|| { @@ -723,7 +726,10 @@ impl Table for FuseTable { } #[async_backtrace::framed] - async fn column_statistics_provider(&self) -> Result> { + async fn column_statistics_provider( + &self, + _ctx: Arc, + ) -> Result> { let provider = if let Some(snapshot) = self.read_table_snapshot().await? { let stats = &snapshot.summary.col_stats; let table_statistics = self.read_table_snapshot_statistics(Some(&snapshot)).await?; diff --git a/src/query/storages/fuse/src/operations/replace.rs b/src/query/storages/fuse/src/operations/replace.rs index 87a5142b2d682..f95a241122327 100644 --- a/src/query/storages/fuse/src/operations/replace.rs +++ b/src/query/storages/fuse/src/operations/replace.rs @@ -177,10 +177,11 @@ impl FuseTable { // are supported by bloom index. pub async fn choose_bloom_filter_columns( &self, + ctx: Arc, on_conflicts: &[OnConflictField], max_num_columns: u64, ) -> Result> { - let col_stats_provider = self.column_statistics_provider().await?; + let col_stats_provider = self.column_statistics_provider(ctx).await?; let mut cols = on_conflicts .iter() .enumerate() diff --git a/src/query/storages/hive/hive/src/hive_table.rs b/src/query/storages/hive/hive/src/hive_table.rs index 0afd842bfb068..24fc7dd00e5d6 100644 --- a/src/query/storages/hive/hive/src/hive_table.rs +++ b/src/query/storages/hive/hive/src/hive_table.rs @@ -619,7 +619,10 @@ impl Table for HiveTable { Ok(None) } - async fn table_statistics(&self) -> Result> { + async fn table_statistics( + &self, + _ctx: Arc, + ) -> Result> { Ok(None) } diff --git a/src/query/storages/parquet/src/parquet2/parquet_table/table.rs b/src/query/storages/parquet/src/parquet2/parquet_table/table.rs index cadde022be7ef..43aa188bf62f1 100644 --- a/src/query/storages/parquet/src/parquet2/parquet_table/table.rs +++ b/src/query/storages/parquet/src/parquet2/parquet_table/table.rs @@ -115,7 +115,10 @@ impl Table for Parquet2Table { true } - async fn table_statistics(&self) -> Result> { + async fn table_statistics( + &self, + _ctx: Arc, + ) -> Result> { let s = &self.table_info.meta.statistics; Ok(Some(TableStatistics { num_rows: Some(s.number_of_rows), @@ -128,7 +131,10 @@ impl Table for Parquet2Table { } #[async_backtrace::framed] - async fn column_statistics_provider(&self) -> Result> { + async fn column_statistics_provider( + &self, + _ctx: Arc, + ) -> Result> { Ok(Box::new(self.column_statistics_provider.clone())) } diff --git a/src/query/storages/parquet/src/parquet_rs/parquet_table/table.rs b/src/query/storages/parquet/src/parquet_rs/parquet_table/table.rs index aaeca9c14278e..904c9d4343d78 100644 --- a/src/query/storages/parquet/src/parquet_rs/parquet_table/table.rs +++ b/src/query/storages/parquet/src/parquet_rs/parquet_table/table.rs @@ -250,7 +250,10 @@ impl Table for ParquetRSTable { true } - async fn column_statistics_provider(&self) -> Result> { + async fn column_statistics_provider( + &self, + _ctx: Arc, + ) -> Result> { if !self.need_stats_provider { return Ok(Box::new(DummyColumnStatisticsProvider)); } @@ -302,7 +305,10 @@ impl Table for ParquetRSTable { Ok(Box::new(provider)) } - async fn table_statistics(&self) -> Result> { + async fn table_statistics( + &self, + _ctx: Arc, + ) -> Result> { // Unwrap safety: no other thread will hold this lock. let parquet_metas = self.parquet_metas.try_lock().unwrap(); if parquet_metas.is_empty() { diff --git a/src/query/storages/stream/src/stream_table.rs b/src/query/storages/stream/src/stream_table.rs index 76cb10460ace3..d18da9dc892de 100644 --- a/src/query/storages/stream/src/stream_table.rs +++ b/src/query/storages/stream/src/stream_table.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::any::Any; +use std::collections::HashMap; use std::collections::HashSet; use std::fmt::Display; use std::str::FromStr; @@ -30,7 +31,9 @@ use databend_common_catalog::plan::PartitionsShuffleKind; use databend_common_catalog::plan::PushDownInfo; use databend_common_catalog::plan::StreamColumn; use databend_common_catalog::plan::StreamTablePart; +use databend_common_catalog::table::ColumnStatisticsProvider; use databend_common_catalog::table::Table; +use databend_common_catalog::table::TableStatistics; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -51,6 +54,7 @@ use databend_common_sql::binder::STREAM_COLUMN_FACTORY; use databend_common_storages_fuse::io::SegmentsIO; use databend_common_storages_fuse::io::SnapshotsIO; use databend_common_storages_fuse::FuseTable; +use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::SegmentInfo; use databend_storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION; @@ -160,6 +164,33 @@ impl StreamTable { }) } + pub async fn source_table(&self, ctx: Arc) -> Result> { + let table = ctx + .get_table( + self.stream_info.catalog(), + &self.table_database, + &self.table_name, + ) + .await?; + + if table.get_table_info().ident.table_id != self.table_id { + return Err(ErrorCode::IllegalStream(format!( + "Table id mismatch, expect {}, got {}", + self.table_id, + table.get_table_info().ident.table_id + ))); + } + + if !table.change_tracking_enabled() { + return Err(ErrorCode::IllegalStream(format!( + "Change tracking is not enabled for table '{}.{}'", + self.table_database, self.table_name + ))); + } + + Ok(table) + } + pub fn offset(&self) -> u64 { self.table_version } @@ -184,26 +215,18 @@ impl StreamTable { &self.table_database } - #[async_backtrace::framed] - async fn do_read_partitions( + async fn collect_incremental_blocks( &self, ctx: Arc, - push_downs: Option, - ) -> Result<(PartStatistics, Partitions)> { - let start = Instant::now(); - let table = self.source_table(ctx.clone()).await?.unwrap(); - let fuse_table = FuseTable::try_from_table(table.as_ref())?; - - let latest_snapshot = fuse_table.read_table_snapshot().await?; - if latest_snapshot.is_none() { - return Ok((PartStatistics::default(), Partitions::default())); - } - let latest_snapshot = latest_snapshot.unwrap(); - let latest_segments = HashSet::from_iter(latest_snapshot.segments.clone()); - - let summary = latest_snapshot.summary.block_count as usize; - drop(latest_snapshot); + fuse_table: &FuseTable, + ) -> Result<(Vec>, Vec>)> { let operator = fuse_table.get_operator(); + let latest_segments = if let Some(snapshot) = fuse_table.read_table_snapshot().await? { + HashSet::from_iter(snapshot.segments.clone()) + } else { + HashSet::new() + }; + let base_segments = if let Some(snapshot_location) = &self.snapshot_location { let (base_snapshot, _) = SnapshotsIO::read_snapshot(snapshot_location.clone(), operator.clone()).await?; @@ -212,57 +235,74 @@ impl StreamTable { HashSet::new() }; - let mut base_blocks = HashSet::new(); - let mut latest_blocks = Vec::new(); - { - let fuse_segment_io = - SegmentsIO::create(ctx.clone(), operator.clone(), fuse_table.schema()); - let chunk_size = ctx.get_settings().get_max_threads()? as usize * 4; - - let diff_in_base = base_segments - .difference(&latest_segments) - .cloned() - .collect::>(); - for chunk in diff_in_base.chunks(chunk_size) { - let segments = fuse_segment_io - .read_segments::(chunk, true) - .await?; - for segment in segments { - let segment = segment?; - segment.blocks.into_iter().for_each(|block| { - base_blocks.insert(block.location.clone()); - }) - } + let fuse_segment_io = + SegmentsIO::create(ctx.clone(), operator.clone(), fuse_table.schema()); + let chunk_size = ctx.get_settings().get_max_threads()? as usize * 4; + + let mut base_blocks = HashMap::new(); + let diff_in_base = base_segments + .difference(&latest_segments) + .cloned() + .collect::>(); + for chunk in diff_in_base.chunks(chunk_size) { + let segments = fuse_segment_io + .read_segments::(chunk, true) + .await?; + for segment in segments { + let segment = segment?; + segment.blocks.into_iter().for_each(|block| { + base_blocks.insert(block.location.clone(), block); + }) } + } - let diff_in_latest = latest_segments - .difference(&base_segments) - .cloned() - .collect::>(); - for chunk in diff_in_latest.chunks(chunk_size) { - let segments = fuse_segment_io - .read_segments::(chunk, true) - .await?; - - for segment in segments { - let segment = segment?; - segment.blocks.into_iter().for_each(|block| { - if base_blocks.contains(&block.location) { - base_blocks.remove(&block.location); - } else { - latest_blocks.push(block); - } - }); - } + let mut add_blocks = Vec::new(); + let diff_in_latest = latest_segments + .difference(&base_segments) + .cloned() + .collect::>(); + for chunk in diff_in_latest.chunks(chunk_size) { + let segments = fuse_segment_io + .read_segments::(chunk, true) + .await?; + + for segment in segments { + let segment = segment?; + segment.blocks.into_iter().for_each(|block| { + if base_blocks.contains_key(&block.location) { + base_blocks.remove(&block.location); + } else { + add_blocks.push(block); + } + }); } } - if latest_blocks.is_empty() { + + let del_blocks = base_blocks.into_values().collect::>(); + Ok((del_blocks, add_blocks)) + } + + #[async_backtrace::framed] + async fn do_read_partitions( + &self, + ctx: Arc, + push_downs: Option, + ) -> Result<(PartStatistics, Partitions)> { + let start = Instant::now(); + let table = self.source_table(ctx.clone()).await?; + let fuse_table = FuseTable::try_from_table(table.as_ref())?; + + let (del_blocks, add_blocks) = self + .collect_incremental_blocks(ctx.clone(), fuse_table) + .await?; + let summary = add_blocks.len(); + if summary == 0 { return Ok((PartStatistics::default(), Partitions::default())); } - let mut base_block_ids = Vec::with_capacity(base_blocks.len()); - for base_block in base_blocks { - let block_id = block_id_from_location(&base_block.0)?; + let mut base_block_ids = Vec::with_capacity(del_blocks.len()); + for base_block in del_blocks { + let block_id = block_id_from_location(&base_block.location.0)?; base_block_ids.push(block_id); } let base_block_ids_scalar = Scalar::Array(Decimal128Type::from_data(base_block_ids)); @@ -281,7 +321,7 @@ impl StreamTable { }; let stream_pruner = StreamPruner::create( &ctx, - operator, + fuse_table.get_operator(), table_schema.clone(), push_downs.clone(), cluster_key_meta, @@ -289,7 +329,7 @@ impl StreamTable { bloom_index_cols, )?; - let block_metas = stream_pruner.pruning(latest_blocks).await?; + let block_metas = stream_pruner.pruning(add_blocks).await?; let pruning_stats = stream_pruner.pruning_stats(); log::info!( @@ -321,7 +361,7 @@ impl StreamTable { #[minitrace::trace] pub async fn check_stream_status(&self, ctx: Arc) -> Result { - let base_table = self.source_table(ctx).await?.unwrap(); + let base_table = self.source_table(ctx).await?; let status = if base_table.get_table_info().ident.seq == self.table_version { StreamStatus::NoData } else { @@ -341,7 +381,7 @@ impl Table for StreamTable { &self.stream_info } - fn support_internal_column_id(&self, column_id: ColumnId) -> bool { + fn supported_internal_column(&self, column_id: ColumnId) -> bool { (CHANGE_ROW_ID_COLUMN_ID..=BASE_ROW_ID_COLUMN_ID).contains(&column_id) } @@ -364,33 +404,6 @@ impl Table for StreamTable { ] } - async fn source_table(&self, ctx: Arc) -> Result>> { - let table = ctx - .get_table( - self.stream_info.catalog(), - &self.table_database, - &self.table_name, - ) - .await?; - - if table.get_table_info().ident.table_id != self.table_id { - return Err(ErrorCode::IllegalStream(format!( - "Table id mismatch, expect {}, got {}", - self.table_id, - table.get_table_info().ident.table_id - ))); - } - - if !table.change_tracking_enabled() { - return Err(ErrorCode::IllegalStream(format!( - "Change tracking is not enabled for table '{}.{}'", - self.table_database, self.table_name - ))); - } - - Ok(Some(table)) - } - #[minitrace::trace] #[async_backtrace::framed] async fn read_partitions( @@ -419,6 +432,52 @@ impl Table for StreamTable { })?; table.read_data(ctx, plan, pipeline, put_cache) } + + async fn table_statistics( + &self, + ctx: Arc, + ) -> Result> { + let table = self.source_table(ctx.clone()).await?; + let fuse_table = FuseTable::try_from_table(table.as_ref())?; + + let (_, add_blocks) = self + .collect_incremental_blocks(ctx.clone(), fuse_table) + .await?; + if add_blocks.is_empty() { + return Ok(None); + } + + let mut num_rows = 0; + let mut data_size = 0; + let mut data_size_compressed = 0; + let mut index_size = 0; + let number_of_blocks = add_blocks.len() as u64; + for block in add_blocks { + num_rows += block.row_count; + data_size += block.block_size; + data_size_compressed += block.file_size; + index_size += block.bloom_filter_index_size; + } + + Ok(Some(TableStatistics { + num_rows: Some(num_rows), + data_size: Some(data_size), + data_size_compressed: Some(data_size_compressed), + index_size: Some(index_size), + number_of_blocks: Some(number_of_blocks), + number_of_segments: None, + })) + } + + #[async_backtrace::framed] + async fn column_statistics_provider( + &self, + ctx: Arc, + ) -> Result> { + let table = self.source_table(ctx.clone()).await?; + + table.column_statistics_provider(ctx).await + } } fn replace_push_downs( diff --git a/src/query/storages/system/src/streams_table.rs b/src/query/storages/system/src/streams_table.rs index 02a2d8373b34f..cfd00abb4233f 100644 --- a/src/query/storages/system/src/streams_table.rs +++ b/src/query/storages/system/src/streams_table.rs @@ -182,8 +182,7 @@ impl AsyncSystemTable for StreamsTable { let mut reason = "".to_string(); match stream_table.source_table(ctx.clone()).await { Ok(source) => { - let table = source.unwrap(); - let fuse_table = FuseTable::try_from_table(table.as_ref())?; + let fuse_table = FuseTable::try_from_table(source.as_ref())?; if let Some(location) = stream_table.snapshot_loc() { reason = SnapshotsIO::read_snapshot( location, diff --git a/src/query/storages/system/src/tables_table.rs b/src/query/storages/system/src/tables_table.rs index 49ef1ef8dd774..7e1288fd41d9a 100644 --- a/src/query/storages/system/src/tables_table.rs +++ b/src/query/storages/system/src/tables_table.rs @@ -278,7 +278,7 @@ where TablesTable: HistoryAware .as_ref() .map(|v| v.owner_role_name.as_bytes().to_vec()), ); - let stats = match tbl.table_statistics().await { + let stats = match tbl.table_statistics(ctx.clone()).await { Ok(stats) => stats, Err(err) => { ctx.push_warning(format!(