diff --git a/proto/meta.proto b/proto/meta.proto index d75494625edd..8932dcbc9e03 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -99,6 +99,7 @@ message TableFragments { State state = 2; map fragments = 3; map actor_status = 4; + // `Source` and `SourceBackfill` are handled together here. map actor_splits = 5; stream_plan.StreamContext ctx = 6; @@ -513,6 +514,7 @@ message GetClusterInfoRequest {} message GetClusterInfoResponse { repeated common.WorkerNode worker_nodes = 1; repeated TableFragments table_fragments = 2; + // `Source` and `SourceBackfill` are handled together here. map actor_splits = 3; map source_infos = 4; uint64 revision = 5; diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 5ea2f018eee2..a96f54818146 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -23,6 +23,7 @@ message AddMutation { // All actors to be added (to the main connected component of the graph) in this update. repeated uint32 added_actors = 3; // We may embed a source change split mutation here. + // `Source` and `SourceBackfill` are handled together here. // TODO: we may allow multiple mutations in a single barrier. map actor_splits = 2; // We may embed a pause mutation here. @@ -70,6 +71,7 @@ message UpdateMutation { // All actors to be dropped in this update. repeated uint32 dropped_actors = 4; // Source updates. + // `Source` and `SourceBackfill` are handled together here. map actor_splits = 5; // When modifying the Materialized View, we need to recreate the Dispatcher from the old upstream to the new TableFragment. // Consistent with the semantics in AddMutation. @@ -77,6 +79,7 @@ message UpdateMutation { } message SourceChangeSplitMutation { + // `Source` and `SourceBackfill` are handled together here. map actor_splits = 2; } diff --git a/src/connector/src/with_options.rs b/src/connector/src/with_options.rs index ae2d432fdfd7..065c9394b8a4 100644 --- a/src/connector/src/with_options.rs +++ b/src/connector/src/with_options.rs @@ -126,6 +126,10 @@ pub trait WithPropertiesExt: Get + Sized { CdcTableType::from_properties(self).enable_transaction_metadata() } + fn is_shareable_non_cdc_connector(&self) -> bool { + self.is_kafka_connector() + } + #[inline(always)] fn is_iceberg_connector(&self) -> bool { let Some(connector) = self.get_connector() else { diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index f006ca929f54..432f814cd4c4 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -1640,7 +1640,8 @@ pub async fn handle_create_source( let create_cdc_source_job = with_properties.is_shareable_cdc_connector(); let is_shared = create_cdc_source_job - || (with_properties.is_kafka_connector() && session.config().rw_enable_shared_source()); + || (with_properties.is_shareable_non_cdc_connector() + && session.config().rw_enable_shared_source()); let (columns_from_resolve_source, mut source_info) = if create_cdc_source_job { bind_columns_from_source_for_cdc(&session, &source_schema)? diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 0bea5f37940d..cf6f251b359c 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -78,6 +78,7 @@ pub struct Reschedule { /// Reassigned splits for source actors. /// It becomes the `actor_splits` in [`UpdateMutation`]. + /// `Source` and `SourceBackfill` are handled together here. pub actor_splits: HashMap>, /// Whether this fragment is injectable. The injectable means whether the fragment contains diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index bec6b95cfb0f..447cf5cf8564 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -106,7 +106,8 @@ pub struct TableFragments { /// The status of actors pub actor_status: BTreeMap, - /// The splits of actors + /// The splits of actors, + /// incl. both `Source` and `SourceBackfill` actors. pub actor_splits: HashMap>, /// The streaming context associated with this stream plan and its fragments