diff --git a/parsl/executors/radical/executor.py b/parsl/executors/radical/executor.py index 93b4b38bbd..aa191f49e5 100644 --- a/parsl/executors/radical/executor.py +++ b/parsl/executors/radical/executor.py @@ -133,6 +133,7 @@ def __init__(self, self.resource = resource self._uid = RPEX.lower() self.bulk_mode = bulk_mode + self._terminate = mt.Event() self.working_dir = working_dir self.pilot_kwargs = rpex_pilot_kwargs self.future_tasks: Dict[str, Future] = {} @@ -532,7 +533,7 @@ def _bulk_collector(self): bulk = list() - while True: + while not self._terminate.is_set(): now = time.time() # time of last submission @@ -552,6 +553,9 @@ def _bulk_collector(self): if len(bulk) >= self._max_bulk_size: break + if self._terminate.is_set(): + break + if bulk: logger.debug('submit bulk: %d', len(bulk)) self.tmgr.submit_tasks(bulk) @@ -588,6 +592,13 @@ def submit(self, func, resource_specification, *args, **kwargs): def shutdown(self, hub=True, targets='all', block=False): """Shutdown the executor, including all RADICAL-Pilot components.""" logger.info("RadicalPilotExecutor is terminating...") + + self._terminate.set() + + # ensure we are in the bulk submssion mode + if self.bulk_mode: + self._bulk_thread.join() + self.session.close(download=True) logger.info("RadicalPilotExecutor is terminated.")