Skip to content

Commit

Permalink
feat(meta): support replace source
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Dec 3, 2024
1 parent 2a4081e commit e7756d6
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 10 deletions.
23 changes: 20 additions & 3 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,7 @@ impl CatalogController {
Ok(version)
}

/// TODO: make it general for other streaming jobs.
/// TODO(alter-source): make it general for other streaming jobs.
/// Currently only for replacing table.
pub async fn finish_replace_streaming_job_inner(
tmp_id: ObjectId,
Expand Down Expand Up @@ -1065,7 +1065,11 @@ impl CatalogController {
table.incoming_sinks = Set(incoming_sinks.into());
table.update(txn).await?;
}
// TODO: support other streaming jobs
StreamingJob::Source(source) => {
// Update the source catalog with the new one.
let source = source::ActiveModel::from(source);
source.update(txn).await?;
}
_ => unreachable!(
"invalid streaming job type: {:?}",
streaming_job.job_type_str()
Expand Down Expand Up @@ -1223,8 +1227,21 @@ impl CatalogController {
)),
})
}
_ => unreachable!("invalid streaming job type: {:?}", job_type),
StreamingJobType::Source => {
let (source, source_obj) = Source::find_by_id(original_job_id)
.find_also_related(Object)
.one(txn)
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
relations.push(PbRelation {
relation_info: Some(PbRelationInfo::Source(
ObjectModel(source, source_obj.unwrap()).into(),
)),
})
}
_ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
}
// TODO(alter-source) what is this
if let Some(table_col_index_mapping) = table_col_index_mapping {
let expr_rewriter = ReplaceTableExprRewriter {
table_col_index_mapping,
Expand Down
18 changes: 11 additions & 7 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1299,6 +1299,7 @@ impl DdlController {
&self,
mut streaming_job: StreamingJob,
fragment_graph: StreamFragmentGraphProto,
// TODO(alter-source): what is this
table_col_index_mapping: Option<ColIndexMapping>,
) -> MetaResult<NotificationVersion> {
match &mut streaming_job {
Expand Down Expand Up @@ -1695,9 +1696,8 @@ impl DdlController {
tmp_job_id: TableId,
) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragments)> {
match &stream_job {
StreamingJob::Table(..) => {}
StreamingJob::Source(..)
| StreamingJob::MaterializedView(..)
StreamingJob::Table(..) | StreamingJob::Source(..) => {}
StreamingJob::MaterializedView(..)
| StreamingJob::Sink(..)
| StreamingJob::Index(..) => {
bail_not_implemented!("schema change for {}", stream_job.job_type_str())
Expand Down Expand Up @@ -1742,8 +1742,8 @@ impl DdlController {
}

// build complete graph based on the table job type
let complete_graph = match job_type {
StreamingJobType::Table(TableJobType::General) => {
let complete_graph = match &job_type {
StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
CompleteStreamFragmentGraph::with_downstreams(
fragment_graph,
original_mview_fragment.fragment_id,
Expand Down Expand Up @@ -1791,8 +1791,11 @@ impl DdlController {
merge_updates,
} = actor_graph_builder.generate_graph(&self.env, stream_job, expr_context)?;

// general table job type does not have upstream job, so the dispatchers should be empty
if matches!(job_type, StreamingJobType::Table(TableJobType::General)) {
// general table & source does not have upstream job, so the dispatchers should be empty
if matches!(
job_type,
StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
) {
assert!(dispatchers.is_empty());
}

Expand All @@ -1810,6 +1813,7 @@ impl DdlController {

// Note: no need to set `vnode_count` as it's already set by the frontend.
// See `get_replace_table_plan`.
// TODO(alter-source): check what does this mean

let ctx = ReplaceStreamJobContext {
old_fragments,
Expand Down

0 comments on commit e7756d6

Please sign in to comment.