From 9d111d303d6174d917e750adcbb0379fbdb41f50 Mon Sep 17 00:00:00 2001 From: Patrick Wang Date: Sun, 29 Dec 2024 20:37:33 -0500 Subject: [PATCH] added and passed test_postgres_dbdata --- benchmark/tpch/cli.py | 14 ++-- dbms/postgres/cli.py | 100 +++++++++++++++--------- dbms/tests/integtest_dbms.py | 25 +++++- env/tests/gymlib_integtest_util.py | 4 +- gymlib_package/gymlib/symlinks_paths.py | 26 ++++++ util/workspace.py | 48 ++++++++---- 6 files changed, 156 insertions(+), 61 deletions(-) diff --git a/benchmark/tpch/cli.py b/benchmark/tpch/cli.py index 27452da1..f096c2fa 100644 --- a/benchmark/tpch/cli.py +++ b/benchmark/tpch/cli.py @@ -4,8 +4,9 @@ from gymlib.symlinks_paths import ( get_tables_dirname, get_tables_symlink_path, - get_workload_dirname, get_workload_suffix, + get_workload_symlink_path, + linkname_to_name, ) from benchmark.constants import DEFAULT_SCALE_FACTOR @@ -17,7 +18,6 @@ fully_resolve_path, get_scale_factor_string, is_fully_resolved, - link_result, ) TPCH_KIT_DIRNAME = "tpch-kit" @@ -194,16 +194,14 @@ def _generate_tpch_workload( query_subset: str, scale_factor: float, ) -> None: - workload_name = get_workload_dirname( + expected_workload_symlink_path = get_workload_symlink_path( + dbgym_workspace.dbgym_workspace_path, "tpch", scale_factor, get_workload_suffix( "tpch", seed_start=seed_start, seed_end=seed_end, query_subset=query_subset ), ) - expected_workload_symlink_path = dbgym_workspace.dbgym_cur_symlinks_path / ( - workload_name + ".link" - ) if expected_workload_symlink_path.exists(): logging.getLogger(DBGYM_LOGGER_NAME).info( f"Skipping generation: {expected_workload_symlink_path}" @@ -213,7 +211,9 @@ def _generate_tpch_workload( logging.getLogger(DBGYM_LOGGER_NAME).info( f"Generating: {expected_workload_symlink_path}" ) - workload_path = dbgym_workspace.dbgym_this_run_path / workload_name + workload_path = dbgym_workspace.dbgym_this_run_path / linkname_to_name( + expected_workload_symlink_path.name + ) workload_path.mkdir(parents=False, exist_ok=False) query_names = None diff --git a/dbms/postgres/cli.py b/dbms/postgres/cli.py index 44cfbfac..262f82a9 100644 --- a/dbms/postgres/cli.py +++ b/dbms/postgres/cli.py @@ -10,7 +10,12 @@ import click import sqlalchemy -from gymlib.symlinks_paths import get_pgbin_symlink_path, get_repo_symlink_path +from gymlib.symlinks_paths import ( + get_dbdata_tgz_symlink_path, + get_pgbin_symlink_path, + get_repo_symlink_path, + linkname_to_name, +) from benchmark.constants import DEFAULT_SCALE_FACTOR from benchmark.job.load_info import JobLoadInfo @@ -33,13 +38,9 @@ WORKSPACE_PATH_PLACEHOLDER, DBGymWorkspace, fully_resolve_path, - get_dbdata_tgz_filename, get_default_dbdata_parent_dpath, is_fully_resolved, is_ssd, - link_result, - open_and_save, - save_file, ) @@ -127,6 +128,27 @@ def postgres_dbdata( intended_dbdata_hardware: str, dbdata_parent_dpath: Optional[Path], ) -> None: + _postgres_dbdata( + dbgym_workspace, + benchmark_name, + scale_factor, + pgbin_path, + intended_dbdata_hardware, + dbdata_parent_dpath, + ) + + +def _postgres_dbdata( + dbgym_workspace: DBGymWorkspace, + benchmark_name: str, + scale_factor: float, + pgbin_path: Optional[Path], + intended_dbdata_hardware: str, + dbdata_parent_dpath: Optional[Path], +) -> None: + """ + This function exists as a hook for integration tests. + """ # Set args to defaults programmatically (do this before doing anything else in the function) if pgbin_path is None: pgbin_path = get_pgbin_symlink_path(dbgym_workspace.dbgym_workspace_path) @@ -165,46 +187,54 @@ def _create_dbdata( dbdata_parent_dpath: Path, ) -> None: """ - I chose *not* for this function to skip by default if dbdata_tgz_symlink_path already exists. This - is because, while the generated data is deterministic given benchmark_name and scale_factor, any - change in the _create_dbdata() function would result in a different dbdata. Since _create_dbdata() - may change somewhat frequently, I decided to get rid of the footgun of having changes to - _create_dbdata() not propagate to [dbdata].tgz by default. + If you change the code of _create_dbdata(), you should also delete the symlink so that the next time you run + `dbms postgres dbdata` it will re-create the dbdata. """ + expected_dbdata_tgz_symlink_path = get_dbdata_tgz_symlink_path( + dbgym_workspace.dbgym_workspace_path, + benchmark_name, + scale_factor, + ) + if expected_dbdata_tgz_symlink_path.exists(): + logging.getLogger(DBGYM_LOGGER_NAME).info( + f"Skipping _create_dbdata: {expected_dbdata_tgz_symlink_path}" + ) + return # It's ok for the dbdata/ directory to be temporary. It just matters that the .tgz is saved in a safe place. - dbdata_dpath = dbdata_parent_dpath / "dbdata_being_created" - # We might be reusing the same dbdata_parent_dpath, so delete dbdata_dpath if it already exists - if dbdata_dpath.exists(): - shutil.rmtree(dbdata_dpath) + dbdata_path = dbdata_parent_dpath / "dbdata_being_created" + # We might be reusing the same dbdata_parent_dpath, so delete dbdata_path if it already exists + if dbdata_path.exists(): + shutil.rmtree(dbdata_path) # Call initdb. # Save any script we call from pgbin_symlink_dpath because they are dependencies generated from another task run. - save_file(dbgym_workspace, pgbin_path / "initdb") - subprocess_run(f'./initdb -D "{dbdata_dpath}"', cwd=pgbin_path) + dbgym_workspace.save_file(pgbin_path / "initdb") + subprocess_run(f'./initdb -D "{dbdata_path}"', cwd=pgbin_path) # Start Postgres (all other dbdata setup requires postgres to be started). # Note that subprocess_run() never returns when running "pg_ctl start", so I'm using subprocess.run() instead. - start_postgres(dbgym_workspace, pgbin_path, dbdata_dpath) + start_postgres(dbgym_workspace, pgbin_path, dbdata_path) # Set up Postgres. _generic_dbdata_setup(dbgym_workspace) _load_benchmark_into_dbdata(dbgym_workspace, benchmark_name, scale_factor) # Stop Postgres so that we don't "leak" processes. - stop_postgres(dbgym_workspace, pgbin_path, dbdata_dpath) + stop_postgres(dbgym_workspace, pgbin_path, dbdata_path) # Create .tgz file. # Note that you can't pass "[dbdata].tgz" as an arg to cur_task_runs_data_path() because that would create "[dbdata].tgz" as a dir. - dbdata_tgz_real_fpath = dbgym_workspace.cur_task_runs_data_path( - mkdir=True - ) / get_dbdata_tgz_filename(benchmark_name, scale_factor) - # We need to cd into dbdata_dpath so that the tar file does not contain folders for the whole path of dbdata_dpath. - subprocess_run(f"tar -czf {dbdata_tgz_real_fpath} .", cwd=dbdata_dpath) + dbdata_tgz_real_path = dbgym_workspace.dbgym_this_run_path / linkname_to_name( + expected_dbdata_tgz_symlink_path.name + ) + # We need to cd into dbdata_path so that the tar file does not contain folders for the whole path of dbdata_path. + subprocess_run(f"tar -czf {dbdata_tgz_real_path} .", cwd=dbdata_path) # Create symlink. # Only link at the end so that the link only ever points to a complete dbdata. - dbdata_tgz_symlink_path = link_result(dbgym_workspace, dbdata_tgz_real_fpath) + dbdata_tgz_symlink_path = dbgym_workspace.link_result(dbdata_tgz_real_path) + assert expected_dbdata_tgz_symlink_path.samefile(dbdata_tgz_symlink_path) logging.getLogger(DBGYM_LOGGER_NAME).info( f"Created dbdata in {dbdata_tgz_symlink_path}" ) @@ -221,7 +251,7 @@ def _generic_dbdata_setup(dbgym_workspace: DBGymWorkspace) -> None: pgport = DEFAULT_POSTGRES_PORT # Create user - save_file(dbgym_workspace, pgbin_real_dpath / "psql") + dbgym_workspace.save_file(pgbin_real_dpath / "psql") subprocess_run( f"./psql -c \"create user {dbgym_pguser} with superuser password '{dbgym_pgpass}'\" {DEFAULT_POSTGRES_DBNAME} -p {pgport} -h localhost", cwd=pgbin_real_dpath, @@ -278,7 +308,7 @@ def _load_into_dbdata( sqlalchemy_conn_execute(conn, f"TRUNCATE {table} CASCADE") # Then, load the tables. for table, table_fpath in load_info.get_tables_and_paths(): - with open_and_save(dbgym_workspace, table_fpath, "r") as table_csv: + with dbgym_workspace.open_and_save(table_fpath, "r") as table_csv: assert conn.connection.dbapi_connection is not None cur = conn.connection.dbapi_connection.cursor() try: @@ -301,41 +331,41 @@ def _load_into_dbdata( # even though they are a little redundant. It seems better than making `dbms` depend on the behavior of the # tuning environment. def start_postgres( - dbgym_workspace: DBGymWorkspace, pgbin_path: Path, dbdata_dpath: Path + dbgym_workspace: DBGymWorkspace, pgbin_path: Path, dbdata_path: Path ) -> None: - _start_or_stop_postgres(dbgym_workspace, pgbin_path, dbdata_dpath, True) + _start_or_stop_postgres(dbgym_workspace, pgbin_path, dbdata_path, True) def stop_postgres( - dbgym_workspace: DBGymWorkspace, pgbin_path: Path, dbdata_dpath: Path + dbgym_workspace: DBGymWorkspace, pgbin_path: Path, dbdata_path: Path ) -> None: - _start_or_stop_postgres(dbgym_workspace, pgbin_path, dbdata_dpath, False) + _start_or_stop_postgres(dbgym_workspace, pgbin_path, dbdata_path, False) def _start_or_stop_postgres( dbgym_workspace: DBGymWorkspace, pgbin_path: Path, - dbdata_dpath: Path, + dbdata_path: Path, is_start: bool, ) -> None: # They should be absolute paths and should exist assert is_fully_resolved(pgbin_path) - assert is_fully_resolved(dbdata_dpath) + assert is_fully_resolved(dbdata_path) pgport = DEFAULT_POSTGRES_PORT - save_file(dbgym_workspace, pgbin_path / "pg_ctl") + dbgym_workspace.save_file(pgbin_path / "pg_ctl") if is_start: # We use subprocess.run() because subprocess_run() never returns when running "pg_ctl start". # The reason subprocess_run() never returns is because pg_ctl spawns a postgres process so .poll() always returns None. # On the other hand, subprocess.run() does return normally, like calling `./pg_ctl` on the command line would do. result = subprocess.run( - f"./pg_ctl -D \"{dbdata_dpath}\" -o '-p {pgport}' start", + f"./pg_ctl -D \"{dbdata_path}\" -o '-p {pgport}' start", cwd=pgbin_path, shell=True, ) result.check_returncode() else: subprocess_run( - f"./pg_ctl -D \"{dbdata_dpath}\" -o '-p {pgport}' stop", + f"./pg_ctl -D \"{dbdata_path}\" -o '-p {pgport}' stop", cwd=pgbin_path, ) diff --git a/dbms/tests/integtest_dbms.py b/dbms/tests/integtest_dbms.py index f5961397..ed1a646d 100644 --- a/dbms/tests/integtest_dbms.py +++ b/dbms/tests/integtest_dbms.py @@ -2,9 +2,10 @@ import unittest from pathlib import Path -from gymlib.symlinks_paths import get_repo_symlink_path +from gymlib.symlinks_paths import get_dbdata_tgz_symlink_path, get_repo_symlink_path -from dbms.postgres.cli import _postgres_build +from benchmark.tpch.cli import _tpch_tables +from dbms.postgres.cli import _postgres_build, _postgres_dbdata from util.workspace import ( DBGymWorkspace, fully_resolve_path, @@ -37,6 +38,26 @@ def test_postgres_build(self) -> None: self.assertTrue(repo_path.exists()) self.assertTrue(fully_resolve_path(repo_path).exists()) + def test_postgres_dbdata(self) -> None: + # Setup + # Make sure to recreate self.workspace so that each function call counts as its own run. + scale_factor = 0.01 + _postgres_build(self.workspace, False) + DBGymWorkspace.num_times_created_this_run = 0 + self.workspace = DBGymWorkspace(self.workspace.dbgym_workspace_path) + _tpch_tables(self.workspace, scale_factor) + DBGymWorkspace.num_times_created_this_run = 0 + self.workspace = DBGymWorkspace(self.workspace.dbgym_workspace_path) + + # Test + dbdata_tgz_path = get_dbdata_tgz_symlink_path( + self.workspace.dbgym_workspace_path, "tpch", scale_factor + ) + self.assertFalse(dbdata_tgz_path.exists()) + _postgres_dbdata(self.workspace, "tpch", scale_factor, None, "hdd", None) + self.assertTrue(dbdata_tgz_path.exists()) + self.assertTrue(fully_resolve_path(dbdata_tgz_path).exists()) + if __name__ == "__main__": unittest.main() diff --git a/env/tests/gymlib_integtest_util.py b/env/tests/gymlib_integtest_util.py index 68a6f1b8..959b68a5 100644 --- a/env/tests/gymlib_integtest_util.py +++ b/env/tests/gymlib_integtest_util.py @@ -5,6 +5,7 @@ # TODO: remove symlinks_paths from the import from gymlib.symlinks_paths import ( + get_dbdata_tgz_symlink_path, get_pgbin_symlink_path, get_workload_suffix, get_workload_symlink_path, @@ -16,7 +17,6 @@ DBGymWorkspace, fully_resolve_path, get_default_dbdata_parent_dpath, - get_default_pristine_dbdata_snapshot_path, get_workspace_path_from_config, ) @@ -98,7 +98,7 @@ def get_default_metadata() -> TuningMetadata: ), ), pristine_dbdata_snapshot_path=fully_resolve_path( - get_default_pristine_dbdata_snapshot_path( + get_dbdata_tgz_symlink_path( dbgym_workspace.dbgym_workspace_path, GymlibIntegtestManager.BENCHMARK, GymlibIntegtestManager.SCALE_FACTOR, diff --git a/gymlib_package/gymlib/symlinks_paths.py b/gymlib_package/gymlib/symlinks_paths.py index e1c4b880..9c83ef3d 100644 --- a/gymlib_package/gymlib/symlinks_paths.py +++ b/gymlib_package/gymlib/symlinks_paths.py @@ -39,6 +39,10 @@ def get_workload_dirname(benchmark: str, scale_factor: float | str, suffix: str) return f"workload_{benchmark}_sf{get_scale_factor_string(scale_factor)}_{suffix}" +def get_dbdata_tgz_filename(benchmark_name: str, scale_factor: float | str) -> str: + return f"{benchmark_name}_sf{get_scale_factor_string(scale_factor)}_pristine_dbdata.tgz" + + def get_tables_symlink_path( workspace_path: Path, benchmark: str, scale_factor: float | str ) -> Path: @@ -67,3 +71,25 @@ def get_repo_symlink_path(workspace_path: Path) -> Path: def get_pgbin_symlink_path(workspace_path: Path) -> Path: return get_repo_symlink_path(workspace_path) / "boot" / "build" / "postgres" / "bin" + + +def get_dbdata_tgz_symlink_path( + workspace_path: Path, benchmark_name: str, scale_factor: float | str +) -> Path: + return ( + workspace_path + / SYMLINKS_DNAME + / DBGYM_APP_NAME + / (get_dbdata_tgz_filename(benchmark_name, scale_factor) + ".link") + ) + + +# TODO: refactor stuff to use this +def name_to_linkname(name: str) -> str: + assert not name.endswith(".link") + return f"{name}.link" + + +def linkname_to_name(linkname: str) -> str: + assert linkname.endswith(".link") + return linkname[: -len(".link")] diff --git a/util/workspace.py b/util/workspace.py index 132b31ec..29075847 100644 --- a/util/workspace.py +++ b/util/workspace.py @@ -87,21 +87,6 @@ def get_scale_factor_string(scale_factor: float | str) -> str: return str(scale_factor).replace(".", "point") -def get_dbdata_tgz_filename(benchmark_name: str, scale_factor: float | str) -> str: - return f"{benchmark_name}_sf{get_scale_factor_string(scale_factor)}_pristine_dbdata.tgz" - - -def get_default_pristine_dbdata_snapshot_path( - workspace_path: Path, benchmark_name: str, scale_factor: float | str -) -> Path: - return ( - get_symlinks_path_from_workspace_path(workspace_path) - / "dbgym_dbms_postgres" - / "data" - / (get_dbdata_tgz_filename(benchmark_name, scale_factor) + ".link") - ) - - def get_default_dbdata_parent_dpath(workspace_path: Path) -> Path: return get_tmp_path_from_workspace_path(workspace_path) @@ -302,6 +287,38 @@ def save_file(self, fpath: Path) -> None: copy_fpath = self.dbgym_this_run_path / fname shutil.copy(fpath, copy_fpath) + def open_and_save(self, open_fpath: Path, mode: str = "r") -> IO[Any]: + """ + Open a file and "save" it to [workspace]/task_runs/run_*/. + It takes in a str | Path to match the interface of open(). + This file does not work if open_fpath is a symlink, to make its interface identical to that of open(). + Make sure to resolve all symlinks with fully_resolve_path(). + To avoid confusion, I'm enforcing this function to only work with absolute paths. + # TODO: maybe make it work on non-fully-resolved paths to better match open() + See the comment of save_file() for what "saving" means + If you are generating a "result" for the run, _do not_ use this. Just use the normal open(). + This shouldn't be too hard to remember because this function crashes if open_fpath doesn't exist, + and when you write results you're usually opening open_fpaths which do not exist. + """ + # Validate open_fpath + assert isinstance(open_fpath, Path) + assert is_fully_resolved( + open_fpath + ), f"open_and_save(): open_fpath ({open_fpath}) should be a fully resolved path" + assert not os.path.islink( + open_fpath + ), f"open_fpath ({open_fpath}) should not be a symlink" + assert os.path.exists(open_fpath), f"open_fpath ({open_fpath}) does not exist" + # `open_and_save`` *must* be called on files because it doesn't make sense to open a directory. note that this doesn't mean we'll always save + # a file though. we sometimes save a directory (see save_file() for details) + assert os.path.isfile(open_fpath), f"open_fpath ({open_fpath}) is not a file" + + # Save + self.save_file(open_fpath) + + # Open + return open(open_fpath, mode=mode) + # `append_group()` is used to mark the "codebase path" of an invocation of the CLI. The "codebase path" is explained further in the documentation. def append_group(self, name: str) -> None: self.cur_path_list.append(name) @@ -507,6 +524,7 @@ def is_child_path(child_path: os.PathLike[str], parent_dpath: os.PathLike[str]) ) +# TODO(phw2): deprecate this once I'm done with unittest_workspace.py def open_and_save( dbgym_workspace: DBGymWorkspace, open_fpath: Path, mode: str = "r" ) -> IO[Any]: