Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gymlib consolidation #68

Merged
merged 16 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmark/cli.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import click
from gymlib.workspace import DBGymWorkspace

from benchmark.job.cli import job_group
from benchmark.tpch.cli import tpch_group
from util.workspace import DBGymWorkspace


@click.group(name="benchmark")
Expand Down
26 changes: 8 additions & 18 deletions benchmark/job/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,15 @@
from typing import Optional

import click
from gymlib.symlinks_paths import (
from gymlib.infra_paths import (
get_tables_dirname,
get_workload_dirname,
get_workload_suffix,
name_to_linkname,
)
from gymlib.workspace import DBGymWorkspace, fully_resolve_path, name_to_linkname

from benchmark.constants import DEFAULT_SCALE_FACTOR
from util.log import DBGYM_LOGGER_NAME
from util.shell import subprocess_run
from util.workspace import DBGymWorkspace, fully_resolve_path

JOB_TABLES_URL = "https://event.cwi.nl/da/job/imdb.tgz"
JOB_QUERIES_URL = "https://event.cwi.nl/da/job/job.tgz"
Expand Down Expand Up @@ -213,12 +211,10 @@ def _download_and_untar_dir(
dbgym_workspace.dbgym_cur_symlinks_path / f"{untarred_dname}.link"
)
if expected_symlink_path.exists():
logging.getLogger(DBGYM_LOGGER_NAME).info(
f"Skipping download: {expected_symlink_path}"
)
logging.info(f"Skipping download: {expected_symlink_path}")
return

logging.getLogger(DBGYM_LOGGER_NAME).info(f"Downloading: {expected_symlink_path}")
logging.info(f"Downloading: {expected_symlink_path}")
subprocess_run(f"curl -O {download_url}", cwd=dbgym_workspace.dbgym_this_run_path)
untarred_data_path = dbgym_workspace.dbgym_this_run_path / untarred_dname

Expand All @@ -243,7 +239,7 @@ def _download_and_untar_dir(
)
symlink_path = dbgym_workspace.link_result(untarred_data_path)
assert expected_symlink_path.samefile(symlink_path)
logging.getLogger(DBGYM_LOGGER_NAME).info(f"Downloaded: {expected_symlink_path}")
logging.info(f"Downloaded: {expected_symlink_path}")


def _generate_job_workload(
Expand All @@ -259,14 +255,10 @@ def _generate_job_workload(
name_to_linkname(workload_name)
)
if expected_workload_symlink_path.exists():
logging.getLogger(DBGYM_LOGGER_NAME).info(
f"Skipping generation: {expected_workload_symlink_path}"
)
logging.info(f"Skipping generation: {expected_workload_symlink_path}")
return

logging.getLogger(DBGYM_LOGGER_NAME).info(
f"Generating: {expected_workload_symlink_path}"
)
logging.info(f"Generating: {expected_workload_symlink_path}")
workload_path = dbgym_workspace.dbgym_this_run_path / workload_name
workload_path.mkdir(parents=False, exist_ok=False)

Expand All @@ -291,6 +283,4 @@ def _generate_job_workload(

workload_symlink_path = dbgym_workspace.link_result(workload_path)
assert workload_symlink_path == expected_workload_symlink_path
logging.getLogger(DBGYM_LOGGER_NAME).info(
f"Generated: {expected_workload_symlink_path}"
)
logging.info(f"Generated: {expected_workload_symlink_path}")
4 changes: 2 additions & 2 deletions benchmark/job/load_info.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from pathlib import Path
from typing import Optional

from gymlib.symlinks_paths import get_tables_symlink_path
from gymlib.infra_paths import get_tables_symlink_path
from gymlib.workspace import DBGymWorkspace, fully_resolve_path

from benchmark.constants import DEFAULT_SCALE_FACTOR
from dbms.load_info_base_class import LoadInfoBaseClass
from util.workspace import DBGymWorkspace, fully_resolve_path

JOB_SCHEMA_FNAME = "job_schema.sql"

Expand Down
12 changes: 6 additions & 6 deletions benchmark/tests/integtest_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,22 @@
import unittest
from pathlib import Path

from gymlib.symlinks_paths import (
from gymlib.infra_paths import (
get_tables_symlink_path,
get_workload_suffix,
get_workload_symlink_path,
)
from gymlib.workspace import (
DBGymWorkspace,
fully_resolve_path,
get_workspace_path_from_config,
)

# It's ok to import private functions from the benchmark module because this is an integration test.
from benchmark.constants import DEFAULT_SCALE_FACTOR
from benchmark.job.cli import _job_tables, _job_workload
from benchmark.tpch.cli import _tpch_tables, _tpch_workload
from benchmark.tpch.constants import DEFAULT_TPCH_SEED
from util.workspace import (
DBGymWorkspace,
fully_resolve_path,
get_workspace_path_from_config,
)


class BenchmarkTests(unittest.TestCase):
Expand Down
49 changes: 17 additions & 32 deletions benchmark/tpch/cli.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
import logging

import click
from gymlib.symlinks_paths import (
from gymlib.infra_paths import (
get_scale_factor_string,
get_tables_dirname,
get_tables_symlink_path,
get_workload_suffix,
get_workload_symlink_path,
)
from gymlib.workspace import (
DBGymWorkspace,
fully_resolve_path,
is_fully_resolved,
linkname_to_name,
name_to_linkname,
)

from benchmark.constants import DEFAULT_SCALE_FACTOR
from benchmark.tpch.constants import DEFAULT_TPCH_SEED, NUM_TPCH_QUERIES
from util.log import DBGYM_LOGGER_NAME
from util.shell import subprocess_run
from util.workspace import DBGymWorkspace, fully_resolve_path, is_fully_resolved

TPCH_KIT_DIRNAME = "tpch-kit"

Expand Down Expand Up @@ -102,12 +105,10 @@ def _clone_tpch_kit(dbgym_workspace: DBGymWorkspace) -> None:
name_to_linkname(TPCH_KIT_DIRNAME)
)
if expected_symlink_path.exists():
logging.getLogger(DBGYM_LOGGER_NAME).info(
f"Skipping clone: {expected_symlink_path}"
)
logging.info(f"Skipping clone: {expected_symlink_path}")
return

logging.getLogger(DBGYM_LOGGER_NAME).info(f"Cloning: {expected_symlink_path}")
logging.info(f"Cloning: {expected_symlink_path}")
subprocess_run(
f"./clone_tpch_kit.sh {dbgym_workspace.dbgym_this_run_path}",
cwd=dbgym_workspace.base_dbgym_repo_path / "benchmark" / "tpch",
Expand All @@ -116,7 +117,7 @@ def _clone_tpch_kit(dbgym_workspace: DBGymWorkspace) -> None:
dbgym_workspace.dbgym_this_run_path / TPCH_KIT_DIRNAME
)
assert expected_symlink_path.samefile(symlink_path)
logging.getLogger(DBGYM_LOGGER_NAME).info(f"Cloned: {expected_symlink_path}")
logging.info(f"Cloned: {expected_symlink_path}")


def _generate_tpch_queries(
Expand All @@ -125,9 +126,7 @@ def _generate_tpch_queries(
tpch_kit_path = dbgym_workspace.dbgym_cur_symlinks_path / (
name_to_linkname(TPCH_KIT_DIRNAME)
)
logging.getLogger(DBGYM_LOGGER_NAME).info(
f"Generating queries: [{seed_start}, {seed_end}]"
)
logging.info(f"Generating queries: [{seed_start}, {seed_end}]")
for seed in range(seed_start, seed_end + 1):
expected_queries_symlink_path = dbgym_workspace.dbgym_cur_symlinks_path / (
name_to_linkname(_get_queries_dirname(seed, scale_factor))
Expand All @@ -149,9 +148,7 @@ def _generate_tpch_queries(
)
queries_symlink_path = dbgym_workspace.link_result(queries_parent_path)
assert queries_symlink_path.samefile(expected_queries_symlink_path)
logging.getLogger(DBGYM_LOGGER_NAME).info(
f"Generated queries: [{seed_start}, {seed_end}]"
)
logging.info(f"Generated queries: [{seed_start}, {seed_end}]")


def _generate_tpch_tables(dbgym_workspace: DBGymWorkspace, scale_factor: float) -> None:
Expand All @@ -162,14 +159,10 @@ def _generate_tpch_tables(dbgym_workspace: DBGymWorkspace, scale_factor: float)
dbgym_workspace.dbgym_workspace_path, "tpch", scale_factor
)
if expected_tables_symlink_path.exists():
logging.getLogger(DBGYM_LOGGER_NAME).info(
f"Skipping generation: {expected_tables_symlink_path}"
)
logging.info(f"Skipping generation: {expected_tables_symlink_path}")
return

logging.getLogger(DBGYM_LOGGER_NAME).info(
f"Generating: {expected_tables_symlink_path}"
)
logging.info(f"Generating: {expected_tables_symlink_path}")
subprocess_run(f"./dbgen -vf -s {scale_factor}", cwd=tpch_kit_path / "dbgen")
tables_parent_path = dbgym_workspace.dbgym_this_run_path / get_tables_dirname(
"tpch", scale_factor
Expand All @@ -179,9 +172,7 @@ def _generate_tpch_tables(dbgym_workspace: DBGymWorkspace, scale_factor: float)

tables_symlink_path = dbgym_workspace.link_result(tables_parent_path)
assert tables_symlink_path.samefile(expected_tables_symlink_path)
logging.getLogger(DBGYM_LOGGER_NAME).info(
f"Generated: {expected_tables_symlink_path}"
)
logging.info(f"Generated: {expected_tables_symlink_path}")


def _generate_tpch_workload(
Expand All @@ -200,14 +191,10 @@ def _generate_tpch_workload(
),
)
if expected_workload_symlink_path.exists():
logging.getLogger(DBGYM_LOGGER_NAME).info(
f"Skipping generation: {expected_workload_symlink_path}"
)
logging.info(f"Skipping generation: {expected_workload_symlink_path}")
return

logging.getLogger(DBGYM_LOGGER_NAME).info(
f"Generating: {expected_workload_symlink_path}"
)
logging.info(f"Generating: {expected_workload_symlink_path}")
workload_path = dbgym_workspace.dbgym_this_run_path / linkname_to_name(
expected_workload_symlink_path.name
)
Expand Down Expand Up @@ -238,6 +225,4 @@ def _generate_tpch_workload(

workload_symlink_path = dbgym_workspace.link_result(workload_path)
assert workload_symlink_path == expected_workload_symlink_path
logging.getLogger(DBGYM_LOGGER_NAME).info(
f"Generated: {expected_workload_symlink_path}"
)
logging.info(f"Generated: {expected_workload_symlink_path}")
4 changes: 2 additions & 2 deletions benchmark/tpch/load_info.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from pathlib import Path
from typing import Optional

from gymlib.symlinks_paths import get_tables_symlink_path
from gymlib.infra_paths import get_tables_symlink_path
from gymlib.workspace import DBGymWorkspace, fully_resolve_path

from dbms.load_info_base_class import LoadInfoBaseClass
from util.workspace import DBGymWorkspace, fully_resolve_path

TPCH_SCHEMA_FNAME = "tpch_schema.sql"
TPCH_CONSTRAINTS_FNAME = "tpch_constraints.sql"
Expand Down
2 changes: 1 addition & 1 deletion dbms/cli.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import click
from gymlib.workspace import DBGymWorkspace

from dbms.postgres.cli import postgres_group
from util.workspace import DBGymWorkspace


@click.group(name="dbms")
Expand Down
67 changes: 30 additions & 37 deletions dbms/postgres/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,39 @@
import shutil
import subprocess
from pathlib import Path
from typing import Optional
from typing import Any, Optional

import click
import sqlalchemy
from gymlib.symlinks_paths import (
from gymlib.infra_paths import (
get_dbdata_tgz_symlink_path,
get_pgbin_symlink_path,
get_repo_symlink_path,
linkname_to_name,
)

from benchmark.constants import DEFAULT_SCALE_FACTOR
from benchmark.job.load_info import JobLoadInfo
from benchmark.tpch.load_info import TpchLoadInfo
from dbms.load_info_base_class import LoadInfoBaseClass
from util.log import DBGYM_LOGGER_NAME
from util.pg import (
DBGYM_POSTGRES_DBNAME,
DBGYM_POSTGRES_PASS,
DBGYM_POSTGRES_USER,
DEFAULT_POSTGRES_DBNAME,
DEFAULT_POSTGRES_PORT,
SHARED_PRELOAD_LIBRARIES,
create_sqlalchemy_conn,
sql_file_execute,
sqlalchemy_conn_execute,
)
from util.shell import subprocess_run
from util.workspace import (
from gymlib.pg import create_sqlalchemy_conn, sql_file_execute
from gymlib.workspace import (
WORKSPACE_PATH_PLACEHOLDER,
DBGymWorkspace,
fully_resolve_path,
get_tmp_path_from_workspace_path,
is_fully_resolved,
is_ssd,
linkname_to_name,
)
from sqlalchemy import text

from benchmark.constants import DEFAULT_SCALE_FACTOR
from benchmark.job.load_info import JobLoadInfo
from benchmark.tpch.load_info import TpchLoadInfo
from dbms.load_info_base_class import LoadInfoBaseClass
from util.shell import subprocess_run

DBGYM_POSTGRES_USER = "dbgym_user"
DBGYM_POSTGRES_PASS = "dbgym_pass"
DBGYM_POSTGRES_DBNAME = "dbgym"
DEFAULT_POSTGRES_DBNAME = "postgres"
DEFAULT_POSTGRES_PORT = 5432
SHARED_PRELOAD_LIBRARIES = "boot,pg_hint_plan,pg_prewarm"


@click.group(name="postgres")
Expand Down Expand Up @@ -72,14 +69,10 @@ def _postgres_build(dbgym_workspace: DBGymWorkspace, rebuild: bool) -> None:
dbgym_workspace.dbgym_workspace_path
)
if not rebuild and expected_repo_symlink_path.exists():
logging.getLogger(DBGYM_LOGGER_NAME).info(
f"Skipping _postgres_build: {expected_repo_symlink_path}"
)
logging.info(f"Skipping _postgres_build: {expected_repo_symlink_path}")
return

logging.getLogger(DBGYM_LOGGER_NAME).info(
f"Setting up repo in {expected_repo_symlink_path}"
)
logging.info(f"Setting up repo in {expected_repo_symlink_path}")
repo_real_path = dbgym_workspace.dbgym_this_run_path / "repo"
repo_real_path.mkdir(parents=False, exist_ok=False)
subprocess_run(
Expand All @@ -90,9 +83,7 @@ def _postgres_build(dbgym_workspace: DBGymWorkspace, rebuild: bool) -> None:
# only link at the end so that the link only ever points to a complete repo
repo_symlink_path = dbgym_workspace.link_result(repo_real_path)
assert expected_repo_symlink_path.samefile(repo_symlink_path)
logging.getLogger(DBGYM_LOGGER_NAME).info(
f"Set up repo in {expected_repo_symlink_path}"
)
logging.info(f"Set up repo in {expected_repo_symlink_path}")


@postgres_group.command(
Expand Down Expand Up @@ -198,9 +189,7 @@ def _create_dbdata(
scale_factor,
)
if expected_dbdata_tgz_symlink_path.exists():
logging.getLogger(DBGYM_LOGGER_NAME).info(
f"Skipping _create_dbdata: {expected_dbdata_tgz_symlink_path}"
)
logging.info(f"Skipping _create_dbdata: {expected_dbdata_tgz_symlink_path}")
return

# It's ok for the dbdata/ directory to be temporary. It just matters that the .tgz is saved in a safe place.
Expand Down Expand Up @@ -236,9 +225,7 @@ def _create_dbdata(
# Only link at the end so that the link only ever points to a complete dbdata.
dbdata_tgz_symlink_path = dbgym_workspace.link_result(dbdata_tgz_real_path)
assert expected_dbdata_tgz_symlink_path.samefile(dbdata_tgz_symlink_path)
logging.getLogger(DBGYM_LOGGER_NAME).info(
f"Created dbdata in {dbdata_tgz_symlink_path}"
)
logging.info(f"Created dbdata in {dbdata_tgz_symlink_path}")


def _generic_dbdata_setup(dbgym_workspace: DBGymWorkspace) -> None:
Expand Down Expand Up @@ -370,3 +357,9 @@ def _start_or_stop_postgres(
f"./pg_ctl -D \"{dbdata_path}\" -o '-p {pgport}' stop",
cwd=pgbin_path,
)


def sqlalchemy_conn_execute(
conn: sqlalchemy.Connection, sql: str
) -> sqlalchemy.engine.CursorResult[Any]:
return conn.execute(text(sql))
Loading
Loading