Skip to content

Commit

Permalink
fix: only fill state table id of materialize node in table fragments (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 authored Jan 23, 2024
1 parent f804ed0 commit 7853be6
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions src/meta/src/stream/stream_graph/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub(super) struct BuildingFragment {
/// The fragment structure from the frontend, with the global fragment ID.
inner: StreamFragment,

/// The ID of the job if it's materialized in this fragment.
/// The ID of the job if it contains the streaming job node.
table_id: Option<u32>,

/// The required columns of each upstream table.
Expand Down Expand Up @@ -457,14 +457,14 @@ impl StreamFragmentGraph {
Ok(())
}

/// Returns the fragment id where the table is materialized.
/// Returns the fragment id where the streaming job node located.
pub fn table_fragment_id(&self) -> FragmentId {
self.fragments
.values()
.filter(|b| b.table_id.is_some())
.map(|b| b.fragment_id)
.exactly_one()
.expect("require exactly 1 materialize/sink node when creating the streaming job")
.expect("require exactly 1 materialize/sink/cdc source node when creating the streaming job")
}

/// Returns the fragment id where the table dml is received.
Expand Down Expand Up @@ -896,11 +896,17 @@ impl CompleteStreamFragmentGraph {
} = building_fragment;

let distribution_type = distribution.to_distribution_type() as i32;
let materialized_fragment_id =
if inner.fragment_type_mask & FragmentTypeFlag::Mview as u32 != 0 {
table_id
} else {
None
};

let state_table_ids = internal_tables
.iter()
.map(|t| t.id)
.chain(table_id)
.chain(materialized_fragment_id)
.collect();

let upstream_fragment_ids = self
Expand Down

0 comments on commit 7853be6

Please sign in to comment.