Skip to content

Commit

Permalink
Replicate TPC-H SF10 tuning results (#28)
Browse files Browse the repository at this point in the history
**Summary**: Can now replicate TPC-H SF10 **tuning** results on dev 4.
Note that **embedding** has not yet been replicated.

**Demo**:
After running HPO on dev 4 for 18 hours, we get to a runtime of 110s. We
run with `--duration=4`, and 110s matches the performance in the Proto-X
paper after 4 hours of tuning.
![Screenshot 2024-04-05 at 13 12
03](https://github.com/cmu-db/dbgym/assets/20631215/16babc69-4b42-434d-b1b3-eede0010f1da)

**Details**:
* Because I was only replicating tuning, I used an already-trained
embedder. To replicate, use the `dbgym_manual/embedding/` folder on
dev4.
* The command I used was `python3 task.py --no-startup-check tune protox
agent hpo tpch --scale-factor 10 --max-concurrent 4 --duration 4
--intended-pgdata-hardware ssd --pgdata-parent-dpath
/mnt/nvme1n1/phw2/dbgym_tmp/`
  • Loading branch information
wangpatrick57 authored Apr 5, 2024
1 parent da7c5b0 commit 6400216
Show file tree
Hide file tree
Showing 23 changed files with 631 additions and 367 deletions.
80 changes: 44 additions & 36 deletions benchmark/tpch/cli.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import logging
import os
import shutil

import click

from misc.utils import DBGymConfig
from misc.utils import DBGymConfig, get_scale_factor_string, workload_name_fn
from util.shell import subprocess_run
from util.pg import *

Expand All @@ -16,36 +18,41 @@ def tpch_group(dbgym_cfg: DBGymConfig):
dbgym_cfg.append_group("tpch")


@tpch_group.command(name="generate-data")
@tpch_group.command(name="data")
@click.argument("scale-factor", type=float)
@click.pass_obj
# The reason generate-data is separate from create-pgdata is because generate-data is generic
# to all DBMSs while create-pgdata is specific to Postgres.
def tpch_generate_data(dbgym_cfg: DBGymConfig, scale_factor: float):
def tpch_data(dbgym_cfg: DBGymConfig, scale_factor: float):
_clone(dbgym_cfg)
_generate_data(dbgym_cfg, scale_factor)


@tpch_group.command(name="generate-workload")
@click.argument("workload-name", type=str)
@click.argument("seed-start", type=int)
@click.argument("seed-end", type=int)
@tpch_group.command(name="workload")
@click.option("--seed-start", type=int, default=15721, help="A workload consists of queries from multiple seeds. This is the starting seed (inclusive).")
@click.option("--seed-end", type=int, default=15721, help="A workload consists of queries from multiple seeds. This is the ending seed (inclusive).")
@click.option(
"--generate_type",
type=click.Choice(["sequential", "even", "odd"]),
default="sequential",
"--query-subset",
type=click.Choice(["all", "even", "odd"]),
default="all",
)
@click.option("--scale-factor", type=float, default=1)
@click.pass_obj
def tpch_generate_workload(
def tpch_workload(
dbgym_cfg: DBGymConfig,
workload_name: str,
seed_start: int,
seed_end: int,
generate_type: str,
query_subset: str,
scale_factor: float,
):
assert seed_start <= seed_end, f'seed_start ({seed_start}) must be <= seed_end ({seed_end})'
_clone(dbgym_cfg)
_generate_queries(dbgym_cfg, seed_start, seed_end)
_generate_workload(dbgym_cfg, workload_name, seed_start, seed_end, generate_type)
_generate_queries(dbgym_cfg, seed_start, seed_end, scale_factor)
_generate_workload(dbgym_cfg, seed_start, seed_end, query_subset, scale_factor)


def _get_queries_dname(seed: int, scale_factor: float) -> str:
return f"queries_{seed}_sf{get_scale_factor_string(scale_factor)}"


def _clone(dbgym_cfg: DBGymConfig):
Expand All @@ -65,7 +72,7 @@ def _clone(dbgym_cfg: DBGymConfig):
benchmark_tpch_logger.info(f"Cloned: {symlink_dir}")


def _generate_queries(dbgym_cfg, seed_start, seed_end):
def _generate_queries(dbgym_cfg: DBGymConfig, seed_start: int, seed_end: int, scale_factor: float):
build_path = dbgym_cfg.cur_symlinks_build_path()
assert build_path.exists()

Expand All @@ -74,15 +81,15 @@ def _generate_queries(dbgym_cfg, seed_start, seed_end):
f"Generating queries: {data_path} [{seed_start}, {seed_end}]"
)
for seed in range(seed_start, seed_end + 1):
symlinked_seed = data_path / f"queries_{seed}"
symlinked_seed = data_path / _get_queries_dname(seed, scale_factor)
if symlinked_seed.exists():
continue

real_dir = dbgym_cfg.cur_task_runs_data_path(f"queries_{seed}", mkdir=True)
real_dir = dbgym_cfg.cur_task_runs_data_path(_get_queries_dname(seed, scale_factor), mkdir=True)
for i in range(1, 22 + 1):
target_sql = (real_dir / f"{i}.sql").resolve()
subprocess_run(
f"DSS_QUERY=./queries ./qgen {i} -r {seed} > {target_sql}",
f"DSS_QUERY=./queries ./qgen {i} -r {seed} -s {scale_factor} > {target_sql}",
cwd=build_path / "tpch-kit" / "dbgen",
verbose=False,
)
Expand All @@ -97,7 +104,7 @@ def _generate_data(dbgym_cfg: DBGymConfig, scale_factor: float):
assert build_path.exists()

data_path = dbgym_cfg.cur_symlinks_data_path(mkdir=True)
symlink_dir = data_path / f"tables_sf{scale_factor}"
symlink_dir = data_path / f"tables_sf{get_scale_factor_string(scale_factor)}"
if symlink_dir.exists():
benchmark_tpch_logger.info(f"Skipping generation: {symlink_dir}")
return
Expand All @@ -106,7 +113,7 @@ def _generate_data(dbgym_cfg: DBGymConfig, scale_factor: float):
subprocess_run(
f"./dbgen -vf -s {scale_factor}", cwd=build_path / "tpch-kit" / "dbgen"
)
real_dir = dbgym_cfg.cur_task_runs_data_path(f"tables_sf{scale_factor}", mkdir=True)
real_dir = dbgym_cfg.cur_task_runs_data_path(f"tables_sf{get_scale_factor_string(scale_factor)}", mkdir=True)
subprocess_run(f"mv ./*.tbl {real_dir}", cwd=build_path / "tpch-kit" / "dbgen")

subprocess_run(f"ln -s {real_dir} {data_path}")
Expand All @@ -115,37 +122,38 @@ def _generate_data(dbgym_cfg: DBGymConfig, scale_factor: float):

def _generate_workload(
dbgym_cfg: DBGymConfig,
workload_name: str,
seed_start: int,
seed_end: int,
generate_type: str,
query_subset: str,
scale_factor: float,
):
data_path = dbgym_cfg.cur_symlinks_data_path(mkdir=True)
workload_path = data_path / f"workload_{workload_name}"
if workload_path.exists():
benchmark_tpch_logger.error(f"Workload directory exists: {workload_path}")
raise RuntimeError(f"Workload directory exists: {workload_path}")
symlink_data_dir = dbgym_cfg.cur_symlinks_data_path(mkdir=True)
workload_name = workload_name_fn(scale_factor, seed_start, seed_end, query_subset)
workload_symlink_path = symlink_data_dir / workload_name

benchmark_tpch_logger.info(f"Generating: {workload_path}")
benchmark_tpch_logger.info(f"Generating: {workload_symlink_path}")
real_dir = dbgym_cfg.cur_task_runs_data_path(
f"workload_{workload_name}", mkdir=True
workload_name, mkdir=True
)

queries = None
if generate_type == "sequential":
if query_subset == "all":
queries = [f"{i}" for i in range(1, 22 + 1)]
elif generate_type == "even":
elif query_subset == "even":
queries = [f"{i}" for i in range(1, 22 + 1) if i % 2 == 0]
elif generate_type == "odd":
elif query_subset == "odd":
queries = [f"{i}" for i in range(1, 22 + 1) if i % 2 == 1]

with open(real_dir / "order.txt", "w") as f:
for seed in range(seed_start, seed_end + 1):
for qnum in queries:
sqlfile = data_path / f"queries_{seed}" / f"{qnum}.sql"
sqlfile = symlink_data_dir / _get_queries_dname(seed, scale_factor) / f"{qnum}.sql"
assert sqlfile.exists()
output = ",".join([f"S{seed}-Q{qnum}", str(sqlfile)])
print(output, file=f)
# TODO(WAN): add option to deep-copy the workload.
subprocess_run(f"ln -s {real_dir} {data_path}")
benchmark_tpch_logger.info(f"Generated: {workload_path}")

if workload_symlink_path.exists():
os.remove(workload_symlink_path)
subprocess_run(f"ln -s {real_dir} {workload_symlink_path}")
benchmark_tpch_logger.info(f"Generated: {workload_symlink_path}")
3 changes: 2 additions & 1 deletion benchmark/tpch/load_info.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from dbms.load_info_base_class import LoadInfoBaseClass
from misc.utils import get_scale_factor_string


TPCH_SCHEMA_FNAME = "tpch_schema.sql"
Expand Down Expand Up @@ -41,7 +42,7 @@ def __init__(self, dbgym_cfg, scale_factor):
data_root_dpath = (
dbgym_cfg.dbgym_symlinks_path / TpchLoadInfo.CODEBASE_DNAME / "data"
)
tables_dpath = data_root_dpath / f"tables_sf{scale_factor}"
tables_dpath = data_root_dpath / f"tables_sf{get_scale_factor_string(scale_factor)}"
assert (
tables_dpath.exists()
), f"tables_dpath ({tables_dpath}) does not exist. Make sure you have generated the TPC-H data"
Expand Down
1 change: 1 addition & 0 deletions dbms/postgres/build_repo.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ cd ../
# we need -L to follow links
curl -L https://github.com/ossc-db/pg_hint_plan/archive/refs/tags/REL15_1_5_1.tar.gz -o REL15_1_5_1.tar.gz
tar -xzf REL15_1_5_1.tar.gz
rm REL15_1_5_1.tar.gz
cd ./pg_hint_plan-REL15_1_5_1
PATH="${REPO_REAL_DPATH}/boot/build/postgres/bin:$PATH" make
PATH="${REPO_REAL_DPATH}/boot/build/postgres/bin:$PATH" make install
Expand Down
71 changes: 40 additions & 31 deletions dbms/postgres/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@

from benchmark.tpch.load_info import TpchLoadInfo
from dbms.load_info_base_class import LoadInfoBaseClass
from misc.utils import DBGymConfig, open_and_save, save_file, get_pgdata_tgz_name
from misc.utils import DBGymConfig, conv_inputpath_to_realabspath, open_and_save, save_file, get_pgdata_tgz_name, default_pgbin_path, WORKSPACE_PATH_PLACEHOLDER
from util.shell import subprocess_run
from sqlalchemy import Connection
from util.pg import conn_execute, sql_file_execute, DBGYM_POSTGRES_DBNAME, create_conn, DEFAULT_POSTGRES_PORT, DBGYM_POSTGRES_USER, DBGYM_POSTGRES_PASS
from util.pg import conn_execute, sql_file_execute, DBGYM_POSTGRES_DBNAME, create_conn, DEFAULT_POSTGRES_PORT, DBGYM_POSTGRES_USER, DBGYM_POSTGRES_PASS, DEFAULT_POSTGRES_DBNAME


dbms_postgres_logger = logging.getLogger("dbms/postgres")
Expand Down Expand Up @@ -45,8 +45,17 @@ def postgres_build(dbgym_cfg: DBGymConfig):
@click.pass_obj
@click.argument("benchmark_name", type=str)
@click.option("--scale-factor", type=float, default=1)
def postgres_pgdata(dbgym_cfg: DBGymConfig, benchmark_name: str, scale_factor: float):
_create_pgdata(dbgym_cfg, benchmark_name, scale_factor)
@click.option("--pgbin-path", type=Path, default=None, help=f"The path to the bin containing Postgres executables. The default is {default_pgbin_path(WORKSPACE_PATH_PLACEHOLDER)}.")
def postgres_pgdata(dbgym_cfg: DBGymConfig, benchmark_name: str, scale_factor: float, pgbin_path: Path):
# Set args to defaults programmatically (do this before doing anything else in the function)
if pgbin_path == None:
pgbin_path = default_pgbin_path(dbgym_cfg.dbgym_workspace_path)

# Convert all input paths to absolute paths
pgbin_path = conv_inputpath_to_realabspath(dbgym_cfg, pgbin_path)

# Create pgdata
_create_pgdata(dbgym_cfg, benchmark_name, scale_factor, pgbin_path)


def _get_pgbin_symlink_path(dbgym_cfg: DBGymConfig) -> Path:
Expand Down Expand Up @@ -85,7 +94,7 @@ def _build_repo(dbgym_cfg: DBGymConfig):
dbms_postgres_logger.info(f"Set up repo in {repo_symlink_dpath}")


def _create_pgdata(dbgym_cfg: DBGymConfig, benchmark_name: str, scale_factor: float):
def _create_pgdata(dbgym_cfg: DBGymConfig, benchmark_name: str, scale_factor: float, pgbin_path: Path) -> None:
"""
I chose *not* for this function to skip by default if pgdata_tgz_symlink_path already exists. This
is because, while the generated data is deterministic given benchmark_name and scale_factor, any
Expand All @@ -99,23 +108,20 @@ def _create_pgdata(dbgym_cfg: DBGymConfig, benchmark_name: str, scale_factor: fl
pgdata_dpath = dbgym_cfg.dbgym_tmp_path / "pgdata"

# initdb
pgbin_symlink_dpath = _get_pgbin_symlink_path(dbgym_cfg)
assert pgbin_symlink_dpath.exists()
# save any script we call from pgbin_symlink_dpath because they are dependencies generated from another task run
save_file(dbgym_cfg, pgbin_symlink_dpath / "initdb")
subprocess_run(f'./initdb -D "{pgdata_dpath}"', cwd=pgbin_symlink_dpath)
save_file(dbgym_cfg, pgbin_path / "initdb")
subprocess_run(f'./initdb -D "{pgdata_dpath}"', cwd=pgbin_path)

# start postgres (all other pgdata setup requires postgres to be started)
# note that subprocess_run() never returns when running "pg_ctl start", so I'm using subprocess.run() instead
save_file(dbgym_cfg, pgbin_symlink_dpath / "pg_ctl")
start_postgres(dbgym_cfg, pgbin_symlink_dpath, pgdata_dpath)
start_postgres(dbgym_cfg, pgbin_path, pgdata_dpath)

# setup
_generic_pgdata_setup(dbgym_cfg)
_load_benchmark_into_pgdata(dbgym_cfg, benchmark_name, scale_factor)

# stop postgres so that we don't "leak" processes
stop_postgres(dbgym_cfg, pgbin_symlink_dpath, pgdata_dpath)
stop_postgres(dbgym_cfg, pgbin_path, pgdata_dpath)

# create .tgz file
# you can't pass "[pgdata].tgz" as an arg to cur_task_runs_data_path() because that would create "[pgdata].tgz" as a dir
Expand Down Expand Up @@ -146,18 +152,18 @@ def _generic_pgdata_setup(dbgym_cfg: DBGymConfig):
# get necessary vars
pgbin_symlink_dpath = _get_pgbin_symlink_path(dbgym_cfg)
assert pgbin_symlink_dpath.exists()
pguser = DBGYM_POSTGRES_USER
pgpass = DBGYM_POSTGRES_PASS
dbgym_pguser = DBGYM_POSTGRES_USER
dbgym_pgpass = DBGYM_POSTGRES_PASS
pgport = DEFAULT_POSTGRES_PORT

# create user
save_file(dbgym_cfg, pgbin_symlink_dpath / "psql")
subprocess_run(
f"./psql -c \"create user {pguser} with superuser password '{pgpass}'\" postgres -p {pgport} -h localhost",
f"./psql -c \"create user {dbgym_pguser} with superuser password '{dbgym_pgpass}'\" {DEFAULT_POSTGRES_DBNAME} -p {pgport} -h localhost",
cwd=pgbin_symlink_dpath,
)
subprocess_run(
f'./psql -c "grant pg_monitor to {pguser}" postgres -p {pgport} -h localhost',
f'./psql -c "grant pg_monitor to {dbgym_pguser}" {DEFAULT_POSTGRES_DBNAME} -p {pgport} -h localhost',
cwd=pgbin_symlink_dpath,
)

Expand All @@ -166,14 +172,14 @@ def _generic_pgdata_setup(dbgym_cfg: DBGymConfig):
dbgym_cfg.cur_source_path() / "shared_preload_libraries.sql"
)
subprocess_run(
f"./psql -f {shared_preload_libraries_fpath} postgres -p {pgport} -h localhost",
f"./psql -f {shared_preload_libraries_fpath} {DEFAULT_POSTGRES_DBNAME} -p {pgport} -h localhost",
cwd=pgbin_symlink_dpath,
)

# create the dbgym database. since one pgdata dir maps to one benchmark, all benchmarks will use the same database
# as opposed to using databases named after the benchmark
subprocess_run(
f"./psql -c \"create database {DBGYM_POSTGRES_DBNAME} with owner = '{pguser}'\" postgres -p {pgport} -h localhost",
f"./psql -c \"create database {DBGYM_POSTGRES_DBNAME} with owner = '{dbgym_pguser}'\" {DEFAULT_POSTGRES_DBNAME} -p {pgport} -h localhost",
cwd=pgbin_symlink_dpath,
)

Expand All @@ -193,7 +199,7 @@ def _load_benchmark_into_pgdata(


def _load_into_pgdata(dbgym_cfg: DBGymConfig, conn: Connection, load_info: LoadInfoBaseClass):
sql_file_execute(conn, load_info.get_schema_fpath())
sql_file_execute(dbgym_cfg, conn, load_info.get_schema_fpath())

# truncate all tables first before even loading a single one
for table, _ in load_info.get_tables_and_fpaths():
Expand All @@ -208,29 +214,32 @@ def _load_into_pgdata(dbgym_cfg: DBGymConfig, conn: Connection, load_info: LoadI

constraints_fpath = load_info.get_constraints_fpath()
if constraints_fpath != None:
sql_file_execute(conn, constraints_fpath)
sql_file_execute(dbgym_cfg, conn, constraints_fpath)


def start_postgres(dbgym_cfg: DBGymConfig, pgbin_dpath: Path, pgdata_dpath: Path) -> None:
_start_or_stop_postgres(dbgym_cfg, pgbin_dpath, pgdata_dpath, True)
def start_postgres(dbgym_cfg: DBGymConfig, pgbin_path: Path, pgdata_dpath: Path) -> None:
_start_or_stop_postgres(dbgym_cfg, pgbin_path, pgdata_dpath, True)


def stop_postgres(dbgym_cfg: DBGymConfig, pgbin_dpath: Path, pgdata_dpath: Path) -> None:
_start_or_stop_postgres(dbgym_cfg, pgbin_dpath, pgdata_dpath, False)
def stop_postgres(dbgym_cfg: DBGymConfig, pgbin_path: Path, pgdata_dpath: Path) -> None:
_start_or_stop_postgres(dbgym_cfg, pgbin_path, pgdata_dpath, False)


def _start_or_stop_postgres(dbgym_cfg: DBGymConfig, pgbin_dpath: Path, pgdata_dpath: Path, is_start: bool) -> None:
# they should be absolute paths and should exist
assert pgbin_dpath.is_absolute() and pgbin_dpath.exists()
def _start_or_stop_postgres(dbgym_cfg: DBGymConfig, pgbin_path: Path, pgdata_dpath: Path, is_start: bool) -> None:
# They should be absolute paths and should exist
assert pgbin_path.is_absolute() and pgbin_path.exists()
assert pgdata_dpath.is_absolute() and pgdata_dpath.exists()
# the inputs may be symlinks so we need to resolve them first
pgbin_real_dpath = pgbin_dpath.resolve()
# The inputs may be symlinks so we need to resolve them first
pgbin_real_dpath = pgbin_path.resolve()
pgdata_dpath = pgdata_dpath.resolve()
pgport = DEFAULT_POSTGRES_PORT
save_file(dbgym_cfg, pgbin_real_dpath / "pg_ctl")

if is_start:
# note that subprocess_run() never returns when running "pg_ctl start", so I'm using subprocess.run() instead
subprocess.run(f"./pg_ctl -D \"{pgdata_dpath}\" -o '-p {pgport}' start", cwd=pgbin_real_dpath, shell=True)
# We use subprocess.run() because subprocess_run() never returns when running "pg_ctl start".
# The reason subprocess_run() never returns is because pg_ctl spawns a postgres process so .poll() always returns None.
# On the other hand, subprocess.run() does return normally, like calling `./pg_ctl` on the command line would do.
result = subprocess.run(f"./pg_ctl -D \"{pgdata_dpath}\" -o '-p {pgport}' start", cwd=pgbin_real_dpath, shell=True)
result.check_returncode()
else:
subprocess_run(f"./pg_ctl -D \"{pgdata_dpath}\" -o '-p {pgport}' stop", cwd=pgbin_real_dpath)
1 change: 1 addition & 0 deletions dependency/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,4 @@ virtualenv==20.25.0
Werkzeug==3.0.1
wrapt==1.14.1
zipp==3.17.0
ssd_checker==1.0.3
6 changes: 0 additions & 6 deletions experiment/cli.py

This file was deleted.

2 changes: 0 additions & 2 deletions experiment/protox_embedding_job/main.sh

This file was deleted.

21 changes: 21 additions & 0 deletions experiments/protox_tpch_sf10/main.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/bash

set -euxo pipefail

SCALE_FACTOR=10

# benchmark
python3 task.py --no-startup-check benchmark tpch data $SCALE_FACTOR
python3 task.py --no-startup-check benchmark tpch workload --scale-factor $SCALE_FACTOR

# postgres
python3 task.py --no-startup-check dbms postgres build
python3 task.py --no-startup-check dbms postgres pgdata tpch --scale-factor $SCALE_FACTOR

# embedding
python3 task.py --no-startup-check tune protox embedding datagen tpch --scale-factor $SCALE_FACTOR --override-sample-limits "lineitem,32768" --intended-pgdata-hardware ssd --pgdata-parent-dpath /mnt/nvme1n1/phw2/dbgym_tmp/
python3 task.py --no-startup-check tune protox embedding train tpch --scale-factor $SCALE_FACTOR --train-max-concurrent 10

# agent
python3 task.py --no-startup-check tune protox agent hpo tpch --scale-factor $SCALE_FACTOR --max-concurrent 4 --duration 4 --intended-pgdata-hardware ssd --pgdata-parent-dpath /mnt/nvme1n1/phw2/dbgym_tmp/
python3 task.py --no-startup-check tune protox agent tune tpch --scale-factor $SCALE_FACTOR
Loading

0 comments on commit 6400216

Please sign in to comment.