Skip to content

Commit

Permalink
feat: inject pause/resume for creating sink into table (#17651)
Browse files Browse the repository at this point in the history
Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky authored Aug 26, 2024
1 parent f00c82f commit ce7a5af
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
12 changes: 11 additions & 1 deletion src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,8 @@ impl GlobalStreamManager {
"built actors finished"
);

let need_pause = replace_table_job_info.is_some();

if let Some((streaming_job, context, table_fragments)) = replace_table_job_info {
self.build_actors(
&table_fragments,
Expand Down Expand Up @@ -516,7 +518,15 @@ impl GlobalStreamManager {
}
};
let result: MetaResult<NotificationVersion> = try {
self.barrier_scheduler.run_command(command).await?;
if need_pause {
// Special handling is required when creating sink into table, we need to pause the stream to avoid data loss.
self.barrier_scheduler
.run_config_change_command_with_pause(command)
.await?;
} else {
self.barrier_scheduler.run_command(command).await?;
}

tracing::debug!(?streaming_job, "first barrier collected for stream job");
let result = self
.metadata_manager
Expand Down
5 changes: 4 additions & 1 deletion src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,11 @@ impl Barrier {
Some(
Mutation::Update { .. } // new actors for scaling
| Mutation::Add(AddMutation { pause: true, .. }) // new streaming job, or recovery
| Mutation::AddAndUpdate(AddMutation { pause: true, .. }, _) // new actors for replacing table
) => true,
Some(Mutation::AddAndUpdate(AddMutation { pause, ..}, _)) => {
assert!(pause);
true
},
_ => false,
}
}
Expand Down

0 comments on commit ce7a5af

Please sign in to comment.