diff --git a/src/query/catalog/src/plan/internal_column.rs b/src/query/catalog/src/plan/internal_column.rs index 82c8eaebd86d..6caa6ff21e06 100644 --- a/src/query/catalog/src/plan/internal_column.rs +++ b/src/query/catalog/src/plan/internal_column.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::path::Path; + use common_exception::ErrorCode; use common_exception::Result; use common_expression::types::string::StringColumnBuilder; @@ -19,6 +21,7 @@ use common_expression::types::DataType; use common_expression::types::DecimalDataType; use common_expression::types::DecimalSize; use common_expression::types::NumberDataType; +use common_expression::types::StringType; use common_expression::types::UInt64Type; use common_expression::BlockEntry; use common_expression::BlockMetaInfo; @@ -30,7 +33,11 @@ use common_expression::Scalar; use common_expression::TableDataType; use common_expression::Value; use common_expression::BASE_BLOCK_IDS_COLUMN_ID; +use common_expression::BASE_ROW_ID_COLUMN_ID; use common_expression::BLOCK_NAME_COLUMN_ID; +use common_expression::CHANGE_ACTION_COLUMN_ID; +use common_expression::CHANGE_IS_UPDATE_COLUMN_ID; +use common_expression::CHANGE_ROW_ID_COLUMN_ID; use common_expression::ROW_ID_COLUMN_ID; use common_expression::SEGMENT_NAME_COLUMN_ID; use common_expression::SNAPSHOT_NAME_COLUMN_ID; @@ -121,7 +128,13 @@ pub enum InternalColumnType { BlockName, SegmentName, SnapshotName, + + // stream columns + BaseRowId, BaseBlockIds, + ChangeAction, + ChangeIsUpdate, + ChangeRowId, } #[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)] @@ -148,12 +161,16 @@ impl InternalColumn { InternalColumnType::BlockName => TableDataType::String, InternalColumnType::SegmentName => TableDataType::String, InternalColumnType::SnapshotName => TableDataType::String, + InternalColumnType::BaseRowId => TableDataType::String, InternalColumnType::BaseBlockIds => TableDataType::Array(Box::new( TableDataType::Decimal(DecimalDataType::Decimal128(DecimalSize { precision: 38, scale: 0, })), )), + InternalColumnType::ChangeAction => TableDataType::String, + InternalColumnType::ChangeIsUpdate => TableDataType::Boolean, + InternalColumnType::ChangeRowId => TableDataType::String, } } @@ -172,7 +189,21 @@ impl InternalColumn { InternalColumnType::BlockName => BLOCK_NAME_COLUMN_ID, InternalColumnType::SegmentName => SEGMENT_NAME_COLUMN_ID, InternalColumnType::SnapshotName => SNAPSHOT_NAME_COLUMN_ID, + InternalColumnType::BaseRowId => BASE_ROW_ID_COLUMN_ID, InternalColumnType::BaseBlockIds => BASE_BLOCK_IDS_COLUMN_ID, + InternalColumnType::ChangeAction => CHANGE_ACTION_COLUMN_ID, + InternalColumnType::ChangeIsUpdate => CHANGE_IS_UPDATE_COLUMN_ID, + InternalColumnType::ChangeRowId => CHANGE_ROW_ID_COLUMN_ID, + } + } + + pub fn virtual_computed_expr(&self) -> Option { + match &self.column_type { + InternalColumnType::ChangeRowId => Some( + "sha1(if(is_not_null(_origin_block_id), concat(to_uuid(_origin_block_id), hex(_origin_block_row_num)), _base_row_id))" + .to_string(), + ), + _ => None, } } @@ -234,6 +265,31 @@ impl InternalColumn { Value::Scalar(Scalar::String(builder.build_scalar())), ) } + InternalColumnType::BaseRowId => { + let file_stem = Path::new(&meta.block_location).file_stem().unwrap(); + let file_strs = file_stem + .to_str() + .unwrap_or("") + .split('_') + .collect::>(); + let uuid = file_strs[0]; + let mut row_ids = Vec::with_capacity(num_rows); + if let Some(offsets) = &meta.offsets { + for i in offsets { + let row_id = format!("{}{:x}", uuid, *i).as_bytes().to_vec(); + row_ids.push(row_id); + } + } else { + for i in 0..num_rows { + let row_id = format!("{}{:x}", uuid, i).as_bytes().to_vec(); + row_ids.push(row_id); + } + } + BlockEntry::new( + DataType::String, + Value::Column(StringType::from_data(row_ids)), + ) + } InternalColumnType::BaseBlockIds => { assert!(meta.base_block_ids.is_some()); BlockEntry::new( @@ -246,6 +302,14 @@ impl InternalColumn { Value::Scalar(meta.base_block_ids.clone().unwrap()), ) } + InternalColumnType::ChangeAction => BlockEntry::new( + DataType::String, + Value::Scalar(Scalar::String("INSERT".as_bytes().to_vec())), + ), + InternalColumnType::ChangeIsUpdate => { + BlockEntry::new(DataType::Boolean, Value::Scalar(Scalar::Boolean(false))) + } + InternalColumnType::ChangeRowId => unreachable!(), } } } diff --git a/src/query/expression/src/schema.rs b/src/query/expression/src/schema.rs index 16b44ea2b83d..f2ca3a8cb987 100644 --- a/src/query/expression/src/schema.rs +++ b/src/query/expression/src/schema.rs @@ -50,13 +50,25 @@ pub const ROW_ID_COLUMN_ID: u32 = u32::MAX; pub const BLOCK_NAME_COLUMN_ID: u32 = u32::MAX - 1; pub const SEGMENT_NAME_COLUMN_ID: u32 = u32::MAX - 2; pub const SNAPSHOT_NAME_COLUMN_ID: u32 = u32::MAX - 3; -pub const BASE_BLOCK_IDS_COLUMN_ID: u32 = u32::MAX - 4; +// internal stream column id. +pub const BASE_ROW_ID_COLUMN_ID: u32 = u32::MAX - 5; +pub const BASE_BLOCK_IDS_COLUMN_ID: u32 = u32::MAX - 6; +pub const CHANGE_ACTION_COLUMN_ID: u32 = u32::MAX - 7; +pub const CHANGE_IS_UPDATE_COLUMN_ID: u32 = u32::MAX - 8; +pub const CHANGE_ROW_ID_COLUMN_ID: u32 = u32::MAX - 9; + // internal column name. pub const ROW_ID_COL_NAME: &str = "_row_id"; pub const SNAPSHOT_NAME_COL_NAME: &str = "_snapshot_name"; pub const SEGMENT_NAME_COL_NAME: &str = "_segment_name"; pub const BLOCK_NAME_COL_NAME: &str = "_block_name"; +// internal stream column name. +pub const BASE_ROW_ID_COL_NAME: &str = "_base_row_id"; pub const BASE_BLOCK_IDS_COL_NAME: &str = "_base_block_ids"; +pub const CHANGE_ACTION_COL_NAME: &str = "change$action"; +pub const CHANGE_IS_UPDATE_COL_NAME: &str = "change$is_update"; +pub const CHANGE_ROW_ID_COL_NAME: &str = "change$row_id"; + pub const ROW_NUMBER_COL_NAME: &str = "_row_number"; pub const PREDICATE_COLUMN_NAME: &str = "_predicate"; @@ -71,7 +83,12 @@ pub const ORIGIN_BLOCK_ROW_NUM_COL_NAME: &str = "_origin_block_row_num"; #[inline] pub fn is_internal_column_id(column_id: ColumnId) -> bool { - column_id >= BASE_BLOCK_IDS_COLUMN_ID + column_id >= CHANGE_ROW_ID_COLUMN_ID +} + +#[inline] +pub fn is_internal_stream_column_id(column_id: ColumnId) -> bool { + (CHANGE_ROW_ID_COLUMN_ID..=BASE_ROW_ID_COLUMN_ID).contains(&column_id) } #[inline] diff --git a/src/query/functions/src/scalars/string.rs b/src/query/functions/src/scalars/string.rs index cbbec9c29a07..da6f3bb5627e 100644 --- a/src/query/functions/src/scalars/string.rs +++ b/src/query/functions/src/scalars/string.rs @@ -18,6 +18,8 @@ use std::io::Write; use base64::engine::general_purpose; use base64::prelude::*; use bstr::ByteSlice; +use common_base::base::uuid::Uuid; +use common_expression::types::decimal::Decimal128Type; use common_expression::types::number::SimpleDomain; use common_expression::types::number::UInt64Type; use common_expression::types::string::StringColumn; @@ -263,6 +265,17 @@ pub fn register(registry: &mut FunctionRegistry) { }), ); + registry.register_passthrough_nullable_1_arg::( + "to_uuid", + |_, _| FunctionDomain::Full, + vectorize_with_builder_1_arg::(|arg, output, _| { + let uuid = Uuid::from_u128(arg as u128); + let str = uuid.as_simple().to_string(); + output.put_slice(str.as_bytes()); + output.commit_row(); + }), + ); + registry.register_2_arg::, _, _>( "strcmp", |_, _, _| FunctionDomain::Full, diff --git a/src/query/functions/tests/it/scalars/testdata/function_list.txt b/src/query/functions/tests/it/scalars/testdata/function_list.txt index b932d1b05533..1c5ad6a0dfd4 100644 --- a/src/query/functions/tests/it/scalars/testdata/function_list.txt +++ b/src/query/functions/tests/it/scalars/testdata/function_list.txt @@ -3607,6 +3607,8 @@ Functions overloads: 25 to_uint8(Boolean NULL) :: UInt8 NULL 0 to_unix_timestamp(Timestamp) :: Int64 1 to_unix_timestamp(Timestamp NULL) :: Int64 NULL +0 to_uuid(Decimal(38, 0)) :: String +1 to_uuid(Decimal(38, 0) NULL) :: String NULL 0 to_variant(T0) :: Variant 1 to_variant(T0 NULL) :: Variant NULL 0 to_week_of_year(Date) :: UInt32 diff --git a/src/query/service/src/pipelines/builders/builder_scan.rs b/src/query/service/src/pipelines/builders/builder_scan.rs index 10d50c193a06..d5baa887b9c5 100644 --- a/src/query/service/src/pipelines/builders/builder_scan.rs +++ b/src/query/service/src/pipelines/builders/builder_scan.rs @@ -29,9 +29,9 @@ use common_sql::evaluator::CompoundBlockOperator; use common_sql::executor::physical_plans::ConstantTableScan; use common_sql::executor::physical_plans::CteScan; use common_sql::executor::physical_plans::TableScan; -use common_storages_fuse::operations::FillInternalColumnProcessor; use crate::pipelines::processors::transforms::MaterializedCteSource; +use crate::pipelines::processors::transforms::TransformAddInternalColumns; use crate::pipelines::PipelineBuilder; impl PipelineBuilder { @@ -74,13 +74,7 @@ impl PipelineBuilder { if let Some(internal_columns) = &scan.internal_column { if table.support_row_id_column() { self.main_pipeline.add_transform(|input, output| { - Ok(ProcessorPtr::create(Box::new( - FillInternalColumnProcessor::create( - internal_columns.clone(), - input, - output, - ), - ))) + TransformAddInternalColumns::try_create(input, output, internal_columns.clone()) })?; } else { return Err(ErrorCode::TableEngineNotSupported(format!( diff --git a/src/query/service/src/pipelines/processors/transforms/mod.rs b/src/query/service/src/pipelines/processors/transforms/mod.rs index 91423a8c8b6c..bce4bad5adb5 100644 --- a/src/query/service/src/pipelines/processors/transforms/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/mod.rs @@ -21,6 +21,7 @@ mod processor_extract_hash_table_by_row_number; pub(crate) mod range_join; mod transform_add_computed_columns; mod transform_add_const_columns; +mod transform_add_internal_columns; mod transform_add_stream_columns; mod transform_cast_schema; mod transform_create_sets; @@ -42,6 +43,7 @@ pub use processor_extract_hash_table_by_row_number::ExtractHashTableByRowNumber; pub use range_join::RangeJoinState; pub use transform_add_computed_columns::TransformAddComputedColumns; pub use transform_add_const_columns::TransformAddConstColumns; +pub use transform_add_internal_columns::TransformAddInternalColumns; pub use transform_add_stream_columns::TransformAddStreamColumns; pub use transform_cast_schema::TransformCastSchema; pub use transform_create_sets::SubqueryReceiver; diff --git a/src/query/service/src/pipelines/processors/transforms/transform_add_internal_columns.rs b/src/query/service/src/pipelines/processors/transforms/transform_add_internal_columns.rs new file mode 100644 index 000000000000..bff8364ef990 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/transform_add_internal_columns.rs @@ -0,0 +1,71 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; +use std::sync::Arc; + +use common_catalog::plan::InternalColumn; +use common_catalog::plan::InternalColumnMeta; +use common_exception::ErrorCode; +use common_exception::Result; +use common_expression::BlockMetaInfoDowncast; +use common_expression::DataBlock; +use common_expression::FieldIndex; +use common_expression::CHANGE_ROW_ID_COLUMN_ID; +use common_pipeline_core::processors::InputPort; +use common_pipeline_core::processors::OutputPort; +use common_pipeline_core::processors::ProcessorPtr; +use common_pipeline_transforms::processors::Transform; +use common_pipeline_transforms::processors::Transformer; + +pub struct TransformAddInternalColumns { + internal_columns: BTreeMap, +} + +impl TransformAddInternalColumns +where Self: Transform +{ + pub fn try_create( + input: Arc, + output: Arc, + internal_columns: BTreeMap, + ) -> Result { + Ok(ProcessorPtr::create(Transformer::create( + input, + output, + Self { internal_columns }, + ))) + } +} + +impl Transform for TransformAddInternalColumns { + const NAME: &'static str = "AddInternalColumnsTransform"; + + fn transform(&mut self, mut block: DataBlock) -> Result { + if let Some(meta) = block.take_meta() { + let internal_column_meta = + InternalColumnMeta::downcast_from(meta).ok_or(ErrorCode::Internal("It's a bug"))?; + let num_rows = block.num_rows(); + for internal_column in self.internal_columns.values() { + if internal_column.column_id() == CHANGE_ROW_ID_COLUMN_ID { + continue; + } + let column = + internal_column.generate_column_values(&internal_column_meta, num_rows); + block.add_column(column); + } + } + Ok(block) + } +} diff --git a/src/query/sql/src/executor/physical_plans/physical_table_scan.rs b/src/query/sql/src/executor/physical_plans/physical_table_scan.rs index 7346788061c8..173faaf38380 100644 --- a/src/query/sql/src/executor/physical_plans/physical_table_scan.rs +++ b/src/query/sql/src/executor/physical_plans/physical_table_scan.rs @@ -162,17 +162,21 @@ impl PhysicalPlanBuilder { continue; } let column = metadata.column(*index); - if let ColumnEntry::BaseTableColumn(BaseTableColumn { path_indices, .. }) = column { - if path_indices.is_some() { - has_inner_column = true; + match column { + ColumnEntry::BaseTableColumn(BaseTableColumn { path_indices, .. }) => { + if path_indices.is_some() { + has_inner_column = true; + } } - } else if let ColumnEntry::InternalColumn(TableInternalColumn { - internal_column, .. - }) = column - { - project_internal_columns.insert(*index, internal_column.to_owned()); - } else if let ColumnEntry::VirtualColumn(_) = column { - has_virtual_column = true; + ColumnEntry::InternalColumn(TableInternalColumn { + internal_column, .. + }) => { + project_internal_columns.insert(*index, internal_column.to_owned()); + } + ColumnEntry::VirtualColumn(_) => { + has_virtual_column = true; + } + _ => {} } if let Some(prewhere) = &scan.prewhere { diff --git a/src/query/sql/src/planner/binder/bind_context.rs b/src/query/sql/src/planner/binder/bind_context.rs index ee2fb64fcbac..f6aef741a322 100644 --- a/src/query/sql/src/planner/binder/bind_context.rs +++ b/src/query/sql/src/planner/binder/bind_context.rs @@ -25,6 +25,7 @@ use common_catalog::plan::InternalColumn; use common_exception::ErrorCode; use common_exception::Result; use common_exception::Span; +use common_expression::is_internal_stream_column_id; use common_expression::ColumnId; use common_expression::DataField; use common_expression::DataSchemaRef; @@ -527,6 +528,7 @@ impl BindContext { &mut self, column_binding: &InternalColumnBinding, metadata: MetadataRef, + visible: bool, ) -> Result { if !self.allow_internal_columns { return Err(ErrorCode::SemanticError(format!( @@ -554,16 +556,30 @@ impl BindContext { let metadata = metadata.read(); let table = metadata.table(table_index); + if table.table().engine() != "STREAM" && is_internal_stream_column_id(column_id) { + return Err(ErrorCode::SemanticError(format!( + "Internal column `{}` is not allowed in table `{}`", + column_binding.internal_column.column_name(), + table.table().name() + ))); + } + let column = metadata.column(column_index); + let virtual_computed_expr = column_binding.internal_column.virtual_computed_expr(); let column_binding = ColumnBindingBuilder::new( column.name(), column_index, Box::new(column.data_type()), - Visibility::Visible, + if visible { + Visibility::Visible + } else { + Visibility::InVisible + }, ) .database_name(Some(table.database().to_string())) .table_name(Some(table.name().to_string())) .table_index(Some(table_index)) + .virtual_computed_expr(virtual_computed_expr) .build(); if new { diff --git a/src/query/sql/src/planner/binder/internal_column_factory.rs b/src/query/sql/src/planner/binder/internal_column_factory.rs index 8f259c6b1639..87c97081d817 100644 --- a/src/query/sql/src/planner/binder/internal_column_factory.rs +++ b/src/query/sql/src/planner/binder/internal_column_factory.rs @@ -17,7 +17,11 @@ use std::collections::BTreeMap; use common_catalog::plan::InternalColumn; use common_catalog::plan::InternalColumnType; use common_expression::BASE_BLOCK_IDS_COL_NAME; +use common_expression::BASE_ROW_ID_COL_NAME; use common_expression::BLOCK_NAME_COL_NAME; +use common_expression::CHANGE_ACTION_COL_NAME; +use common_expression::CHANGE_IS_UPDATE_COL_NAME; +use common_expression::CHANGE_ROW_ID_COL_NAME; use common_expression::ROW_ID_COL_NAME; use common_expression::SEGMENT_NAME_COL_NAME; use common_expression::SNAPSHOT_NAME_COL_NAME; @@ -54,11 +58,34 @@ impl InternalColumnFactory { InternalColumn::new(SNAPSHOT_NAME_COL_NAME, InternalColumnType::SnapshotName), ); + internal_columns.insert( + BASE_ROW_ID_COL_NAME.to_string(), + InternalColumn::new(BASE_ROW_ID_COL_NAME, InternalColumnType::BaseRowId), + ); + internal_columns.insert( BASE_BLOCK_IDS_COL_NAME.to_string(), InternalColumn::new(BASE_BLOCK_IDS_COL_NAME, InternalColumnType::BaseBlockIds), ); + internal_columns.insert( + CHANGE_ACTION_COL_NAME.to_string(), + InternalColumn::new(CHANGE_ACTION_COL_NAME, InternalColumnType::ChangeAction), + ); + + internal_columns.insert( + CHANGE_IS_UPDATE_COL_NAME.to_string(), + InternalColumn::new( + CHANGE_IS_UPDATE_COL_NAME, + InternalColumnType::ChangeIsUpdate, + ), + ); + + internal_columns.insert( + CHANGE_ROW_ID_COL_NAME.to_string(), + InternalColumn::new(CHANGE_ROW_ID_COL_NAME, InternalColumnType::ChangeRowId), + ); + InternalColumnFactory { internal_columns } } diff --git a/src/query/sql/src/planner/binder/merge_into.rs b/src/query/sql/src/planner/binder/merge_into.rs index 4ec39a3ca970..f855005aefe6 100644 --- a/src/query/sql/src/planner/binder/merge_into.rs +++ b/src/query/sql/src/planner/binder/merge_into.rs @@ -288,8 +288,11 @@ impl Binder { }, }; - let column_binding = target_context - .add_internal_column_binding(&row_id_column_binding, self.metadata.clone())?; + let column_binding = target_context.add_internal_column_binding( + &row_id_column_binding, + self.metadata.clone(), + true, + )?; target_expr = SExpr::add_internal_column_index(&target_expr, table_index, column_binding.index); diff --git a/src/query/sql/src/planner/binder/table.rs b/src/query/sql/src/planner/binder/table.rs index c8c79bc37cf3..9a8bd58930d9 100644 --- a/src/query/sql/src/planner/binder/table.rs +++ b/src/query/sql/src/planner/binder/table.rs @@ -88,6 +88,8 @@ use dashmap::DashMap; use log::info; use parking_lot::RwLock; +use super::InternalColumnBinding; +use super::INTERNAL_COLUMN_FACTORY; use crate::binder::copy_into_table::resolve_file_location; use crate::binder::scalar::ScalarBinder; use crate::binder::table_args::bind_table_args; @@ -96,7 +98,6 @@ use crate::binder::ColumnBindingBuilder; use crate::binder::CteInfo; use crate::binder::ExprContext; use crate::binder::Visibility; -use crate::binder::INTERNAL_COLUMN_FACTORY; use crate::optimizer::RelExpr; use crate::optimizer::SExpr; use crate::planner::semantic::normalize_identifier; @@ -1308,24 +1309,17 @@ impl Binder { .map(|col| col.index()) .collect::>(); if let Some(origin_block_id) = origin_block_id { - let column_index = self.metadata.write().add_internal_column( - table_index, - INTERNAL_COLUMN_FACTORY - .get_internal_column(BASE_BLOCK_IDS_COL_NAME) - .unwrap(), - ); - let column = self.metadata.read().column(column_index).clone(); - let base_block_ids = ColumnBindingBuilder::new( - column.name(), - column_index, - Box::new(column.data_type()), - Visibility::InVisible, - ) - .table_name(Some(table_name.to_string())) - .database_name(Some(database_name.to_string())) - .table_index(Some(table_index)) - .build(); - + let base_block_ids = bind_context.add_internal_column_binding( + &InternalColumnBinding { + database_name: Some(database_name.to_string()), + table_name: Some(table_name.to_string()), + internal_column: INTERNAL_COLUMN_FACTORY + .get_internal_column(BASE_BLOCK_IDS_COL_NAME) + .unwrap(), + }, + self.metadata.clone(), + false, + )?; columns.insert(base_block_ids.index); let predicate = ScalarExpr::FunctionCall(FunctionCall { diff --git a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_filter_scan.rs b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_filter_scan.rs index f9c037858904..ba3e12e0bd63 100644 --- a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_filter_scan.rs +++ b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_filter_scan.rs @@ -283,18 +283,13 @@ impl RulePushDownFilterScan { let used_columns = predicate.used_columns(); let mut contain_derived_column = false; for column_entry in column_entries { - match column_entry { - ColumnEntry::BaseTableColumn(_) => {} - ColumnEntry::InternalColumn(_) => {} - ColumnEntry::DerivedColumn(column) => { - // Don't push down predicate that contains derived column - // Because storage can't know such columns. - if used_columns.contains(&column.column_index) { - contain_derived_column = true; - break; - } + if let ColumnEntry::DerivedColumn(column) = column_entry { + // Don't push down predicate that contains derived column + // Because storage can't know such columns. + if used_columns.contains(&column.column_index) { + contain_derived_column = true; + break; } - ColumnEntry::VirtualColumn(_) => {} } } if !contain_derived_column { diff --git a/src/query/sql/src/planner/semantic/type_check.rs b/src/query/sql/src/planner/semantic/type_check.rs index 76ec1d7b5841..05f0c839deab 100644 --- a/src/query/sql/src/planner/semantic/type_check.rs +++ b/src/query/sql/src/planner/semantic/type_check.rs @@ -268,18 +268,26 @@ impl<'a> TypeChecker<'a> { } NameResolutionResult::InternalColumn(column) => { // add internal column binding into `BindContext` - let column = self - .bind_context - .add_internal_column_binding(&column, self.metadata.clone())?; - let data_type = *column.data_type.clone(); - ( - BoundColumnRef { - span: *span, - column, - } - .into(), - data_type, - ) + let column = self.bind_context.add_internal_column_binding( + &column, + self.metadata.clone(), + true, + )?; + if let Some(virtual_computed_expr) = column.virtual_computed_expr { + let sql_tokens = tokenize_sql(virtual_computed_expr.as_str())?; + let expr = parse_expr(&sql_tokens, self.dialect)?; + return self.resolve(&expr).await; + } else { + let data_type = *column.data_type.clone(); + ( + BoundColumnRef { + span: *span, + column, + } + .into(), + data_type, + ) + } } NameResolutionResult::Alias { scalar, .. } => { (scalar.clone(), scalar.data_type()?) diff --git a/src/query/storages/fuse/src/operations/common/processors/fill_internal_columns.rs b/src/query/storages/fuse/src/operations/common/processors/fill_internal_columns.rs deleted file mode 100644 index 7f1e886dd478..000000000000 --- a/src/query/storages/fuse/src/operations/common/processors/fill_internal_columns.rs +++ /dev/null @@ -1,120 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::any::Any; -use std::collections::BTreeMap; -use std::collections::VecDeque; -use std::sync::Arc; - -use common_catalog::plan::InternalColumn; -use common_catalog::plan::InternalColumnMeta; -use common_exception::Result; -use common_expression::BlockMetaInfoDowncast; -use common_expression::DataBlock; -use common_expression::FieldIndex; -use common_pipeline_core::processors::Event; -use common_pipeline_core::processors::InputPort; -use common_pipeline_core::processors::OutputPort; -use common_pipeline_core::processors::Processor; - -pub struct FillInternalColumnProcessor { - internal_columns: BTreeMap, - data_blocks: VecDeque<(InternalColumnMeta, DataBlock)>, - input: Arc, - output: Arc, - output_data: Option, -} - -impl FillInternalColumnProcessor { - pub fn create( - internal_columns: BTreeMap, - input: Arc, - output: Arc, - ) -> Self { - Self { - internal_columns, - data_blocks: VecDeque::new(), - input, - output, - output_data: None, - } - } -} - -#[async_trait::async_trait] -impl Processor for FillInternalColumnProcessor { - fn name(&self) -> String { - "FillInternalColumnProcessor".to_string() - } - - fn as_any(&mut self) -> &mut dyn Any { - self - } - - fn event(&mut self) -> Result { - if self.output.is_finished() { - self.input.finish(); - return Ok(Event::Finished); - } - - if !self.output.can_push() { - self.input.set_not_need_data(); - return Ok(Event::NeedConsume); - } - - if let Some(data_block) = self.output_data.take() { - self.output.push_data(Ok(data_block)); - return Ok(Event::NeedConsume); - } - - if self.input.has_data() { - let mut data_block = self.input.pull_data().unwrap()?; - if let Some(source_meta) = data_block.take_meta() { - if let Some(internal_column_meta) = InternalColumnMeta::downcast_from(source_meta) { - self.data_blocks - .push_back((internal_column_meta, data_block)); - return Ok(Event::Sync); - } - } - - unreachable!(); - } - - if self.input.is_finished() { - self.output.finish(); - return Ok(Event::Finished); - } - - self.input.set_need_data(); - Ok(Event::NeedData) - } - - fn process(&mut self) -> Result<()> { - if let Some((internal_column_meta, data_block)) = self.data_blocks.pop_front() { - let mut data_block = data_block; - let num_rows = data_block.num_rows(); - for internal_column in self.internal_columns.values() { - let column = - internal_column.generate_column_values(&internal_column_meta, num_rows); - data_block.add_column(column); - } - // output datablock MUST with empty meta - self.output_data = Some(DataBlock::new( - data_block.columns().to_vec(), - data_block.num_rows(), - )); - } - Ok(()) - } -} diff --git a/src/query/storages/fuse/src/operations/common/processors/mod.rs b/src/query/storages/fuse/src/operations/common/processors/mod.rs index c2eebd102d3d..ba5f27a7fe4f 100644 --- a/src/query/storages/fuse/src/operations/common/processors/mod.rs +++ b/src/query/storages/fuse/src/operations/common/processors/mod.rs @@ -12,13 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod fill_internal_columns; mod sink_commit; mod transform_mutation_aggregator; mod transform_serialize_block; mod transform_serialize_segment; -pub use fill_internal_columns::FillInternalColumnProcessor; pub use sink_commit::CommitSink; pub use transform_mutation_aggregator::TableMutationAggregator; pub use transform_serialize_block::TransformSerializeBlock; diff --git a/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs b/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs index 5987d5a7b3a2..daa5b7e25082 100644 --- a/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs +++ b/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs @@ -813,7 +813,7 @@ impl Processor for NativeDeserializeDataTransform { }; // Step 8: Fill `InternalColumnMeta` as `DataBlock.meta` if query internal columns, - // `FillInternalColumnProcessor` will generate internal columns using `InternalColumnMeta` in next pipeline. + // `TransformAddInternalColumns` will generate internal columns using `InternalColumnMeta` in next pipeline. let mut block = block.resort(&self.src_schema, &self.output_schema)?; if self.block_reader.query_internal_columns() { let offsets = if let Some(Value::Column(bitmap)) = filter.as_ref() { diff --git a/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs b/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs index 3af410830d07..aa5331b77685 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs @@ -233,7 +233,7 @@ impl Processor for DeserializeDataTransform { data_block.resort(&self.src_schema, &self.output_schema)?; // Fill `BlockMetaIndex` as `DataBlock.meta` if query internal columns, - // `FillInternalColumnProcessor` will generate internal columns using `BlockMetaIndex` in next pipeline. + // `TransformAddInternalColumns` will generate internal columns using `BlockMetaIndex` in next pipeline. if self.block_reader.query_internal_columns() { data_block = fill_internal_column_meta( data_block, diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0011_change_tracking.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0011_change_tracking.test index 56d78cffd5b5..fa311314bb38 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0011_change_tracking.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0011_change_tracking.test @@ -68,6 +68,9 @@ select a, _origin_version is null, _origin_block_id is null, _origin_block_row_n 5 0 0 0 6 0 0 1 +statement error 1065 +select change$is_update from t + statement error 1002 replace into t on(a) values(7) diff --git a/tests/sqllogictests/suites/ee/01_ee_system/01_0003_stream.test b/tests/sqllogictests/suites/ee/01_ee_system/01_0003_stream.test index 9eb6b78977bd..1406a8919cc4 100644 --- a/tests/sqllogictests/suites/ee/01_ee_system/01_0003_stream.test +++ b/tests/sqllogictests/suites/ee/01_ee_system/01_0003_stream.test @@ -67,10 +67,10 @@ select * from s 2 3 -query I -select * from s1 +query ITB +select a, change$action, change$is_update from s1 ---- -3 +3 INSERT 0 statement ok create stream s2 on table t at (stream => s) comment='this is a stream' diff --git a/tests/suites/5_ee/05_stream/05_0000_ee_stream.result b/tests/suites/5_ee/05_stream/05_0000_ee_stream.result index c510f4b388bd..efc4b56d18cf 100755 --- a/tests/suites/5_ee/05_stream/05_0000_ee_stream.result +++ b/tests/suites/5_ee/05_stream/05_0000_ee_stream.result @@ -1,3 +1,5 @@ +1 +2 INSERT 0 1 test_s default default db_stream.t test_s default default db_stream.t NULL test append_only test_s default default db_stream.t NULL test append_only diff --git a/tests/suites/5_ee/05_stream/05_0000_ee_stream.sh b/tests/suites/5_ee/05_stream/05_0000_ee_stream.sh index c86684b7831f..fcba56ca7148 100755 --- a/tests/suites/5_ee/05_stream/05_0000_ee_stream.sh +++ b/tests/suites/5_ee/05_stream/05_0000_ee_stream.sh @@ -7,7 +7,14 @@ echo "drop database if exists db_stream" | $BENDSQL_CLIENT_CONNECT echo "CREATE DATABASE db_stream" | $BENDSQL_CLIENT_CONNECT echo "create table db_stream.t(a int) change_tracking = true" | $BENDSQL_CLIENT_CONNECT +echo "insert into db_stream.t values(1)" | $BENDSQL_CLIENT_CONNECT echo "create stream default.test_s on table db_stream.t comment = 'test'" | $BENDSQL_CLIENT_CONNECT +echo "insert into db_stream.t values(2)" | $BENDSQL_CLIENT_CONNECT + +BASE_ROW_ID=$(echo "select _base_row_id from default.test_s" | $BENDSQL_CLIENT_CONNECT) +echo "select change\$row_id=$BASE_ROW_ID from default.test_s" | $BENDSQL_CLIENT_CONNECT +echo "optimize table db_stream.t compact" | $BENDSQL_CLIENT_CONNECT +echo "select a, change\$action, change\$is_update, change\$row_id=$BASE_ROW_ID from default.test_s" | $BENDSQL_CLIENT_CONNECT echo "show streams like 'test_s'" | $BENDSQL_CLIENT_CONNECT | awk '{print $(NF-3), $(NF-2), $(NF-1), $NF}' echo "show full streams like 'test_s'" | $BENDSQL_CLIENT_CONNECT | awk '{print $(NF-6), $(NF-5), $(NF-4), $(NF-3), $(NF-2), $(NF-1), $NF}'