diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index e9bf934cf1..87aad67545 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -90,7 +90,7 @@ def __init__(self, config: Config) -> None: self.run_dir = make_rundir(config.run_dir) if config.initialize_logging: - parsl.set_file_logger("{}/parsl.log".format(self.run_dir), level=logging.DEBUG) + _, self.logging_handler = parsl.set_file_logger("{}/parsl.log".format(self.run_dir), level=logging.DEBUG) logger.info("Starting DataFlowKernel with config\n{}".format(config)) @@ -1262,7 +1262,13 @@ def cleanup(self) -> None: else: logger.debug("Cleaning up non-default DFK - not unregistering") - logger.info("DFK cleanup complete") + # TODO: do this in parsl/logutils.py + logger.info("DFK cleanup complete - removing parsl.log handler") + logger_to_remove = logging.getLogger("parsl") + logger_to_remove.removeHandler(self.logging_handler) + self.logging_handler.close() + + logger.info("handler closed - is this going to break things?") def checkpoint(self, tasks: Optional[Sequence[TaskRecord]] = None) -> str: """Checkpoint the dfk incrementally to a checkpoint file. diff --git a/parsl/log_utils.py b/parsl/log_utils.py index ff25249b1b..b5d483b8e6 100644 --- a/parsl/log_utils.py +++ b/parsl/log_utils.py @@ -12,7 +12,7 @@ """ import io import logging -from typing import Optional +from typing import Optional, Tuple import typeguard @@ -65,7 +65,7 @@ def set_stream_logger(name: str = 'parsl', def set_file_logger(filename: str, name: str = 'parsl', level: int = logging.DEBUG, - format_string: Optional[str] = None) -> logging.Logger: + format_string: Optional[str] = None) -> Tuple[logging.Logger, logging.FileHandler]: """Add a file log handler. Args: @@ -93,4 +93,4 @@ def set_file_logger(filename: str, futures_logger = logging.getLogger("concurrent.futures") futures_logger.addHandler(handler) - return logger + return (logger, handler) diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 3fbe5736ba..3a585e3c6d 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -252,9 +252,9 @@ def close(self) -> None: @wrap_with_logs def filesystem_receiver(q: Queue[TaggedMonitoringMessage], run_dir: str) -> None: - logger = set_file_logger(f"{run_dir}/monitoring_filesystem_radio.log", - name="monitoring_filesystem_radio", - level=logging.INFO) + logger, _ = set_file_logger(f"{run_dir}/monitoring_filesystem_radio.log", + name="monitoring_filesystem_radio", + level=logging.INFO) logger.info("Starting filesystem radio receiver") setproctitle("parsl: monitoring filesystem receiver") diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index 0926712c36..789739144c 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -60,9 +60,9 @@ def __init__(self, An event that the main Parsl process will set to signal that the monitoring router should shut down. """ os.makedirs(run_dir, exist_ok=True) - self.logger = set_file_logger(f"{run_dir}/monitoring_router.log", - name="monitoring_router", - level=logging_level) + self.logger, _ = set_file_logger(f"{run_dir}/monitoring_router.log", + name="monitoring_router", + level=logging_level) self.logger.debug("Monitoring router starting") self.hub_address = hub_address diff --git a/parsl/tests/conftest.py b/parsl/tests/conftest.py index 4bcdde0b7a..9e8838be9c 100644 --- a/parsl/tests/conftest.py +++ b/parsl/tests/conftest.py @@ -20,6 +20,7 @@ from itertools import chain import _pytest.runner as runner +import psutil import pytest import parsl @@ -179,6 +180,12 @@ def load_dfk_session(request, pytestconfig, tmpd_cwd_session): config = pytestconfig.getoption('config')[0] if config != 'local': + this_process = psutil.Process() + start_fds = this_process.num_fds() + logger.error(f"BENC: start open fds: {start_fds}") + + assert threading.active_count() == 1, "precondition: only one thread can be running before this test: " + repr(threading.enumerate()) + spec = importlib.util.spec_from_file_location('', config) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) @@ -206,6 +213,15 @@ def load_dfk_session(request, pytestconfig, tmpd_cwd_session): raise RuntimeError("DFK changed unexpectedly during test") dfk.cleanup() assert DataFlowKernelLoader._dfk is None + end_fds = this_process.num_fds() + logger.error(f"BENC: end open fds: {end_fds}") + + end_fds = this_process.num_fds() + logger.error(f"BENC: end open fds: {end_fds} (vs {start_fds} at start)") + assert start_fds == end_fds, "number of open fds changed across test run" + + assert threading.active_count() == 1, "test left threads running: " + repr(threading.enumerate()) + else: yield @@ -227,6 +243,12 @@ def load_dfk_local_module(request, pytestconfig, tmpd_cwd_session): config = pytestconfig.getoption('config')[0] if config == 'local': + this_process = psutil.Process() + start_fds = this_process.num_fds() + logger.error(f"BENC: start open fds: {start_fds}") + logger.error(f"BENC: start threads: {threading.active_count()}") + + assert threading.active_count() == 1, "precondition: only one thread can be running before this test" local_setup = getattr(request.module, "local_setup", None) local_teardown = getattr(request.module, "local_teardown", None) local_config = getattr(request.module, "local_config", None) @@ -258,6 +280,18 @@ def load_dfk_local_module(request, pytestconfig, tmpd_cwd_session): raise RuntimeError("DFK changed unexpectedly during test") dfk.cleanup() assert DataFlowKernelLoader._dfk is None + end_fds = this_process.num_fds() + logger.error(f"BENC: end open fds: {end_fds} (vs start {start_fds}") + logger.error(f"BENC: end threads: {threading.active_count()}") + + end_fds = this_process.num_fds() + logger.error(f"BENC: open fds END: {end_fds}") + if end_fds > start_fds: + logger.error(f"Open files (not all fds, though?): {this_process.open_files()!r}") + os.system(f"ls -l /proc/{os.getpid()}/fd") + pytest.fail("BENC: number of open fds increased across test") + + assert threading.active_count() == 1, "test left threads running: " + repr(threading.enumerate()) else: yield diff --git a/parsl/tests/test_error_handling/test_resource_spec.py b/parsl/tests/test_error_handling/test_resource_spec.py index 4616219be2..df7c5a4bad 100644 --- a/parsl/tests/test_error_handling/test_resource_spec.py +++ b/parsl/tests/test_error_handling/test_resource_spec.py @@ -32,7 +32,7 @@ def test_resource(n=2): # Specify resources with wrong types # 'cpus' is incorrect, should be 'cores' - spec = {'cpus': 2, 'memory': 1000, 'disk': 1000} + spec = {'cpus': 2, 'memory': 1000, 'disk': 10} fut = double(n, parsl_resource_specification=spec) try: fut.result()