From 51d3c63f702ddf8860f6223a1e9a425e034d3375 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 3 Sep 2024 16:03:15 +0800 Subject: [PATCH] fix: `DROP DATABASE` doesn't clean up the source stream job (in v1) (#18033) Signed-off-by: xxchan --- src/meta/src/manager/catalog/mod.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 81e4f1c4d96c3..12c1596841f67 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -625,20 +625,21 @@ impl CatalogManager { .notify_frontend(Operation::Delete, Info::Database(database)) .await; - let catalog_deleted_ids = tables_to_drop + let streaming_job_deleted_ids = tables_to_drop .into_iter() .filter(|table| valid_table_name(&table.name)) .map(|table| StreamingJobId::new(table.id)) + .chain(sources_to_drop.iter().filter_map(|source| { + source + .info + .as_ref() + .and_then(|info| info.is_shared().then(|| StreamingJobId::new(source.id))) + })) .chain( sinks_to_drop .into_iter() .map(|sink| StreamingJobId::new(sink.id)), ) - .chain( - subscriptions_to_drop - .into_iter() - .map(|subscription| StreamingJobId::new(subscription.id)), - ) .collect_vec(); let source_deleted_ids = sources_to_drop .into_iter() @@ -647,7 +648,7 @@ impl CatalogManager { Ok(( version, - catalog_deleted_ids, + streaming_job_deleted_ids, source_deleted_ids, connections_dropped, ))