diff --git a/src/meta/model_v2/migration/src/m20230908_072257_init.rs b/src/meta/model_v2/migration/src/m20230908_072257_init.rs index 0c3d9d36c485..444433d84c31 100644 --- a/src/meta/model_v2/migration/src/m20230908_072257_init.rs +++ b/src/meta/model_v2/migration/src/m20230908_072257_init.rs @@ -571,6 +571,7 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Table::Description).string()) .col(ColumnDef::new(Table::Version).json()) .col(ColumnDef::new(Table::RetentionSeconds).integer()) + .col(ColumnDef::new(Table::IncomingSinks).json().not_null()) .foreign_key( &mut ForeignKey::create() .name("FK_table_object_id") @@ -628,6 +629,7 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Sink::DbName).string().not_null()) .col(ColumnDef::new(Sink::SinkFromName).string().not_null()) .col(ColumnDef::new(Sink::SinkFormatDesc).json()) + .col(ColumnDef::new(Sink::TargetTable).integer()) .foreign_key( &mut ForeignKey::create() .name("FK_sink_object_id") @@ -643,6 +645,13 @@ impl MigrationTrait for Migration { .to(Connection::Table, Connection::ConnectionId) .to_owned(), ) + .foreign_key( + &mut ForeignKey::create() + .name("FK_sink_target_table_id") + .from(Sink::Table, Sink::TargetTable) + .to(Table::Table, Table::TableId) + .to_owned(), + ) .to_owned(), ) .await?; @@ -1034,6 +1043,7 @@ enum Table { Description, Version, RetentionSeconds, + IncomingSinks, } #[derive(DeriveIden)] @@ -1069,6 +1079,7 @@ enum Sink { DbName, SinkFromName, SinkFormatDesc, + TargetTable, } #[derive(DeriveIden)] diff --git a/src/meta/model_v2/src/sink.rs b/src/meta/model_v2/src/sink.rs index 0d93b4380305..f5fa59c85ff5 100644 --- a/src/meta/model_v2/src/sink.rs +++ b/src/meta/model_v2/src/sink.rs @@ -18,6 +18,7 @@ use sea_orm::ActiveValue::Set; use crate::{ ColumnCatalogArray, ColumnOrderArray, ConnectionId, I32Array, Property, SinkFormatDesc, SinkId, + TableId, }; #[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] @@ -69,6 +70,7 @@ pub struct Model { pub db_name: String, pub sink_from_name: String, pub sink_format_desc: Option, + pub target_table: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -123,6 +125,7 @@ impl From for ActiveModel { db_name: Set(pb_sink.db_name), sink_from_name: Set(pb_sink.sink_from_name), sink_format_desc: Set(pb_sink.format_desc.map(|x| x.into())), + target_table: Set(pb_sink.target_table.map(|x| x as _)), } } } diff --git a/src/meta/model_v2/src/table.rs b/src/meta/model_v2/src/table.rs index 963b683f4ef5..06710d42d9d2 100644 --- a/src/meta/model_v2/src/table.rs +++ b/src/meta/model_v2/src/table.rs @@ -123,6 +123,7 @@ pub struct Model { pub description: Option, pub version: Option, pub retention_seconds: Option, + pub incoming_sinks: I32Array, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -232,6 +233,7 @@ impl From for ActiveModel { description: Set(pb_table.description), version: Set(pb_table.version.map(|v| v.into())), retention_seconds: Set(pb_table.retention_seconds.map(|i| i as _)), + incoming_sinks: Set(pb_table.incoming_sinks.into()), } } } diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index 59e520a3316f..479229386323 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -186,6 +186,7 @@ pub async fn rpc_serve( let mut options = sea_orm::ConnectOptions::new(endpoint); options .max_connections(max_connection) + .sqlx_logging(false) .connect_timeout(Duration::from_secs(10)) .idle_timeout(Duration::from_secs(30)); let conn = sea_orm::Database::connect(options).await?; diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 6b1b73d6ca69..07765fe840c3 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -928,6 +928,27 @@ impl CommandContext { init_split_assignment, ) .await?; + + if let Some(ReplaceTablePlan { + new_table_fragments, + dispatchers, + init_split_assignment, + old_table_fragments, + .. + }) = replace_table + { + // Tell compute nodes to drop actors. + self.clean_up(old_table_fragments.actor_ids()).await?; + + mgr.catalog_controller + .post_collect_table_fragments( + new_table_fragments.table_id().table_id as _, + new_table_fragments.actor_ids(), + dispatchers.clone(), + init_split_assignment, + ) + .await?; + } } } diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index 4873a42809b0..7fd70318a1ff 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -134,8 +134,7 @@ impl From> for PbTable { .optional_associated_source_id .map(|id| PbOptionalAssociatedSourceId::AssociatedSourceId(id as _)), description: value.0.description, - // TODO: fix it for model v2. - incoming_sinks: vec![], + incoming_sinks: value.0.incoming_sinks.into_u32_array(), initialized_at_cluster_version: value.1.initialized_at_cluster_version, created_at_cluster_version: value.1.created_at_cluster_version, retention_seconds: value.0.retention_seconds.map(|id| id as u32), @@ -204,8 +203,7 @@ impl From> for PbSink { sink_from_name: value.0.sink_from_name, stream_job_status: PbStreamJobStatus::Created as _, // todo: deprecate it. format_desc: value.0.sink_format_desc.map(|desc| desc.0), - // todo: fix this for model v2 - target_table: None, + target_table: value.0.target_table.map(|id| id as _), initialized_at_cluster_version: value.1.initialized_at_cluster_version, created_at_cluster_version: value.1.created_at_cluster_version, } diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 4fc84dda21d5..9bb8af617246 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -552,8 +552,8 @@ impl CatalogController { streaming_job: StreamingJob, merge_updates: Vec, table_col_index_mapping: Option, - _creating_sink_id: Option, - _dropping_sink_id: Option, + creating_sink_id: Option, + dropping_sink_id: Option, ) -> MetaResult { // Question: The source catalog should be remain unchanged? let StreamingJob::Table(_, table, ..) = streaming_job else { @@ -564,7 +564,22 @@ impl CatalogController { let txn = inner.db.begin().await?; let job_id = table.id as ObjectId; - let table = table::ActiveModel::from(table).update(&txn).await?; + let mut table = table::ActiveModel::from(table); + let mut incoming_sinks = table.incoming_sinks.as_ref().inner_ref().clone(); + if let Some(sink_id) = creating_sink_id { + debug_assert!(!incoming_sinks.contains(&(sink_id as i32))); + incoming_sinks.push(sink_id as _); + } + + if let Some(sink_id) = dropping_sink_id { + let drained = incoming_sinks + .extract_if(|id| *id == sink_id as i32) + .collect_vec(); + debug_assert_eq!(drained, vec![sink_id as i32]); + } + + table.incoming_sinks = Set(incoming_sinks.into()); + let table = table.update(&txn).await?; // let old_fragment_mappings = get_fragment_mappings(&txn, job_id).await?; // 1. replace old fragments/actors with new ones. diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 25b6ed25464a..7cff9dc4a9b7 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -389,8 +389,13 @@ impl DdlController { .await } MetadataManager::V2(_) => { - self.drop_object(ObjectType::Database, database_id as _, DropMode::Cascade) - .await + self.drop_object( + ObjectType::Database, + database_id as _, + DropMode::Cascade, + None, + ) + .await } } } @@ -409,7 +414,7 @@ impl DdlController { match &self.metadata_manager { MetadataManager::V1(mgr) => mgr.catalog_manager.drop_schema(schema_id).await, MetadataManager::V2(_) => { - self.drop_object(ObjectType::Schema, schema_id as _, DropMode::Restrict) + self.drop_object(ObjectType::Schema, schema_id as _, DropMode::Restrict, None) .await } } @@ -453,7 +458,7 @@ impl DdlController { ) -> MetaResult { let MetadataManager::V1(mgr) = &self.metadata_manager else { return self - .drop_object(ObjectType::Source, source_id as _, drop_mode) + .drop_object(ObjectType::Source, source_id as _, drop_mode, None) .await; }; // 1. Drop source in catalog. @@ -523,7 +528,7 @@ impl DdlController { ) -> MetaResult { let MetadataManager::V1(mgr) = &self.metadata_manager else { return self - .drop_object(ObjectType::View, view_id as _, drop_mode) + .drop_object(ObjectType::View, view_id as _, drop_mode, None) .await; }; let (version, streaming_job_ids) = mgr @@ -569,6 +574,7 @@ impl DdlController { ObjectType::Connection, connection_id as _, DropMode::Restrict, + None, ) .await } @@ -616,7 +622,7 @@ impl DdlController { ) -> MetaResult { let MetadataManager::V1(mgr) = &self.metadata_manager else { return self - .create_streaming_job_v2(stream_job, fragment_graph) + .create_streaming_job_v2(stream_job, fragment_graph, affected_table_replace_info) .await; }; let id = self.gen_unique_id::<{ IdCategory::Table }>().await?; @@ -843,9 +849,10 @@ impl DdlController { // Here we modify the union node of the downstream table by the TableFragments of the to-be-created sink upstream. // The merge in the union has already been set up in the frontend and will be filled with specific upstream actors in this function. // Meanwhile, the Dispatcher corresponding to the upstream of the merge will also be added to the replace table context here. - async fn inject_replace_table_job_for_table_sink( + pub(crate) async fn inject_replace_table_job_for_table_sink( &self, - mgr: &MetadataManagerV1, + dummy_id: u32, + mgr: &MetadataManager, stream_ctx: StreamContext, sink: Option<&Sink>, creating_sink_table_fragments: Option<&TableFragments>, @@ -853,12 +860,6 @@ impl DdlController { streaming_job: &StreamingJob, fragment_graph: StreamFragmentGraph, ) -> MetaResult<(ReplaceTableContext, TableFragments)> { - let dummy_id = self - .env - .id_gen_manager() - .generate::<{ IdCategory::Table }>() - .await? as u32; - let (mut replace_table_ctx, mut table_fragments) = self .build_replace_table(stream_ctx, streaming_job, fragment_graph, None, dummy_id) .await?; @@ -901,17 +902,14 @@ impl DdlController { } let [table_catalog]: [_; 1] = mgr - .catalog_manager - .get_tables(&[table.id]) - .await + .get_table_catalog_by_ids(vec![table.id]) + .await? .try_into() .expect("Target table should exist in sink into table"); assert_eq!(table_catalog.incoming_sinks, table.incoming_sinks); { - let guard = mgr.fragment_manager.get_fragment_read_guard().await; - for sink_id in &table_catalog.incoming_sinks { if let Some(dropping_sink_id) = dropping_sink_id && *sink_id == dropping_sink_id @@ -919,10 +917,9 @@ impl DdlController { continue; }; - let sink_table_fragments = guard - .table_fragments() - .get(&risingwave_common::catalog::TableId::new(*sink_id)) - .unwrap(); + let sink_table_fragments = mgr + .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(*sink_id)) + .await?; let sink_fragment = sink_table_fragments.sink_fragment().unwrap(); @@ -1133,16 +1130,18 @@ impl DdlController { .await } MetadataManager::V2(_) => { - if target_replace_info.is_some() { - unimplemented!("support replace table for drop in v2"); - } let (object_id, object_type) = match job_id { StreamingJobId::MaterializedView(id) => (id as _, ObjectType::Table), StreamingJobId::Sink(id) => (id as _, ObjectType::Sink), StreamingJobId::Table(_, id) => (id as _, ObjectType::Table), StreamingJobId::Index(idx) => (idx as _, ObjectType::Index), }; - self.drop_object(object_type, object_id, drop_mode).await + + let version = self + .drop_object(object_type, object_id, drop_mode, target_replace_info) + .await?; + + Ok(version) } } } @@ -1218,9 +1217,17 @@ impl DdlController { let result: MetaResult<()> = try { tracing::debug!(id = streaming_job.id(), "replacing table for dropped sink"); + + let dummy_id = self + .env + .id_gen_manager() + .generate::<{ IdCategory::Table }>() + .await? as u32; + let (context, table_fragments) = self .inject_replace_table_job_for_table_sink( - mgr, + dummy_id, + &self.metadata_manager, stream_ctx, None, None, @@ -1387,13 +1394,31 @@ impl DdlController { let StreamingJob::Sink(s, _) = stream_job else { bail!("additional replace table event only occurs when sinking into table"); }; - let MetadataManager::V1(mgr) = &self.metadata_manager else { - unimplemented!("support create sink into table in v2"); + + let dummy_id = match &self.metadata_manager { + MetadataManager::V1(_) => { + self.env + .id_gen_manager() + .generate::<{ IdCategory::Table }>() + .await? as u32 + } + MetadataManager::V2(mgr) => { + let table = streaming_job.table().unwrap(); + mgr.catalog_controller + .create_job_catalog_for_replace( + &streaming_job, + &stream_ctx, + table.get_version()?, + &fragment_graph.default_parallelism(), + ) + .await? as u32 + } }; let (context, table_fragments) = self .inject_replace_table_job_for_table_sink( - mgr, + dummy_id, + &self.metadata_manager, stream_ctx, Some(s), Some(&table_fragments), diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs index 748857d2c3da..126d040997a6 100644 --- a/src/meta/src/rpc/ddl_controller_v2.rs +++ b/src/meta/src/rpc/ddl_controller_v2.rs @@ -29,7 +29,9 @@ use crate::manager::{ MetadataManagerV2, NotificationVersion, StreamingJob, IGNORED_NOTIFICATION_VERSION, }; use crate::model::{MetadataModel, StreamContext}; -use crate::rpc::ddl_controller::{fill_table_stream_graph_info, DdlController, DropMode}; +use crate::rpc::ddl_controller::{ + fill_table_stream_graph_info, DdlController, DropMode, ReplaceTableInfo, +}; use crate::stream::{validate_sink, StreamFragmentGraph}; use crate::MetaResult; @@ -38,6 +40,7 @@ impl DdlController { &self, mut streaming_job: StreamingJob, mut fragment_graph: StreamFragmentGraphProto, + affected_table_replace_info: Option, ) -> MetaResult { let mgr = self.metadata_manager.as_v2_ref(); @@ -80,7 +83,13 @@ impl DdlController { // create streaming job. match self - .create_streaming_job_inner_v2(mgr, ctx, &mut streaming_job, fragment_graph) + .create_streaming_job_inner_v2( + mgr, + ctx, + &mut streaming_job, + fragment_graph, + affected_table_replace_info, + ) .await { Ok(version) => Ok(version), @@ -110,6 +119,7 @@ impl DdlController { ctx: StreamContext, streaming_job: &mut StreamingJob, fragment_graph: StreamFragmentGraphProto, + affected_table_replace_info: Option, ) -> MetaResult { let mut fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, streaming_job).await?; @@ -124,10 +134,34 @@ impl DdlController { .await?; fragment_graph.refill_internal_table_ids(table_id_map); + let affected_table_replace_info = match affected_table_replace_info { + Some(replace_table_info) => { + let ReplaceTableInfo { + mut streaming_job, + fragment_graph, + .. + } = replace_table_info; + + let fragment_graph = + StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job).await?; + streaming_job.set_table_fragment_id(fragment_graph.table_fragment_id()); + streaming_job.set_dml_fragment_id(fragment_graph.dml_fragment_id()); + let streaming_job = streaming_job; + + Some((streaming_job, fragment_graph)) + } + None => None, + }; + // create fragment and actor catalogs. tracing::debug!(id = streaming_job.id(), "building streaming job"); let (ctx, table_fragments) = self - .build_stream_job(ctx, streaming_job, fragment_graph, None) + .build_stream_job( + ctx, + streaming_job, + fragment_graph, + affected_table_replace_info, + ) .await?; match streaming_job { @@ -139,9 +173,12 @@ impl DdlController { self.source_manager.register_source(source).await?; } StreamingJob::Sink(sink, target_table) => { - if target_table.is_some() { - unimplemented!("support create sink into table in v2"); + if let Some((StreamingJob::Table(source, table, _), ..)) = + &ctx.replace_table_job_info + { + *target_table = Some((table.clone(), source.clone())); } + // Validate the sink on the connector node. validate_sink(sink).await?; } @@ -160,13 +197,39 @@ impl DdlController { let stream_job_id = streaming_job.id(); match streaming_job.create_type() { CreateType::Unspecified | CreateType::Foreground => { + let replace_table_job_info = ctx.replace_table_job_info.as_ref().map( + |(streaming_job, ctx, table_fragments)| { + ( + streaming_job.clone(), + ctx.merge_updates.clone(), + table_fragments.table_id(), + ) + }, + ); + self.stream_manager .create_streaming_job(table_fragments, ctx) .await?; - let version = mgr + + let mut version = mgr .catalog_controller .finish_streaming_job(stream_job_id as _) .await?; + + if let Some((streaming_job, merge_updates, table_id)) = replace_table_job_info { + version = mgr + .catalog_controller + .finish_replace_streaming_job( + table_id.table_id as _, + streaming_job, + merge_updates, + None, + Some(stream_job_id), + None, + ) + .await?; + } + Ok(version) } CreateType::Background => { @@ -199,9 +262,10 @@ impl DdlController { object_type: ObjectType, object_id: ObjectId, drop_mode: DropMode, + target_replace_info: Option, ) -> MetaResult { let mgr = self.metadata_manager.as_v2_ref(); - let (release_ctx, version) = match object_type { + let (release_ctx, mut version) = match object_type { ObjectType::Database => mgr.catalog_controller.drop_database(object_id).await?, ObjectType::Schema => { return mgr @@ -224,6 +288,97 @@ impl DdlController { } }; + if let Some(replace_table_info) = target_replace_info { + let stream_ctx = + StreamContext::from_protobuf(replace_table_info.fragment_graph.get_ctx().unwrap()); + + let ReplaceTableInfo { + mut streaming_job, + fragment_graph, + .. + } = replace_table_info; + + let sink_id = if let ObjectType::Sink = object_type { + object_id as _ + } else { + panic!("additional replace table event only occurs when dropping sink into table") + }; + + let fragment_graph = + StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job).await?; + streaming_job.set_table_fragment_id(fragment_graph.table_fragment_id()); + streaming_job.set_dml_fragment_id(fragment_graph.dml_fragment_id()); + let streaming_job = streaming_job; + + let table = streaming_job.table().unwrap(); + + tracing::debug!(id = streaming_job.id(), "replacing table for dropped sink"); + let dummy_id = mgr + .catalog_controller + .create_job_catalog_for_replace( + &streaming_job, + &stream_ctx, + table.get_version()?, + &fragment_graph.default_parallelism(), + ) + .await? as u32; + + let (ctx, table_fragments) = self + .inject_replace_table_job_for_table_sink( + dummy_id, + &self.metadata_manager, + stream_ctx, + None, + None, + Some(sink_id), + &streaming_job, + fragment_graph, + ) + .await?; + + let result: MetaResult> = try { + let merge_updates = ctx.merge_updates.clone(); + + mgr.catalog_controller + .prepare_streaming_job(table_fragments.to_protobuf(), &streaming_job, true) + .await?; + + self.stream_manager + .replace_table(table_fragments, ctx) + .await?; + + merge_updates + }; + + version = match result { + Ok(merge_updates) => { + let version = mgr + .catalog_controller + .finish_replace_streaming_job( + dummy_id as _, + streaming_job, + merge_updates, + None, + None, + Some(sink_id), + ) + .await?; + Ok(version) + } + Err(err) => { + tracing::error!(id = object_id, error = ?err.as_report(), "failed to replace table"); + let _ = mgr + .catalog_controller + .try_abort_replacing_streaming_job(dummy_id as _) + .await + .inspect_err(|err| { + tracing::error!(id = object_id, error = ?err.as_report(), "failed to abort replacing table"); + }); + Err(err) + } + }?; + } + let ReleaseContext { state_table_ids, source_ids, diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index e0f4f8cb82b5..a949780e06ed 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -31,7 +31,7 @@ use super::{Locations, RescheduleOptions, ScaleControllerRef, TableResizePolicy} use crate::barrier::{BarrierScheduler, Command, ReplaceTablePlan, StreamRpcManager}; use crate::hummock::HummockManagerRef; use crate::manager::{DdlType, MetaSrvEnv, MetadataManager, StreamingJob}; -use crate::model::{ActorId, TableFragments, TableParallelism}; +use crate::model::{ActorId, MetadataModel, TableFragments, TableParallelism}; use crate::stream::SourceManagerRef; use crate::{MetaError, MetaResult}; @@ -430,10 +430,7 @@ impl GlobalStreamManager { self.build_actors(&table_fragments, &building_locations, &existing_locations) .await?; - if let Some((_, context, table_fragments)) = replace_table_job_info { - let MetadataManager::V1(mgr) = &self.metadata_manager else { - unimplemented!("support create sink into table in v2"); - }; + if let Some((streaming_job, context, table_fragments)) = replace_table_job_info { self.build_actors( &table_fragments, &context.building_locations, @@ -441,10 +438,19 @@ impl GlobalStreamManager { ) .await?; - // Add table fragments to meta store with state: `State::Initial`. - mgr.fragment_manager - .start_create_table_fragments(table_fragments.clone()) - .await?; + match &self.metadata_manager { + MetadataManager::V1(mgr) => { + // Add table fragments to meta store with state: `State::Initial`. + mgr.fragment_manager + .start_create_table_fragments(table_fragments.clone()) + .await? + } + MetadataManager::V2(mgr) => { + mgr.catalog_controller + .prepare_streaming_job(table_fragments.to_protobuf(), &streaming_job, true) + .await? + } + } let dummy_table_id = table_fragments.table_id();