diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 953c17405e..3b7643bc21 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -10,7 +10,7 @@ jobs: main-test-suite: strategy: matrix: - python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + python-version: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13.0-rc.1"] runs-on: ubuntu-20.04 timeout-minutes: 60 diff --git a/Makefile b/Makefile index 90f20601e9..54684f3e7e 100644 --- a/Makefile +++ b/Makefile @@ -10,6 +10,7 @@ export PATH := $(CCTOOLS_INSTALL)/bin/:$(PATH) export CCTOOLS_VERSION=7.8.0 export HYDRA_LAUNCHER=fork export OMPI_MCA_rmaps_base_oversubscribe=yes +export RADICAL_UTILS_NO_ATFORK=whatever MPI=$(MPICH) .PHONY: help @@ -62,7 +63,7 @@ htex_local_test: ## run all tests with htex_local config .PHONY: htex_local_alternate_test htex_local_alternate_test: ## run all tests with htex_local config - pip3 install ".[monitoring]" + pip3 install "." pytest parsl/tests/ -k "not cleannet" --config parsl/tests/configs/htex_local_alternate.py --random-order --durations 10 $(CCTOOLS_INSTALL): #CCtools contains both taskvine and workqueue so install only once @@ -84,7 +85,7 @@ radical_local_test: .PHONY: config_local_test config_local_test: $(CCTOOLS_INSTALL) - pip3 install ".[monitoring,visualization,proxystore]" + pip3 install ".[proxystore]" PYTHONPATH=/tmp/cctools/lib/python3.8/site-packages pytest parsl/tests/ -k "not cleannet" --config local --random-order --durations 10 .PHONY: site_test @@ -97,7 +98,7 @@ perf_test: parsl-perf --time 5 --config parsl/tests/configs/local_threads.py .PHONY: test ## run all tests with all config types -test: clean_coverage isort lint flake8 mypy local_thread_test htex_local_test htex_local_alternate_test wqex_local_test vineex_local_test radical_local_test config_local_test perf_test ## run all tests +test: clean_coverage isort lint flake8 mypy local_thread_test htex_local_test wqex_local_test vineex_local_test radical_local_test config_local_test perf_test ## run all tests .PHONY: tag tag: ## create a tag in git. to run, do a 'make VERSION="version string" tag diff --git a/mypy.ini b/mypy.ini index 604fa4d07a..7c703048f8 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,5 +1,6 @@ [mypy] -plugins = sqlalchemy.ext.mypy.plugin +# for greenlet +# plugins = sqlalchemy.ext.mypy.plugin enable_error_code = ignore-without-code no_implicit_reexport = True @@ -81,6 +82,7 @@ disallow_any_expr = True disallow_untyped_defs = True [mypy-parsl.monitoring.*] +disable_error_code = misc, valid-type, unused-ignore, attr-defined disallow_untyped_decorators = True check_untyped_defs = True disallow_subclassing_any = True @@ -206,3 +208,7 @@ ignore_missing_imports = True [mypy-proxystore.*] ignore_missing_imports = True + +# for ignoring monitoring in CI due to no greenlet on python 3.13 +[mypy-sqlalchemy.*] +ignore_missing_imports = True diff --git a/parsl/tests/configs/htex_local_alternate.py b/parsl/tests/configs/htex_local_alternate.py index 52124211bc..1a52888e57 100644 --- a/parsl/tests/configs/htex_local_alternate.py +++ b/parsl/tests/configs/htex_local_alternate.py @@ -68,6 +68,3 @@ def fresh_config(): ), usage_tracking=True ) - - -config = fresh_config() diff --git a/parsl/tests/test_monitoring/__init__.py b/parsl/tests/test_monitoring/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/parsl/tests/test_monitoring/test_app_names.py b/parsl/tests/test_monitoring/test_app_names.py deleted file mode 100644 index 1c3b6fef11..0000000000 --- a/parsl/tests/test_monitoring/test_app_names.py +++ /dev/null @@ -1,86 +0,0 @@ -"""Tests monitoring records app name under various decoration patterns. -""" - -import os -import time - -import pytest - -import parsl -from parsl.tests.configs.htex_local_alternate import fresh_config - - -@parsl.python_app -def regular_decorated_app(): - return 5 - - -@pytest.mark.local -def get_regular_decorated_app(): - return regular_decorated_app - - -def for_decoration_later(): - return 77 - - -def get_for_decoration_later(): - return parsl.python_app(for_decoration_later) - - -def get_decorated_closure(): - - r = 53 - - @parsl.python_app - def decorated_closure(): - return r - - return decorated_closure - - -@pytest.mark.local -@pytest.mark.parametrize("get_app,expected_name,expected_result", - [(get_regular_decorated_app, "regular_decorated_app", 5), - (get_for_decoration_later, "for_decoration_later", 77), - (get_decorated_closure, "decorated_closure", 53) - ]) -def test_app_name(get_app, expected_name, expected_result, tmpd_cwd): - - # this is imported here rather than at module level because - # it isn't available in a plain parsl install, so this module - # would otherwise fail to import and break even a basic test - # run. - import sqlalchemy - - c = fresh_config() - c.run_dir = tmpd_cwd - c.monitoring.logging_endpoint = f"sqlite:///{tmpd_cwd}/monitoring.db" - parsl.load(c) - - app = get_app() - assert app().result() == expected_result - - parsl.dfk().cleanup() - - engine = sqlalchemy.create_engine(c.monitoring.logging_endpoint) - with engine.begin() as connection: - - def count_rows(table: str): - result = connection.execute(f"SELECT COUNT(*) FROM {table}") - (c, ) = result.first() - return c - - # one workflow... - assert count_rows("workflow") == 1 - - # ... with one task ... - assert count_rows("task") == 1 - - # ... that was tried once ... - assert count_rows("try") == 1 - - # ... and has the expected name. - result = connection.execute("SELECT task_func_name FROM task") - (c, ) = result.first() - assert c == expected_name diff --git a/parsl/tests/test_monitoring/test_basic.py b/parsl/tests/test_monitoring/test_basic.py deleted file mode 100644 index 1c792a9d82..0000000000 --- a/parsl/tests/test_monitoring/test_basic.py +++ /dev/null @@ -1,121 +0,0 @@ -import os -import time - -import pytest - -import parsl -from parsl import HighThroughputExecutor -from parsl.config import Config -from parsl.executors.taskvine import TaskVineExecutor, TaskVineManagerConfig -from parsl.monitoring import MonitoringHub - - -@parsl.python_app -def this_app(): - # this delay needs to be several times the resource monitoring - # period configured in the test configuration, so that some - # messages are actually sent - there is no guarantee that any - # (non-first) resource message will be sent at all for a short app. - time.sleep(3) - - return 5 - - -# The below fresh configs are for use in parametrization, and should return -# a configuration that is suitably configured for monitoring. - -def htex_config(): - """This config will use htex's default htex-specific monitoring radio mode""" - from parsl.tests.configs.htex_local_alternate import fresh_config - return fresh_config() - - -def htex_udp_config(): - """This config will force UDP""" - from parsl.tests.configs.htex_local_alternate import fresh_config - c = fresh_config() - assert len(c.executors) == 1 - - assert c.executors[0].radio_mode == "htex", "precondition: htex has a radio mode attribute, configured for htex radio" - c.executors[0].radio_mode = "udp" - - return c - - -def workqueue_config(): - from parsl.tests.configs.workqueue_ex import fresh_config - c = fresh_config() - c.monitoring = MonitoringHub( - hub_address="localhost", - resource_monitoring_interval=1) - return c - - -def taskvine_config(): - c = Config(executors=[TaskVineExecutor(manager_config=TaskVineManagerConfig(port=9000), - worker_launch_method='provider')], - - monitoring=MonitoringHub(hub_address="localhost", - resource_monitoring_interval=1)) - return c - - -@pytest.mark.local -@pytest.mark.parametrize("fresh_config", [htex_config, htex_udp_config, workqueue_config, taskvine_config]) -def test_row_counts(tmpd_cwd, fresh_config): - # this is imported here rather than at module level because - # it isn't available in a plain parsl install, so this module - # would otherwise fail to import and break even a basic test - # run. - import sqlalchemy - from sqlalchemy import text - - db_url = f"sqlite:///{tmpd_cwd}/monitoring.db" - - config = fresh_config() - config.run_dir = tmpd_cwd - config.monitoring.logging_endpoint = db_url - - with parsl.load(config): - assert this_app().result() == 5 - - # at this point, we should find one row in the monitoring database. - - engine = sqlalchemy.create_engine(db_url) - with engine.begin() as connection: - - result = connection.execute(text("SELECT COUNT(*) FROM workflow")) - (c, ) = result.first() - assert c == 1 - - result = connection.execute(text("SELECT COUNT(*) FROM task")) - (c, ) = result.first() - assert c == 1 - - result = connection.execute(text("SELECT COUNT(*) FROM try")) - (c, ) = result.first() - assert c == 1 - - result = connection.execute(text("SELECT COUNT(*) FROM status, try " - "WHERE status.task_id = try.task_id " - "AND status.task_status_name='exec_done' " - "AND task_try_time_running is NULL")) - (c, ) = result.first() - assert c == 0 - - if isinstance(config.executors[0], HighThroughputExecutor): - # The node table is specific to the HighThroughputExecutor - # Two entries: one showing manager active, one inactive - result = connection.execute(text("SELECT COUNT(*) FROM node")) - (c, ) = result.first() - assert c == 2 - - # There should be one block polling status - # local provider has a status_polling_interval of 5s - result = connection.execute(text("SELECT COUNT(*) FROM block")) - (c, ) = result.first() - assert c >= 2 - - result = connection.execute(text("SELECT COUNT(*) FROM resource")) - (c, ) = result.first() - assert c >= 1 diff --git a/parsl/tests/test_monitoring/test_db_locks.py b/parsl/tests/test_monitoring/test_db_locks.py deleted file mode 100644 index e1d821a40e..0000000000 --- a/parsl/tests/test_monitoring/test_db_locks.py +++ /dev/null @@ -1,88 +0,0 @@ -import logging -import os -import time - -import pytest - -import parsl - -logger = logging.getLogger(__name__) - - -@parsl.python_app -def this_app(): - return 5 - - -@pytest.mark.local -def test_row_counts(): - import sqlalchemy - from sqlalchemy import text - - from parsl.tests.configs.htex_local_alternate import fresh_config - if os.path.exists("runinfo/monitoring.db"): - logger.info("Monitoring database already exists - deleting") - os.remove("runinfo/monitoring.db") - - engine = sqlalchemy.create_engine("sqlite:///runinfo/monitoring.db") - - logger.info("loading parsl") - parsl.load(fresh_config()) - - # parsl.load() returns before all initialisation of monitoring - # is complete, which means it isn't safe to take a read lock on - # the database yet. This delay tries to work around that - some - # better async behaviour might be nice, but what? - # - # Taking a read lock before monitoring is initialized will cause - # a failure in the part of monitoring which creates tables, and - # which is not protected against read locks at the time this test - # was written. - time.sleep(10) - - # to get an sqlite3 read lock that is held over a controllable - # long time, create a transaction and perform a SELECT in it. - # The lock will be held until the end of the transaction. - # (see bottom of https://sqlite.org/lockingv3.html) - - logger.info("Getting a read lock on the monitoring database") - with engine.begin() as readlock_connection: - readlock_connection.execute(text("BEGIN TRANSACTION")) - result = readlock_connection.execute(text("SELECT COUNT(*) FROM workflow")) - (c, ) = result.first() - assert c == 1 - # now readlock_connection should have a read lock that will - # stay locked until the transaction is ended, or the with - # block ends. - - logger.info("invoking and waiting for result") - assert this_app().result() == 5 - - # there is going to be some raciness here making sure that - # the database manager actually tries to write while the - # read lock is held. I'm not sure if there is a better way - # to detect this other than a hopefully long-enough sleep. - time.sleep(10) - - logger.info("cleaning up parsl") - parsl.dfk().cleanup() - - # at this point, we should find data consistent with executing one - # task in the database. - - logger.info("checking database content") - with engine.begin() as connection: - - result = connection.execute(text("SELECT COUNT(*) FROM workflow")) - (c, ) = result.first() - assert c == 1 - - result = connection.execute(text("SELECT COUNT(*) FROM task")) - (c, ) = result.first() - assert c == 1 - - result = connection.execute(text("SELECT COUNT(*) FROM try")) - (c, ) = result.first() - assert c == 1 - - logger.info("all done") diff --git a/parsl/tests/test_monitoring/test_fuzz_zmq.py b/parsl/tests/test_monitoring/test_fuzz_zmq.py deleted file mode 100644 index 3f50385564..0000000000 --- a/parsl/tests/test_monitoring/test_fuzz_zmq.py +++ /dev/null @@ -1,108 +0,0 @@ -import logging -import os -import socket -import time - -import pytest -import zmq - -import parsl - -logger = logging.getLogger(__name__) - - -@parsl.python_app -def this_app(): - return 5 - - -@pytest.mark.local -def test_row_counts(): - import sqlalchemy - from sqlalchemy import text - - from parsl.tests.configs.htex_local_alternate import fresh_config - - if os.path.exists("runinfo/monitoring.db"): - logger.info("Monitoring database already exists - deleting") - os.remove("runinfo/monitoring.db") - - logger.info("loading parsl") - parsl.load(fresh_config()) - - logger.info("invoking apps and waiting for result") - - assert this_app().result() == 5 - assert this_app().result() == 5 - - # now we've run some apps, send fuzz into the monitoring ZMQ - # socket, before trying to run some more tests. - - # there are different kinds of fuzz: - # could send ZMQ messages that are weird - # could send random bytes to the TCP socket - # the latter is what i'm most suspicious of in my present investigation - - # dig out the interchange port... - hub_address = parsl.dfk().monitoring.hub_address - hub_zmq_port = parsl.dfk().monitoring.hub_zmq_port - - # this will send a string to a new socket connection - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.connect((hub_address, hub_zmq_port)) - s.sendall(b'fuzzing\r') - - context = zmq.Context() - channel_timeout = 10000 # in milliseconds - hub_channel = context.socket(zmq.DEALER) - hub_channel.setsockopt(zmq.LINGER, 0) - hub_channel.set_hwm(0) - hub_channel.setsockopt(zmq.SNDTIMEO, channel_timeout) - hub_channel.connect("tcp://{}:{}".format(hub_address, hub_zmq_port)) - - # this will send a non-object down the DFK's existing ZMQ connection - hub_channel.send(b'FuzzyByte\rSTREAM') - - # This following attack is commented out, because monitoring is not resilient - # to this. - # In practice, it works some of the time but in some circumstances, - # it would still abandon writing multiple unrelated records to the database, - # causing ongoing monitoring data loss. - - # This will send an unusual python object down the - # DFK's existing ZMQ connection. this doesn't break the router, - # but breaks the db_manager in a way that isn't reported until - # the very end of the run, and database writing is abandoned - # rather than completing, in this case. - # I'm unclear if this is a case we should be trying to handle. - # parsl.dfk().monitoring._dfk_channel.send_pyobj("FUZZ3") - - # hopefully long enough for any breakage to happen - # before attempting to run more tasks - time.sleep(5) - - assert this_app().result() == 5 - assert this_app().result() == 5 - - logger.info("cleaning up parsl") - parsl.dfk().cleanup() - - # at this point, we should find one row in the monitoring database. - - logger.info("checking database content") - engine = sqlalchemy.create_engine("sqlite:///runinfo/monitoring.db") - with engine.begin() as connection: - - result = connection.execute(text("SELECT COUNT(*) FROM workflow")) - (c, ) = result.first() - assert c == 1 - - result = connection.execute(text("SELECT COUNT(*) FROM task")) - (c, ) = result.first() - assert c == 4 - - result = connection.execute(text("SELECT COUNT(*) FROM try")) - (c, ) = result.first() - assert c == 4 - - logger.info("all done") diff --git a/parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py b/parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py deleted file mode 100644 index eb7a25003b..0000000000 --- a/parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py +++ /dev/null @@ -1,83 +0,0 @@ -import logging -import os -import time - -import pytest - -import parsl -from parsl.channels import LocalChannel -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.launchers import SimpleLauncher -from parsl.monitoring import MonitoringHub -from parsl.providers import LocalProvider - - -def fresh_config(run_dir, strategy, db_url): - return Config( - run_dir=os.fspath(run_dir), - executors=[ - HighThroughputExecutor( - label="htex_local", - cores_per_worker=1, - encrypted=True, - provider=LocalProvider( - channel=LocalChannel(), - init_blocks=1, - # min and max are set to 0 to ensure that we don't get - # a block from ongoing strategy scaling, only from - # init_blocks - min_blocks=0, - max_blocks=0, - launcher=SimpleLauncher(), - ), - ) - ], - strategy=strategy, - strategy_period=0.1, - monitoring=MonitoringHub( - hub_address="localhost", - hub_port=55055, - logging_endpoint=db_url - ) - ) - - -@parsl.python_app -def this_app(): - pass - - -@pytest.mark.local -@pytest.mark.parametrize("strategy", ('none', 'simple', 'htex_auto_scale')) -def test_row_counts(tmpd_cwd, strategy): - # this is imported here rather than at module level because - # it isn't available in a plain parsl install, so this module - # would otherwise fail to import and break even a basic test - # run. - import sqlalchemy - from sqlalchemy import text - - db_url = f"sqlite:///{tmpd_cwd}/monitoring.db" - with parsl.load(fresh_config(tmpd_cwd, strategy, db_url)): - dfk = parsl.dfk() - run_id = dfk.run_id - - this_app().result() - - engine = sqlalchemy.create_engine(db_url) - with engine.begin() as connection: - - binds = {"run_id": run_id} - - result = connection.execute(text("SELECT COUNT(DISTINCT block_id) FROM block WHERE run_id = :run_id"), binds) - (c, ) = result.first() - assert c == 1, "We should see a single block in this database" - - result = connection.execute(text("SELECT COUNT(*) FROM block WHERE block_id = 0 AND status = 'PENDING' AND run_id = :run_id"), binds) - (c, ) = result.first() - assert c == 1, "There should be a single pending status" - - result = connection.execute(text("SELECT COUNT(*) FROM block WHERE block_id = 0 AND status = 'CANCELLED' AND run_id = :run_id"), binds) - (c, ) = result.first() - assert c == 1, "There should be a single cancelled status" diff --git a/parsl/tests/test_monitoring/test_incomplete_futures.py b/parsl/tests/test_monitoring/test_incomplete_futures.py deleted file mode 100644 index c73d973150..0000000000 --- a/parsl/tests/test_monitoring/test_incomplete_futures.py +++ /dev/null @@ -1,66 +0,0 @@ -import logging -import os -import random -from concurrent.futures import Future - -import pytest - -import parsl - - -@parsl.python_app -def this_app(inputs=()): - return inputs[0] - - -@pytest.mark.local -def test_future_representation(tmpd_cwd): - import sqlalchemy - from sqlalchemy import text - - from parsl.tests.configs.htex_local_alternate import fresh_config - - monitoring_db = str(tmpd_cwd / "monitoring.db") - monitoring_url = "sqlite:///" + monitoring_db - - c = fresh_config() - c.monitoring.logging_endpoint = monitoring_url - c.run_dir = tmpd_cwd - - parsl.load(c) - - # we're going to pass this TOKEN into an app via a pre-requisite Future, - # and then expect to see it appear in the monitoring database. - TOKEN = random.randint(0, 1000000) - - # make a Future that has no result yet - # invoke a task that depends on it - # inspect and insert something about the monitoring recorded value of that Future - # make the Future complete - # inspect and insert something about the monitoring recorded value of that Future - - f1 = Future() - - f2 = this_app(inputs=[f1]) - - f1.set_result(TOKEN) - - assert f2.result() == TOKEN - - # this cleanup gives a barrier that allows the monitoring code to store - # everything it has in the database - without this, there's a race - # condition that the task will not have arrived in the database yet. - # A different approach for this test might be to poll the DB for a few - # seconds, with the assumption "data will arrive in the DB within - # 30 seconds, but probably much sooner". - parsl.dfk().cleanup() - - engine = sqlalchemy.create_engine(monitoring_url) - with engine.begin() as connection: - result = connection.execute(text("SELECT COUNT(*) FROM task")) - (task_count, ) = result.first() - assert task_count == 1 - - result = connection.execute(text("SELECT task_inputs FROM task")) - (task_inputs, ) = result.first() - assert task_inputs == "[" + repr(TOKEN) + "]" diff --git a/parsl/tests/test_monitoring/test_memoization_representation.py b/parsl/tests/test_monitoring/test_memoization_representation.py deleted file mode 100644 index 08d12bb113..0000000000 --- a/parsl/tests/test_monitoring/test_memoization_representation.py +++ /dev/null @@ -1,83 +0,0 @@ - -import logging -import os - -import pytest - -import parsl - -logger = logging.getLogger(__name__) - - -@parsl.python_app(cache=True) -def this_app(x): - return x + 1 - - -@pytest.mark.local -def test_hashsum(): - import sqlalchemy - from sqlalchemy import text - - from parsl.tests.configs.htex_local_alternate import fresh_config - - if os.path.exists("runinfo/monitoring.db"): - logger.info("Monitoring database already exists - deleting") - os.remove("runinfo/monitoring.db") - - logger.info("loading parsl") - parsl.load(fresh_config()) - - logger.info("invoking and waiting for result (1/4)") - f1 = this_app(4) - assert f1.result() == 5 - - logger.info("invoking and waiting for result (2/4)") - f2 = this_app(17) - assert f2.result() == 18 - - logger.info("invoking and waiting for result (3/4)") - f3 = this_app(4) - assert f3.result() == 5 - - logger.info("invoking and waiting for result (4/4)") - f4 = this_app(4) - assert f4.result() == 5 - - assert f1.task_record['hashsum'] == f3.task_record['hashsum'] - assert f1.task_record['hashsum'] == f4.task_record['hashsum'] - assert f1.task_record['hashsum'] != f2.task_record['hashsum'] - - logger.info("cleaning up parsl") - parsl.dfk().cleanup() - - # at this point, we should find one row in the monitoring database. - - logger.info("checking database content") - engine = sqlalchemy.create_engine("sqlite:///runinfo/monitoring.db") - with engine.begin() as connection: - - # we should have three tasks, but with only two tries, because the - # memo try should be missing - result = connection.execute(text("SELECT COUNT(*) FROM task")) - (task_count, ) = result.first() - assert task_count == 4 - - # this will check that the number of task rows for each hashsum matches the above app invocations - result = connection.execute(text(f"SELECT COUNT(task_hashsum) FROM task WHERE task_hashsum='{f1.task_record['hashsum']}'")) - (hashsum_count, ) = result.first() - assert hashsum_count == 3 - - result = connection.execute(text(f"SELECT COUNT(task_hashsum) FROM task WHERE task_hashsum='{f2.task_record['hashsum']}'")) - (hashsum_count, ) = result.first() - assert hashsum_count == 1 - - result = connection.execute(text("SELECT COUNT(*) FROM status WHERE task_status_name='exec_done'")) - (memo_count, ) = result.first() - assert memo_count == 2 - - result = connection.execute(text("SELECT COUNT(*) FROM status WHERE task_status_name='memo_done'")) - (memo_count, ) = result.first() - assert memo_count == 2 - - logger.info("all done") diff --git a/parsl/tests/test_monitoring/test_stdouterr.py b/parsl/tests/test_monitoring/test_stdouterr.py deleted file mode 100644 index d1817164c0..0000000000 --- a/parsl/tests/test_monitoring/test_stdouterr.py +++ /dev/null @@ -1,135 +0,0 @@ -"""Tests monitoring records app name under various decoration patterns. -""" - -import logging -import os -import re -import time -from typing import Union - -import pytest - -import parsl -from parsl.config import Config -from parsl.data_provider.data_manager import default_staging -from parsl.data_provider.files import File -from parsl.data_provider.staging import Staging -from parsl.executors import HighThroughputExecutor -from parsl.monitoring import MonitoringHub -from parsl.providers import LocalProvider - - -def fresh_config(run_dir): - return Config( - run_dir=str(run_dir), - executors=[ - HighThroughputExecutor( - address="127.0.0.1", - label="htex_Local", - provider=LocalProvider( - init_blocks=1, - min_blocks=1, - max_blocks=1, - ) - ) - ], - strategy='simple', - strategy_period=0.1, - monitoring=MonitoringHub( - hub_address="localhost", - hub_port=55055, - ) - ) - - -@parsl.python_app -def stdapp(stdout=None, stderr=None): - pass - - -class ArbitraryPathLike(os.PathLike): - def __init__(self, path: Union[str, bytes]) -> None: - self.path = path - - def __fspath__(self) -> Union[str, bytes]: - return self.path - - -class ArbitraryStaging(Staging): - """This staging provider will not actually do any staging, but will - accept arbitrary: scheme URLs. That's enough for this monitoring test - which doesn't need any actual stage out action to happen. - """ - def can_stage_out(self, file): - return file.scheme == "arbitrary" - - -@pytest.mark.local -@pytest.mark.parametrize('stdx,expected_stdx', - [('hello.txt', 'hello.txt'), - (None, ''), - (('tuple.txt', 'w'), 'tuple.txt'), - (ArbitraryPathLike('pl.txt'), 'pl.txt'), - (ArbitraryPathLike(b'pl2.txt'), 'pl2.txt'), - ((ArbitraryPathLike('pl3.txt'), 'w'), 'pl3.txt'), - ((ArbitraryPathLike(b'pl4.txt'), 'w'), 'pl4.txt'), - (parsl.AUTO_LOGNAME, - lambda p: - isinstance(p, str) and - os.path.isabs(p) and - re.match("^.*/task_0000_stdapp\\.std...$", p)), - (File("arbitrary:abc123"), "arbitrary:abc123"), - (File("file:///tmp/pl5"), "file:///tmp/pl5"), - ]) -@pytest.mark.parametrize('stream', ['stdout', 'stderr']) -def test_stdstream_to_monitoring(stdx, expected_stdx, stream, tmpd_cwd, caplog): - """This tests that various forms of stdout/err specification are - represented in monitoring correctly. The stderr and stdout codepaths - are generally duplicated, rather than factorised, and so this test - runs the same tests on both stdout and stderr. - """ - - # this is imported here rather than at module level because - # it isn't available in a plain parsl install, so this module - # would otherwise fail to import and break even a basic test - # run. - import sqlalchemy - - c = fresh_config(tmpd_cwd) - c.monitoring.logging_endpoint = f"sqlite:///{tmpd_cwd}/monitoring.db" - c.executors[0].storage_access = default_staging + [ArbitraryStaging()] - - with parsl.load(c): - kwargs = {stream: stdx} - stdapp(**kwargs).result() - - engine = sqlalchemy.create_engine(c.monitoring.logging_endpoint) - with engine.begin() as connection: - - def count_rows(table: str): - result = connection.execute(f"SELECT COUNT(*) FROM {table}") - (c, ) = result.first() - return c - - # one workflow... - assert count_rows("workflow") == 1 - - # ... with one task ... - assert count_rows("task") == 1 - - # ... that was tried once ... - assert count_rows("try") == 1 - - # ... and has the expected name. - result = connection.execute(f"SELECT task_{stream} FROM task") - (c, ) = result.first() - - if isinstance(expected_stdx, str): - assert c == expected_stdx - elif callable(expected_stdx): - assert expected_stdx(c) - else: - raise RuntimeError("Bad expected_stdx value") - - for record in caplog.records: - assert record.levelno < logging.ERROR diff --git a/parsl/tests/test_monitoring/test_viz_colouring.py b/parsl/tests/test_monitoring/test_viz_colouring.py deleted file mode 100644 index 5d7d82f9f3..0000000000 --- a/parsl/tests/test_monitoring/test_viz_colouring.py +++ /dev/null @@ -1,18 +0,0 @@ -import pytest - -from parsl.dataflow.states import States - - -@pytest.mark.local -def test_all_states_colored() -> None: - """This checks that the coloring tables in parsl-visualize contain - a color for each state defined in the task state enumeration. - """ - - # imports inside test because viz can't be imported in an environment - # with no monitoring installed - import parsl.monitoring.visualization.plots.default.workflow_plots as workflow_plots - - for s in States: - assert s.name in workflow_plots.gantt_colors - assert s.name in workflow_plots.dag_state_colors diff --git a/parsl/tests/test_shutdown/test_kill_monitoring.py b/parsl/tests/test_shutdown/test_kill_monitoring.py deleted file mode 100644 index 0fb56f08d8..0000000000 --- a/parsl/tests/test_shutdown/test_kill_monitoring.py +++ /dev/null @@ -1,64 +0,0 @@ -import os -import signal -import time - -import pytest - -import parsl -from parsl.tests.configs.htex_local_alternate import fresh_config - -# This is a very generous upper bound on expected shutdown time of target -# process after receiving a signal, measured in seconds. -PERMITTED_SHUTDOWN_TIME_S = 60 - - -@parsl.python_app -def simple_app(): - return True - - -@pytest.mark.local -def test_no_kills(): - """This tests that we can create a monitoring-enabled DFK and shut it down.""" - - parsl.load(fresh_config()) - - assert parsl.dfk().monitoring is not None, "This test requires monitoring" - - parsl.dfk().cleanup() - - -@pytest.mark.local -@pytest.mark.parametrize("sig", [signal.SIGINT, signal.SIGTERM, signal.SIGKILL, signal.SIGQUIT]) -@pytest.mark.parametrize("process_attr", ["router_proc", "dbm_proc"]) -def test_kill_monitoring_helper_process(sig, process_attr, try_assert): - """This tests that we can kill a monitoring process and still have successful shutdown. - SIGINT emulates some racy behaviour when ctrl-C is pressed: that - monitoring processes receive a ctrl-C too, and so the other processes - need to be tolerant to monitoring processes arbitrarily exiting. - """ - - parsl.load(fresh_config()) - - dfk = parsl.dfk() - - assert dfk.monitoring is not None, "Monitoring required" - - target_proc = getattr(dfk.monitoring, process_attr) - - assert target_proc is not None, "prereq: target process must exist" - assert target_proc.is_alive(), "prereq: target process must be alive" - - target_pid = target_proc.pid - assert target_pid is not None, "prereq: target process must have a pid" - - os.kill(target_pid, sig) - - try_assert(lambda: not target_proc.is_alive(), timeout_ms=PERMITTED_SHUTDOWN_TIME_S * 1000) - - # now we have broken one piece of the monitoring system, do some app - # execution and then shut down. - - simple_app().result() - - parsl.dfk().cleanup() diff --git a/test-requirements.txt b/test-requirements.txt index acd670b5e9..3fc640f23b 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -17,8 +17,9 @@ mpi4py # sqlalchemy is needed for typechecking, so it's here # as well as at runtime for optional monitoring execution # (where it's specified in setup.py) -sqlalchemy>=1.4,<2 -sqlalchemy2-stubs +# disabled for greenlet +# sqlalchemy>=1.4,<2 +# sqlalchemy2-stubs Sphinx==4.5.0 twine