Skip to content

Commit

Permalink
root
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Dec 10, 2024
1 parent 532231c commit 835348d
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 16 deletions.
11 changes: 7 additions & 4 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1352,25 +1352,28 @@ impl CatalogController {
/// Root fragment connects to downstream jobs.
///
/// ## What can be the root fragment
/// - For sink, it should have one `Sink` fragment.
/// - For MV, it should have one `MView` fragment.
/// - For table, it should have one `MView` fragment and one or two `Source` fragments. `MView` should be the root.
/// - For source, it should have one `Source` fragment.
///
/// In other words, it's the `MView` fragment if it exists, otherwise it's the `Source` fragment.
/// In other words, it's the `MView` or `Sink` fragment if it exists, otherwise it's the `Source` fragment.
pub async fn get_root_fragments(
&self,
job_ids: Vec<ObjectId>,
) -> MetaResult<(HashMap<ObjectId, PbFragment>, Vec<(ActorId, WorkerId)>)> {
let inner = self.inner.read().await;

let all_upstream_fragments = Fragment::find()
let all_fragments = Fragment::find()
.filter(fragment::Column::JobId.is_in(job_ids))
.all(&inner.db)
.await?;
// job_id -> fragment
let mut root_fragments = HashMap::<ObjectId, fragment::Model>::new();
for fragment in all_upstream_fragments {
if fragment.fragment_type_mask & PbFragmentTypeFlag::Mview as i32 != 0 {
for fragment in all_fragments {
if (fragment.fragment_type_mask & PbFragmentTypeFlag::Mview as i32) != 0
|| (fragment.fragment_type_mask & PbFragmentTypeFlag::Sink as i32) != 0
{
_ = root_fragments.insert(fragment.job_id, fragment);
} else if fragment.fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 {
// look for Source fragment only if there's no MView fragment
Expand Down
12 changes: 1 addition & 11 deletions src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,17 +409,7 @@ impl MetadataManager {
/// Get and filter the "**root**" fragments of the specified relations.
/// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
///
/// ## What can be the root fragment
/// - For MV, it should have one `MView` fragment.
/// - For table, it should have one `MView` fragment and one or two `Source` fragments. `MView` should be the root.
/// - For source, it should have one `Source` fragment.
///
/// In other words, it's the `MView` fragment if it exists, otherwise it's the `Source` fragment.
///
/// ## What do we expect to get for different creating streaming job
/// - MV/Sink/Index should have MV upstream fragments for upstream MV/Tables, and Source upstream fragments for upstream shared sources.
/// - CDC Table has a Source upstream fragment.
/// - Sources and other Tables shouldn't have an upstream fragment.
/// See also [`crate::controller::catalog::CatalogController::get_root_fragments`].
pub async fn get_upstream_root_fragments(
&self,
upstream_table_ids: &HashSet<TableId>,
Expand Down
4 changes: 3 additions & 1 deletion src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,9 @@ impl StreamJobFragments {
}

pub fn root_fragment(&self) -> Option<Fragment> {
self.mview_fragment().or_else(|| self.source_fragment())
self.mview_fragment()
.or_else(|| self.sink_fragment())
.or_else(|| self.source_fragment())
}

/// Returns the fragment with the `Mview` type flag.
Expand Down

0 comments on commit 835348d

Please sign in to comment.