Skip to content

Commit

Permalink
fix: only notify creating materialized views to FE when using sql bac…
Browse files Browse the repository at this point in the history
…kend (#18476)
  • Loading branch information
yezizp2012 authored Sep 13, 2024
1 parent 8eed769 commit 72396b2
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 114 deletions.
4 changes: 1 addition & 3 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use crate::barrier::rpc::ControlStreamManager;
use crate::barrier::schedule::ScheduledBarriers;
use crate::barrier::state::BarrierManagerState;
use crate::barrier::{BarrierKind, GlobalBarrierManager, GlobalBarrierManagerContext};
use crate::controller::catalog::ReleaseContext;
use crate::manager::{ActiveStreamingWorkerNodes, MetadataManager, WorkerId};
use crate::model::{MetadataModel, MigrationPlan, TableFragments, TableParallelism};
use crate::stream::{build_actor_connector_splits, RescheduleOptions, TableResizePolicy};
Expand Down Expand Up @@ -100,8 +99,7 @@ impl GlobalBarrierManagerContext {
}
MetadataManager::V2(mgr) => {
mgr.catalog_controller.clean_dirty_subscription().await?;
let ReleaseContext { source_ids, .. } =
mgr.catalog_controller.clean_dirty_creating_jobs().await?;
let source_ids = mgr.catalog_controller.clean_dirty_creating_jobs().await?;

// unregister cleaned sources.
self.source_manager
Expand Down
96 changes: 64 additions & 32 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use risingwave_pb::catalog::subscription::SubscriptionState;
use risingwave_pb::catalog::table::PbTableType;
use risingwave_pb::catalog::{
PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSecret, PbSink, PbSource,
PbSubscription, PbTable, PbView,
PbStreamJobStatus, PbSubscription, PbTable, PbView,
};
use risingwave_pb::meta::cancel_creating_jobs_request::PbCreatingJobInfo;
use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
Expand Down Expand Up @@ -728,11 +728,11 @@ impl CatalogController {
}

/// `clean_dirty_creating_jobs` cleans up creating jobs that are creating in Foreground mode or in Initial status.
pub async fn clean_dirty_creating_jobs(&self) -> MetaResult<ReleaseContext> {
pub async fn clean_dirty_creating_jobs(&self) -> MetaResult<Vec<SourceId>> {
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;

let mut dirty_objs: Vec<PartialObject> = streaming_job::Entity::find()
let dirty_job_objs: Vec<PartialObject> = streaming_job::Entity::find()
.select_only()
.column(streaming_job::Column::JobId)
.columns([
Expand All @@ -755,36 +755,46 @@ impl CatalogController {

let changed = Self::clean_dirty_sink_downstreams(&txn).await?;

if dirty_objs.is_empty() {
if dirty_job_objs.is_empty() {
if changed {
txn.commit().await?;
}

return Ok(ReleaseContext::default());
return Ok(vec![]);
}

self.log_cleaned_dirty_jobs(&dirty_objs, &txn).await?;
self.log_cleaned_dirty_jobs(&dirty_job_objs, &txn).await?;

let dirty_job_ids = dirty_objs.iter().map(|obj| obj.oid).collect::<Vec<_>>();
let dirty_job_ids = dirty_job_objs.iter().map(|obj| obj.oid).collect::<Vec<_>>();

// Filter out dummy objs for replacement.
// todo: we'd better introduce a new dummy object type for replacement.
let all_dirty_table_ids = dirty_objs
let all_dirty_table_ids = dirty_job_objs
.iter()
.filter(|obj| obj.obj_type == ObjectType::Table)
.map(|obj| obj.oid)
.collect_vec();
let dirty_table_ids: HashSet<ObjectId> = Table::find()
let dirty_table_type_map: HashMap<ObjectId, TableType> = Table::find()
.select_only()
.column(table::Column::TableId)
.column(table::Column::TableType)
.filter(table::Column::TableId.is_in(all_dirty_table_ids))
.into_tuple::<ObjectId>()
.into_tuple::<(ObjectId, TableType)>()
.all(&txn)
.await?
.into_iter()
.collect();
dirty_objs
.retain(|obj| obj.obj_type != ObjectType::Table || dirty_table_ids.contains(&obj.oid));

// Only notify delete for failed materialized views.
let dirty_mview_objs = dirty_job_objs
.into_iter()
.filter(|obj| {
matches!(
dirty_table_type_map.get(&obj.oid),
Some(TableType::MaterializedView)
)
})
.collect_vec();

let associated_source_ids: Vec<SourceId> = Table::find()
.select_only()
Expand All @@ -797,15 +807,16 @@ impl CatalogController {
.into_tuple()
.all(&txn)
.await?;
let dirty_source_objs: Vec<PartialObject> = Object::find()
.filter(object::Column::Oid.is_in(associated_source_ids.clone()))
.into_partial_model()

let dirty_state_table_ids: Vec<TableId> = Table::find()
.select_only()
.column(table::Column::TableId)
.filter(table::Column::BelongsToJobId.is_in(dirty_job_ids.clone()))
.into_tuple()
.all(&txn)
.await?;
dirty_objs.extend(dirty_source_objs);

let mut dirty_state_table_ids = vec![];
let to_drop_internal_table_objs: Vec<PartialObject> = Object::find()
let dirty_mview_internal_table_objs = Object::find()
.select_only()
.columns([
object::Column::Oid,
Expand All @@ -814,17 +825,15 @@ impl CatalogController {
object::Column::DatabaseId,
])
.join(JoinType::InnerJoin, object::Relation::Table.def())
.filter(table::Column::BelongsToJobId.is_in(dirty_job_ids.clone()))
.filter(table::Column::BelongsToJobId.is_in(dirty_mview_objs.iter().map(|obj| obj.oid)))
.into_partial_model()
.all(&txn)
.await?;
dirty_state_table_ids.extend(to_drop_internal_table_objs.iter().map(|obj| obj.oid));
dirty_objs.extend(to_drop_internal_table_objs);

let to_delete_objs: HashSet<ObjectId> = dirty_job_ids
.clone()
.into_iter()
.chain(dirty_state_table_ids.clone().into_iter())
.chain(dirty_state_table_ids.into_iter())
.chain(associated_source_ids.clone().into_iter())
.collect();

Expand All @@ -836,17 +845,18 @@ impl CatalogController {

txn.commit().await?;

let relation_group = build_relation_group(dirty_objs);
let relation_group = build_relation_group(
dirty_mview_objs
.into_iter()
.chain(dirty_mview_internal_table_objs.into_iter())
.collect_vec(),
);

let _version = self
.notify_frontend(NotificationOperation::Delete, relation_group)
.await;

Ok(ReleaseContext {
state_table_ids: dirty_state_table_ids,
source_ids: associated_source_ids,
..Default::default()
})
Ok(associated_source_ids)
}

async fn log_cleaned_dirty_jobs(
Expand Down Expand Up @@ -3137,12 +3147,16 @@ impl CatalogControllerInner {
Ok(table_ids)
}

/// `list_tables` return all `CREATED` tables and internal tables that belong to `CREATED` streaming jobs.
/// `list_tables` return all `CREATED` tables, `CREATING` materialized views and internal tables that belong to them.
async fn list_tables(&self) -> MetaResult<Vec<PbTable>> {
let table_objs = Table::find()
.find_also_related(Object)
.join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
.filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
.filter(
streaming_job::Column::JobStatus
.eq(JobStatus::Created)
.or(table::Column::TableType.eq(TableType::MaterializedView)),
)
.all(&self.db)
.await?;

Expand All @@ -3154,20 +3168,38 @@ impl CatalogControllerInner {
.all(&self.db)
.await?;

let job_ids: HashSet<ObjectId> = table_objs
.iter()
.map(|(t, _)| t.table_id)
.chain(created_streaming_job_ids.iter().cloned())
.collect();

let internal_table_objs = Table::find()
.find_also_related(Object)
.filter(
table::Column::TableType
.eq(TableType::Internal)
.and(table::Column::BelongsToJobId.is_in(created_streaming_job_ids)),
.and(table::Column::BelongsToJobId.is_in(job_ids)),
)
.all(&self.db)
.await?;

Ok(table_objs
.into_iter()
.chain(internal_table_objs.into_iter())
.map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
.map(|(table, obj)| {
// Correctly set the stream job status for creating materialized views and internal tables.
let is_created = created_streaming_job_ids.contains(&table.table_id)
|| (table.table_type == TableType::Internal
&& created_streaming_job_ids.contains(&table.belongs_to_job_id.unwrap()));
let mut pb_table: PbTable = ObjectModel(table, obj.unwrap()).into();
pb_table.stream_job_status = if is_created {
PbStreamJobStatus::Created.into()
} else {
PbStreamJobStatus::Creating.into()
};
pb_table
})
.collect())
}

Expand Down
6 changes: 3 additions & 3 deletions src/meta/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl From<ObjectModel<table::Model>> for PbTable {
Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
),
cleaned_by_watermark: value.0.cleaned_by_watermark,
stream_job_status: PbStreamJobStatus::Created as _, // todo: deprecate it.
stream_job_status: PbStreamJobStatus::Created as _,
create_type: PbCreateType::Foreground as _,
version: value.0.version.map(|v| v.to_protobuf()),
optional_associated_source_id: value
Expand Down Expand Up @@ -236,7 +236,7 @@ impl From<ObjectModel<sink::Model>> for PbSink {
),
db_name: value.0.db_name,
sink_from_name: value.0.sink_from_name,
stream_job_status: PbStreamJobStatus::Created as _, // todo: deprecate it.
stream_job_status: PbStreamJobStatus::Created as _,
format_desc: value.0.sink_format_desc.map(|desc| desc.to_protobuf()),
target_table: value.0.target_table.map(|id| id as _),
initialized_at_cluster_version: value.1.initialized_at_cluster_version,
Expand Down Expand Up @@ -299,7 +299,7 @@ impl From<ObjectModel<index::Model>> for PbIndex {
created_at_epoch: Some(
Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
),
stream_job_status: PbStreamJobStatus::Created as _, // todo: deprecate it.
stream_job_status: PbStreamJobStatus::Created as _,
initialized_at_cluster_version: value.1.initialized_at_cluster_version,
created_at_cluster_version: value.1.created_at_cluster_version,
}
Expand Down
Loading

0 comments on commit 72396b2

Please sign in to comment.