Skip to content

Commit

Permalink
TuningAgent interface (#49)
Browse files Browse the repository at this point in the history
**Summary**: basic `TuningAgent` interface. It provides a common
interface between agents and the replay subsystem (which will be pushed
in a later PR).

**Demo**:
Added test cases that pass.
<img width="802" alt="Screenshot 2024-12-20 at 17 17 59"
src="https://github.com/user-attachments/assets/5c2cf531-d9c8-4326-9e6d-dd519bbec261"
/>

**Details**:
* The core of the interface is the `DBMSConfigDelta` class which
represents the change to the DBMS's config after a single step of
tuning.
* Subclasses only need to override the `TuningAgent._step()` function.
* The main functionality of the class is in automatically saving the
deltas to files.
* CWI's JOB dataset link also stopped working so I made my own:
https://drive.google.com/uc?id=19m0zDpphAw0Bu9Irr_ta9EGr5k85hiN1.
  • Loading branch information
wangpatrick57 authored Dec 21, 2024
1 parent c1f162d commit 842227d
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 24 deletions.
7 changes: 4 additions & 3 deletions benchmark/job/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import click

from benchmark.constants import DEFAULT_SCALE_FACTOR
from benchmark.job.load_info import JobLoadInfo
from util.log import DBGYM_LOGGER_NAME
from util.shell import subprocess_run
from util.workspace import (
Expand All @@ -13,7 +12,8 @@
link_result,
)

JOB_TABLES_URL = "https://homepages.cwi.nl/~boncz/job/imdb.tgz"
# JOB_TABLES_URL = "https://homepages.cwi.nl/~boncz/job/imdb.tgz" # This link stopped working for me
JOB_TABLES_URL = "https://drive.google.com/uc?id=19m0zDpphAw0Bu9Irr_ta9EGr5k85hiN1"
JOB_QUERY_NAMES = [
"1a",
"1b",
Expand Down Expand Up @@ -177,7 +177,8 @@ def _download_job_data(dbgym_cfg: DBGymConfig) -> None:

logging.getLogger(DBGYM_LOGGER_NAME).info(f"Downloading: {expected_symlink_dpath}")
real_data_path = dbgym_cfg.cur_task_runs_data_path(mkdir=True)
subprocess_run(f"curl -O {JOB_TABLES_URL}", cwd=real_data_path)
# subprocess_run(f"curl -O {JOB_TABLES_URL}", cwd=real_data_path) # This is if we're using a non-Google-Drive link
subprocess_run(f"gdown {JOB_TABLES_URL}", cwd=real_data_path)
job_data_dpath = dbgym_cfg.cur_task_runs_data_path(
default_tables_dname(DEFAULT_SCALE_FACTOR), mkdir=True
)
Expand Down
1 change: 1 addition & 0 deletions dependencies/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,4 @@ Werkzeug==3.0.1
wrapt==1.14.1
zipp==3.17.0
streamlit==1.39.0
gdown==5.2.0
30 changes: 10 additions & 20 deletions env/integtest_pg_conn.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import copy
import subprocess
import unittest
from pathlib import Path

import yaml

from env.integtest_util import IntegtestWorkspace
from env.pg_conn import PostgresConn
from util.pg import (
DEFAULT_POSTGRES_PORT,
Expand All @@ -19,27 +16,16 @@
default_pristine_dbdata_snapshot_path,
)

ENV_INTEGTESTS_DBGYM_CONFIG_FPATH = Path("env/env_integtests_dbgym_config.yaml")
BENCHMARK = "tpch"
SCALE_FACTOR = 0.01


def get_unittest_workspace_path() -> Path:
with open(ENV_INTEGTESTS_DBGYM_CONFIG_FPATH) as f:
return Path(yaml.safe_load(f)["dbgym_workspace_path"])
assert False


class PostgresConnTests(unittest.TestCase):
dbgym_cfg: DBGymConfig

@staticmethod
def setUpClass() -> None:
# If you're running the test locally, this check makes runs past the first one much faster.
if not get_unittest_workspace_path().exists():
subprocess.run(["./env/set_up_env_integtests.sh"], check=True)

PostgresConnTests.dbgym_cfg = DBGymConfig(ENV_INTEGTESTS_DBGYM_CONFIG_FPATH)
IntegtestWorkspace.set_up_workspace()

def setUp(self) -> None:
self.assertFalse(
Expand All @@ -48,19 +34,23 @@ def setUp(self) -> None:
+ "to ensure this. Be careful about accidentally taking down other people's Postgres instances though.",
)
self.pristine_dbdata_snapshot_path = default_pristine_dbdata_snapshot_path(
self.dbgym_cfg.dbgym_workspace_path, BENCHMARK, SCALE_FACTOR
IntegtestWorkspace.get_dbgym_cfg().dbgym_workspace_path,
BENCHMARK,
SCALE_FACTOR,
)
self.dbdata_parent_dpath = default_dbdata_parent_dpath(
self.dbgym_cfg.dbgym_workspace_path
IntegtestWorkspace.get_dbgym_cfg().dbgym_workspace_path
)
self.pgbin_dpath = default_pgbin_path(
IntegtestWorkspace.get_dbgym_cfg().dbgym_workspace_path
)
self.pgbin_dpath = default_pgbin_path(self.dbgym_cfg.dbgym_workspace_path)

def tearDown(self) -> None:
self.assertFalse(get_is_postgres_running())

def create_pg_conn(self, pgport: int = DEFAULT_POSTGRES_PORT) -> PostgresConn:
return PostgresConn(
PostgresConnTests.dbgym_cfg,
IntegtestWorkspace.get_dbgym_cfg(),
pgport,
self.pristine_dbdata_snapshot_path,
self.dbdata_parent_dpath,
Expand Down
66 changes: 66 additions & 0 deletions env/integtest_tuning_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import unittest
from typing import Any, Optional

from env.integtest_util import IntegtestWorkspace
from env.tuning_agent import DBMSConfigDelta, TuningAgent


class MockTuningAgent(TuningAgent):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.config_to_return: Optional[DBMSConfigDelta] = None

def _step(self) -> DBMSConfigDelta:
assert self.config_to_return is not None
ret = self.config_to_return
# Setting this ensures you must set self.config_to_return every time.
self.config_to_return = None
return ret


class PostgresConnTests(unittest.TestCase):
@staticmethod
def setUpClass() -> None:
IntegtestWorkspace.set_up_workspace()

@staticmethod
def make_config(letter: str) -> DBMSConfigDelta:
return DBMSConfigDelta([letter], {letter: letter}, {letter: [letter]})

def test_get_step_delta(self) -> None:
agent = MockTuningAgent(IntegtestWorkspace.get_dbgym_cfg())

agent.config_to_return = PostgresConnTests.make_config("a")
agent.step()
agent.config_to_return = PostgresConnTests.make_config("b")
agent.step()
agent.config_to_return = PostgresConnTests.make_config("c")
agent.step()

self.assertEqual(agent.get_step_delta(1), PostgresConnTests.make_config("b"))
self.assertEqual(agent.get_step_delta(0), PostgresConnTests.make_config("a"))
self.assertEqual(agent.get_step_delta(1), PostgresConnTests.make_config("b"))
self.assertEqual(agent.get_step_delta(2), PostgresConnTests.make_config("c"))

def test_get_all_deltas(self) -> None:
agent = MockTuningAgent(IntegtestWorkspace.get_dbgym_cfg())

agent.config_to_return = PostgresConnTests.make_config("a")
agent.step()
agent.config_to_return = PostgresConnTests.make_config("b")
agent.step()
agent.config_to_return = PostgresConnTests.make_config("c")
agent.step()

self.assertEqual(
agent.get_all_deltas(),
[
PostgresConnTests.make_config("a"),
PostgresConnTests.make_config("b"),
PostgresConnTests.make_config("c"),
],
)


if __name__ == "__main__":
unittest.main()
43 changes: 43 additions & 0 deletions env/integtest_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import subprocess
from pathlib import Path
from typing import Optional

import yaml

from util.workspace import DBGymConfig


class IntegtestWorkspace:
"""
This is essentially a singleton class. This avoids multiple integtest_*.py files creating
the workspace and/or the DBGymConfig redundantly.
"""

ENV_INTEGTESTS_DBGYM_CONFIG_FPATH = Path("env/env_integtests_dbgym_config.yaml")
INTEGTEST_DBGYM_CFG: Optional[DBGymConfig] = None

@staticmethod
def set_up_workspace() -> None:
# This if statement prevents us from setting up the workspace twice, which saves time.
if not IntegtestWorkspace.get_workspace_path().exists():
subprocess.run(["./env/set_up_env_integtests.sh"], check=True)

# Once we get here, we have an invariant that the workspace exists. We need this
# invariant to be true in order to create the DBGymConfig.
#
# However, it also can't be created more than once so we need to check `is None`.
if IntegtestWorkspace.INTEGTEST_DBGYM_CFG is None:
IntegtestWorkspace.INTEGTEST_DBGYM_CFG = DBGymConfig(
IntegtestWorkspace.ENV_INTEGTESTS_DBGYM_CONFIG_FPATH
)

@staticmethod
def get_dbgym_cfg() -> DBGymConfig:
assert IntegtestWorkspace.INTEGTEST_DBGYM_CFG is not None
return IntegtestWorkspace.INTEGTEST_DBGYM_CFG

@staticmethod
def get_workspace_path() -> Path:
with open(IntegtestWorkspace.ENV_INTEGTESTS_DBGYM_CONFIG_FPATH) as f:
return Path(yaml.safe_load(f)["dbgym_workspace_path"])
assert False
66 changes: 66 additions & 0 deletions env/tuning_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import json
from dataclasses import asdict, dataclass
from pathlib import Path

from util.workspace import DBGymConfig


@dataclass
class DBMSConfigDelta:
"""
This class represents a DBMS config delta. A "DBMS config" is the indexes, system knobs,
and query knobs set by the tuning agent. A "delta" is the change from the prior config.
`indexes` contains a list of SQL statements for creating indexes. Note that since it's a
config delta, it might contain "DROP ..." statements.
`sysknobs` contains a mapping from knob names to their values.
`qknobs` contains a mapping from query IDs to a list of knobs. Each list contains knobs
to prepend to the start of the query. The knobs are a list[str] instead of a dict[str, str]
because knobs can be settings ("SET (enable_sort on)") or flags ("IndexOnlyScan(it)").
"""

indexes: list[str]
sysknobs: dict[str, str]
qknobs: dict[str, list[str]]


class TuningAgent:
def __init__(self, dbgym_cfg: DBGymConfig) -> None:
self.dbgym_cfg = dbgym_cfg
self.dbms_cfg_deltas_dpath = self.dbgym_cfg.cur_task_runs_artifacts_path(
"dbms_cfg_deltas", mkdir=True
)
self.next_step_num = 0

def step(self) -> None:
"""
This wraps _step() and saves the cfg to a file so that it can be replayed.
"""
curr_step_num = self.next_step_num
self.next_step_num += 1
dbms_cfg_delta = self._step()
with self.get_step_delta_fpath(curr_step_num).open("w") as f:
json.dump(asdict(dbms_cfg_delta), f)

def get_step_delta_fpath(self, step_num: int) -> Path:
return self.dbms_cfg_deltas_dpath / f"step{step_num}_delta.json"

# Subclasses should override this function.
def _step(self) -> DBMSConfigDelta:
"""
This should be overridden by subclasses.
This should return the delta in the config caused by this step.
"""
raise NotImplementedError

def get_step_delta(self, step_num: int) -> DBMSConfigDelta:
assert step_num >= 0 and step_num < self.next_step_num
with self.get_step_delta_fpath(step_num).open("r") as f:
return DBMSConfigDelta(**json.load(f))
assert False

def get_all_deltas(self) -> list[DBMSConfigDelta]:
return [self.get_step_delta(step_num) for step_num in range(self.next_step_num)]
2 changes: 1 addition & 1 deletion util/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import IO, Any, Callable, Optional, Tuple
from typing import IO, Any, Callable, Optional

import redis
import yaml
Expand Down

0 comments on commit 842227d

Please sign in to comment.