diff --git a/src/plumpy/process_states.py b/src/plumpy/process_states.py index d962eb77..3407412d 100644 --- a/src/plumpy/process_states.py +++ b/src/plumpy/process_states.py @@ -330,6 +330,10 @@ async def execute(self) -> State: # type: ignore # pylint: disable=invalid-over def resume(self, value: Any = NULL) -> None: assert self._waiting_future is not None, 'Not yet waiting' + + if self._waiting_future.done(): + return + self._waiting_future.set_result(value) diff --git a/test/rmq/docker-compose.yml b/test/rmq/docker-compose.yml index 6e743f7d..456690e0 100644 --- a/test/rmq/docker-compose.yml +++ b/test/rmq/docker-compose.yml @@ -12,16 +12,15 @@ version: '3.4' services: - rabbit: - image: rabbitmq:3.8.3-management - container_name: plumpy-rmq + image: rabbitmq:3-management-alpine + container_name: plumpy_rmq + ports: + - 5672:5672 + - 15672:15672 environment: RABBITMQ_DEFAULT_USER: guest RABBITMQ_DEFAULT_PASS: guest - ports: - - '5672:5672' - - '15672:15672' healthcheck: test: rabbitmq-diagnostics -q ping interval: 30s diff --git a/test/test_processes.py b/test/test_processes.py index 158cce66..0cb4161b 100644 --- a/test/test_processes.py +++ b/test/test_processes.py @@ -836,6 +836,31 @@ async def async_test(): loop.create_task(proc.step_until_terminated()) loop.run_until_complete(async_test()) + def test_double_restart(self): + """Test that consecutive restarts do not cause any issues, this is tested for concurrency reasons.""" + loop = asyncio.get_event_loop() + proc = _RestartProcess() + + async def async_test(): + await utils.run_until_waiting(proc) + + # Save the state of the process + saved_state = plumpy.Bundle(proc) + + # Load a process from the saved state + loaded_proc = saved_state.unbundle() + self.assertEqual(loaded_proc.state, ProcessState.WAITING) + + # Now resume it twice in succession + loaded_proc.resume() + loaded_proc.resume() + + await loaded_proc.step_until_terminated() + self.assertEqual(loaded_proc.outputs, {'finished': True}) + + loop.create_task(proc.step_until_terminated()) + loop.run_until_complete(async_test()) + def test_wait_save_continue(self): """ Test that process saved while in WAITING state restarts correctly when loaded """ loop = asyncio.get_event_loop()