diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 4b4180b46b..87f90fa234 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -1,4 +1,3 @@ -import base64 import logging import math import pickle @@ -9,7 +8,7 @@ from collections import defaultdict from concurrent.futures import Future from dataclasses import dataclass -from typing import Callable, Dict, List, Optional, Sequence, Tuple, Union +from typing import IO, Callable, Dict, List, Optional, Sequence, Tuple, Union import typeguard @@ -543,18 +542,22 @@ def _start_local_interchange_process(self) -> None: "cert_dir": self.cert_dir, } - encoded = base64.b64encode(pickle.dumps(interchange_config)) + config_pickle = pickle.dumps(interchange_config) - cmd: List[bytes] = [b"interchange.py", - encoded - ] - self.interchange_proc = subprocess.Popen(cmd) + self.interchange_proc = subprocess.Popen(b"interchange.py", stdin=subprocess.PIPE) + stdin = self.interchange_proc.stdin + assert isinstance(stdin, IO) + logger.debug("Popened interchange process. Writing config object") + stdin.write(config_pickle) + stdin.flush() + logger.debug("Sent config object. Requesting worker ports") try: (self.worker_task_port, self.worker_result_port) = self.command_client.run("WORKER_PORTS", timeout_s=120) except CommandClientTimeoutError: - logger.error("Interchange has not completed initialization in 120s. Aborting") + logger.error("Interchange has not completed initialization. Aborting") raise Exception("Interchange failed to start") + logger.debug("Got worker ports") def _start_queue_management_thread(self): """Method to start the management thread as a daemon. diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index b196c49814..9fe94dbabd 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -1,5 +1,4 @@ #!/usr/bin/env python -import base64 import datetime import json import logging @@ -676,7 +675,7 @@ def start_file_logger(filename: str, level: int = logging.DEBUG, format_string: if __name__ == "__main__": setproctitle("parsl: HTEX interchange") - config = pickle.loads(base64.b64decode(sys.argv[1])) + config = pickle.load(sys.stdin.buffer) ic = Interchange(**config) ic.start()