From 20e5898e0c9037624988fe321e784f4fe38a2e8d Mon Sep 17 00:00:00 2001 From: Sebastiano Bisacchi <33641204+sebaB003@users.noreply.github.com> Date: Mon, 24 Jun 2024 11:20:53 +0200 Subject: [PATCH] Make `Waiting.resume()` idempotent (#285) Calling `Waiting.resume()` when it had already been resumed would raise an exception. Here, the method is made idempotent by checking first whether the future has already been resolved. This fix ensures the behavior matches the behavior of the other state transitions: calling `play` on an already running process and calling `pause` on an already paused process isn't rising any error. --- src/plumpy/process_states.py | 4 ++++ test/rmq/docker-compose.yml | 11 +++++------ test/test_processes.py | 25 +++++++++++++++++++++++++ 3 files changed, 34 insertions(+), 6 deletions(-) diff --git a/src/plumpy/process_states.py b/src/plumpy/process_states.py index d962eb77..3407412d 100644 --- a/src/plumpy/process_states.py +++ b/src/plumpy/process_states.py @@ -330,6 +330,10 @@ async def execute(self) -> State: # type: ignore # pylint: disable=invalid-over def resume(self, value: Any = NULL) -> None: assert self._waiting_future is not None, 'Not yet waiting' + + if self._waiting_future.done(): + return + self._waiting_future.set_result(value) diff --git a/test/rmq/docker-compose.yml b/test/rmq/docker-compose.yml index 6e743f7d..456690e0 100644 --- a/test/rmq/docker-compose.yml +++ b/test/rmq/docker-compose.yml @@ -12,16 +12,15 @@ version: '3.4' services: - rabbit: - image: rabbitmq:3.8.3-management - container_name: plumpy-rmq + image: rabbitmq:3-management-alpine + container_name: plumpy_rmq + ports: + - 5672:5672 + - 15672:15672 environment: RABBITMQ_DEFAULT_USER: guest RABBITMQ_DEFAULT_PASS: guest - ports: - - '5672:5672' - - '15672:15672' healthcheck: test: rabbitmq-diagnostics -q ping interval: 30s diff --git a/test/test_processes.py b/test/test_processes.py index 158cce66..0cb4161b 100644 --- a/test/test_processes.py +++ b/test/test_processes.py @@ -836,6 +836,31 @@ async def async_test(): loop.create_task(proc.step_until_terminated()) loop.run_until_complete(async_test()) + def test_double_restart(self): + """Test that consecutive restarts do not cause any issues, this is tested for concurrency reasons.""" + loop = asyncio.get_event_loop() + proc = _RestartProcess() + + async def async_test(): + await utils.run_until_waiting(proc) + + # Save the state of the process + saved_state = plumpy.Bundle(proc) + + # Load a process from the saved state + loaded_proc = saved_state.unbundle() + self.assertEqual(loaded_proc.state, ProcessState.WAITING) + + # Now resume it twice in succession + loaded_proc.resume() + loaded_proc.resume() + + await loaded_proc.step_until_terminated() + self.assertEqual(loaded_proc.outputs, {'finished': True}) + + loop.create_task(proc.step_until_terminated()) + loop.run_until_complete(async_test()) + def test_wait_save_continue(self): """ Test that process saved while in WAITING state restarts correctly when loaded """ loop = asyncio.get_event_loop()