Skip to content

Commit

Permalink
fix(metadata v2): existing actor splits were not updated (#18553)
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyufjh authored Sep 17, 2024
1 parent 9167768 commit 11ad34b
Showing 1 changed file with 18 additions and 0 deletions.
18 changes: 18 additions & 0 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1447,6 +1447,7 @@ impl CatalogController {
.exec(&txn)
.await?;

// add new actors
for (
PbStreamActor {
actor_id,
Expand Down Expand Up @@ -1554,6 +1555,23 @@ impl CatalogController {
actor.update(&txn).await?;
}

// Update actor_splits for existing actors
for (actor_id, splits) in actor_splits {
if new_created_actors.contains(&(actor_id as ActorId)) {
continue;
}

let actor = Actor::find_by_id(actor_id as ActorId)
.one(&txn)
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?;

let mut actor = actor.into_active_model();
let splits = splits.iter().map(PbConnectorSplit::from).collect_vec();
actor.splits = Set(Some((&PbConnectorSplits { splits }).into()));
actor.update(&txn).await?;
}

// fragment update
let fragment = Fragment::find_by_id(fragment_id)
.one(&txn)
Expand Down

0 comments on commit 11ad34b

Please sign in to comment.