diff --git a/updates_and_signals/safe_message_handlers/workflow.py b/updates_and_signals/safe_message_handlers/workflow.py index b8230578..5d441637 100644 --- a/updates_and_signals/safe_message_handlers/workflow.py +++ b/updates_and_signals/safe_message_handlers/workflow.py @@ -212,9 +212,6 @@ def init(self, input: ClusterManagerInput) -> None: self.sleep_interval_seconds = 1 def should_continue_as_new(self) -> bool: - # We don't want to continue-as-new if we're in the middle of an update - if self.nodes_lock.locked(): - return False if workflow.info().is_continue_as_new_suggested(): return True # This is just for ease-of-testing. In production, we trust temporal to tell us when to continue as new. @@ -243,6 +240,8 @@ async def run(self, input: ClusterManagerInput) -> ClusterManagerResult: if self.state.cluster_shutdown: break if self.should_continue_as_new(): + # We don't want to leave any job assignment or deletion handlers half-finished when we continue as new. + await workflow.wait_condition(lambda: workflow.all_handlers_finished()) workflow.logger.info("Continuing as new") workflow.continue_as_new( ClusterManagerInput( @@ -250,6 +249,8 @@ async def run(self, input: ClusterManagerInput) -> ClusterManagerResult: test_continue_as_new=input.test_continue_as_new, ) ) + # Make sure we finish off handlers such as deleting jobs before we complete the workflow. + await workflow.wait_condition(lambda: workflow.all_handlers_finished()) return ClusterManagerResult( len(self.get_assigned_nodes()), len(self.get_bad_nodes()),