Skip to content

Commit

Permalink
Demonstrate workflow.all_handlers_finished
Browse files Browse the repository at this point in the history
  • Loading branch information
drewhoskins-temporal committed Aug 31, 2024
1 parent ccf0945 commit b8ab80d
Showing 1 changed file with 4 additions and 3 deletions.
7 changes: 4 additions & 3 deletions updates_and_signals/safe_message_handlers/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,6 @@ def init(self, input: ClusterManagerInput) -> None:
self.sleep_interval_seconds = 1

def should_continue_as_new(self) -> bool:
# We don't want to continue-as-new if we're in the middle of an update
if self.nodes_lock.locked():
return False
if workflow.info().is_continue_as_new_suggested():
return True
# This is just for ease-of-testing. In production, we trust temporal to tell us when to continue as new.
Expand Down Expand Up @@ -243,13 +240,17 @@ async def run(self, input: ClusterManagerInput) -> ClusterManagerResult:
if self.state.cluster_shutdown:
break
if self.should_continue_as_new():
# We don't want to leave any job assignment or deletion handlers half-finished when we continue as new.
await workflow.wait_condition(lambda: workflow.all_handlers_finished())
workflow.logger.info("Continuing as new")
workflow.continue_as_new(
ClusterManagerInput(
state=self.state,
test_continue_as_new=input.test_continue_as_new,
)
)
# Make sure we finish off handlers such as deleting jobs before we complete the workflow.
await workflow.wait_condition(lambda: workflow.all_handlers_finished())
return ClusterManagerResult(
len(self.get_assigned_nodes()),
len(self.get_bad_nodes()),
Expand Down

0 comments on commit b8ab80d

Please sign in to comment.