diff --git a/updates_and_signals/safe_message_handlers/workflow.py b/updates_and_signals/safe_message_handlers/workflow.py index 8f04173d..7bfe993b 100644 --- a/updates_and_signals/safe_message_handlers/workflow.py +++ b/updates_and_signals/safe_message_handlers/workflow.py @@ -66,7 +66,6 @@ def __init__(self) -> None: # Protects workflow state from interleaved access self.nodes_lock = asyncio.Lock() self.max_history_length: Optional[int] = None - self.sleep_interval_seconds: int = 600 @workflow.signal async def start_cluster(self) -> None: @@ -176,7 +175,6 @@ def init(self, input: ClusterManagerInput) -> None: self.state = input.state if input.test_continue_as_new: self.max_history_length = 120 - self.sleep_interval_seconds = 1 def should_continue_as_new(self) -> bool: # We don't want to continue-as-new if we're in the middle of an update @@ -195,24 +193,16 @@ def should_continue_as_new(self) -> bool: @workflow.run async def run(self, input: ClusterManagerInput) -> ClusterManagerResult: self.init(input) - await workflow.wait_condition(lambda: self.state.cluster_started) - while True: - try: - await workflow.wait_condition( - lambda: self.state.cluster_shutdown - or self.should_continue_as_new(), - timeout=timedelta(seconds=self.sleep_interval_seconds), - ) - except asyncio.TimeoutError: - pass - if self.state.cluster_shutdown: - break - if self.should_continue_as_new(): - workflow.logger.info("Continuing as new") - workflow.continue_as_new( - ClusterManagerInput( - state=self.state, - test_continue_as_new=input.test_continue_as_new, - ) + await workflow.wait_condition( + lambda: self.state.cluster_shutdown or self.should_continue_as_new() + ) + if self.should_continue_as_new(): + workflow.logger.info("Continuing as new") + workflow.continue_as_new( + ClusterManagerInput( + state=self.state, + test_continue_as_new=input.test_continue_as_new, ) - return ClusterManagerResult(len(self.get_assigned_nodes())) + ) + else: + return ClusterManagerResult(len(self.get_assigned_nodes()))