From 835348d282f2df161278557740898d9d0a4d9ae5 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 10 Dec 2024 13:47:57 +0800 Subject: [PATCH] root Signed-off-by: xxchan --- src/meta/src/controller/fragment.rs | 11 +++++++---- src/meta/src/manager/metadata.rs | 12 +----------- src/meta/src/model/stream.rs | 4 +++- 3 files changed, 11 insertions(+), 16 deletions(-) diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 73ff00d4c506..14be4ad5a288 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -1352,25 +1352,28 @@ impl CatalogController { /// Root fragment connects to downstream jobs. /// /// ## What can be the root fragment + /// - For sink, it should have one `Sink` fragment. /// - For MV, it should have one `MView` fragment. /// - For table, it should have one `MView` fragment and one or two `Source` fragments. `MView` should be the root. /// - For source, it should have one `Source` fragment. /// - /// In other words, it's the `MView` fragment if it exists, otherwise it's the `Source` fragment. + /// In other words, it's the `MView` or `Sink` fragment if it exists, otherwise it's the `Source` fragment. pub async fn get_root_fragments( &self, job_ids: Vec, ) -> MetaResult<(HashMap, Vec<(ActorId, WorkerId)>)> { let inner = self.inner.read().await; - let all_upstream_fragments = Fragment::find() + let all_fragments = Fragment::find() .filter(fragment::Column::JobId.is_in(job_ids)) .all(&inner.db) .await?; // job_id -> fragment let mut root_fragments = HashMap::::new(); - for fragment in all_upstream_fragments { - if fragment.fragment_type_mask & PbFragmentTypeFlag::Mview as i32 != 0 { + for fragment in all_fragments { + if (fragment.fragment_type_mask & PbFragmentTypeFlag::Mview as i32) != 0 + || (fragment.fragment_type_mask & PbFragmentTypeFlag::Sink as i32) != 0 + { _ = root_fragments.insert(fragment.job_id, fragment); } else if fragment.fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 { // look for Source fragment only if there's no MView fragment diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 1431e204899b..c31a47a17d4d 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -409,17 +409,7 @@ impl MetadataManager { /// Get and filter the "**root**" fragments of the specified relations. /// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`. /// - /// ## What can be the root fragment - /// - For MV, it should have one `MView` fragment. - /// - For table, it should have one `MView` fragment and one or two `Source` fragments. `MView` should be the root. - /// - For source, it should have one `Source` fragment. - /// - /// In other words, it's the `MView` fragment if it exists, otherwise it's the `Source` fragment. - /// - /// ## What do we expect to get for different creating streaming job - /// - MV/Sink/Index should have MV upstream fragments for upstream MV/Tables, and Source upstream fragments for upstream shared sources. - /// - CDC Table has a Source upstream fragment. - /// - Sources and other Tables shouldn't have an upstream fragment. + /// See also [`crate::controller::catalog::CatalogController::get_root_fragments`]. pub async fn get_upstream_root_fragments( &self, upstream_table_ids: &HashSet, diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index b169af902797..3bfbfc7a03d9 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -370,7 +370,9 @@ impl StreamJobFragments { } pub fn root_fragment(&self) -> Option { - self.mview_fragment().or_else(|| self.source_fragment()) + self.mview_fragment() + .or_else(|| self.sink_fragment()) + .or_else(|| self.source_fragment()) } /// Returns the fragment with the `Mview` type flag.