diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 69d4ff3be4898..037b2d60baa21 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -374,6 +374,10 @@ pub trait TableContext: Send + Sync { fn get_shared_settings(&self) -> Arc; fn get_runtime(&self) -> Result>; + + fn add_m_cte_temp_table(&self, database_name: &str, table_name: &str); + + async fn drop_m_cte_temp_table(&self) -> Result<()>; } pub type AbortChecker = Arc; diff --git a/src/query/service/src/interpreters/interpreter.rs b/src/query/service/src/interpreters/interpreter.rs index 861d9d754f3df..30a80033f976e 100644 --- a/src/query/service/src/interpreters/interpreter.rs +++ b/src/query/service/src/interpreters/interpreter.rs @@ -85,7 +85,10 @@ pub trait Interpreter: Sync + Send { #[fastrace::trace] async fn execute(&self, ctx: Arc) -> Result { log_query_start(&ctx); - match self.execute_inner(ctx.clone()).await { + let res = self.execute_inner(ctx.clone()).await; + // Drop the temp tables that are generated by materialized cte. + ctx.drop_m_cte_temp_table().await?; + match res { Ok(stream) => Ok(stream), Err(err) => { log_query_finished(&ctx, Some(err.clone()), false); @@ -230,7 +233,9 @@ async fn plan_sql( ) -> Result<(Plan, PlanExtras, AcquireQueueGuard)> { let mut planner = Planner::new_with_query_executor( ctx.clone(), - Arc::new(ServiceQueryExecutor::new(ctx.clone())), + Arc::new(ServiceQueryExecutor::new(QueryContext::create_from( + ctx.clone(), + ))), ); // Parse the SQL query, get extract additional information. diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 4055fe53b9291..bea3646a44077 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -79,6 +79,7 @@ use databend_common_meta_app::principal::UserPrivilegeType; use databend_common_meta_app::principal::COPY_MAX_FILES_COMMIT_MSG; use databend_common_meta_app::principal::COPY_MAX_FILES_PER_COMMIT; use databend_common_meta_app::schema::CatalogType; +use databend_common_meta_app::schema::DropTableByIdReq; use databend_common_meta_app::schema::GetTableCopiedFileReq; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::storage::StorageParams; @@ -107,9 +108,11 @@ use databend_common_storages_stage::StageTable; use databend_common_storages_stream::stream_table::StreamTable; use databend_common_users::GrantObjectVisibilityChecker; use databend_common_users::UserApiProvider; +use databend_storages_common_session::drop_table_by_id; use databend_storages_common_session::SessionState; use databend_storages_common_session::TxnManagerRef; use databend_storages_common_table_meta::meta::Location; +use databend_storages_common_table_meta::table::OPT_KEY_TEMP_PREFIX; use jiff::tz::TimeZone; use jiff::Zoned; use log::debug; @@ -149,6 +152,9 @@ pub struct QueryContext { fragment_id: Arc, // Used by synchronized generate aggregating indexes when new data written. inserted_segment_locs: Arc>>, + // Temp table for materialized CTE, first string is the database_name, second string is the table_name + // All temp tables' catalog is `CATALOG_DEFAULT`, so we don't need to store it. + m_cte_temp_table: Arc>>, } impl QueryContext { @@ -174,6 +180,7 @@ impl QueryContext { fragment_id: Arc::new(AtomicUsize::new(0)), inserted_segment_locs: Arc::new(RwLock::new(HashSet::new())), block_threshold: Arc::new(RwLock::new(BlockThresholds::default())), + m_cte_temp_table: Arc::new(Default::default()), }) } @@ -1467,6 +1474,43 @@ impl TableContext for QueryContext { fn get_runtime(&self) -> Result> { self.shared.try_get_runtime() } + + fn add_m_cte_temp_table(&self, database_name: &str, table_name: &str) { + self.m_cte_temp_table + .write() + .push((database_name.to_string(), table_name.to_string())); + } + + async fn drop_m_cte_temp_table(&self) -> Result<()> { + let temp_tbl_mgr = self.shared.session.session_ctx.temp_tbl_mgr(); + let m_cte_temp_table = self.m_cte_temp_table.read().clone(); + let tenant = self.get_tenant(); + for (db_name, table_name) in m_cte_temp_table.iter() { + let table = self.get_table(CATALOG_DEFAULT, db_name, table_name).await?; + let db = self + .get_catalog(CATALOG_DEFAULT) + .await? + .get_database(&tenant, db_name) + .await?; + let drop_table_req = DropTableByIdReq { + if_exists: true, + tenant: tenant.clone(), + tb_id: table.get_table_info().ident.table_id, + table_name: table_name.to_string(), + db_id: db.get_db_info().database_id.db_id, + engine: table.engine().to_string(), + session_id: table + .options() + .get(OPT_KEY_TEMP_PREFIX) + .cloned() + .unwrap_or_default(), + }; + drop_table_by_id(temp_tbl_mgr.clone(), drop_table_req).await?; + } + let mut m_cte_temp_table = self.m_cte_temp_table.write(); + m_cte_temp_table.clear(); + Ok(()) + } } impl TrySpawn for QueryContext { diff --git a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs index d07e1e81b0d93..3d3d2fc7e23fe 100644 --- a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs +++ b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs @@ -1000,6 +1000,14 @@ impl TableContext for CtxDelegation { fn get_runtime(&self) -> Result> { todo!() } + + fn add_m_cte_temp_table(&self, _database_name: &str, _table_name: &str) { + todo!() + } + + async fn drop_m_cte_temp_table(&self) -> Result<()> { + todo!() + } } #[tokio::test(flavor = "multi_thread")] diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index 0840ac188ea77..8d494ca256f8a 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -880,6 +880,14 @@ impl TableContext for CtxDelegation { fn get_runtime(&self) -> Result> { todo!() } + + fn add_m_cte_temp_table(&self, _database_name: &str, _table_name: &str) { + todo!() + } + + async fn drop_m_cte_temp_table(&self) -> Result<()> { + todo!() + } } #[derive(Clone, Debug)] diff --git a/src/query/sql/src/planner/binder/bind_context.rs b/src/query/sql/src/planner/binder/bind_context.rs index 93dd83d68d061..b52c3a733aa80 100644 --- a/src/query/sql/src/planner/binder/bind_context.rs +++ b/src/query/sql/src/planner/binder/bind_context.rs @@ -212,7 +212,6 @@ pub struct CteInfo { pub cte_idx: IndexType, // If cte is materialized, save its columns pub columns: Vec, - pub m_cte_name_to_temp_table: HashMap, } impl BindContext { diff --git a/src/query/sql/src/planner/binder/bind_query/bind.rs b/src/query/sql/src/planner/binder/bind_query/bind.rs index a13726f90fa24..d74b7e0aa0b1e 100644 --- a/src/query/sql/src/planner/binder/bind_query/bind.rs +++ b/src/query/sql/src/planner/binder/bind_query/bind.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::sync::Arc; use databend_common_ast::ast::CreateOption; @@ -26,6 +25,7 @@ use databend_common_ast::ast::TableType; use databend_common_ast::ast::With; use databend_common_ast::ast::CTE; use databend_common_ast::Span; +use databend_common_catalog::catalog::CATALOG_DEFAULT; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -89,21 +89,17 @@ impl Binder { .iter() .map(|ident| self.normalize_identifier(ident).name) .collect(); - let mut cte_info = CteInfo { + let cte_info = CteInfo { columns_alias: column_name, query: *cte.query.clone(), recursive: with.recursive, cte_idx: idx, columns: vec![], materialized: cte.materialized, - m_cte_name_to_temp_table: HashMap::new(), }; // If the CTE is materialized, we'll construct a temp table for it. if cte.materialized { - let temp_table_name = self.m_cte_to_temp_table(cte)?; - cte_info - .m_cte_name_to_temp_table - .insert(cte.alias.name.name.clone(), temp_table_name); + self.m_cte_to_temp_table(cte)?; } bind_context .cte_context @@ -183,27 +179,27 @@ impl Binder { } // The return value is temp_table name` - fn m_cte_to_temp_table(&self, cte: &CTE) -> Result { + fn m_cte_to_temp_table(&self, cte: &CTE) -> Result<()> { let engine = if self.ctx.get_settings().get_persist_materialized_cte()? { Engine::Fuse } else { Engine::Memory }; let database = self.ctx.get_current_database(); - let catalog = self.ctx.get_current_catalog(); - let query_id = self.ctx.get_id(); - // Navigate the temp table for cte to `cte_name + query_id` - // Avoid the conflict of the temp table name with the same name of user's temp table. - let table_name = format!( - "{}_{}", - cte.alias.name.name, - query_id.split('-').next().unwrap_or(&query_id) - ); + if self + .ctx + .is_temp_table(CATALOG_DEFAULT, &database, &cte.alias.name.name) + { + return Err(ErrorCode::Internal(format!( + "Temporary table {:?} already exists in current session, please change the materialized CTE name", + cte.alias.name.name + ))); + } let create_table_stmt = CreateTableStmt { create_option: CreateOption::CreateOrReplace, - catalog: Some(Identifier::from_name(Span::None, catalog.clone())), + catalog: Some(Identifier::from_name(Span::None, CATALOG_DEFAULT)), database: Some(Identifier::from_name(Span::None, database.clone())), - table: Identifier::from_name(cte.alias.name.span, table_name.clone()), + table: cte.alias.name.clone(), source: None, engine: Some(engine), uri_location: None, @@ -225,7 +221,10 @@ impl Binder { }; self.ctx - .remove_table_from_cache(&catalog, &database, &cte.alias.name.name); - Ok(table_name) + .add_m_cte_temp_table(&database, &cte.alias.name.name); + + self.ctx + .remove_table_from_cache(CATALOG_DEFAULT, &database, &cte.alias.name.name); + Ok(()) } } diff --git a/src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs b/src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs index 574a4d081b473..93bb3bed0fc0e 100644 --- a/src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs +++ b/src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs @@ -97,19 +97,6 @@ impl Binder { }; } - let mut temp_table_name = None; - if let Some(cte_info) = cte_map.get(&table_name) - && cte_info.materialized - { - temp_table_name = Some( - cte_info - .m_cte_name_to_temp_table - .get(&table_name) - .unwrap() - .to_string(), - ); - } - let navigation = self.resolve_temporal_clause(bind_context, temporal)?; // Resolve table with catalog @@ -117,11 +104,7 @@ impl Binder { match self.resolve_data_source( catalog.as_str(), database.as_str(), - if let Some(temp_table_name) = temp_table_name.as_ref() { - temp_table_name.as_str() - } else { - table_name.as_str() - }, + table_name.as_str(), navigation.as_ref(), max_batch_size, self.ctx.clone().get_abort_checker(), @@ -167,7 +150,6 @@ impl Binder { database.clone(), table_meta, table_name_alias, - None, bind_context.view_info.is_some(), bind_context.planning_agg_index, false, @@ -246,7 +228,6 @@ impl Binder { database.clone(), table_meta, table_name_alias, - None, false, false, false, @@ -278,7 +259,6 @@ impl Binder { catalog, database.clone(), table_meta, - Some(table_name.clone()), table_name_alias, bind_context.view_info.is_some(), bind_context.planning_agg_index, diff --git a/src/query/sql/src/planner/binder/bind_table_reference/bind_table_function.rs b/src/query/sql/src/planner/binder/bind_table_reference/bind_table_function.rs index b965384221871..886627804681a 100644 --- a/src/query/sql/src/planner/binder/bind_table_reference/bind_table_function.rs +++ b/src/query/sql/src/planner/binder/bind_table_reference/bind_table_function.rs @@ -145,7 +145,6 @@ impl Binder { "system".to_string(), table.clone(), table_alias_name, - None, false, false, false, @@ -208,7 +207,6 @@ impl Binder { "system".to_string(), table.clone(), table_alias_name, - None, false, false, false, diff --git a/src/query/sql/src/planner/binder/table.rs b/src/query/sql/src/planner/binder/table.rs index 38039d873f5a0..901a3bff350bd 100644 --- a/src/query/sql/src/planner/binder/table.rs +++ b/src/query/sql/src/planner/binder/table.rs @@ -130,7 +130,6 @@ impl Binder { "system".to_string(), table.clone(), table_alias_name, - None, false, false, true, diff --git a/src/query/sql/src/planner/dataframe.rs b/src/query/sql/src/planner/dataframe.rs index 49403dabc84dc..a590b9bbad8df 100644 --- a/src/query/sql/src/planner/dataframe.rs +++ b/src/query/sql/src/planner/dataframe.rs @@ -98,7 +98,6 @@ impl Dataframe { database.to_string(), table_meta, None, - None, false, false, false, diff --git a/src/query/sql/src/planner/expression_parser.rs b/src/query/sql/src/planner/expression_parser.rs index 1dfe88cd79f7f..e6d7b20b87ace 100644 --- a/src/query/sql/src/planner/expression_parser.rs +++ b/src/query/sql/src/planner/expression_parser.rs @@ -68,7 +68,6 @@ pub fn bind_table(table_meta: Arc) -> Result<(BindContext, MetadataRe "default".to_string(), table_meta, None, - None, false, false, false, diff --git a/src/query/sql/src/planner/metadata.rs b/src/query/sql/src/planner/metadata.rs index e3f942e88c29c..042766c848d55 100644 --- a/src/query/sql/src/planner/metadata.rs +++ b/src/query/sql/src/planner/metadata.rs @@ -349,18 +349,13 @@ impl Metadata { catalog: String, database: String, table_meta: Arc, - table_name: Option, table_alias_name: Option, source_of_view: bool, source_of_index: bool, source_of_stage: bool, consume: bool, ) -> IndexType { - let table_name = if let Some(table_name) = table_name { - table_name - } else { - table_meta.name().to_string() - }; + let table_name = table_meta.name().to_string(); let table_index = self.tables.len(); // If exists table alias name, use it instead of origin name