diff --git a/.github/workflows/tests_ci.yml b/.github/workflows/tests_ci.yaml similarity index 65% rename from .github/workflows/tests_ci.yml rename to .github/workflows/tests_ci.yaml index f0108e24..60f46015 100644 --- a/.github/workflows/tests_ci.yml +++ b/.github/workflows/tests_ci.yaml @@ -42,14 +42,27 @@ jobs: ./scripts/mypy.sh - name: Run unit tests + # Unit tests are defined as tests which don't require any external systems to be running. run: | . "$HOME/.cargo/env" - python scripts/run_unit_tests.py + ./scripts/run_unit_tests.sh - name: Run integration tests - # Delete the workspace. Run once with a clean workspace. Run again from the existing workspace. + # Integration tests do require external systems to be running (most commonly a database instance). + # Unlike end-to-end tests though, they test a specific module in a detailed manner, much like a unit test does. + # + # We set `INTENDED_DBDATA_HARDWARE` so that it's seen when `integtest_pg_conn.py` executes `./tune/env/set_up_env_integtests.sh`. + run: | + . "$HOME/.cargo/env" + export INTENDED_DBDATA_HARDWARE=ssd + ./scripts/run_integration_tests.sh + + - name: Run end-to-end tests + # End-to-end tests are like integration tests in that they require external systems to be running. + # Unlike integration tests though, they don't perform detailed checks for any individual module. + # # Note that we need to run with a non-root user in order to start Postgres. This is configured in the .yaml # file for our self-hosted GHA runners. run: | . "$HOME/.cargo/env" - python -m scripts.run_protox_integration_test ssd + python -m scripts.run_protox_e2e_test ssd diff --git a/analyze/tests/test_analyze.py b/analyze/tests/unittest_analyze.py similarity index 100% rename from analyze/tests/test_analyze.py rename to analyze/tests/unittest_analyze.py diff --git a/dbms/__init__.py b/dbms/__init__.py index e69de29b..57ce106a 100644 --- a/dbms/__init__.py +++ b/dbms/__init__.py @@ -0,0 +1,2 @@ +# This folder contains code for building DBMSs. +# It should not be confused with code that uses DBMSs (e.g. those in tune/env/). diff --git a/dbms/postgres/cli.py b/dbms/postgres/cli.py index dc2a6f5b..41d7e2e4 100644 --- a/dbms/postgres/cli.py +++ b/dbms/postgres/cli.py @@ -289,6 +289,11 @@ def _load_into_dbdata( sql_file_execute(dbgym_cfg, conn, constraints_fpath) +# The start and stop functions slightly duplicate functionality from pg_conn.py. However, I chose to do it this way +# because what the `dbms` CLI needs in terms of starting and stopping Postgres is much simpler than what an agent +# that is tuning the database needs. Because these functions are so simple, I think it's okay to leave them here +# even though they are a little redundant. It seems better than making `dbms` depend on the behavior of the +# tuning environment. def start_postgres( dbgym_cfg: DBGymConfig, pgbin_path: Path, dbdata_dpath: Path ) -> None: diff --git a/manage/tests/test_clean.py b/manage/tests/unittest_clean.py similarity index 100% rename from manage/tests/test_clean.py rename to manage/tests/unittest_clean.py diff --git a/scripts/e2e_test_dbgym_config.yaml b/scripts/e2e_test_dbgym_config.yaml new file mode 100644 index 00000000..b9d5bc00 --- /dev/null +++ b/scripts/e2e_test_dbgym_config.yaml @@ -0,0 +1,3 @@ +dbgym_workspace_path: ../dbgym_e2etest_workspace +boot_redis_port: 7379 +ray_gcs_port: 7380 \ No newline at end of file diff --git a/scripts/integtest_dbgym_config.yaml b/scripts/integtest_dbgym_config.yaml deleted file mode 100644 index 14a1063a..00000000 --- a/scripts/integtest_dbgym_config.yaml +++ /dev/null @@ -1,3 +0,0 @@ -dbgym_workspace_path: ../dbgym_integtest_workspace -boot_redis_port: 7379 -ray_gcs_port: 7380 \ No newline at end of file diff --git a/scripts/run_integration_tests.sh b/scripts/run_integration_tests.sh new file mode 100755 index 00000000..99ccfe8e --- /dev/null +++ b/scripts/run_integration_tests.sh @@ -0,0 +1,2 @@ +#!/bin/bash +python -m scripts.run_tests "integtest_*.py" \ No newline at end of file diff --git a/scripts/run_protox_e2e_test.py b/scripts/run_protox_e2e_test.py new file mode 100644 index 00000000..bee9f597 --- /dev/null +++ b/scripts/run_protox_e2e_test.py @@ -0,0 +1,177 @@ +import os +import shutil +import subprocess +import sys +from enum import Enum, auto +from pathlib import Path + +import yaml + +from util.pg import get_is_postgres_running +from util.workspace import ( + default_embedder_path, + default_hpoed_agent_params_path, + default_pristine_dbdata_snapshot_path, + default_replay_data_fpath, + default_repo_path, + default_tables_path, + default_traindata_path, + default_tuning_steps_dpath, + default_workload_path, + workload_name_fn, +) + +# Be careful when changing these constants. In some places, the E2E test is hardcoded to work for these specific constants. +DBMS = "postgres" +AGENT = "protox" +BENCHMARK = "tpch" +SCALE_FACTOR = 0.01 +E2ETEST_DBGYM_CONFIG_FPATH = Path("scripts/e2e_test_dbgym_config.yaml") + + +def get_workspace_dpath(config_fpath: Path) -> Path: + with open(config_fpath, "r") as file: + config = yaml.safe_load(file) + return Path(config.get("dbgym_workspace_path")) + + +def clear_workspace(workspace_dpath: Path) -> None: + actual_workspace_dpath = Path("../dbgym_workspace") + if workspace_dpath.exists(): + if actual_workspace_dpath.exists(): + assert not workspace_dpath.samefile( + actual_workspace_dpath + ), "YOU MAY BE ABOUT TO DELETE YOUR ACTUAL WORKSPACE" + shutil.rmtree(workspace_dpath) + + +class Stage(Enum): + Tables = auto() + Workload = auto() + DBRepo = auto() + DBData = auto() + EmbeddingData = auto() + EmbeddingModel = auto() + TuneHPO = auto() + TuneTune = auto() + Replay = auto() + + +# When debugging the E2E test, this gives you an easy way of turning off certain stages to speed up your iteration cycle. +# +# I made this slightly convoluted system is because you can't just naively comment out a big chunk of code with all the stages +# you don't want to run. Many stages define variables that are used by future stages, which can't be commented out. +# +# One useful debugging workflow is to run all stages up until a point, make a copy of that workspace, and then rerun the +# integration test as many times as you want starting from that copy. +ALL_STAGES = {stage for stage in Stage} +# This is a set and not a list because the order of stages is already pre-defined. This just defines what not to skip. +STAGES_TO_RUN = ALL_STAGES + + +if __name__ == "__main__": + intended_dbdata_hardware = sys.argv[1] if len(sys.argv) > 1 else "hdd" + + # Set the config file so that we use resources that don't conflict with normal usage (e.g. a different workspace, different ports, etc.). + os.environ["DBGYM_CONFIG_PATH"] = str(E2ETEST_DBGYM_CONFIG_FPATH) + + # Clear the E2E testing workspace so we always run the test with a clean slate. + workspace_dpath = get_workspace_dpath(E2ETEST_DBGYM_CONFIG_FPATH) + clear_workspace(workspace_dpath) + + # Make other checks that we have a clean slate for testing. + assert not get_is_postgres_running() + + # Run the full Proto-X training pipeline, asserting things along the way + # Setup (workload and database) + tables_dpath = default_tables_path(workspace_dpath, BENCHMARK, SCALE_FACTOR) + if Stage.Tables in STAGES_TO_RUN: + assert not tables_dpath.exists() + subprocess.run( + f"python task.py benchmark {BENCHMARK} data {SCALE_FACTOR}".split(), + check=True, + ) + assert tables_dpath.exists() + + workload_name = workload_name_fn(SCALE_FACTOR, 15721, 15721, "all") + workload_dpath = default_workload_path(workspace_dpath, BENCHMARK, workload_name) + if Stage.Workload in STAGES_TO_RUN: + assert not workload_dpath.exists() + subprocess.run( + f"python task.py benchmark {BENCHMARK} workload --scale-factor {SCALE_FACTOR}".split(), + check=True, + ) + assert workload_dpath.exists() + + repo_dpath = default_repo_path(workspace_dpath) + if Stage.DBRepo in STAGES_TO_RUN: + assert not repo_dpath.exists() + subprocess.run(f"python task.py dbms {DBMS} build".split(), check=True) + assert repo_dpath.exists() + + pristine_dbdata_snapshot_fpath = default_pristine_dbdata_snapshot_path( + workspace_dpath, BENCHMARK, SCALE_FACTOR + ) + if Stage.DBData in STAGES_TO_RUN: + assert not pristine_dbdata_snapshot_fpath.exists() + subprocess.run( + f"python task.py dbms {DBMS} dbdata {BENCHMARK} --scale-factor {SCALE_FACTOR} --intended-dbdata-hardware {intended_dbdata_hardware}".split(), + check=True, + ) + assert pristine_dbdata_snapshot_fpath.exists() + + # Tuning (embedding, HPO, and actual tuning) + traindata_dpath = default_traindata_path(workspace_dpath, BENCHMARK, workload_name) + if Stage.EmbeddingData in STAGES_TO_RUN: + assert not traindata_dpath.exists() + subprocess.run( + f"python task.py tune {AGENT} embedding datagen {BENCHMARK} --scale-factor {SCALE_FACTOR} --override-sample-limits lineitem,32768 --intended-dbdata-hardware {intended_dbdata_hardware}".split(), + check=True, + ) + assert traindata_dpath.exists() + + embedder_dpath = default_embedder_path(workspace_dpath, BENCHMARK, workload_name) + if Stage.EmbeddingModel in STAGES_TO_RUN: + assert not embedder_dpath.exists() + subprocess.run( + f"python task.py tune {AGENT} embedding train {BENCHMARK} --scale-factor {SCALE_FACTOR} --iterations-per-epoch 1 --num-points-to-sample 1 --num-batches 1 --batch-size 64 --start-epoch 15 --num-samples 4 --train-max-concurrent 4 --num-curate 2".split(), + check=True, + ) + assert embedder_dpath.exists() + + hpoed_agent_params_fpath = default_hpoed_agent_params_path( + workspace_dpath, BENCHMARK, workload_name + ) + if Stage.TuneHPO in STAGES_TO_RUN: + assert not hpoed_agent_params_fpath.exists() + subprocess.run( + f"python task.py tune {AGENT} agent hpo {BENCHMARK} --scale-factor {SCALE_FACTOR} --num-samples 2 --max-concurrent 2 --workload-timeout 15 --query-timeout 1 --tune-duration-during-hpo 0.01 --intended-dbdata-hardware {intended_dbdata_hardware}".split(), + check=True, + ) + assert hpoed_agent_params_fpath.exists() + + tuning_steps_dpath = default_tuning_steps_dpath( + workspace_dpath, BENCHMARK, workload_name, False + ) + if Stage.TuneTune in STAGES_TO_RUN: + assert not tuning_steps_dpath.exists() + subprocess.run( + f"python task.py tune {AGENT} agent tune {BENCHMARK} --scale-factor {SCALE_FACTOR}".split(), + check=True, + ) + assert tuning_steps_dpath.exists() + + # Post-training (replay and analysis) + replay_data_fpath = default_replay_data_fpath( + workspace_dpath, BENCHMARK, workload_name, False + ) + if Stage.Replay in STAGES_TO_RUN: + assert not replay_data_fpath.exists() + subprocess.run( + f"python3 task.py tune {AGENT} agent replay {BENCHMARK} --scale-factor {SCALE_FACTOR}".split(), + check=True, + ) + assert replay_data_fpath.exists() + + # Clear it at the end as well to avoid leaving artifacts. + clear_workspace(workspace_dpath) diff --git a/scripts/run_protox_integration_test.py b/scripts/run_protox_integration_test.py deleted file mode 100644 index 09f7d9c3..00000000 --- a/scripts/run_protox_integration_test.py +++ /dev/null @@ -1,169 +0,0 @@ -import os -import re -import shutil -import subprocess -import sys -from pathlib import Path - -import yaml - -from util.workspace import ( - default_embedder_path, - default_hpoed_agent_params_path, - default_pristine_dbdata_snapshot_path, - default_replay_data_fpath, - default_repo_path, - default_tables_path, - default_traindata_path, - default_tuning_steps_dpath, - default_workload_path, - get_latest_run_path_from_workspace_path, - workload_name_fn, -) - -# TODO: add check for the tfevents file and run tfevents analyze in integtest - - -# Be careful when changing these constants. The integration test is hardcoded to work for these specific constants. -DBMS = "postgres" -AGENT = "protox" -BENCHMARK = "tpch" -SCALE_FACTOR = 0.01 -INTEGTEST_DBGYM_CONFIG_FPATH = Path("scripts/integtest_dbgym_config.yaml") - - -def get_workspace_dpath(config_fpath: Path) -> Path: - with open(config_fpath, "r") as file: - config = yaml.safe_load(file) - return Path(config.get("dbgym_workspace_path")) - - -def clear_workspace(workspace_dpath: Path) -> None: - actual_workspace_dpath = Path("../dbgym_workspace") - if workspace_dpath.exists(): - if actual_workspace_dpath.exists(): - assert not workspace_dpath.samefile( - actual_workspace_dpath - ), "YOU MAY BE ABOUT TO DELETE YOUR ACTUAL WORKSPACE" - shutil.rmtree(workspace_dpath) - - -if __name__ == "__main__": - intended_dbdata_hardware = sys.argv[1] if len(sys.argv) > 1 else "hdd" - - # Set the config file so that we use resources that don't conflict with normal usage (e.g. a different workspace, different ports, etc.). - os.environ["DBGYM_CONFIG_PATH"] = str(INTEGTEST_DBGYM_CONFIG_FPATH) - - # Clear the integration testing workspace so we always run the test with a clean slate. - workspace_dpath = get_workspace_dpath(INTEGTEST_DBGYM_CONFIG_FPATH) - clear_workspace(workspace_dpath) - - # Run the full Proto-X training pipeline, asserting things along the way - # Setup (workload and database) - tables_dpath = default_tables_path(workspace_dpath, BENCHMARK, SCALE_FACTOR) - assert not tables_dpath.exists() - subprocess.run( - f"python task.py benchmark {BENCHMARK} data {SCALE_FACTOR}".split(), check=True - ) - assert tables_dpath.exists() - - workload_name = workload_name_fn(SCALE_FACTOR, 15721, 15721, "all") - workload_dpath = default_workload_path(workspace_dpath, BENCHMARK, workload_name) - assert not workload_dpath.exists() - subprocess.run( - f"python task.py benchmark {BENCHMARK} workload --scale-factor {SCALE_FACTOR}".split(), - check=True, - ) - assert workload_dpath.exists() - - repo_dpath = default_repo_path(workspace_dpath) - assert not repo_dpath.exists() - subprocess.run(f"python task.py dbms {DBMS} build".split(), check=True) - assert repo_dpath.exists() - - pristine_dbdata_snapshot_fpath = default_pristine_dbdata_snapshot_path( - workspace_dpath, BENCHMARK, SCALE_FACTOR - ) - assert not pristine_dbdata_snapshot_fpath.exists() - subprocess.run( - f"python task.py dbms {DBMS} dbdata {BENCHMARK} --scale-factor {SCALE_FACTOR} --intended-dbdata-hardware {intended_dbdata_hardware}".split(), - check=True, - ) - assert pristine_dbdata_snapshot_fpath.exists() - - # Training (embedding and tuning) - traindata_dpath = default_traindata_path(workspace_dpath, BENCHMARK, workload_name) - assert not traindata_dpath.exists() - subprocess.run( - f"python task.py tune {AGENT} embedding datagen {BENCHMARK} --scale-factor {SCALE_FACTOR} --override-sample-limits lineitem,32768 --intended-dbdata-hardware {intended_dbdata_hardware}".split(), - check=True, - ) - assert traindata_dpath.exists() - - embedder_dpath = default_embedder_path(workspace_dpath, BENCHMARK, workload_name) - assert not embedder_dpath.exists() - subprocess.run( - f"python task.py tune {AGENT} embedding train {BENCHMARK} --scale-factor {SCALE_FACTOR} --iterations-per-epoch 1 --num-points-to-sample 1 --num-batches 1 --batch-size 64 --start-epoch 15 --num-samples 4 --train-max-concurrent 4 --num-curate 2".split(), - check=True, - ) - assert embedder_dpath.exists() - - hpoed_agent_params_fpath = default_hpoed_agent_params_path( - workspace_dpath, BENCHMARK, workload_name - ) - assert not hpoed_agent_params_fpath.exists() - subprocess.run( - f"python task.py tune {AGENT} agent hpo {BENCHMARK} --scale-factor {SCALE_FACTOR} --num-samples 2 --max-concurrent 2 --workload-timeout 15 --query-timeout 1 --tune-duration-during-hpo 0.01 --intended-dbdata-hardware {intended_dbdata_hardware}".split(), - check=True, - ) - assert hpoed_agent_params_fpath.exists() - - tuning_steps_dpath = default_tuning_steps_dpath( - workspace_dpath, BENCHMARK, workload_name, False - ) - assert not tuning_steps_dpath.exists() - subprocess.run( - f"python task.py tune {AGENT} agent tune {BENCHMARK} --scale-factor {SCALE_FACTOR}".split(), - check=True, - ) - tboard_dpath = ( - get_latest_run_path_from_workspace_path(workspace_dpath) - / "dbgym_tune_protox_agent" - / "artifacts" - / "tboard" - ) - assert tuning_steps_dpath.exists() - tboard_dpath = ( - tboard_dpath.resolve() - ) # Resolve it since latest_run.link will be updated by future runs. - assert tboard_dpath.exists() - pattern = re.compile(r"events\.out\.tfevents.*") - tfevents_fpaths = [ - file - for file in tboard_dpath.iterdir() - if file.is_file() and pattern.search(file.name) - ] - assert ( - len(tfevents_fpaths) == 1 - ), "There should only be one .tfevents file because this is a tuning run, not an HPO run." - tfevents_fpath = tfevents_fpaths[0] - assert tfevents_fpath.exists() - - # Post-training (replay and analysis) - replay_data_fpath = default_replay_data_fpath( - workspace_dpath, BENCHMARK, workload_name, False - ) - assert not replay_data_fpath.exists() - subprocess.run( - f"python3 task.py tune {AGENT} agent replay {BENCHMARK} --scale-factor {SCALE_FACTOR}".split(), - check=True, - ) - assert replay_data_fpath.exists() - - subprocess.run( - f"python3 task.py analyze tfevents {tfevents_fpath}".split(), - check=True, - ) - - # Clear it at the end as well to avoid leaving artifacts. - clear_workspace(workspace_dpath) diff --git a/scripts/run_unit_tests.py b/scripts/run_tests.py similarity index 89% rename from scripts/run_unit_tests.py rename to scripts/run_tests.py index 988681d6..5763f7e1 100644 --- a/scripts/run_unit_tests.py +++ b/scripts/run_tests.py @@ -10,7 +10,7 @@ if __name__ == "__main__": loader = unittest.TestLoader() - suite = loader.discover(".") + suite = loader.discover(".", pattern=sys.argv[1]) runner = unittest.TextTestRunner() result = runner.run(suite) if not result.wasSuccessful(): diff --git a/scripts/run_unit_tests.sh b/scripts/run_unit_tests.sh new file mode 100755 index 00000000..72714776 --- /dev/null +++ b/scripts/run_unit_tests.sh @@ -0,0 +1,2 @@ +#!/bin/bash +python -m scripts.run_tests "unittest_*.py" \ No newline at end of file diff --git a/tune/env/__init__.py b/tune/env/__init__.py new file mode 100644 index 00000000..d0f3ccad --- /dev/null +++ b/tune/env/__init__.py @@ -0,0 +1,4 @@ +# This folder contains code for managing the environment (aka the DBMS) that is shared across all tuning agents. +# Even though it is a folder in tune/, it in itself is not a tuning agent. +# The difference between this and dbms/ is that dbms/ is the CLI to build the database while this is code to use the database. +# The reason this is not a top-level directory is because the environment in itself is not a CLI command. diff --git a/tune/env/env_integtests_dbgym_config.yaml b/tune/env/env_integtests_dbgym_config.yaml new file mode 100644 index 00000000..0de54f97 --- /dev/null +++ b/tune/env/env_integtests_dbgym_config.yaml @@ -0,0 +1 @@ +dbgym_workspace_path: ../dbgym_env_integtest_workspace/ diff --git a/tune/env/integtest_pg_conn.py b/tune/env/integtest_pg_conn.py new file mode 100644 index 00000000..e4f356af --- /dev/null +++ b/tune/env/integtest_pg_conn.py @@ -0,0 +1,116 @@ +import subprocess +import unittest +from pathlib import Path + +import yaml + +from tune.env.pg_conn import PostgresConn +from util.pg import get_is_postgres_running, get_running_postgres_ports +from util.workspace import ( + DEFAULT_BOOT_CONFIG_FPATH, + DBGymConfig, + default_dbdata_parent_dpath, + default_pgbin_path, + default_pristine_dbdata_snapshot_path, +) + +ENV_INTEGTESTS_DBGYM_CONFIG_FPATH = Path("tune/env/env_integtests_dbgym_config.yaml") +BENCHMARK = "tpch" +SCALE_FACTOR = 0.01 +BASE_PGPORT = 5432 + + +def get_unittest_workspace_path() -> Path: + with open(ENV_INTEGTESTS_DBGYM_CONFIG_FPATH) as f: + return Path(yaml.safe_load(f)["dbgym_workspace_path"]) + assert False + + +class PostgresConnTests(unittest.TestCase): + dbgym_cfg: DBGymConfig + + @staticmethod + def setUpClass() -> None: + # If you're running the test locally, this check makes runs past the first one much faster. + if not get_unittest_workspace_path().exists(): + subprocess.run(["./tune/env/set_up_env_integtests.sh"], check=True) + + PostgresConnTests.dbgym_cfg = DBGymConfig(ENV_INTEGTESTS_DBGYM_CONFIG_FPATH) + + def setUp(self) -> None: + self.assertFalse( + get_is_postgres_running(), + "Make sure Postgres isn't running before starting the integration test. `pkill postgres` is one way" + + "to ensure this. Be careful about accidentally taking down other people's Postgres instances though.", + ) + self.pristine_dbdata_snapshot_path = default_pristine_dbdata_snapshot_path( + self.dbgym_cfg.dbgym_workspace_path, BENCHMARK, SCALE_FACTOR + ) + self.dbdata_parent_dpath = default_dbdata_parent_dpath( + self.dbgym_cfg.dbgym_workspace_path + ) + self.pgbin_dpath = default_pgbin_path(self.dbgym_cfg.dbgym_workspace_path) + + def tearDown(self) -> None: + self.assertFalse(get_is_postgres_running()) + + def create_pg_conn(self, pgport: int = BASE_PGPORT) -> PostgresConn: + return PostgresConn( + PostgresConnTests.dbgym_cfg, + pgport, + self.pristine_dbdata_snapshot_path, + self.dbdata_parent_dpath, + self.pgbin_dpath, + False, + DEFAULT_BOOT_CONFIG_FPATH, + ) + + def test_init(self) -> None: + _ = self.create_pg_conn() + + def test_start_and_stop(self) -> None: + pg_conn = self.create_pg_conn() + pg_conn.restore_pristine_snapshot() + pg_conn.start_with_changes() + self.assertTrue(get_is_postgres_running()) + pg_conn.shutdown_postgres() + + def test_start_on_multiple_ports(self) -> None: + pg_conn0 = self.create_pg_conn() + pg_conn0.restore_pristine_snapshot() + pg_conn0.start_with_changes() + self.assertEqual(set(get_running_postgres_ports()), {BASE_PGPORT}) + pg_conn1 = self.create_pg_conn(BASE_PGPORT + 1) + pg_conn1.restore_pristine_snapshot() + pg_conn1.start_with_changes() + self.assertEqual( + set(get_running_postgres_ports()), {BASE_PGPORT, BASE_PGPORT + 1} + ) + + # Clean up + pg_conn0.shutdown_postgres() + pg_conn1.shutdown_postgres() + + def test_connect_and_disconnect(self) -> None: + # Setup + pg_conn = self.create_pg_conn() + pg_conn.restore_pristine_snapshot() + pg_conn.start_with_changes() + + # Test + self.assertIsNone(pg_conn._conn) + conn = pg_conn.conn() + self.assertIsNotNone(conn) + self.assertIs( + conn, pg_conn._conn + ) # The conn should be cached so these objects should be the same + self.assertIs(conn, pg_conn.conn()) # Same thing here + pg_conn.disconnect() + self.assertIsNone(pg_conn._conn) + + # Cleanup + pg_conn.shutdown_postgres() + + +if __name__ == "__main__": + unittest.main() diff --git a/tune/protox/env/util/pg_conn.py b/tune/env/pg_conn.py similarity index 94% rename from tune/protox/env/util/pg_conn.py rename to tune/env/pg_conn.py index d91e46d4..cd573af8 100644 --- a/tune/protox/env/util/pg_conn.py +++ b/tune/env/pg_conn.py @@ -22,18 +22,16 @@ from plumbum import local from psycopg.errors import ProgramLimitExceeded, QueryCanceled -from tune.protox.env.artifact_manager import ArtifactManager, time_record from util.log import DBGYM_LOGGER_NAME -from util.pg import ( - DBGYM_POSTGRES_DBNAME, - DBGYM_POSTGRES_PASS, - DBGYM_POSTGRES_USER, - SHARED_PRELOAD_LIBRARIES, -) -from util.workspace import DBGymConfig, link_result, open_and_save, parent_dpath_of_path +from util.pg import DBGYM_POSTGRES_DBNAME, SHARED_PRELOAD_LIBRARIES, get_kv_connstr +from util.workspace import DBGymConfig, open_and_save, parent_dpath_of_path + +DEFAULT_CONNECT_TIMEOUT = 300 class PostgresConn: + # The reason that PostgresConn takes in all these paths (e.g. `pgbin_path`) is so that + # it's fully decoupled from how the files are organized in the workspace. def __init__( self, dbgym_cfg: DBGymConfig, @@ -41,10 +39,9 @@ def __init__( pristine_dbdata_snapshot_fpath: Path, dbdata_parent_dpath: Path, pgbin_path: Union[str, Path], - connect_timeout: int, enable_boot: bool, boot_config_fpath: Path, - artifact_manager: ArtifactManager, + connect_timeout: int = DEFAULT_CONNECT_TIMEOUT, ) -> None: self.dbgym_cfg = dbgym_cfg @@ -54,7 +51,6 @@ def __init__( self.enable_boot = enable_boot self.boot_config_fpath = boot_config_fpath self.log_step = 0 - self.artifact_manager = artifact_manager # All the paths related to dbdata # pristine_dbdata_snapshot_fpath is the .tgz snapshot that represents the starting state @@ -75,13 +71,13 @@ def __init__( self._conn: Optional[psycopg.Connection[Any]] = None - def get_connstr(self) -> str: - return f"host=localhost port={self.pgport} user={DBGYM_POSTGRES_USER} password={DBGYM_POSTGRES_PASS} dbname={DBGYM_POSTGRES_DBNAME}" + def get_kv_connstr(self) -> str: + return get_kv_connstr(self.pgport) def conn(self) -> psycopg.Connection[Any]: if self._conn is None: self._conn = psycopg.connect( - self.get_connstr(), autocommit=True, prepare_threshold=None + self.get_kv_connstr(), autocommit=True, prepare_threshold=None ) return self._conn @@ -103,7 +99,6 @@ def move_log(self) -> None: shutil.move(pglog_fpath, pglog_this_step_fpath) self.log_step += 1 - @time_record("shutdown") def shutdown_postgres(self) -> None: """Shuts down postgres.""" self.disconnect() @@ -134,7 +129,6 @@ def shutdown_postgres(self) -> None: if not exists and retcode != 0: break - @time_record("start") def start_with_changes( self, conf_changes: Optional[list[str]] = None, @@ -142,7 +136,8 @@ def start_with_changes( save_checkpoint: bool = False, ) -> bool: """ - This function assumes that some snapshot has already been untarred into self.dbdata_dpath + This function assumes that some snapshot has already been untarred into self.dbdata_dpath. + You can do this by calling one of the wrappers around _restore_snapshot(). """ # Install the new configuration changes. if conf_changes is not None: @@ -299,7 +294,6 @@ def _set_up_boot( self.conn().execute(f"SET boot.mu_hyp_stdev={mu_hyp_stdev}") logging.getLogger(DBGYM_LOGGER_NAME).debug("Set up boot") - @time_record("psql") def psql(self, sql: str) -> tuple[int, Optional[str]]: low_sql = sql.lower() @@ -329,7 +323,7 @@ def cancel_fn(conn_str: str) -> None: conn.execute("SET statement_timeout = 300000") try: - timer = threading.Timer(300.0, cancel_fn, args=(self.get_connstr(),)) + timer = threading.Timer(300.0, cancel_fn, args=(self.get_kv_connstr(),)) timer.start() conn.execute(sql) @@ -362,7 +356,6 @@ def restore_pristine_snapshot(self) -> bool: def restore_checkpointed_snapshot(self) -> bool: return self._restore_snapshot(self.checkpoint_dbdata_snapshot_fpath) - @time_record("restore") def _restore_snapshot( self, dbdata_snapshot_path: Path, diff --git a/tune/env/set_up_env_integtests.sh b/tune/env/set_up_env_integtests.sh new file mode 100755 index 00000000..c4ffe017 --- /dev/null +++ b/tune/env/set_up_env_integtests.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +# Environment tests relies on Postgres being built and workloads/dbdata being generated. This script does this. +# Generating these things is not considered a part of the test which is why it's in its own shell script. +# The reason there's a shell script generating them instead of them just being in the repo is because (a) +# the Postgres repo is very large and (b) the built binary will be different for different machines. +# This script should be run from the base dbgym/ directory. + +set -euxo pipefail + +# INTENDED_DBDATA_HARDWARE can be set elsewhere (e.g. by tests_ci.yaml) but we use hdd by default. +INTENDED_DBDATA_HARDWARE="${INTENDED_DBDATA_HARDWARE:-hdd}" +BENCHMARK=tpch +SCALE_FACTOR=0.01 +export DBGYM_CONFIG_PATH=tune/env/env_integtests_dbgym_config.yaml # Note that this envvar needs to be exported. +WORKSPACE_PATH=$(grep 'dbgym_workspace_path:' $DBGYM_CONFIG_PATH | sed 's/dbgym_workspace_path: //') + +python3 task.py benchmark $BENCHMARK data $SCALE_FACTOR +python3 task.py dbms postgres build +python3 task.py dbms postgres dbdata $BENCHMARK --scale-factor $SCALE_FACTOR --intended-dbdata-hardware $INTENDED_DBDATA_HARDWARE diff --git a/tune/protox/agent/build_trial.py b/tune/protox/agent/build_trial.py index 01445733..2083607f 100644 --- a/tune/protox/agent/build_trial.py +++ b/tune/protox/agent/build_trial.py @@ -16,6 +16,7 @@ from torch import nn from torch.optim import Adam # type: ignore[attr-defined] +from tune.env.pg_conn import PostgresConn from tune.protox.agent.agent_env import AgentEnv from tune.protox.agent.buffers import ReplayBuffer from tune.protox.agent.noise import ClampNoise @@ -39,7 +40,6 @@ from tune.protox.env.space.state.space import StateSpace from tune.protox.env.target_reset.target_reset_wrapper import TargetResetWrapper from tune.protox.env.types import ProtoAction, TableAttrAccessSetsMap -from tune.protox.env.util.pg_conn import PostgresConn from tune.protox.env.util.reward import RewardUtility from tune.protox.env.workload import Workload from util.workspace import ( @@ -179,9 +179,8 @@ def _build_utilities( pgbin_path=Path(hpo_params["pgconn_info"]["pgbin_path"]), enable_boot=enable_boot, boot_config_fpath=hpo_params["boot_config_fpath"][str(tuning_mode)], - connect_timeout=300, - artifact_manager=artifact_manager, ) + # TODO(phw2): I removed artifact_manager here. Fix this later. workload = Workload( dbgym_cfg=dbgym_cfg, diff --git a/tune/protox/env/pg_env.py b/tune/protox/env/pg_env.py index cd97a00f..d1a2cb68 100644 --- a/tune/protox/env/pg_env.py +++ b/tune/protox/env/pg_env.py @@ -1,14 +1,13 @@ import copy -import json import logging import time -from pathlib import Path -from typing import Any, Optional, Tuple, Union +from typing import Any, Optional import gymnasium as gym import psycopg from plumbum import local +from tune.env.pg_conn import PostgresConn from tune.protox.env.artifact_manager import ArtifactManager, time_record from tune.protox.env.space.holon_space import HolonSpace from tune.protox.env.space.state.space import StateSpace @@ -20,7 +19,6 @@ HolonStateContainer, TargetResetConfig, ) -from tune.protox.env.util.pg_conn import PostgresConn from tune.protox.env.util.reward import RewardUtility from tune.protox.env.workload import Workload from util.log import DBGYM_LOGGER_NAME @@ -430,7 +428,7 @@ def attempt_checkpoint(conn_str: str) -> None: # We've killed the index operation. or "operational" in stderr ) - attempt_checkpoint(self.pg_conn.get_connstr()) + attempt_checkpoint(self.pg_conn.get_kv_connstr()) return False assert ret == 0, stderr diff --git a/tune/protox/env/workload.py b/tune/protox/env/workload.py index f5d8e189..91670fbf 100644 --- a/tune/protox/env/workload.py +++ b/tune/protox/env/workload.py @@ -12,6 +12,7 @@ import pglast from plumbum import local +from tune.env.pg_conn import PostgresConn from tune.protox.env.artifact_manager import ArtifactManager, time_record from tune.protox.env.space.holon_space import HolonSpace from tune.protox.env.space.latent_space import LatentKnobSpace, LatentQuerySpace @@ -36,7 +37,6 @@ _acquire_metrics_around_query, execute_variations, ) -from tune.protox.env.util.pg_conn import PostgresConn from tune.protox.env.util.reward import RewardUtility from tune.protox.env.util.workload_analysis import ( extract_aliases, diff --git a/tune/protox/tests/test_index_space.py b/tune/protox/tests/unittest_index_space.py similarity index 100% rename from tune/protox/tests/test_index_space.py rename to tune/protox/tests/unittest_index_space.py diff --git a/tune/protox/tests/test_primitive.py b/tune/protox/tests/unittest_primitive.py similarity index 100% rename from tune/protox/tests/test_primitive.py rename to tune/protox/tests/unittest_primitive.py diff --git a/tune/protox/tests/test_workload.py b/tune/protox/tests/unittest_workload.py similarity index 100% rename from tune/protox/tests/test_workload.py rename to tune/protox/tests/unittest_workload.py diff --git a/tune/protox/tests/test_workload_utils.py b/tune/protox/tests/unittest_workload_utils.py similarity index 100% rename from tune/protox/tests/test_workload_utils.py rename to tune/protox/tests/unittest_workload_utils.py diff --git a/util/pg.py b/util/pg.py index 09272865..43252727 100644 --- a/util/pg.py +++ b/util/pg.py @@ -1,7 +1,12 @@ +""" +There are multiple parts of the codebase which interact with Postgres. This file contains helpers common to all those parts. +""" + from pathlib import Path -from typing import Any, List, NewType, Union +from typing import Any import pglast +import psutil import psycopg import sqlalchemy from sqlalchemy import create_engine, text @@ -54,6 +59,10 @@ def get_connstr(pgport: int = DEFAULT_POSTGRES_PORT, use_psycopg: bool = True) - return connstr_prefix + "://" + connstr_suffix +def get_kv_connstr(pgport: int = DEFAULT_POSTGRES_PORT) -> str: + return f"host=localhost port={pgport} user={DBGYM_POSTGRES_USER} password={DBGYM_POSTGRES_PASS} dbname={DBGYM_POSTGRES_DBNAME}" + + def create_psycopg_conn(pgport: int = DEFAULT_POSTGRES_PORT) -> psycopg.Connection[Any]: connstr = get_connstr(use_psycopg=True, pgport=pgport) psycopg_conn = psycopg.connect(connstr, autocommit=True, prepare_threshold=None) @@ -69,3 +78,38 @@ def create_sqlalchemy_conn( execution_options={"isolation_level": "AUTOCOMMIT"}, ) return engine.connect() + + +def get_is_postgres_running() -> bool: + """ + This is often used in assertions to ensure that Postgres isn't running before we + execute some code. + + I intentionally do not have a function that forcefully *stops* all Postgres instances. + This is risky because it could accidentally stop instances it wasn't supposed (e.g. + Postgres instances run by other users on the same machine). + + Stopping Postgres instances is thus a responsibility of the human to take care of. + """ + return len(get_running_postgres_ports()) > 0 + + +def get_running_postgres_ports() -> list[int]: + """ + Returns a list of all ports on which Postgres is currently running. + + There are ways to check with psycopg/sqlalchemy. However, I chose to check using + psutil to keep it as simple as possible and orthogonal to how connections work. + """ + running_ports = [] + + for conn in psutil.net_connections(kind="inet"): + if conn.status == "LISTEN": + try: + proc = psutil.Process(conn.pid) + if proc.name() == "postgres": + running_ports.append(conn.laddr.port) + except (psutil.NoSuchProcess, psutil.AccessDenied): + continue + + return running_ports