diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 63cd4c16d9aaf..9fe0033f7e91b 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -40,7 +40,6 @@ use crate::barrier::rpc::ControlStreamManager; use crate::barrier::schedule::ScheduledBarriers; use crate::barrier::state::BarrierManagerState; use crate::barrier::{BarrierKind, GlobalBarrierManager, GlobalBarrierManagerContext}; -use crate::controller::catalog::ReleaseContext; use crate::manager::{ActiveStreamingWorkerNodes, MetadataManager, WorkerId}; use crate::model::{MetadataModel, MigrationPlan, TableFragments, TableParallelism}; use crate::stream::{build_actor_connector_splits, RescheduleOptions, TableResizePolicy}; @@ -100,8 +99,7 @@ impl GlobalBarrierManagerContext { } MetadataManager::V2(mgr) => { mgr.catalog_controller.clean_dirty_subscription().await?; - let ReleaseContext { source_ids, .. } = - mgr.catalog_controller.clean_dirty_creating_jobs().await?; + let source_ids = mgr.catalog_controller.clean_dirty_creating_jobs().await?; // unregister cleaned sources. self.source_manager diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index ffab5160b5d9f..c9c3210dd6c67 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -40,7 +40,7 @@ use risingwave_pb::catalog::subscription::SubscriptionState; use risingwave_pb::catalog::table::PbTableType; use risingwave_pb::catalog::{ PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource, - PbSubscription, PbTable, PbView, + PbStreamJobStatus, PbSubscription, PbTable, PbView, }; use risingwave_pb::meta::cancel_creating_jobs_request::PbCreatingJobInfo; use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies; @@ -728,11 +728,11 @@ impl CatalogController { } /// `clean_dirty_creating_jobs` cleans up creating jobs that are creating in Foreground mode or in Initial status. - pub async fn clean_dirty_creating_jobs(&self) -> MetaResult { + pub async fn clean_dirty_creating_jobs(&self) -> MetaResult> { let inner = self.inner.write().await; let txn = inner.db.begin().await?; - let mut dirty_objs: Vec = streaming_job::Entity::find() + let dirty_job_objs: Vec = streaming_job::Entity::find() .select_only() .column(streaming_job::Column::JobId) .columns([ @@ -755,36 +755,46 @@ impl CatalogController { let changed = Self::clean_dirty_sink_downstreams(&txn).await?; - if dirty_objs.is_empty() { + if dirty_job_objs.is_empty() { if changed { txn.commit().await?; } - return Ok(ReleaseContext::default()); + return Ok(vec![]); } - self.log_cleaned_dirty_jobs(&dirty_objs, &txn).await?; + self.log_cleaned_dirty_jobs(&dirty_job_objs, &txn).await?; - let dirty_job_ids = dirty_objs.iter().map(|obj| obj.oid).collect::>(); + let dirty_job_ids = dirty_job_objs.iter().map(|obj| obj.oid).collect::>(); // Filter out dummy objs for replacement. // todo: we'd better introduce a new dummy object type for replacement. - let all_dirty_table_ids = dirty_objs + let all_dirty_table_ids = dirty_job_objs .iter() .filter(|obj| obj.obj_type == ObjectType::Table) .map(|obj| obj.oid) .collect_vec(); - let dirty_table_ids: HashSet = Table::find() + let dirty_table_type_map: HashMap = Table::find() .select_only() .column(table::Column::TableId) + .column(table::Column::TableType) .filter(table::Column::TableId.is_in(all_dirty_table_ids)) - .into_tuple::() + .into_tuple::<(ObjectId, TableType)>() .all(&txn) .await? .into_iter() .collect(); - dirty_objs - .retain(|obj| obj.obj_type != ObjectType::Table || dirty_table_ids.contains(&obj.oid)); + + // Only notify delete for failed materialized views. + let dirty_mview_objs = dirty_job_objs + .into_iter() + .filter(|obj| { + matches!( + dirty_table_type_map.get(&obj.oid), + Some(TableType::MaterializedView) + ) + }) + .collect_vec(); let associated_source_ids: Vec = Table::find() .select_only() @@ -797,15 +807,16 @@ impl CatalogController { .into_tuple() .all(&txn) .await?; - let dirty_source_objs: Vec = Object::find() - .filter(object::Column::Oid.is_in(associated_source_ids.clone())) - .into_partial_model() + + let dirty_state_table_ids: Vec = Table::find() + .select_only() + .column(table::Column::TableId) + .filter(table::Column::BelongsToJobId.is_in(dirty_job_ids.clone())) + .into_tuple() .all(&txn) .await?; - dirty_objs.extend(dirty_source_objs); - let mut dirty_state_table_ids = vec![]; - let to_drop_internal_table_objs: Vec = Object::find() + let dirty_mview_internal_table_objs = Object::find() .select_only() .columns([ object::Column::Oid, @@ -814,17 +825,15 @@ impl CatalogController { object::Column::DatabaseId, ]) .join(JoinType::InnerJoin, object::Relation::Table.def()) - .filter(table::Column::BelongsToJobId.is_in(dirty_job_ids.clone())) + .filter(table::Column::BelongsToJobId.is_in(dirty_mview_objs.iter().map(|obj| obj.oid))) .into_partial_model() .all(&txn) .await?; - dirty_state_table_ids.extend(to_drop_internal_table_objs.iter().map(|obj| obj.oid)); - dirty_objs.extend(to_drop_internal_table_objs); let to_delete_objs: HashSet = dirty_job_ids .clone() .into_iter() - .chain(dirty_state_table_ids.clone().into_iter()) + .chain(dirty_state_table_ids.into_iter()) .chain(associated_source_ids.clone().into_iter()) .collect(); @@ -836,17 +845,18 @@ impl CatalogController { txn.commit().await?; - let relation_group = build_relation_group(dirty_objs); + let relation_group = build_relation_group( + dirty_mview_objs + .into_iter() + .chain(dirty_mview_internal_table_objs.into_iter()) + .collect_vec(), + ); let _version = self .notify_frontend(NotificationOperation::Delete, relation_group) .await; - Ok(ReleaseContext { - state_table_ids: dirty_state_table_ids, - source_ids: associated_source_ids, - ..Default::default() - }) + Ok(associated_source_ids) } async fn log_cleaned_dirty_jobs( @@ -3137,12 +3147,16 @@ impl CatalogControllerInner { Ok(table_ids) } - /// `list_tables` return all `CREATED` tables and internal tables that belong to `CREATED` streaming jobs. + /// `list_tables` return all `CREATED` tables, `CREATING` materialized views and internal tables that belong to them. async fn list_tables(&self) -> MetaResult> { let table_objs = Table::find() .find_also_related(Object) .join(JoinType::LeftJoin, object::Relation::StreamingJob.def()) - .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created)) + .filter( + streaming_job::Column::JobStatus + .eq(JobStatus::Created) + .or(table::Column::TableType.eq(TableType::MaterializedView)), + ) .all(&self.db) .await?; @@ -3154,12 +3168,18 @@ impl CatalogControllerInner { .all(&self.db) .await?; + let job_ids: HashSet = table_objs + .iter() + .map(|(t, _)| t.table_id) + .chain(created_streaming_job_ids.iter().cloned()) + .collect(); + let internal_table_objs = Table::find() .find_also_related(Object) .filter( table::Column::TableType .eq(TableType::Internal) - .and(table::Column::BelongsToJobId.is_in(created_streaming_job_ids)), + .and(table::Column::BelongsToJobId.is_in(job_ids)), ) .all(&self.db) .await?; @@ -3167,7 +3187,19 @@ impl CatalogControllerInner { Ok(table_objs .into_iter() .chain(internal_table_objs.into_iter()) - .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into()) + .map(|(table, obj)| { + // Correctly set the stream job status for creating materialized views and internal tables. + let is_created = created_streaming_job_ids.contains(&table.table_id) + || (table.table_type == TableType::Internal + && created_streaming_job_ids.contains(&table.belongs_to_job_id.unwrap())); + let mut pb_table: PbTable = ObjectModel(table, obj.unwrap()).into(); + pb_table.stream_job_status = if is_created { + PbStreamJobStatus::Created.into() + } else { + PbStreamJobStatus::Creating.into() + }; + pb_table + }) .collect()) } diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index e22b0f20ee86e..3e903802b86ee 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -150,7 +150,7 @@ impl From> for PbTable { Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0, ), cleaned_by_watermark: value.0.cleaned_by_watermark, - stream_job_status: PbStreamJobStatus::Created as _, // todo: deprecate it. + stream_job_status: PbStreamJobStatus::Created as _, create_type: PbCreateType::Foreground as _, version: value.0.version.map(|v| v.to_protobuf()), optional_associated_source_id: value @@ -236,7 +236,7 @@ impl From> for PbSink { ), db_name: value.0.db_name, sink_from_name: value.0.sink_from_name, - stream_job_status: PbStreamJobStatus::Created as _, // todo: deprecate it. + stream_job_status: PbStreamJobStatus::Created as _, format_desc: value.0.sink_format_desc.map(|desc| desc.to_protobuf()), target_table: value.0.target_table.map(|id| id as _), initialized_at_cluster_version: value.1.initialized_at_cluster_version, @@ -299,7 +299,7 @@ impl From> for PbIndex { created_at_epoch: Some( Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0, ), - stream_job_status: PbStreamJobStatus::Created as _, // todo: deprecate it. + stream_job_status: PbStreamJobStatus::Created as _, initialized_at_cluster_version: value.1.initialized_at_cluster_version, created_at_cluster_version: value.1.created_at_cluster_version, } diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index fd12630fd1649..c0035f0b2bc7d 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -26,6 +26,7 @@ use risingwave_meta_model_v2::prelude::{ Actor, ActorDispatcher, Fragment, Index, Object, ObjectDependency, Sink, Source, StreamingJob as StreamingJobModel, Table, }; +use risingwave_meta_model_v2::table::TableType; use risingwave_meta_model_v2::{ actor, actor_dispatcher, fragment, index, object, object_dependency, sink, source, streaming_job, table, ActorId, ActorUpstreamActors, ColumnCatalogArray, CreateType, DatabaseId, @@ -208,9 +209,6 @@ impl CatalogController { sink.id = job_id as _; let sink_model: sink::ActiveModel = sink.clone().into(); Sink::insert(sink_model).exec(&txn).await?; - relations.push(Relation { - relation_info: Some(RelationInfo::Sink(sink.to_owned())), - }); } StreamingJob::Table(src, table, _) => { let job_id = Self::create_streaming_job_obj( @@ -242,15 +240,9 @@ impl CatalogController { ); let source: source::ActiveModel = src.clone().into(); Source::insert(source).exec(&txn).await?; - relations.push(Relation { - relation_info: Some(RelationInfo::Source(src.to_owned())), - }); } let table_model: table::ActiveModel = table.clone().into(); Table::insert(table_model).exec(&txn).await?; - relations.push(Relation { - relation_info: Some(RelationInfo::Table(table.to_owned())), - }); } StreamingJob::Index(index, table) => { ensure_object_id(ObjectType::Table, index.primary_table_id as _, &txn).await?; @@ -282,12 +274,6 @@ impl CatalogController { Table::insert(table_model).exec(&txn).await?; let index_model: index::ActiveModel = index.clone().into(); Index::insert(index_model).exec(&txn).await?; - relations.push(Relation { - relation_info: Some(RelationInfo::Table(table.to_owned())), - }); - relations.push(Relation { - relation_info: Some(RelationInfo::Index(index.to_owned())), - }); } StreamingJob::Source(src) => { let job_id = Self::create_streaming_job_obj( @@ -304,9 +290,6 @@ impl CatalogController { src.id = job_id as _; let source_model: source::ActiveModel = src.clone().into(); Source::insert(source_model).exec(&txn).await?; - relations.push(Relation { - relation_info: Some(RelationInfo::Source(src.to_owned())), - }); } } @@ -331,21 +314,23 @@ impl CatalogController { txn.commit().await?; - let _version = self - .notify_frontend( + if !relations.is_empty() { + self.notify_frontend( Operation::Add, Info::RelationGroup(RelationGroup { relations }), ) .await; + } Ok(()) } pub async fn create_internal_table_catalog( &self, - job_id: ObjectId, + job: &StreamingJob, mut internal_tables: Vec, ) -> MetaResult> { + let job_id = job.id() as ObjectId; let inner = self.inner.write().await; let txn = inner.db.begin().await?; let mut table_id_map = HashMap::new(); @@ -363,13 +348,14 @@ impl CatalogController { table.id = table_id as _; let mut table_model: table::ActiveModel = table.clone().into(); table_model.table_id = Set(table_id as _); - table_model.belongs_to_job_id = Set(Some(job_id as _)); + table_model.belongs_to_job_id = Set(Some(job_id)); table_model.fragment_id = NotSet; Table::insert(table_model).exec(&txn).await?; } txn.commit().await?; - let _version = self - .notify_frontend( + + if job.is_materialized_view() { + self.notify_frontend( Operation::Add, Info::RelationGroup(RelationGroup { relations: internal_tables @@ -381,6 +367,8 @@ impl CatalogController { }), ) .await; + } + Ok(table_id_map) } @@ -497,64 +485,52 @@ impl CatalogController { .all(&txn) .await?; - let associated_source_id: Option = Table::find_by_id(job_id) - .select_only() - .column(table::Column::OptionalAssociatedSourceId) - .filter(table::Column::OptionalAssociatedSourceId.is_not_null()) - .into_tuple() - .one(&txn) - .await?; - - // Get notification info + // Get the notification info if the job is a materialized view. + let table_obj = Table::find_by_id(job_id).one(&txn).await?; let mut objs = vec![]; - let obj: Option = Object::find_by_id(job_id) - .select_only() - .columns([ - object::Column::Oid, - object::Column::ObjType, - object::Column::SchemaId, - object::Column::DatabaseId, - ]) - .into_partial_model() - .one(&txn) - .await?; - let obj = obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?; - objs.push(obj); - let internal_table_objs: Vec = Object::find() - .select_only() - .columns([ - object::Column::Oid, - object::Column::ObjType, - object::Column::SchemaId, - object::Column::DatabaseId, - ]) - .join(JoinType::InnerJoin, object::Relation::Table.def()) - .filter(table::Column::BelongsToJobId.eq(job_id)) - .into_partial_model() - .all(&txn) - .await?; - objs.extend(internal_table_objs); - if let Some(source_id) = associated_source_id { - let source_obj = Object::find_by_id(source_id) + if let Some(table) = &table_obj + && table.table_type == TableType::MaterializedView + { + let obj: Option = Object::find_by_id(job_id) .select_only() - .column(object::Column::ObjType) + .columns([ + object::Column::Oid, + object::Column::ObjType, + object::Column::SchemaId, + object::Column::DatabaseId, + ]) .into_partial_model() .one(&txn) - .await? - .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?; - objs.push(source_obj); + .await?; + let obj = + obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?; + objs.push(obj); + let internal_table_objs: Vec = Object::find() + .select_only() + .columns([ + object::Column::Oid, + object::Column::ObjType, + object::Column::SchemaId, + object::Column::DatabaseId, + ]) + .join(JoinType::InnerJoin, object::Relation::Table.def()) + .filter(table::Column::BelongsToJobId.eq(job_id)) + .into_partial_model() + .all(&txn) + .await?; + objs.extend(internal_table_objs); } - let relation_group = build_relation_group(objs); - // Can delete objects after queried notification info Object::delete_by_id(job_id).exec(&txn).await?; if !internal_table_ids.is_empty() { Object::delete_many() - .filter(object::Column::Oid.is_in(internal_table_ids.iter().cloned())) + .filter(object::Column::Oid.is_in(internal_table_ids)) .exec(&txn) .await?; } - if let Some(source_id) = associated_source_id { + if let Some(t) = &table_obj + && let Some(source_id) = t.optional_associated_source_id + { Object::delete_by_id(source_id).exec(&txn).await?; } @@ -576,9 +552,10 @@ impl CatalogController { } txn.commit().await?; - let _version = self - .notify_frontend(Operation::Delete, relation_group) - .await; + if !objs.is_empty() { + self.notify_frontend(Operation::Delete, build_relation_group(objs)) + .await; + } Ok(true) } @@ -778,6 +755,7 @@ impl CatalogController { )), }) .collect_vec(); + let mut notification_op = NotificationOperation::Add; match job_type { ObjectType::Table => { @@ -786,6 +764,10 @@ impl CatalogController { .one(&txn) .await? .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?; + if table.table_type == TableType::MaterializedView { + notification_op = NotificationOperation::Update; + } + if let Some(source_id) = table.optional_associated_source_id { let (src, obj) = Source::find_by_id(source_id) .find_also_related(Object) @@ -892,7 +874,7 @@ impl CatalogController { let mut version = self .notify_frontend( - NotificationOperation::Update, + notification_op, NotificationInfo::RelationGroup(PbRelationGroup { relations }), ) .await; diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index 2d517272a3d00..43fed3380d6bc 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -230,7 +230,7 @@ pub fn construct_sink_cycle_check_query( .to_owned() } -#[derive(Clone, DerivePartialModel, FromQueryResult)] +#[derive(Clone, DerivePartialModel, FromQueryResult, Debug)] #[sea_orm(entity = "Object")] pub struct PartialObject { pub oid: ObjectId, diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs index c097fa5acb5c6..5e83e49b767a7 100644 --- a/src/meta/src/rpc/ddl_controller_v2.rs +++ b/src/meta/src/rpc/ddl_controller_v2.rs @@ -146,7 +146,7 @@ impl DdlController { let internal_tables = fragment_graph.internal_tables().into_values().collect_vec(); let table_id_map = mgr .catalog_controller - .create_internal_table_catalog(streaming_job.id() as _, internal_tables) + .create_internal_table_catalog(&streaming_job, internal_tables) .await?; fragment_graph.refill_internal_table_ids(table_id_map);