diff --git a/benchmark/constants.py b/benchmark/constants.py index 2533501d..363a62ec 100644 --- a/benchmark/constants.py +++ b/benchmark/constants.py @@ -1 +1 @@ -DEFAULT_SCALE_FACTOR = 1 \ No newline at end of file +DEFAULT_SCALE_FACTOR = 1 diff --git a/benchmark/job/load_info.py b/benchmark/job/load_info.py index e24d9a51..bb94bd6c 100644 --- a/benchmark/job/load_info.py +++ b/benchmark/job/load_info.py @@ -52,8 +52,7 @@ def __init__(self, dbgym_cfg: DBGymConfig): dbgym_cfg.dbgym_symlinks_path / JobLoadInfo.CODEBASE_DNAME / "data" ) tables_symlink_dpath = ( - data_root_dpath - / f"{default_tables_dname(DEFAULT_SCALE_FACTOR)}.link" + data_root_dpath / f"{default_tables_dname(DEFAULT_SCALE_FACTOR)}.link" ) tables_dpath = tables_symlink_dpath.resolve() assert ( diff --git a/scripts/run_protox_e2e_test.py b/scripts/run_protox_e2e_test.py index 61a1ac6e..aec9ed38 100644 --- a/scripts/run_protox_e2e_test.py +++ b/scripts/run_protox_e2e_test.py @@ -24,8 +24,6 @@ # Be careful when changing these constants. In some places, the E2E test is hardcoded to work for these specific constants. DBMS = "postgres" AGENT = "protox" -BENCHMARK = "tpch" -SCALE_FACTOR = 0.01 E2ETEST_DBGYM_CONFIG_FPATH = Path("scripts/e2e_test_dbgym_config.yaml") @@ -69,11 +67,21 @@ class Stage(Enum): STAGES_TO_RUN = ALL_STAGES -if __name__ == "__main__": - intended_dbdata_hardware = sys.argv[1] if len(sys.argv) > 1 else "hdd" - - # Set the config file so that we use resources that don't conflict with normal usage (e.g. a different workspace, different ports, etc.). - os.environ["DBGYM_CONFIG_PATH"] = str(E2ETEST_DBGYM_CONFIG_FPATH) +def run_e2e_for_benchmark(benchmark_name: str, intended_dbdata_hardware: str) -> None: + if benchmark_name == "tpch": + scale_factor = 0.01 + workload_name_suffix = "15721_15721_all" + embedding_datagen_args = "--override-sample-limits lineitem,32768" + embedding_train_args = "--iterations-per-epoch 1 --num-points-to-sample 1 --num-batches 1 --batch-size 64 --start-epoch 15 --num-samples 4 --train-max-concurrent 4 --num-curate 2" + tune_hpo_args = "--num-samples 2 --max-concurrent 2 --workload-timeout 15 --query-timeout 1 --tune-duration-during-hpo 0.01" + elif benchmark_name == "job": + scale_factor = 1 + workload_name_suffix = "all" + embedding_datagen_args = "" + embedding_train_args = "--iterations-per-epoch 1 --num-points-to-sample 2 --num-batches 1 --batch-size 64 --start-epoch 15 --num-samples 4 --train-max-concurrent 4 --num-curate 2" + tune_hpo_args = "--num-samples 2 --max-concurrent 2 --workload-timeout 15 --query-timeout 2 --tune-duration-during-hpo 0.03" + else: + assert False # Clear the E2E testing workspace so we always run the test with a clean slate. workspace_dpath = get_workspace_dpath(E2ETEST_DBGYM_CONFIG_FPATH) @@ -84,21 +92,23 @@ class Stage(Enum): # Run the full Proto-X training pipeline, asserting things along the way # Setup (workload and database) - tables_dpath = default_tables_path(workspace_dpath, BENCHMARK, SCALE_FACTOR) + tables_dpath = default_tables_path(workspace_dpath, benchmark_name, scale_factor) if Stage.Tables in STAGES_TO_RUN: assert not tables_dpath.exists() subprocess.run( - f"python task.py benchmark {BENCHMARK} data {SCALE_FACTOR}".split(), + f"python task.py benchmark {benchmark_name} data {scale_factor}".split(), check=True, ) assert tables_dpath.exists() - workload_name = get_workload_name(SCALE_FACTOR, "15721_15721_all") - workload_dpath = default_workload_path(workspace_dpath, BENCHMARK, workload_name) + workload_name = get_workload_name(scale_factor, workload_name_suffix) + workload_dpath = default_workload_path( + workspace_dpath, benchmark_name, workload_name + ) if Stage.Workload in STAGES_TO_RUN: assert not workload_dpath.exists() subprocess.run( - f"python task.py benchmark {BENCHMARK} workload --scale-factor {SCALE_FACTOR}".split(), + f"python task.py benchmark {benchmark_name} workload --scale-factor {scale_factor}".split(), check=True, ) assert workload_dpath.exists() @@ -110,68 +120,81 @@ class Stage(Enum): assert repo_dpath.exists() pristine_dbdata_snapshot_fpath = default_pristine_dbdata_snapshot_path( - workspace_dpath, BENCHMARK, SCALE_FACTOR + workspace_dpath, benchmark_name, scale_factor ) if Stage.DBData in STAGES_TO_RUN: assert not pristine_dbdata_snapshot_fpath.exists() subprocess.run( - f"python task.py dbms {DBMS} dbdata {BENCHMARK} --scale-factor {SCALE_FACTOR} --intended-dbdata-hardware {intended_dbdata_hardware}".split(), + f"python task.py dbms {DBMS} dbdata {benchmark_name} --scale-factor {scale_factor} --intended-dbdata-hardware {intended_dbdata_hardware}".split(), check=True, ) assert pristine_dbdata_snapshot_fpath.exists() # Tuning (embedding, HPO, and actual tuning) - traindata_dpath = default_traindata_path(workspace_dpath, BENCHMARK, workload_name) + traindata_dpath = default_traindata_path( + workspace_dpath, benchmark_name, workload_name + ) if Stage.EmbeddingData in STAGES_TO_RUN: assert not traindata_dpath.exists() subprocess.run( - f"python task.py tune {AGENT} embedding datagen {BENCHMARK} --scale-factor {SCALE_FACTOR} --override-sample-limits lineitem,32768 --intended-dbdata-hardware {intended_dbdata_hardware}".split(), + f"python task.py tune {AGENT} embedding datagen {benchmark_name} --scale-factor {scale_factor} {embedding_datagen_args} --intended-dbdata-hardware {intended_dbdata_hardware}".split(), check=True, ) assert traindata_dpath.exists() - embedder_dpath = default_embedder_path(workspace_dpath, BENCHMARK, workload_name) + embedder_dpath = default_embedder_path( + workspace_dpath, benchmark_name, workload_name + ) if Stage.EmbeddingModel in STAGES_TO_RUN: assert not embedder_dpath.exists() subprocess.run( - f"python task.py tune {AGENT} embedding train {BENCHMARK} --scale-factor {SCALE_FACTOR} --iterations-per-epoch 1 --num-points-to-sample 1 --num-batches 1 --batch-size 64 --start-epoch 15 --num-samples 4 --train-max-concurrent 4 --num-curate 2".split(), + f"python task.py tune {AGENT} embedding train {benchmark_name} --scale-factor {scale_factor} {embedding_train_args}".split(), check=True, ) assert embedder_dpath.exists() hpoed_agent_params_fpath = default_hpoed_agent_params_path( - workspace_dpath, BENCHMARK, workload_name + workspace_dpath, benchmark_name, workload_name ) if Stage.TuneHPO in STAGES_TO_RUN: assert not hpoed_agent_params_fpath.exists() subprocess.run( - f"python task.py tune {AGENT} agent hpo {BENCHMARK} --scale-factor {SCALE_FACTOR} --num-samples 2 --max-concurrent 2 --workload-timeout 15 --query-timeout 1 --tune-duration-during-hpo 0.01 --intended-dbdata-hardware {intended_dbdata_hardware}".split(), + f"python task.py tune {AGENT} agent hpo {benchmark_name} --scale-factor {scale_factor} {tune_hpo_args} --intended-dbdata-hardware {intended_dbdata_hardware}".split(), check=True, ) assert hpoed_agent_params_fpath.exists() tuning_steps_dpath = default_tuning_steps_dpath( - workspace_dpath, BENCHMARK, workload_name, False + workspace_dpath, benchmark_name, workload_name, False ) if Stage.TuneTune in STAGES_TO_RUN: assert not tuning_steps_dpath.exists() subprocess.run( - f"python task.py tune {AGENT} agent tune {BENCHMARK} --scale-factor {SCALE_FACTOR}".split(), + f"python task.py tune {AGENT} agent tune {benchmark_name} --scale-factor {scale_factor}".split(), check=True, ) assert tuning_steps_dpath.exists() - # Post-training (replay and analysis) + # Post-training (replay) replay_data_fpath = default_replay_data_fpath( - workspace_dpath, BENCHMARK, workload_name, False + workspace_dpath, benchmark_name, workload_name, False ) if Stage.Replay in STAGES_TO_RUN: assert not replay_data_fpath.exists() subprocess.run( - f"python3 task.py tune {AGENT} agent replay {BENCHMARK} --scale-factor {SCALE_FACTOR}".split(), + f"python3 task.py tune {AGENT} agent replay {benchmark_name} --scale-factor {scale_factor}".split(), check=True, ) assert replay_data_fpath.exists() # Clear it at the end as well to avoid leaving artifacts. clear_workspace(workspace_dpath) + + +if __name__ == "__main__": + intended_dbdata_hardware = sys.argv[1] if len(sys.argv) > 1 else "hdd" + + # Set the config file so that we use resources that don't conflict with normal usage (e.g. a different workspace, different ports, etc.). + os.environ["DBGYM_CONFIG_PATH"] = str(E2ETEST_DBGYM_CONFIG_FPATH) + + run_e2e_for_benchmark("tpch", intended_dbdata_hardware)