Skip to content

Commit

Permalink
refactor: add some comments for source splits (#18034)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan authored Sep 4, 2024
1 parent c4b1dd4 commit 6402328
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 2 deletions.
2 changes: 2 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ message TableFragments {
State state = 2;
map<uint32, Fragment> fragments = 3;
map<uint32, ActorStatus> actor_status = 4;
// `Source` and `SourceBackfill` are handled together here.
map<uint32, source.ConnectorSplits> actor_splits = 5;

stream_plan.StreamContext ctx = 6;
Expand Down Expand Up @@ -513,6 +514,7 @@ message GetClusterInfoRequest {}
message GetClusterInfoResponse {
repeated common.WorkerNode worker_nodes = 1;
repeated TableFragments table_fragments = 2;
// `Source` and `SourceBackfill` are handled together here.
map<uint32, source.ConnectorSplits> actor_splits = 3;
map<uint32, catalog.Source> source_infos = 4;
uint64 revision = 5;
Expand Down
3 changes: 3 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ message AddMutation {
// All actors to be added (to the main connected component of the graph) in this update.
repeated uint32 added_actors = 3;
// We may embed a source change split mutation here.
// `Source` and `SourceBackfill` are handled together here.
// TODO: we may allow multiple mutations in a single barrier.
map<uint32, source.ConnectorSplits> actor_splits = 2;
// We may embed a pause mutation here.
Expand Down Expand Up @@ -70,13 +71,15 @@ message UpdateMutation {
// All actors to be dropped in this update.
repeated uint32 dropped_actors = 4;
// Source updates.
// `Source` and `SourceBackfill` are handled together here.
map<uint32, source.ConnectorSplits> actor_splits = 5;
// When modifying the Materialized View, we need to recreate the Dispatcher from the old upstream to the new TableFragment.
// Consistent with the semantics in AddMutation.
map<uint32, Dispatchers> actor_new_dispatchers = 6;
}

message SourceChangeSplitMutation {
// `Source` and `SourceBackfill` are handled together here.
map<uint32, source.ConnectorSplits> actor_splits = 2;
}

Expand Down
4 changes: 4 additions & 0 deletions src/connector/src/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ pub trait WithPropertiesExt: Get + Sized {
CdcTableType::from_properties(self).enable_transaction_metadata()
}

fn is_shareable_non_cdc_connector(&self) -> bool {
self.is_kafka_connector()
}

#[inline(always)]
fn is_iceberg_connector(&self) -> bool {
let Some(connector) = self.get_connector() else {
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1640,7 +1640,8 @@ pub async fn handle_create_source(

let create_cdc_source_job = with_properties.is_shareable_cdc_connector();
let is_shared = create_cdc_source_job
|| (with_properties.is_kafka_connector() && session.config().rw_enable_shared_source());
|| (with_properties.is_shareable_non_cdc_connector()
&& session.config().rw_enable_shared_source());

let (columns_from_resolve_source, mut source_info) = if create_cdc_source_job {
bind_columns_from_source_for_cdc(&session, &source_schema)?
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ pub struct Reschedule {

/// Reassigned splits for source actors.
/// It becomes the `actor_splits` in [`UpdateMutation`].
/// `Source` and `SourceBackfill` are handled together here.
pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,

/// Whether this fragment is injectable. The injectable means whether the fragment contains
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ pub struct TableFragments {
/// The status of actors
pub actor_status: BTreeMap<ActorId, ActorStatus>,

/// The splits of actors
/// The splits of actors,
/// incl. both `Source` and `SourceBackfill` actors.
pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,

/// The streaming context associated with this stream plan and its fragments
Expand Down

0 comments on commit 6402328

Please sign in to comment.