Skip to content

Commit

Permalink
added job to protox e2e test
Browse files Browse the repository at this point in the history
  • Loading branch information
wangpatrick57 committed Nov 6, 2024
1 parent 9cef1f5 commit d10dc27
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 28 deletions.
2 changes: 1 addition & 1 deletion benchmark/constants.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
DEFAULT_SCALE_FACTOR = 1
DEFAULT_SCALE_FACTOR = 1
3 changes: 1 addition & 2 deletions benchmark/job/load_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ def __init__(self, dbgym_cfg: DBGymConfig):
dbgym_cfg.dbgym_symlinks_path / JobLoadInfo.CODEBASE_DNAME / "data"
)
tables_symlink_dpath = (
data_root_dpath
/ f"{default_tables_dname(DEFAULT_SCALE_FACTOR)}.link"
data_root_dpath / f"{default_tables_dname(DEFAULT_SCALE_FACTOR)}.link"
)
tables_dpath = tables_symlink_dpath.resolve()
assert (
Expand Down
73 changes: 48 additions & 25 deletions scripts/run_protox_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
# Be careful when changing these constants. In some places, the E2E test is hardcoded to work for these specific constants.
DBMS = "postgres"
AGENT = "protox"
BENCHMARK = "tpch"
SCALE_FACTOR = 0.01
E2ETEST_DBGYM_CONFIG_FPATH = Path("scripts/e2e_test_dbgym_config.yaml")


Expand Down Expand Up @@ -69,11 +67,21 @@ class Stage(Enum):
STAGES_TO_RUN = ALL_STAGES


if __name__ == "__main__":
intended_dbdata_hardware = sys.argv[1] if len(sys.argv) > 1 else "hdd"

# Set the config file so that we use resources that don't conflict with normal usage (e.g. a different workspace, different ports, etc.).
os.environ["DBGYM_CONFIG_PATH"] = str(E2ETEST_DBGYM_CONFIG_FPATH)
def run_e2e_for_benchmark(benchmark_name: str, intended_dbdata_hardware: str) -> None:
if benchmark_name == "tpch":
scale_factor = 0.01
workload_name_suffix = "15721_15721_all"
embedding_datagen_args = "--override-sample-limits lineitem,32768"
embedding_train_args = "--iterations-per-epoch 1 --num-points-to-sample 1 --num-batches 1 --batch-size 64 --start-epoch 15 --num-samples 4 --train-max-concurrent 4 --num-curate 2"
tune_hpo_args = "--num-samples 2 --max-concurrent 2 --workload-timeout 15 --query-timeout 1 --tune-duration-during-hpo 0.01"
elif benchmark_name == "job":
scale_factor = 1
workload_name_suffix = "all"
embedding_datagen_args = ""
embedding_train_args = "--iterations-per-epoch 1 --num-points-to-sample 2 --num-batches 1 --batch-size 64 --start-epoch 15 --num-samples 4 --train-max-concurrent 4 --num-curate 2"
tune_hpo_args = "--num-samples 2 --max-concurrent 2 --workload-timeout 15 --query-timeout 2 --tune-duration-during-hpo 0.03"
else:
assert False

# Clear the E2E testing workspace so we always run the test with a clean slate.
workspace_dpath = get_workspace_dpath(E2ETEST_DBGYM_CONFIG_FPATH)
Expand All @@ -84,21 +92,23 @@ class Stage(Enum):

# Run the full Proto-X training pipeline, asserting things along the way
# Setup (workload and database)
tables_dpath = default_tables_path(workspace_dpath, BENCHMARK, SCALE_FACTOR)
tables_dpath = default_tables_path(workspace_dpath, benchmark_name, scale_factor)
if Stage.Tables in STAGES_TO_RUN:
assert not tables_dpath.exists()
subprocess.run(
f"python task.py benchmark {BENCHMARK} data {SCALE_FACTOR}".split(),
f"python task.py benchmark {benchmark_name} data {scale_factor}".split(),
check=True,
)
assert tables_dpath.exists()

workload_name = get_workload_name(SCALE_FACTOR, "15721_15721_all")
workload_dpath = default_workload_path(workspace_dpath, BENCHMARK, workload_name)
workload_name = get_workload_name(scale_factor, workload_name_suffix)
workload_dpath = default_workload_path(
workspace_dpath, benchmark_name, workload_name
)
if Stage.Workload in STAGES_TO_RUN:
assert not workload_dpath.exists()
subprocess.run(
f"python task.py benchmark {BENCHMARK} workload --scale-factor {SCALE_FACTOR}".split(),
f"python task.py benchmark {benchmark_name} workload --scale-factor {scale_factor}".split(),
check=True,
)
assert workload_dpath.exists()
Expand All @@ -110,68 +120,81 @@ class Stage(Enum):
assert repo_dpath.exists()

pristine_dbdata_snapshot_fpath = default_pristine_dbdata_snapshot_path(
workspace_dpath, BENCHMARK, SCALE_FACTOR
workspace_dpath, benchmark_name, scale_factor
)
if Stage.DBData in STAGES_TO_RUN:
assert not pristine_dbdata_snapshot_fpath.exists()
subprocess.run(
f"python task.py dbms {DBMS} dbdata {BENCHMARK} --scale-factor {SCALE_FACTOR} --intended-dbdata-hardware {intended_dbdata_hardware}".split(),
f"python task.py dbms {DBMS} dbdata {benchmark_name} --scale-factor {scale_factor} --intended-dbdata-hardware {intended_dbdata_hardware}".split(),
check=True,
)
assert pristine_dbdata_snapshot_fpath.exists()

# Tuning (embedding, HPO, and actual tuning)
traindata_dpath = default_traindata_path(workspace_dpath, BENCHMARK, workload_name)
traindata_dpath = default_traindata_path(
workspace_dpath, benchmark_name, workload_name
)
if Stage.EmbeddingData in STAGES_TO_RUN:
assert not traindata_dpath.exists()
subprocess.run(
f"python task.py tune {AGENT} embedding datagen {BENCHMARK} --scale-factor {SCALE_FACTOR} --override-sample-limits lineitem,32768 --intended-dbdata-hardware {intended_dbdata_hardware}".split(),
f"python task.py tune {AGENT} embedding datagen {benchmark_name} --scale-factor {scale_factor} {embedding_datagen_args} --intended-dbdata-hardware {intended_dbdata_hardware}".split(),
check=True,
)
assert traindata_dpath.exists()

embedder_dpath = default_embedder_path(workspace_dpath, BENCHMARK, workload_name)
embedder_dpath = default_embedder_path(
workspace_dpath, benchmark_name, workload_name
)
if Stage.EmbeddingModel in STAGES_TO_RUN:
assert not embedder_dpath.exists()
subprocess.run(
f"python task.py tune {AGENT} embedding train {BENCHMARK} --scale-factor {SCALE_FACTOR} --iterations-per-epoch 1 --num-points-to-sample 1 --num-batches 1 --batch-size 64 --start-epoch 15 --num-samples 4 --train-max-concurrent 4 --num-curate 2".split(),
f"python task.py tune {AGENT} embedding train {benchmark_name} --scale-factor {scale_factor} {embedding_train_args}".split(),
check=True,
)
assert embedder_dpath.exists()

hpoed_agent_params_fpath = default_hpoed_agent_params_path(
workspace_dpath, BENCHMARK, workload_name
workspace_dpath, benchmark_name, workload_name
)
if Stage.TuneHPO in STAGES_TO_RUN:
assert not hpoed_agent_params_fpath.exists()
subprocess.run(
f"python task.py tune {AGENT} agent hpo {BENCHMARK} --scale-factor {SCALE_FACTOR} --num-samples 2 --max-concurrent 2 --workload-timeout 15 --query-timeout 1 --tune-duration-during-hpo 0.01 --intended-dbdata-hardware {intended_dbdata_hardware}".split(),
f"python task.py tune {AGENT} agent hpo {benchmark_name} --scale-factor {scale_factor} {tune_hpo_args} --intended-dbdata-hardware {intended_dbdata_hardware}".split(),
check=True,
)
assert hpoed_agent_params_fpath.exists()

tuning_steps_dpath = default_tuning_steps_dpath(
workspace_dpath, BENCHMARK, workload_name, False
workspace_dpath, benchmark_name, workload_name, False
)
if Stage.TuneTune in STAGES_TO_RUN:
assert not tuning_steps_dpath.exists()
subprocess.run(
f"python task.py tune {AGENT} agent tune {BENCHMARK} --scale-factor {SCALE_FACTOR}".split(),
f"python task.py tune {AGENT} agent tune {benchmark_name} --scale-factor {scale_factor}".split(),
check=True,
)
assert tuning_steps_dpath.exists()

# Post-training (replay and analysis)
# Post-training (replay)
replay_data_fpath = default_replay_data_fpath(
workspace_dpath, BENCHMARK, workload_name, False
workspace_dpath, benchmark_name, workload_name, False
)
if Stage.Replay in STAGES_TO_RUN:
assert not replay_data_fpath.exists()
subprocess.run(
f"python3 task.py tune {AGENT} agent replay {BENCHMARK} --scale-factor {SCALE_FACTOR}".split(),
f"python3 task.py tune {AGENT} agent replay {benchmark_name} --scale-factor {scale_factor}".split(),
check=True,
)
assert replay_data_fpath.exists()

# Clear it at the end as well to avoid leaving artifacts.
clear_workspace(workspace_dpath)


if __name__ == "__main__":
intended_dbdata_hardware = sys.argv[1] if len(sys.argv) > 1 else "hdd"

# Set the config file so that we use resources that don't conflict with normal usage (e.g. a different workspace, different ports, etc.).
os.environ["DBGYM_CONFIG_PATH"] = str(E2ETEST_DBGYM_CONFIG_FPATH)

run_e2e_for_benchmark("tpch", intended_dbdata_hardware)

0 comments on commit d10dc27

Please sign in to comment.