From 7853be6cd209c71cb4235c8ac106d5c2fbefb009 Mon Sep 17 00:00:00 2001 From: August Date: Tue, 23 Jan 2024 20:37:12 +0800 Subject: [PATCH] fix: only fill state table id of materialize node in table fragments (#14754) --- src/meta/src/stream/stream_graph/fragment.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 925b01c8cbdcf..ccea576f03939 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -51,7 +51,7 @@ pub(super) struct BuildingFragment { /// The fragment structure from the frontend, with the global fragment ID. inner: StreamFragment, - /// The ID of the job if it's materialized in this fragment. + /// The ID of the job if it contains the streaming job node. table_id: Option, /// The required columns of each upstream table. @@ -457,14 +457,14 @@ impl StreamFragmentGraph { Ok(()) } - /// Returns the fragment id where the table is materialized. + /// Returns the fragment id where the streaming job node located. pub fn table_fragment_id(&self) -> FragmentId { self.fragments .values() .filter(|b| b.table_id.is_some()) .map(|b| b.fragment_id) .exactly_one() - .expect("require exactly 1 materialize/sink node when creating the streaming job") + .expect("require exactly 1 materialize/sink/cdc source node when creating the streaming job") } /// Returns the fragment id where the table dml is received. @@ -896,11 +896,17 @@ impl CompleteStreamFragmentGraph { } = building_fragment; let distribution_type = distribution.to_distribution_type() as i32; + let materialized_fragment_id = + if inner.fragment_type_mask & FragmentTypeFlag::Mview as u32 != 0 { + table_id + } else { + None + }; let state_table_ids = internal_tables .iter() .map(|t| t.id) - .chain(table_id) + .chain(materialized_fragment_id) .collect(); let upstream_fragment_ids = self