diff --git a/src/query/catalog/src/table.rs b/src/query/catalog/src/table.rs index 865c78e4c5a04..44bea62722c1e 100644 --- a/src/query/catalog/src/table.rs +++ b/src/query/catalog/src/table.rs @@ -59,6 +59,10 @@ pub trait Table: Sync + Send { self.get_table_info().engine() } + fn support_internal_column_id(&self, _column_id: ColumnId) -> bool { + false + } + fn schema(&self) -> Arc { self.get_table_info().schema() } @@ -125,6 +129,12 @@ pub trait Table: Sync + Send { }) } + async fn source_table(&self, ctx: Arc) -> Result>> { + let _ = ctx; + + Ok(None) + } + /// Whether the table engine supports prewhere optimization. /// only Fuse Engine supports this. fn support_prewhere(&self) -> bool { @@ -140,11 +150,6 @@ pub trait Table: Sync + Send { false } - /// Whether the table engine supports virtual column `_row_id`. - fn support_row_id_column(&self) -> bool { - false - } - #[async_backtrace::framed] async fn alter_table_cluster_keys( &self, diff --git a/src/query/service/src/interpreters/common/stream.rs b/src/query/service/src/interpreters/common/stream.rs index 79d1daf6bb3d1..3cff3b5779dff 100644 --- a/src/query/service/src/interpreters/common/stream.rs +++ b/src/query/service/src/interpreters/common/stream.rs @@ -48,7 +48,7 @@ pub async fn build_update_stream_meta_seq( for table in tables.into_iter() { let stream = StreamTable::try_from_table(table.as_ref())?; let stream_info = stream.get_table_info(); - let source_table = stream.source_table(ctx.clone()).await?; + let source_table = stream.source_table(ctx.clone()).await?.unwrap(); let inner_fuse = FuseTable::try_from_table(source_table.as_ref())?; let table_version = inner_fuse.get_table_info().ident.seq; diff --git a/src/query/service/src/interpreters/interpreter_delete.rs b/src/query/service/src/interpreters/interpreter_delete.rs index 0f9350dd097e4..f5cc53013aa66 100644 --- a/src/query/service/src/interpreters/interpreter_delete.rs +++ b/src/query/service/src/interpreters/interpreter_delete.rs @@ -24,6 +24,7 @@ use databend_common_exception::Result; use databend_common_expression::types::DataType; use databend_common_expression::types::NumberDataType; use databend_common_expression::DataBlock; +use databend_common_expression::ROW_ID_COLUMN_ID; use databend_common_expression::ROW_ID_COL_NAME; use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_meta_app::schema::CatalogInfo; @@ -118,7 +119,7 @@ impl Interpreter for DeleteInterpreter { tbl.check_mutable()?; let selection = if !self.plan.subquery_desc.is_empty() { - let support_row_id = tbl.support_row_id_column(); + let support_row_id = tbl.support_internal_column_id(ROW_ID_COLUMN_ID); if !support_row_id { return Err(ErrorCode::from_string( "table doesn't support row_id, so it can't use delete with subquery" diff --git a/src/query/service/src/interpreters/interpreter_update.rs b/src/query/service/src/interpreters/interpreter_update.rs index 6597baaa725b1..929af4c5a567f 100644 --- a/src/query/service/src/interpreters/interpreter_update.rs +++ b/src/query/service/src/interpreters/interpreter_update.rs @@ -26,6 +26,7 @@ use databend_common_expression::types::DataType; use databend_common_expression::types::NumberDataType; use databend_common_expression::FieldIndex; use databend_common_expression::RemoteExpr; +use databend_common_expression::ROW_ID_COLUMN_ID; use databend_common_expression::ROW_ID_COL_NAME; use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_license::license::Feature::ComputedColumn; @@ -107,7 +108,7 @@ impl Interpreter for UpdateInterpreter { tbl.check_mutable()?; let selection = if !self.plan.subquery_desc.is_empty() { - let support_row_id = tbl.support_row_id_column(); + let support_row_id = tbl.support_internal_column_id(ROW_ID_COLUMN_ID); if !support_row_id { return Err(ErrorCode::from_string( "table doesn't support row_id, so it can't use delete with subquery" diff --git a/src/query/service/src/pipelines/builders/builder_scan.rs b/src/query/service/src/pipelines/builders/builder_scan.rs index 799386af78d73..3c4d87a93c2c6 100644 --- a/src/query/service/src/pipelines/builders/builder_scan.rs +++ b/src/query/service/src/pipelines/builders/builder_scan.rs @@ -17,7 +17,6 @@ use std::sync::Mutex; use std::time::Instant; use databend_common_catalog::table_context::TableContext; -use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::DataBlock; use databend_common_pipeline_core::processors::ProcessorPtr; @@ -72,16 +71,9 @@ impl PipelineBuilder { // Fill internal columns if needed. if let Some(internal_columns) = &scan.internal_column { - if table.support_row_id_column() { - self.main_pipeline.add_transform(|input, output| { - TransformAddInternalColumns::try_create(input, output, internal_columns.clone()) - })?; - } else { - return Err(ErrorCode::TableEngineNotSupported(format!( - "Table engine `{}` does not support virtual column _row_id", - table.engine() - ))); - } + self.main_pipeline.add_transform(|input, output| { + TransformAddInternalColumns::try_create(input, output, internal_columns.clone()) + })?; } let schema = scan.source.schema(); diff --git a/src/query/sql/src/planner/binder/select.rs b/src/query/sql/src/planner/binder/select.rs index abd5558f01969..2acde501a9a69 100644 --- a/src/query/sql/src/planner/binder/select.rs +++ b/src/query/sql/src/planner/binder/select.rs @@ -41,6 +41,7 @@ use databend_common_exception::Result; use databend_common_exception::Span; use databend_common_expression::type_check::common_super_type; use databend_common_expression::types::DataType; +use databend_common_expression::ROW_ID_COLUMN_ID; use databend_common_expression::ROW_ID_COL_NAME; use databend_common_functions::BUILTIN_FUNCTIONS; use log::warn; @@ -842,7 +843,11 @@ impl Binder { return Ok(()); } - if !metadata.table(0).table().support_row_id_column() { + if !metadata + .table(0) + .table() + .support_internal_column_id(ROW_ID_COLUMN_ID) + { return Ok(()); } diff --git a/src/query/sql/src/planner/binder/table.rs b/src/query/sql/src/planner/binder/table.rs index 6a3646c578799..3cb3b60c90a24 100644 --- a/src/query/sql/src/planner/binder/table.rs +++ b/src/query/sql/src/planner/binder/table.rs @@ -1203,7 +1203,12 @@ impl Binder { let table = self.metadata.read().table(table_index).clone(); let table_name = table.name(); let table = table.table(); - let statistics_provider = table.column_statistics_provider().await?; + let source = table.source_table(self.ctx.clone()).await?; + let statistics_provider = if let Some(source) = source { + source.column_statistics_provider().await? + } else { + table.column_statistics_provider().await? + }; let table_version = if table.engine() == "STREAM" { let options = table.options(); let table_version = options diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index c26218e2aebdd..43187ff4f560a 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -32,10 +32,12 @@ use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::BlockThresholds; +use databend_common_expression::ColumnId; use databend_common_expression::RemoteExpr; use databend_common_expression::ORIGIN_BLOCK_ID_COL_NAME; use databend_common_expression::ORIGIN_BLOCK_ROW_NUM_COL_NAME; use databend_common_expression::ORIGIN_VERSION_COL_NAME; +use databend_common_expression::SNAPSHOT_NAME_COLUMN_ID; use databend_common_io::constants::DEFAULT_BLOCK_BUFFER_SIZE; use databend_common_io::constants::DEFAULT_BLOCK_MAX_ROWS; use databend_common_meta_app::schema::DatabaseType; @@ -436,6 +438,10 @@ impl Table for FuseTable { Some(self.data_metrics.clone()) } + fn support_internal_column_id(&self, column_id: ColumnId) -> bool { + column_id >= SNAPSHOT_NAME_COLUMN_ID + } + fn support_column_projection(&self) -> bool { true } @@ -813,10 +819,6 @@ impl Table for FuseTable { true } - fn support_row_id_column(&self) -> bool { - true - } - fn result_can_be_cached(&self) -> bool { true } diff --git a/src/query/storages/stream/src/stream_pruner.rs b/src/query/storages/stream/src/stream_pruner.rs index c6163ed7be534..7cba640356faa 100644 --- a/src/query/storages/stream/src/stream_pruner.rs +++ b/src/query/storages/stream/src/stream_pruner.rs @@ -90,11 +90,12 @@ impl StreamPruner { #[async_backtrace::framed] pub async fn pruning( self: &Arc, - mut block_metas: Vec>, + block_metas: Vec>, ) -> Result)>> { let mut remain = block_metas.len() % self.max_concurrency; let batch_size = block_metas.len() / self.max_concurrency; let mut works = Vec::with_capacity(self.max_concurrency); + let mut block_metas = block_metas.into_iter().enumerate().collect::>(); while !block_metas.is_empty() { let gap_size = std::cmp::min(1, remain); @@ -194,7 +195,7 @@ impl StreamPruner { pub async fn block_pruning( &self, bloom_pruner: &Arc, - block_metas: Vec>, + block_metas: Vec<(usize, Arc)>, ) -> Result)>> { let pruning_stats = self.pruning_ctx.pruning_stats.clone(); let pruning_runtime = &self.pruning_ctx.pruning_runtime; @@ -203,15 +204,18 @@ impl StreamPruner { let range_pruner = self.pruning_ctx.range_pruner.clone(); let page_pruner = self.pruning_ctx.page_pruner.clone(); - let mut blocks = block_metas.iter().enumerate(); + let mut blocks = block_metas.into_iter(); let pruning_tasks = std::iter::from_fn(|| { // check limit speculatively if limit_pruner.exceeded() { return None; } - type BlockPruningFutureReturn = - Pin>, String)> + Send>>; + type BlockPruningFutureReturn = Pin< + Box< + dyn Future>, Arc)> + Send, + >, + >; type BlockPruningFuture = Box BlockPruningFutureReturn + Send + 'static>; @@ -273,9 +277,9 @@ impl StreamPruner { let (keep, range) = page_pruner.should_keep(&block_meta.cluster_stats); - (block_idx, keep, range, block_meta.location.0.clone()) + (block_idx, keep, range, block_meta) } else { - (block_idx, keep, None, block_meta.location.0.clone()) + (block_idx, keep, None, block_meta) } }) }); @@ -284,7 +288,7 @@ impl StreamPruner { let v: BlockPruningFuture = Box::new(move |permit: OwnedSemaphorePermit| { Box::pin(async move { let _permit = permit; - (block_idx, false, None, block_meta.location.0.clone()) + (block_idx, false, None, block_meta) }) }); v @@ -304,12 +308,8 @@ impl StreamPruner { let mut result = Vec::with_capacity(joint.len()); for item in joint { - let (block_idx, keep, range, block_location) = item; + let (block_idx, keep, range, block) = item; if keep { - let block = block_metas[block_idx].clone(); - - debug_assert_eq!(block_location, block.location.0); - result.push(( BlockMetaIndex { segment_idx: 0, @@ -317,7 +317,7 @@ impl StreamPruner { range, page_size: block.page_size() as usize, block_id: 0, - block_location: block_location.clone(), + block_location: block.location.0.clone(), segment_location: "".to_string(), snapshot_location: None, }, @@ -336,7 +336,7 @@ impl StreamPruner { fn block_pruning_sync( &self, - block_metas: Vec>, + block_metas: Vec<(usize, Arc)>, ) -> Result)>> { let pruning_stats = self.pruning_ctx.pruning_stats.clone(); let limit_pruner = self.pruning_ctx.limit_pruner.clone(); @@ -346,7 +346,7 @@ impl StreamPruner { let start = Instant::now(); let mut result = Vec::with_capacity(block_metas.len()); - for (block_idx, block_meta) in block_metas.into_iter().enumerate() { + for (block_idx, block_meta) in block_metas.into_iter() { // Perf. { metrics_inc_blocks_range_pruning_before(1); diff --git a/src/query/storages/stream/src/stream_table.rs b/src/query/storages/stream/src/stream_table.rs index 16bc7ba850045..76cb10460ace3 100644 --- a/src/query/storages/stream/src/stream_table.rs +++ b/src/query/storages/stream/src/stream_table.rs @@ -35,10 +35,13 @@ use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::types::decimal::Decimal128Type; +use databend_common_expression::ColumnId; use databend_common_expression::FromData; use databend_common_expression::RemoteExpr; use databend_common_expression::Scalar; use databend_common_expression::BASE_BLOCK_IDS_COL_NAME; +use databend_common_expression::BASE_ROW_ID_COLUMN_ID; +use databend_common_expression::CHANGE_ROW_ID_COLUMN_ID; use databend_common_expression::ORIGIN_BLOCK_ID_COL_NAME; use databend_common_expression::ORIGIN_BLOCK_ROW_NUM_COL_NAME; use databend_common_expression::ORIGIN_VERSION_COL_NAME; @@ -157,33 +160,6 @@ impl StreamTable { }) } - pub async fn source_table(&self, ctx: Arc) -> Result> { - let table = ctx - .get_table( - self.stream_info.catalog(), - &self.table_database, - &self.table_name, - ) - .await?; - - if table.get_table_info().ident.table_id != self.table_id { - return Err(ErrorCode::IllegalStream(format!( - "Table id mismatch, expect {}, got {}", - self.table_id, - table.get_table_info().ident.table_id - ))); - } - - if !table.change_tracking_enabled() { - return Err(ErrorCode::IllegalStream(format!( - "Change tracking is not enabled for table '{}.{}'", - self.table_database, self.table_name - ))); - } - - Ok(table) - } - pub fn offset(&self) -> u64 { self.table_version } @@ -215,13 +191,13 @@ impl StreamTable { push_downs: Option, ) -> Result<(PartStatistics, Partitions)> { let start = Instant::now(); - let table = self.source_table(ctx.clone()).await?; + let table = self.source_table(ctx.clone()).await?.unwrap(); let fuse_table = FuseTable::try_from_table(table.as_ref())?; + let latest_snapshot = fuse_table.read_table_snapshot().await?; if latest_snapshot.is_none() { return Ok((PartStatistics::default(), Partitions::default())); } - let latest_snapshot = latest_snapshot.unwrap(); let latest_segments = HashSet::from_iter(latest_snapshot.segments.clone()); @@ -345,7 +321,7 @@ impl StreamTable { #[minitrace::trace] pub async fn check_stream_status(&self, ctx: Arc) -> Result { - let base_table = self.source_table(ctx).await?; + let base_table = self.source_table(ctx).await?.unwrap(); let status = if base_table.get_table_info().ident.seq == self.table_version { StreamStatus::NoData } else { @@ -365,12 +341,12 @@ impl Table for StreamTable { &self.stream_info } - /// whether column prune(projection) can help in table read - fn support_column_projection(&self) -> bool { - true + fn support_internal_column_id(&self, column_id: ColumnId) -> bool { + (CHANGE_ROW_ID_COLUMN_ID..=BASE_ROW_ID_COLUMN_ID).contains(&column_id) } - fn support_row_id_column(&self) -> bool { + /// whether column prune(projection) can help in table read + fn support_column_projection(&self) -> bool { true } @@ -388,6 +364,33 @@ impl Table for StreamTable { ] } + async fn source_table(&self, ctx: Arc) -> Result>> { + let table = ctx + .get_table( + self.stream_info.catalog(), + &self.table_database, + &self.table_name, + ) + .await?; + + if table.get_table_info().ident.table_id != self.table_id { + return Err(ErrorCode::IllegalStream(format!( + "Table id mismatch, expect {}, got {}", + self.table_id, + table.get_table_info().ident.table_id + ))); + } + + if !table.change_tracking_enabled() { + return Err(ErrorCode::IllegalStream(format!( + "Change tracking is not enabled for table '{}.{}'", + self.table_database, self.table_name + ))); + } + + Ok(Some(table)) + } + #[minitrace::trace] #[async_backtrace::framed] async fn read_partitions( diff --git a/src/query/storages/system/src/streams_table.rs b/src/query/storages/system/src/streams_table.rs index cfd00abb4233f..02a2d8373b34f 100644 --- a/src/query/storages/system/src/streams_table.rs +++ b/src/query/storages/system/src/streams_table.rs @@ -182,7 +182,8 @@ impl AsyncSystemTable for StreamsTable { let mut reason = "".to_string(); match stream_table.source_table(ctx.clone()).await { Ok(source) => { - let fuse_table = FuseTable::try_from_table(source.as_ref())?; + let table = source.unwrap(); + let fuse_table = FuseTable::try_from_table(table.as_ref())?; if let Some(location) = stream_table.snapshot_loc() { reason = SnapshotsIO::read_snapshot( location,