Skip to content

Commit

Permalink
added and passed test_postgres_dbdata
Browse files Browse the repository at this point in the history
  • Loading branch information
wangpatrick57 committed Dec 30, 2024
1 parent e0cde22 commit 9d111d3
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 61 deletions.
14 changes: 7 additions & 7 deletions benchmark/tpch/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
from gymlib.symlinks_paths import (
get_tables_dirname,
get_tables_symlink_path,
get_workload_dirname,
get_workload_suffix,
get_workload_symlink_path,
linkname_to_name,
)

from benchmark.constants import DEFAULT_SCALE_FACTOR
Expand All @@ -17,7 +18,6 @@
fully_resolve_path,
get_scale_factor_string,
is_fully_resolved,
link_result,
)

TPCH_KIT_DIRNAME = "tpch-kit"
Expand Down Expand Up @@ -194,16 +194,14 @@ def _generate_tpch_workload(
query_subset: str,
scale_factor: float,
) -> None:
workload_name = get_workload_dirname(
expected_workload_symlink_path = get_workload_symlink_path(
dbgym_workspace.dbgym_workspace_path,
"tpch",
scale_factor,
get_workload_suffix(
"tpch", seed_start=seed_start, seed_end=seed_end, query_subset=query_subset
),
)
expected_workload_symlink_path = dbgym_workspace.dbgym_cur_symlinks_path / (
workload_name + ".link"
)
if expected_workload_symlink_path.exists():
logging.getLogger(DBGYM_LOGGER_NAME).info(
f"Skipping generation: {expected_workload_symlink_path}"
Expand All @@ -213,7 +211,9 @@ def _generate_tpch_workload(
logging.getLogger(DBGYM_LOGGER_NAME).info(
f"Generating: {expected_workload_symlink_path}"
)
workload_path = dbgym_workspace.dbgym_this_run_path / workload_name
workload_path = dbgym_workspace.dbgym_this_run_path / linkname_to_name(
expected_workload_symlink_path.name
)
workload_path.mkdir(parents=False, exist_ok=False)

query_names = None
Expand Down
100 changes: 65 additions & 35 deletions dbms/postgres/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@

import click
import sqlalchemy
from gymlib.symlinks_paths import get_pgbin_symlink_path, get_repo_symlink_path
from gymlib.symlinks_paths import (
get_dbdata_tgz_symlink_path,
get_pgbin_symlink_path,
get_repo_symlink_path,
linkname_to_name,
)

from benchmark.constants import DEFAULT_SCALE_FACTOR
from benchmark.job.load_info import JobLoadInfo
Expand All @@ -33,13 +38,9 @@
WORKSPACE_PATH_PLACEHOLDER,
DBGymWorkspace,
fully_resolve_path,
get_dbdata_tgz_filename,
get_default_dbdata_parent_dpath,
is_fully_resolved,
is_ssd,
link_result,
open_and_save,
save_file,
)


Expand Down Expand Up @@ -127,6 +128,27 @@ def postgres_dbdata(
intended_dbdata_hardware: str,
dbdata_parent_dpath: Optional[Path],
) -> None:
_postgres_dbdata(
dbgym_workspace,
benchmark_name,
scale_factor,
pgbin_path,
intended_dbdata_hardware,
dbdata_parent_dpath,
)


def _postgres_dbdata(
dbgym_workspace: DBGymWorkspace,
benchmark_name: str,
scale_factor: float,
pgbin_path: Optional[Path],
intended_dbdata_hardware: str,
dbdata_parent_dpath: Optional[Path],
) -> None:
"""
This function exists as a hook for integration tests.
"""
# Set args to defaults programmatically (do this before doing anything else in the function)
if pgbin_path is None:
pgbin_path = get_pgbin_symlink_path(dbgym_workspace.dbgym_workspace_path)
Expand Down Expand Up @@ -165,46 +187,54 @@ def _create_dbdata(
dbdata_parent_dpath: Path,
) -> None:
"""
I chose *not* for this function to skip by default if dbdata_tgz_symlink_path already exists. This
is because, while the generated data is deterministic given benchmark_name and scale_factor, any
change in the _create_dbdata() function would result in a different dbdata. Since _create_dbdata()
may change somewhat frequently, I decided to get rid of the footgun of having changes to
_create_dbdata() not propagate to [dbdata].tgz by default.
If you change the code of _create_dbdata(), you should also delete the symlink so that the next time you run
`dbms postgres dbdata` it will re-create the dbdata.
"""
expected_dbdata_tgz_symlink_path = get_dbdata_tgz_symlink_path(
dbgym_workspace.dbgym_workspace_path,
benchmark_name,
scale_factor,
)
if expected_dbdata_tgz_symlink_path.exists():
logging.getLogger(DBGYM_LOGGER_NAME).info(
f"Skipping _create_dbdata: {expected_dbdata_tgz_symlink_path}"
)
return

# It's ok for the dbdata/ directory to be temporary. It just matters that the .tgz is saved in a safe place.
dbdata_dpath = dbdata_parent_dpath / "dbdata_being_created"
# We might be reusing the same dbdata_parent_dpath, so delete dbdata_dpath if it already exists
if dbdata_dpath.exists():
shutil.rmtree(dbdata_dpath)
dbdata_path = dbdata_parent_dpath / "dbdata_being_created"
# We might be reusing the same dbdata_parent_dpath, so delete dbdata_path if it already exists
if dbdata_path.exists():
shutil.rmtree(dbdata_path)

# Call initdb.
# Save any script we call from pgbin_symlink_dpath because they are dependencies generated from another task run.
save_file(dbgym_workspace, pgbin_path / "initdb")
subprocess_run(f'./initdb -D "{dbdata_dpath}"', cwd=pgbin_path)
dbgym_workspace.save_file(pgbin_path / "initdb")
subprocess_run(f'./initdb -D "{dbdata_path}"', cwd=pgbin_path)

# Start Postgres (all other dbdata setup requires postgres to be started).
# Note that subprocess_run() never returns when running "pg_ctl start", so I'm using subprocess.run() instead.
start_postgres(dbgym_workspace, pgbin_path, dbdata_dpath)
start_postgres(dbgym_workspace, pgbin_path, dbdata_path)

# Set up Postgres.
_generic_dbdata_setup(dbgym_workspace)
_load_benchmark_into_dbdata(dbgym_workspace, benchmark_name, scale_factor)

# Stop Postgres so that we don't "leak" processes.
stop_postgres(dbgym_workspace, pgbin_path, dbdata_dpath)
stop_postgres(dbgym_workspace, pgbin_path, dbdata_path)

# Create .tgz file.
# Note that you can't pass "[dbdata].tgz" as an arg to cur_task_runs_data_path() because that would create "[dbdata].tgz" as a dir.
dbdata_tgz_real_fpath = dbgym_workspace.cur_task_runs_data_path(
mkdir=True
) / get_dbdata_tgz_filename(benchmark_name, scale_factor)
# We need to cd into dbdata_dpath so that the tar file does not contain folders for the whole path of dbdata_dpath.
subprocess_run(f"tar -czf {dbdata_tgz_real_fpath} .", cwd=dbdata_dpath)
dbdata_tgz_real_path = dbgym_workspace.dbgym_this_run_path / linkname_to_name(
expected_dbdata_tgz_symlink_path.name
)
# We need to cd into dbdata_path so that the tar file does not contain folders for the whole path of dbdata_path.
subprocess_run(f"tar -czf {dbdata_tgz_real_path} .", cwd=dbdata_path)

# Create symlink.
# Only link at the end so that the link only ever points to a complete dbdata.
dbdata_tgz_symlink_path = link_result(dbgym_workspace, dbdata_tgz_real_fpath)
dbdata_tgz_symlink_path = dbgym_workspace.link_result(dbdata_tgz_real_path)
assert expected_dbdata_tgz_symlink_path.samefile(dbdata_tgz_symlink_path)
logging.getLogger(DBGYM_LOGGER_NAME).info(
f"Created dbdata in {dbdata_tgz_symlink_path}"
)
Expand All @@ -221,7 +251,7 @@ def _generic_dbdata_setup(dbgym_workspace: DBGymWorkspace) -> None:
pgport = DEFAULT_POSTGRES_PORT

# Create user
save_file(dbgym_workspace, pgbin_real_dpath / "psql")
dbgym_workspace.save_file(pgbin_real_dpath / "psql")
subprocess_run(
f"./psql -c \"create user {dbgym_pguser} with superuser password '{dbgym_pgpass}'\" {DEFAULT_POSTGRES_DBNAME} -p {pgport} -h localhost",
cwd=pgbin_real_dpath,
Expand Down Expand Up @@ -278,7 +308,7 @@ def _load_into_dbdata(
sqlalchemy_conn_execute(conn, f"TRUNCATE {table} CASCADE")
# Then, load the tables.
for table, table_fpath in load_info.get_tables_and_paths():
with open_and_save(dbgym_workspace, table_fpath, "r") as table_csv:
with dbgym_workspace.open_and_save(table_fpath, "r") as table_csv:
assert conn.connection.dbapi_connection is not None
cur = conn.connection.dbapi_connection.cursor()
try:
Expand All @@ -301,41 +331,41 @@ def _load_into_dbdata(
# even though they are a little redundant. It seems better than making `dbms` depend on the behavior of the
# tuning environment.
def start_postgres(
dbgym_workspace: DBGymWorkspace, pgbin_path: Path, dbdata_dpath: Path
dbgym_workspace: DBGymWorkspace, pgbin_path: Path, dbdata_path: Path
) -> None:
_start_or_stop_postgres(dbgym_workspace, pgbin_path, dbdata_dpath, True)
_start_or_stop_postgres(dbgym_workspace, pgbin_path, dbdata_path, True)


def stop_postgres(
dbgym_workspace: DBGymWorkspace, pgbin_path: Path, dbdata_dpath: Path
dbgym_workspace: DBGymWorkspace, pgbin_path: Path, dbdata_path: Path
) -> None:
_start_or_stop_postgres(dbgym_workspace, pgbin_path, dbdata_dpath, False)
_start_or_stop_postgres(dbgym_workspace, pgbin_path, dbdata_path, False)


def _start_or_stop_postgres(
dbgym_workspace: DBGymWorkspace,
pgbin_path: Path,
dbdata_dpath: Path,
dbdata_path: Path,
is_start: bool,
) -> None:
# They should be absolute paths and should exist
assert is_fully_resolved(pgbin_path)
assert is_fully_resolved(dbdata_dpath)
assert is_fully_resolved(dbdata_path)
pgport = DEFAULT_POSTGRES_PORT
save_file(dbgym_workspace, pgbin_path / "pg_ctl")
dbgym_workspace.save_file(pgbin_path / "pg_ctl")

if is_start:
# We use subprocess.run() because subprocess_run() never returns when running "pg_ctl start".
# The reason subprocess_run() never returns is because pg_ctl spawns a postgres process so .poll() always returns None.
# On the other hand, subprocess.run() does return normally, like calling `./pg_ctl` on the command line would do.
result = subprocess.run(
f"./pg_ctl -D \"{dbdata_dpath}\" -o '-p {pgport}' start",
f"./pg_ctl -D \"{dbdata_path}\" -o '-p {pgport}' start",
cwd=pgbin_path,
shell=True,
)
result.check_returncode()
else:
subprocess_run(
f"./pg_ctl -D \"{dbdata_dpath}\" -o '-p {pgport}' stop",
f"./pg_ctl -D \"{dbdata_path}\" -o '-p {pgport}' stop",
cwd=pgbin_path,
)
25 changes: 23 additions & 2 deletions dbms/tests/integtest_dbms.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
import unittest
from pathlib import Path

from gymlib.symlinks_paths import get_repo_symlink_path
from gymlib.symlinks_paths import get_dbdata_tgz_symlink_path, get_repo_symlink_path

from dbms.postgres.cli import _postgres_build
from benchmark.tpch.cli import _tpch_tables
from dbms.postgres.cli import _postgres_build, _postgres_dbdata
from util.workspace import (
DBGymWorkspace,
fully_resolve_path,
Expand Down Expand Up @@ -37,6 +38,26 @@ def test_postgres_build(self) -> None:
self.assertTrue(repo_path.exists())
self.assertTrue(fully_resolve_path(repo_path).exists())

def test_postgres_dbdata(self) -> None:
# Setup
# Make sure to recreate self.workspace so that each function call counts as its own run.
scale_factor = 0.01
_postgres_build(self.workspace, False)
DBGymWorkspace.num_times_created_this_run = 0
self.workspace = DBGymWorkspace(self.workspace.dbgym_workspace_path)
_tpch_tables(self.workspace, scale_factor)
DBGymWorkspace.num_times_created_this_run = 0
self.workspace = DBGymWorkspace(self.workspace.dbgym_workspace_path)

# Test
dbdata_tgz_path = get_dbdata_tgz_symlink_path(
self.workspace.dbgym_workspace_path, "tpch", scale_factor
)
self.assertFalse(dbdata_tgz_path.exists())
_postgres_dbdata(self.workspace, "tpch", scale_factor, None, "hdd", None)
self.assertTrue(dbdata_tgz_path.exists())
self.assertTrue(fully_resolve_path(dbdata_tgz_path).exists())


if __name__ == "__main__":
unittest.main()
4 changes: 2 additions & 2 deletions env/tests/gymlib_integtest_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

# TODO: remove symlinks_paths from the import
from gymlib.symlinks_paths import (
get_dbdata_tgz_symlink_path,
get_pgbin_symlink_path,
get_workload_suffix,
get_workload_symlink_path,
Expand All @@ -16,7 +17,6 @@
DBGymWorkspace,
fully_resolve_path,
get_default_dbdata_parent_dpath,
get_default_pristine_dbdata_snapshot_path,
get_workspace_path_from_config,
)

Expand Down Expand Up @@ -98,7 +98,7 @@ def get_default_metadata() -> TuningMetadata:
),
),
pristine_dbdata_snapshot_path=fully_resolve_path(
get_default_pristine_dbdata_snapshot_path(
get_dbdata_tgz_symlink_path(
dbgym_workspace.dbgym_workspace_path,
GymlibIntegtestManager.BENCHMARK,
GymlibIntegtestManager.SCALE_FACTOR,
Expand Down
26 changes: 26 additions & 0 deletions gymlib_package/gymlib/symlinks_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ def get_workload_dirname(benchmark: str, scale_factor: float | str, suffix: str)
return f"workload_{benchmark}_sf{get_scale_factor_string(scale_factor)}_{suffix}"


def get_dbdata_tgz_filename(benchmark_name: str, scale_factor: float | str) -> str:
return f"{benchmark_name}_sf{get_scale_factor_string(scale_factor)}_pristine_dbdata.tgz"


def get_tables_symlink_path(
workspace_path: Path, benchmark: str, scale_factor: float | str
) -> Path:
Expand Down Expand Up @@ -67,3 +71,25 @@ def get_repo_symlink_path(workspace_path: Path) -> Path:

def get_pgbin_symlink_path(workspace_path: Path) -> Path:
return get_repo_symlink_path(workspace_path) / "boot" / "build" / "postgres" / "bin"


def get_dbdata_tgz_symlink_path(
workspace_path: Path, benchmark_name: str, scale_factor: float | str
) -> Path:
return (
workspace_path
/ SYMLINKS_DNAME
/ DBGYM_APP_NAME
/ (get_dbdata_tgz_filename(benchmark_name, scale_factor) + ".link")
)


# TODO: refactor stuff to use this
def name_to_linkname(name: str) -> str:
assert not name.endswith(".link")
return f"{name}.link"


def linkname_to_name(linkname: str) -> str:
assert linkname.endswith(".link")
return linkname[: -len(".link")]
Loading

0 comments on commit 9d111d3

Please sign in to comment.