diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 8a895ee447..6f9d4cccbc 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -3,6 +3,7 @@ import logging import multiprocessing.synchronize as ms import os +import pickle import queue import time from multiprocessing import Event @@ -18,7 +19,6 @@ from parsl.monitoring.types import TaggedMonitoringMessage from parsl.multiprocessing import ForkProcess, SizedQueue from parsl.process_loggers import wrap_with_logs -from parsl.serialize import deserialize from parsl.utils import RepresentationMixin, setproctitle _db_manager_excepts: Optional[Exception] @@ -278,7 +278,7 @@ def filesystem_receiver(q: Queue[TaggedMonitoringMessage], run_dir: str) -> None logger.info(f"Processing filesystem radio file {filename}") full_path_filename = f"{new_dir}/{filename}" with open(full_path_filename, "rb") as f: - message = deserialize(f.read()) + message = pickle.load(f) logger.debug(f"Message received is: {message}") assert isinstance(message, tuple) target_radio.send(cast(TaggedMonitoringMessage, message)) diff --git a/parsl/monitoring/radios.py b/parsl/monitoring/radios.py index 5b8de46919..14dc046557 100644 --- a/parsl/monitoring/radios.py +++ b/parsl/monitoring/radios.py @@ -8,8 +8,6 @@ import zmq -from parsl.serialize import serialize - logger = logging.getLogger(__name__) @@ -59,7 +57,7 @@ def send(self, message: object) -> None: # move it into new/, so that a partially written # file will never be observed in new/ with open(tmp_filename, "wb") as f: - f.write(serialize(buffer)) + pickle.dump(buffer, f) os.rename(tmp_filename, new_filename)