Skip to content

Commit

Permalink
feat: throw an error if unable to find downstream actor index during …
Browse files Browse the repository at this point in the history
…the scaling process. (#15720)

Co-authored-by: lmatz <[email protected]>
  • Loading branch information
shanicky and lmatz authored Mar 20, 2024
1 parent af47580 commit 7e4cf71
Showing 1 changed file with 13 additions and 9 deletions.
22 changes: 13 additions & 9 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1828,16 +1828,17 @@ impl ScaleController {
table_fragment_id_map: &mut HashMap<u32, HashSet<FragmentId>>,
fragment_actor_id_map: &mut HashMap<FragmentId, HashSet<u32>>,
table_fragments: &BTreeMap<TableId, TableFragments>,
) {
) -> MetaResult<()> {
// This is only for assertion purposes and will be removed once the dispatcher_id is guaranteed to always correspond to the downstream fragment_id,
// such as through the foreign key constraints in the SQL backend.
let mut actor_fragment_id_map_for_check = HashMap::new();
for table_fragments in table_fragments.values() {
for (fragment_id, fragment) in &table_fragments.fragments {
for actor in &fragment.actors {
debug_assert!(actor_fragment_id_map_for_check
.insert(actor.actor_id, *fragment_id)
.is_none());
let prev =
actor_fragment_id_map_for_check.insert(actor.actor_id, *fragment_id);

debug_assert!(prev.is_none());
}
}
}
Expand Down Expand Up @@ -1870,9 +1871,10 @@ impl ScaleController {
dispatcher.dispatcher_id as FragmentId
);
} else {
tracing::warn!(
"downstream actor id {} not found in fragment_actor_id_map",
downstream_actor_id
bail!(
"downstream actor id {} from actor {} not found in fragment_actor_id_map",
downstream_actor_id,
actor.actor_id,
);
}

Expand All @@ -1892,6 +1894,8 @@ impl ScaleController {

actor_status.extend(table_fragments.actor_status.clone());
}

Ok(())
}

match &self.metadata_manager {
Expand All @@ -1905,7 +1909,7 @@ impl ScaleController {
&mut table_fragment_id_map,
&mut fragment_actor_id_map,
guard.table_fragments(),
);
)?;
}
MetadataManager::V2(_) => {
let all_table_fragments = self.list_all_table_fragments().await?;
Expand All @@ -1922,7 +1926,7 @@ impl ScaleController {
&mut table_fragment_id_map,
&mut fragment_actor_id_map,
&all_table_fragments,
);
)?;
}
}

Expand Down

0 comments on commit 7e4cf71

Please sign in to comment.