From aa9a972a9794ec1908b24fb48fae5e5377d5b92c Mon Sep 17 00:00:00 2001 From: zhya Date: Thu, 19 Dec 2024 14:46:55 +0800 Subject: [PATCH] fix: change tracking stream failed (#17072) fix change tracking failed --- src/query/ast/src/ast/query.rs | 22 +++++++ src/query/catalog/src/table_context.rs | 8 +++ .../service/src/interpreters/common/stream.rs | 35 +----------- .../interpreter_copy_into_location.rs | 2 +- .../interpreter_copy_into_table.rs | 2 +- .../src/interpreters/interpreter_insert.rs | 3 +- .../interpreter_insert_multi_table.rs | 4 +- .../src/interpreters/interpreter_mutation.rs | 3 +- .../src/interpreters/interpreter_replace.rs | 2 +- .../src/interpreters/interpreter_select.rs | 2 +- src/query/service/src/sessions/query_ctx.rs | 33 +++++++++++ .../service/src/sessions/query_ctx_shared.rs | 3 +- .../sql/src/planner/binder/bind_query/bind.rs | 2 +- .../binder/bind_table_reference/bind_table.rs | 16 ++++-- .../bind_table_function.rs | 2 - src/query/sql/src/planner/binder/binder.rs | 25 ++++---- src/query/sql/src/planner/binder/table.rs | 1 - src/query/sql/src/planner/dataframe.rs | 1 - .../sql/src/planner/expression_parser.rs | 1 - src/query/sql/src/planner/metadata.rs | 13 +---- .../table_meta/src/table/stream_keys.rs | 15 +++++ src/query/storages/fuse/src/fuse_table.rs | 3 +- .../storages/fuse/src/operations/changes.rs | 57 +++++++++++-------- src/query/storages/stream/src/stream_table.rs | 3 +- .../06_ee_stream/06_0007_stream_ddl_txn.test | 14 +++++ .../06_ee_stream/06_0008_stream_issue_17058 | 48 ++++++++++++++++ 26 files changed, 216 insertions(+), 104 deletions(-) create mode 100644 tests/sqllogictests/suites/ee/06_ee_stream/06_0008_stream_issue_17058 diff --git a/src/query/ast/src/ast/query.rs b/src/query/ast/src/ast/query.rs index 165b66fd6b22..bcf0308d78bf 100644 --- a/src/query/ast/src/ast/query.rs +++ b/src/query/ast/src/ast/query.rs @@ -619,6 +619,28 @@ impl Display for WithOptions { } } +impl WithOptions { + /// Used for build change query. + pub fn to_change_query_with_clause(&self) -> String { + let mut result = String::from(" WITH ("); + for (i, (k, v)) in self.options.iter().enumerate() { + if i > 0 { + result.push_str(", "); + } + + if k == "consume" { + // The consume stream will be recorded in QueryContext. + // Skip 'consume' to avoid unnecessary operations. + result.push_str("consume = false"); + } else { + result.push_str(&format!("{k} = '{v}'")); + } + } + result.push(')'); + result + } +} + #[derive(Debug, Clone, PartialEq, Drive, DriveMut)] pub struct ChangesInterval { pub append_only: bool, diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 3ec2659deac7..c7f45b570184 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -376,6 +376,14 @@ pub trait TableContext: Send + Sync { fn add_m_cte_temp_table(&self, database_name: &str, table_name: &str); async fn drop_m_cte_temp_table(&self) -> Result<()>; + + fn add_streams_ref(&self, _catalog: &str, _database: &str, _stream: &str, _consume: bool) { + unimplemented!() + } + + fn get_consume_streams(&self, _query: bool) -> Result>> { + unimplemented!() + } } pub type AbortChecker = Arc; diff --git a/src/query/service/src/interpreters/common/stream.rs b/src/query/service/src/interpreters/common/stream.rs index e4efbc6c84ba..f5eb20b9ae3a 100644 --- a/src/query/service/src/interpreters/common/stream.rs +++ b/src/query/service/src/interpreters/common/stream.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; use std::sync::Arc; use chrono::Utc; @@ -24,13 +23,10 @@ use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::UpdateStreamMetaReq; use databend_common_meta_app::schema::UpdateTableMetaReq; use databend_common_meta_types::MatchSeq; -use databend_common_sql::MetadataRef; -use databend_common_sql::TableEntry; use databend_common_storages_factory::Table; use databend_common_storages_fuse::FuseTable; use databend_common_storages_fuse::TableContext; use databend_common_storages_stream::stream_table::StreamTable; -use databend_common_storages_stream::stream_table::STREAM_ENGINE; use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID; use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_NAME; use databend_storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION; @@ -42,9 +38,8 @@ use crate::sessions::QueryContext; pub async fn dml_build_update_stream_req( ctx: Arc, - metadata: &MetadataRef, ) -> Result> { - let tables = get_stream_table(metadata, |t| t.table().engine() == STREAM_ENGINE)?; + let tables = ctx.get_consume_streams(false)?; if tables.is_empty() { return Ok(vec![]); } @@ -96,38 +91,14 @@ pub async fn dml_build_update_stream_req( Ok(reqs) } -fn get_stream_table(metadata: &MetadataRef, pred: F) -> Result>> -where F: Fn(&TableEntry) -> bool { - let r_lock = metadata.read(); - let tables = r_lock.tables(); - let mut streams = vec![]; - let mut streams_ids = HashSet::new(); - for t in tables { - if pred(t) { - let stream = t.table(); - - let stream_id = stream.get_table_info().ident.table_id; - if streams_ids.contains(&stream_id) { - continue; - } - streams_ids.insert(stream_id); - - streams.push(stream); - } - } - Ok(streams) -} - pub struct StreamTableUpdates { pub update_table_metas: Vec<(UpdateTableMetaReq, TableInfo)>, } + pub async fn query_build_update_stream_req( ctx: &Arc, - metadata: &MetadataRef, ) -> Result> { - let streams = get_stream_table(metadata, |t| { - t.is_consume() && t.table().engine() == STREAM_ENGINE - })?; + let streams = ctx.get_consume_streams(true)?; if streams.is_empty() { return Ok(None); } diff --git a/src/query/service/src/interpreters/interpreter_copy_into_location.rs b/src/query/service/src/interpreters/interpreter_copy_into_location.rs index 7046f7ae3f17..6ec49b1ccbfa 100644 --- a/src/query/service/src/interpreters/interpreter_copy_into_location.rs +++ b/src/query/service/src/interpreters/interpreter_copy_into_location.rs @@ -75,7 +75,7 @@ impl CopyIntoLocationInterpreter { false, )?; - let update_stream_meta = dml_build_update_stream_req(self.ctx.clone(), metadata).await?; + let update_stream_meta = dml_build_update_stream_req(self.ctx.clone()).await?; Ok((select_interpreter, update_stream_meta)) } diff --git a/src/query/service/src/interpreters/interpreter_copy_into_table.rs b/src/query/service/src/interpreters/interpreter_copy_into_table.rs index 1b2ecd128ebf..2b7a4f5d08ff 100644 --- a/src/query/service/src/interpreters/interpreter_copy_into_table.rs +++ b/src/query/service/src/interpreters/interpreter_copy_into_table.rs @@ -78,7 +78,7 @@ impl CopyIntoTableInterpreter { v => unreachable!("Input plan must be Query, but it's {}", v), }; - let update_stream_meta = dml_build_update_stream_req(self.ctx.clone(), metadata).await?; + let update_stream_meta = dml_build_update_stream_req(self.ctx.clone()).await?; let select_interpreter = SelectInterpreter::try_create( self.ctx.clone(), diff --git a/src/query/service/src/interpreters/interpreter_insert.rs b/src/query/service/src/interpreters/interpreter_insert.rs index 1a057fdaf79e..c218c481351a 100644 --- a/src/query/service/src/interpreters/interpreter_insert.rs +++ b/src/query/service/src/interpreters/interpreter_insert.rs @@ -167,8 +167,7 @@ impl Interpreter for InsertInterpreter { .format_pretty()?; info!("Insert select plan: \n{}", explain_plan); - let update_stream_meta = - dml_build_update_stream_req(self.ctx.clone(), metadata).await?; + let update_stream_meta = dml_build_update_stream_req(self.ctx.clone()).await?; // here we remove the last exchange merge plan to trigger distribute insert let insert_select_plan = match (select_plan, table.support_distributed_insert()) { diff --git a/src/query/service/src/interpreters/interpreter_insert_multi_table.rs b/src/query/service/src/interpreters/interpreter_insert_multi_table.rs index 11d566a34371..9b021672ad12 100644 --- a/src/query/service/src/interpreters/interpreter_insert_multi_table.rs +++ b/src/query/service/src/interpreters/interpreter_insert_multi_table.rs @@ -130,8 +130,8 @@ impl Interpreter for InsertMultiTableInterpreter { impl InsertMultiTableInterpreter { pub async fn build_physical_plan(&self) -> Result { - let (mut root, metadata) = self.build_source_physical_plan().await?; - let update_stream_meta = dml_build_update_stream_req(self.ctx.clone(), &metadata).await?; + let (mut root, _) = self.build_source_physical_plan().await?; + let update_stream_meta = dml_build_update_stream_req(self.ctx.clone()).await?; let source_schema = root.output_schema()?; let branches = self.build_insert_into_branches().await?; let serializable_tables = branches diff --git a/src/query/service/src/interpreters/interpreter_mutation.rs b/src/query/service/src/interpreters/interpreter_mutation.rs index fe1dde3c7c87..c22ccdf93d19 100644 --- a/src/query/service/src/interpreters/interpreter_mutation.rs +++ b/src/query/service/src/interpreters/interpreter_mutation.rs @@ -190,8 +190,7 @@ impl MutationInterpreter { table_snapshot: Option>, ) -> Result { let table_info = fuse_table.get_table_info().clone(); - let update_stream_meta = - dml_build_update_stream_req(self.ctx.clone(), &mutation.metadata).await?; + let update_stream_meta = dml_build_update_stream_req(self.ctx.clone()).await?; let partitions = self .mutation_source_partitions(mutation, fuse_table, table_snapshot.clone()) .await?; diff --git a/src/query/service/src/interpreters/interpreter_replace.rs b/src/query/service/src/interpreters/interpreter_replace.rs index 6018939ae07a..9d35f62e3818 100644 --- a/src/query/service/src/interpreters/interpreter_replace.rs +++ b/src/query/service/src/interpreters/interpreter_replace.rs @@ -448,7 +448,7 @@ impl ReplaceInterpreter { v => unreachable!("Input plan must be Query, but it's {}", v), }; - let update_stream_meta = dml_build_update_stream_req(self.ctx.clone(), metadata).await?; + let update_stream_meta = dml_build_update_stream_req(self.ctx.clone()).await?; let select_interpreter = SelectInterpreter::try_create( ctx.clone(), diff --git a/src/query/service/src/interpreters/interpreter_select.rs b/src/query/service/src/interpreters/interpreter_select.rs index fd124c444d61..c389780b5502 100644 --- a/src/query/service/src/interpreters/interpreter_select.rs +++ b/src/query/service/src/interpreters/interpreter_select.rs @@ -138,7 +138,7 @@ impl SelectInterpreter { .await?; // consume stream - let update_stream_metas = query_build_update_stream_req(&self.ctx, &self.metadata).await?; + let update_stream_metas = query_build_update_stream_req(&self.ctx).await?; let catalog = self.ctx.get_default_catalog()?; build_res diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 32fe54220adf..140efbf3eb29 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -1511,6 +1511,39 @@ impl TableContext for QueryContext { m_cte_temp_table.clear(); Ok(()) } + + fn add_streams_ref(&self, catalog: &str, database: &str, stream: &str, consume: bool) { + let mut streams = self.shared.streams_refs.write(); + let stream_key = ( + catalog.to_string(), + database.to_string(), + stream.to_string(), + ); + streams + .entry(stream_key) + .and_modify(|v| { + if consume { + *v = true; + } + }) + .or_insert(consume); + } + + fn get_consume_streams(&self, query: bool) -> Result>> { + let streams_refs = self.shared.streams_refs.read(); + let tables = self.shared.tables_refs.lock(); + let mut streams_meta = Vec::with_capacity(streams_refs.len()); + for (stream_key, consume) in streams_refs.iter() { + if query && !consume { + continue; + } + let stream = tables + .get(stream_key) + .ok_or_else(|| ErrorCode::Internal("It's a bug"))?; + streams_meta.push(stream.clone()); + } + Ok(streams_meta) + } } impl TrySpawn for QueryContext { diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 7864f495d39f..86ffdcfae17f 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -97,6 +97,7 @@ pub struct QueryContextShared { pub(in crate::sessions) running_query_parameterized_hash: Arc>>, pub(in crate::sessions) aborting: Arc, pub(in crate::sessions) tables_refs: Arc>>>, + pub(in crate::sessions) streams_refs: Arc>>, pub(in crate::sessions) affect: Arc>>, pub(in crate::sessions) catalog_manager: Arc, pub(in crate::sessions) data_operator: DataOperator, @@ -168,6 +169,7 @@ impl QueryContextShared { running_query_parameterized_hash: Arc::new(RwLock::new(None)), aborting: Arc::new(AtomicBool::new(false)), tables_refs: Arc::new(Mutex::new(HashMap::new())), + streams_refs: Default::default(), affect: Arc::new(Mutex::new(None)), executor: Arc::new(RwLock::new(Weak::new())), stage_attachment: Arc::new(RwLock::new(None)), @@ -337,7 +339,6 @@ impl QueryContextShared { max_batch_size: Option, ) -> Result> { // Always get same table metadata in the same query - let table_meta_key = (catalog.to_string(), database.to_string(), table.to_string()); let already_in_cache = { self.tables_refs.lock().contains_key(&table_meta_key) }; diff --git a/src/query/sql/src/planner/binder/bind_query/bind.rs b/src/query/sql/src/planner/binder/bind_query/bind.rs index 250950f37b4c..1339e9b923ba 100644 --- a/src/query/sql/src/planner/binder/bind_query/bind.rs +++ b/src/query/sql/src/planner/binder/bind_query/bind.rs @@ -189,7 +189,7 @@ impl Binder { let query_id = self.ctx.get_id(); let database = self.ctx.get_current_database(); let mut table_identifier = cte.alias.name.clone(); - table_identifier.name = format!("{}_{}", table_identifier.name, query_id.replace("-", "_")); + table_identifier.name = format!("{}${}", table_identifier.name, query_id.replace("-", "")); let table_name = normalize_identifier(&table_identifier, &self.name_resolution_ctx).name; self.m_cte_table_name.insert( normalize_identifier(&cte.alias.name, &self.name_resolution_ctx).name, diff --git a/src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs b/src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs index a95132d04ec1..8dfaa916b411 100644 --- a/src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs +++ b/src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs @@ -63,7 +63,7 @@ impl Binder { check_with_opt_valid(with_options)?; let consume = get_with_opt_consume(with_options)?; let max_batch_size = get_with_opt_max_batch_size(with_options)?; - let with_opts_str = format!(" {with_options}"); + let with_opts_str = with_options.to_change_query_with_clause(); (consume, max_batch_size, with_opts_str) } else { (false, None, String::new()) @@ -74,7 +74,7 @@ impl Binder { let cte_map = bind_context.cte_context.cte_map.clone(); if let Some(cte_info) = cte_map.get(&table_name) { if cte_info.materialized { - cte_suffix_name = Some(self.ctx.get_id().replace("-", "_")); + cte_suffix_name = Some(self.ctx.get_id().replace("-", "")); } else { if self .metadata @@ -105,7 +105,7 @@ impl Binder { // Resolve table with catalog let table_meta = { let table_name = if let Some(cte_suffix_name) = cte_suffix_name.as_ref() { - format!("{}_{}", &table_name, cte_suffix_name) + format!("{}${}", &table_name, cte_suffix_name) } else { table_name.clone() }; @@ -161,7 +161,6 @@ impl Binder { bind_context.view_info.is_some(), bind_context.planning_agg_index, false, - consume, None, ); let (s_expr, mut bind_context) = self.bind_base_table( @@ -186,6 +185,10 @@ impl Binder { &with_opts_str, ))?; + if table_meta.is_stream() { + self.ctx + .add_streams_ref(&catalog, &database, &table_name, consume); + } let mut new_bind_context = BindContext::with_parent(Box::new(bind_context.clone())); let tokens = tokenize_sql(query.as_str())?; let (stmt, _) = parse_sql(&tokens, self.dialect)?; @@ -193,6 +196,9 @@ impl Binder { unreachable!() }; let (s_expr, mut new_bind_context) = self.bind_query(&mut new_bind_context, query)?; + bind_context + .cte_context + .set_cte_context(new_bind_context.cte_context.clone()); let cols = table_meta .schema() @@ -240,7 +246,6 @@ impl Binder { false, false, false, - false, None, ); let (s_expr, mut new_bind_context) = @@ -273,7 +278,6 @@ impl Binder { bind_context.view_info.is_some(), bind_context.planning_agg_index, false, - false, cte_suffix_name, ); diff --git a/src/query/sql/src/planner/binder/bind_table_reference/bind_table_function.rs b/src/query/sql/src/planner/binder/bind_table_reference/bind_table_function.rs index d3cf4a7064be..df7ab473deb9 100644 --- a/src/query/sql/src/planner/binder/bind_table_reference/bind_table_function.rs +++ b/src/query/sql/src/planner/binder/bind_table_reference/bind_table_function.rs @@ -146,7 +146,6 @@ impl Binder { false, false, false, - false, None, ); @@ -209,7 +208,6 @@ impl Binder { false, false, false, - false, None, ); diff --git a/src/query/sql/src/planner/binder/binder.rs b/src/query/sql/src/planner/binder/binder.rs index e90f9a7bb837..70279db2dc46 100644 --- a/src/query/sql/src/planner/binder/binder.rs +++ b/src/query/sql/src/planner/binder/binder.rs @@ -27,7 +27,6 @@ use databend_common_ast::parser::parse_sql; use databend_common_ast::parser::tokenize_sql; use databend_common_ast::parser::Dialect; use databend_common_catalog::catalog::CatalogManager; -use databend_common_catalog::query_kind::QueryKind; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -48,6 +47,7 @@ use databend_common_metrics::storage::metrics_inc_copy_purge_files_counter; use databend_common_storage::init_stage_operator; use databend_storages_common_io::Files; use databend_storages_common_session::TxnManagerRef; +use databend_storages_common_table_meta::table::is_stream_name; use log::error; use log::info; use log::warn; @@ -649,17 +649,20 @@ impl<'a> Binder { } }; - match plan.kind() { - QueryKind::Query | QueryKind::Explain => {} + match &plan { + Plan::Explain { .. } + | Plan::ExplainAnalyze { .. } + | Plan::ExplainAst { .. } + | Plan::ExplainSyntax { .. } + | Plan::Query { .. } => {} + Plan::CreateTable(plan) + if is_stream_name(&plan.table, self.ctx.get_id().replace("-", "").as_str()) => {} _ => { - let meta_data_guard = self.metadata.read(); - let tables = meta_data_guard.tables(); - for t in tables { - if t.is_consume() { - return Err(ErrorCode::SyntaxException( - "WITH CONSUME only allowed in query", - )); - } + let consume_streams = self.ctx.get_consume_streams(true)?; + if !consume_streams.is_empty() { + return Err(ErrorCode::SyntaxException( + "WITH CONSUME only allowed in query", + )); } } } diff --git a/src/query/sql/src/planner/binder/table.rs b/src/query/sql/src/planner/binder/table.rs index dc7bc681be7f..2511596844ba 100644 --- a/src/query/sql/src/planner/binder/table.rs +++ b/src/query/sql/src/planner/binder/table.rs @@ -140,7 +140,6 @@ impl Binder { false, false, true, - false, None, ); diff --git a/src/query/sql/src/planner/dataframe.rs b/src/query/sql/src/planner/dataframe.rs index 8a246eab706c..314a16960308 100644 --- a/src/query/sql/src/planner/dataframe.rs +++ b/src/query/sql/src/planner/dataframe.rs @@ -101,7 +101,6 @@ impl Dataframe { false, false, false, - false, None, ); diff --git a/src/query/sql/src/planner/expression_parser.rs b/src/query/sql/src/planner/expression_parser.rs index a7020fa7b492..e031ec01a406 100644 --- a/src/query/sql/src/planner/expression_parser.rs +++ b/src/query/sql/src/planner/expression_parser.rs @@ -71,7 +71,6 @@ pub fn bind_table(table_meta: Arc) -> Result<(BindContext, MetadataRe false, false, false, - false, None, ); diff --git a/src/query/sql/src/planner/metadata.rs b/src/query/sql/src/planner/metadata.rs index 4f77eb806370..bd8507d0604c 100644 --- a/src/query/sql/src/planner/metadata.rs +++ b/src/query/sql/src/planner/metadata.rs @@ -298,7 +298,7 @@ impl Metadata { let table_index = base_column.table_index; let source_column_name = base_column.column_name.clone(); let source_column_index = base_column.column_index; - // The type of source coumn is variant, not a nested type, must have `column_id`. + // The type of source column is variant, not a nested type, must have `column_id`. let source_column_id = base_column.column_id.unwrap(); // If the function that generates the virtual column already has an index, @@ -353,7 +353,6 @@ impl Metadata { source_of_view: bool, source_of_index: bool, source_of_stage: bool, - consume: bool, cte_suffix_name: Option, ) -> IndexType { let table_name = table_meta.name().to_string(); @@ -371,7 +370,6 @@ impl Metadata { source_of_view, source_of_index, source_of_stage, - consume, }; self.tables.push(table_entry); let table_schema = table_meta.schema_with_stream(); @@ -512,9 +510,6 @@ pub struct TableEntry { source_of_stage: bool, table: Arc, - - /// If this table need to be consumed. - consume: bool, } impl Debug for TableEntry { @@ -547,7 +542,6 @@ impl TableEntry { source_of_view: false, source_of_index: false, source_of_stage: false, - consume: false, } } @@ -596,11 +590,6 @@ impl TableEntry { self.source_of_index } - /// Return true if this table need to be consumed. - pub fn is_consume(&self) -> bool { - self.consume - } - pub fn update_table_index(&mut self, table_index: IndexType) { self.index = table_index; } diff --git a/src/query/storages/common/table_meta/src/table/stream_keys.rs b/src/query/storages/common/table_meta/src/table/stream_keys.rs index f43277b17267..6fcb6224f66b 100644 --- a/src/query/storages/common/table_meta/src/table/stream_keys.rs +++ b/src/query/storages/common/table_meta/src/table/stream_keys.rs @@ -82,3 +82,18 @@ pub fn get_change_type(table_alias_name: &Option) -> Option } change_type } + +pub fn is_stream_name(table_name: &str, suffix: &str) -> bool { + let parts = table_name.split('$').collect::>(); + if parts.len() != 3 || parts[0] != "_change" || parts[2] != suffix || parts[1].len() != 8 { + return false; + } + if let Ok(suffix) = i64::from_str_radix(parts[1], 16) { + // 2023-01-01 00:00:00. + let base_timestamp = 1672502400; + if suffix > base_timestamp { + return true; + } + } + false +} diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index f4607cca52da..b63b59dc4d5b 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -1082,7 +1082,7 @@ impl Table for FuseTable { #[async_backtrace::framed] async fn generate_changes_query( &self, - _ctx: Arc, + ctx: Arc, database_name: &str, table_name: &str, _with_options: &str, @@ -1102,6 +1102,7 @@ impl Table for FuseTable { self.check_changes_valid(&db_tb_name, *seq)?; self.get_changes_query( + ctx, mode, location, format!("{}.{} {}", database_name, table_name, desc), diff --git a/src/query/storages/fuse/src/operations/changes.rs b/src/query/storages/fuse/src/operations/changes.rs index e4c10299fc1f..25e594260b2c 100644 --- a/src/query/storages/fuse/src/operations/changes.rs +++ b/src/query/storages/fuse/src/operations/changes.rs @@ -120,6 +120,7 @@ impl FuseTable { pub async fn get_changes_query( &self, + ctx: Arc, mode: &StreamMode, base_location: &Option, table_desc: String, @@ -153,56 +154,64 @@ impl FuseTable { ) } StreamMode::Standard => { - let a_table_alias = format!("_change_insert${}", suffix); - let a_cols = cols.join(", "); + let quote = ctx.get_settings().get_sql_dialect()?.default_ident_quote(); + let a_table_alias = format!("_change_insert${}", suffix); let d_table_alias = format!("_change_delete${}", suffix); - let d_cols = cols - .iter() - .map(|s| format!("d_{}", s)) - .collect::>() - .join(", "); + let mut a_cols_vec = Vec::with_capacity(cols.len()); + let mut d_alias_vec = Vec::with_capacity(cols.len()); + let mut d_cols_vec = Vec::with_capacity(cols.len()); + for col in cols { + a_cols_vec.push(format!("{quote}{col}{quote}")); + d_alias_vec.push(format!("{quote}{col}{quote} as d_{col}")); + d_cols_vec.push(format!("d_{col}")); + } + let a_cols = a_cols_vec.join(", "); + let d_cols_alias = d_alias_vec.join(", "); + let d_cols = d_cols_vec.join(", "); + + let cte_name = format!("_change${}", suffix); format!( - "with _change({a_cols}, change$action, change$row_id, \ - {d_cols}, d_change$action, d_change$row_id) as \ + "with {cte_name} as materialized \ ( \ - select * \ + select {a_cols}, a_change$action, a_change$row_id, \ + {d_cols}, d_change$action, d_change$row_id \ from ( \ - select *, \ + select {a_cols}, \ _row_version, \ - 'INSERT' as change$action, \ + 'INSERT' as a_change$action, \ if(is_not_null(_origin_block_id), \ concat(to_uuid(_origin_block_id), lpad(hex(_origin_block_row_num), 6, '0')), \ {a_table_alias}._base_row_id \ - ) as change$row_id \ + ) as a_change$row_id \ from {table_desc} as {a_table_alias} \ ) as A \ FULL OUTER JOIN ( \ - select *, \ + select {d_cols_alias}, \ _row_version, \ - 'DELETE' as change$action, \ + 'DELETE' as d_change$action, \ if(is_not_null(_origin_block_id), \ concat(to_uuid(_origin_block_id), lpad(hex(_origin_block_row_num), 6, '0')), \ {d_table_alias}._base_row_id \ - ) as change$row_id \ + ) as d_change$row_id \ from {table_desc} as {d_table_alias} \ ) as D \ - on A.change$row_id = D.change$row_id \ - where A.change$row_id is null or D.change$row_id is null or A._row_version > D._row_version \ + on A.a_change$row_id = D.d_change$row_id \ + where A.a_change$row_id is null or D.d_change$row_id is null or A._row_version > D._row_version \ ) \ select {a_cols}, \ - change$action, \ - change$row_id, \ + a_change$action as change$action, \ + a_change$row_id as change$row_id, \ d_change$action is not null as change$is_update \ - from _change \ - where change$action is not null \ + from {cte_name} \ + where a_change$action is not null \ union all \ select {d_cols}, \ d_change$action, \ d_change$row_id, \ - change$action is not null as change$is_update \ - from _change \ + a_change$action is not null \ + from {cte_name} \ where d_change$action is not null", ) } diff --git a/src/query/storages/stream/src/stream_table.rs b/src/query/storages/stream/src/stream_table.rs index e3f0bbec4c8d..67b01da262c7 100644 --- a/src/query/storages/stream/src/stream_table.rs +++ b/src/query/storages/stream/src/stream_table.rs @@ -427,11 +427,12 @@ impl Table for StreamTable { table_name: &str, with_options: &str, ) -> Result { - let table = self.source_table(ctx).await?; + let table = self.source_table(ctx.clone()).await?; let fuse_table = FuseTable::try_from_table(table.as_ref())?; let table_desc = format!("{database_name}.{table_name}{with_options}"); fuse_table .get_changes_query( + ctx, &self.mode(), &self.snapshot_loc(), table_desc, diff --git a/tests/sqllogictests/suites/ee/06_ee_stream/06_0007_stream_ddl_txn.test b/tests/sqllogictests/suites/ee/06_ee_stream/06_0007_stream_ddl_txn.test index a2d001a08052..09daf98aaa7e 100644 --- a/tests/sqllogictests/suites/ee/06_ee_stream/06_0007_stream_ddl_txn.test +++ b/tests/sqllogictests/suites/ee/06_ee_stream/06_0007_stream_ddl_txn.test @@ -1,3 +1,17 @@ +## Copyright 2023 Databend Cloud +## +## Licensed under the Elastic License, Version 2.0 (the "License"); +## you may not use this file except in compliance with the License. +## You may obtain a copy of the License at +## +## https://www.elastic.co/licensing/elastic-license +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. + statement ok create or replace table t(a int); diff --git a/tests/sqllogictests/suites/ee/06_ee_stream/06_0008_stream_issue_17058 b/tests/sqllogictests/suites/ee/06_ee_stream/06_0008_stream_issue_17058 new file mode 100644 index 000000000000..a62ca152b937 --- /dev/null +++ b/tests/sqllogictests/suites/ee/06_ee_stream/06_0008_stream_issue_17058 @@ -0,0 +1,48 @@ +## Copyright 2023 Databend Cloud +## +## Licensed under the Elastic License, Version 2.0 (the "License"); +## you may not use this file except in compliance with the License. +## You may obtain a copy of the License at +## +## https://www.elastic.co/licensing/elastic-license +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. + +statement ok +create or replace table t_17058(a int not null); + +statement ok +insert into t_17058 values(1),(2); + +statement ok +create stream s_17058 on table t_17058 append_only=false; + +statement ok +insert into t_17058 values(3); + +statement ok +optimize table t_17058 compact; + +statement ok +create or replace table t1_17058(a int not null); + +statement ok +insert into t1_17058 select a from s_17058 where change$action = 'INSERT'; + +query I +select a from t1_17058; +---- +3 + +statement ok +drop stream s_17058; + +statement ok +drop table t1_17058 all; + +statement ok +drop table t_17058 all;