Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconfiguration demo #47

Merged
merged 17 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
# Integration tests do require external systems to be running (most commonly a database instance).
# Unlike end-to-end tests though, they test a specific module in a detailed manner, much like a unit test does.
env:
# We set `INTENDED_DBDATA_HARDWARE` so that it's seen when `integtest_pg_conn.py` executes `./tune/env/set_up_env_integtests.sh`.
# We set `INTENDED_DBDATA_HARDWARE` so that it's seen when `integtest_pg_conn.py` executes `./env/set_up_env_integtests.sh`.
INTENDED_DBDATA_HARDWARE: ssd
run: |
. "$HOME/.cargo/env"
Expand Down
Empty file added env/__init__.py
Empty file.
74 changes: 66 additions & 8 deletions tune/env/integtest_pg_conn.py → env/integtest_pg_conn.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import copy
import subprocess
import unittest
from pathlib import Path

import yaml

from tune.env.pg_conn import PostgresConn
from env.pg_conn import PostgresConn
from util.pg import (
DEFAULT_POSTGRES_PORT,
get_is_postgres_running,
Expand All @@ -18,7 +19,7 @@
default_pristine_dbdata_snapshot_path,
)

ENV_INTEGTESTS_DBGYM_CONFIG_FPATH = Path("tune/env/env_integtests_dbgym_config.yaml")
ENV_INTEGTESTS_DBGYM_CONFIG_FPATH = Path("env/env_integtests_dbgym_config.yaml")
BENCHMARK = "tpch"
SCALE_FACTOR = 0.01

Expand All @@ -36,14 +37,14 @@ class PostgresConnTests(unittest.TestCase):
def setUpClass() -> None:
# If you're running the test locally, this check makes runs past the first one much faster.
if not get_unittest_workspace_path().exists():
subprocess.run(["./tune/env/set_up_env_integtests.sh"], check=True)
subprocess.run(["./env/set_up_env_integtests.sh"], check=True)

PostgresConnTests.dbgym_cfg = DBGymConfig(ENV_INTEGTESTS_DBGYM_CONFIG_FPATH)

def setUp(self) -> None:
self.assertFalse(
get_is_postgres_running(),
"Make sure Postgres isn't running before starting the integration test. `pkill postgres` is one way"
"Make sure Postgres isn't running before starting the integration test. `pkill postgres` is one way "
+ "to ensure this. Be careful about accidentally taking down other people's Postgres instances though.",
)
self.pristine_dbdata_snapshot_path = default_pristine_dbdata_snapshot_path(
Expand Down Expand Up @@ -74,18 +75,18 @@ def test_init(self) -> None:
def test_start_and_stop(self) -> None:
pg_conn = self.create_pg_conn()
pg_conn.restore_pristine_snapshot()
pg_conn.start_with_changes()
pg_conn.restart_postgres()
self.assertTrue(get_is_postgres_running())
pg_conn.shutdown_postgres()

def test_start_on_multiple_ports(self) -> None:
pg_conn0 = self.create_pg_conn()
pg_conn0.restore_pristine_snapshot()
pg_conn0.start_with_changes()
pg_conn0.restart_postgres()
self.assertEqual(set(get_running_postgres_ports()), {DEFAULT_POSTGRES_PORT})
pg_conn1 = self.create_pg_conn(DEFAULT_POSTGRES_PORT + 1)
pg_conn1.restore_pristine_snapshot()
pg_conn1.start_with_changes()
pg_conn1.restart_postgres()
self.assertEqual(
set(get_running_postgres_ports()),
{DEFAULT_POSTGRES_PORT, DEFAULT_POSTGRES_PORT + 1},
Expand All @@ -99,7 +100,7 @@ def test_connect_and_disconnect(self) -> None:
# Setup
pg_conn = self.create_pg_conn()
pg_conn.restore_pristine_snapshot()
pg_conn.start_with_changes()
pg_conn.restart_postgres()

# Test
self.assertIsNone(pg_conn._conn)
Expand All @@ -115,6 +116,63 @@ def test_connect_and_disconnect(self) -> None:
# Cleanup
pg_conn.shutdown_postgres()

def test_start_with_changes(self) -> None:
# Setup
pg_conn = self.create_pg_conn()
pg_conn.restore_pristine_snapshot()
pg_conn.restart_postgres()

# Test
initial_sysknobs = pg_conn.get_system_knobs()
self.assertEqual(initial_sysknobs["wal_buffers"], "4MB")
pg_conn.restart_with_changes({"wal_buffers": "8MB"})
new_sysknobs = pg_conn.get_system_knobs()
self.assertEqual(new_sysknobs["wal_buffers"], "8MB")

# Cleanup
pg_conn.shutdown_postgres()

def test_multiple_start_with_changes(self) -> None:
# Setup
pg_conn = self.create_pg_conn()
pg_conn.restore_pristine_snapshot()
pg_conn.restart_postgres()

# Test
initial_sysknobs = pg_conn.get_system_knobs()

# First call
self.assertEqual(initial_sysknobs["wal_buffers"], "4MB")
pg_conn.restart_with_changes({"wal_buffers": "8MB"})
new_sysknobs = pg_conn.get_system_knobs()
self.assertEqual(new_sysknobs["wal_buffers"], "8MB")

# Second call
self.assertEqual(initial_sysknobs["enable_nestloop"], "on")
pg_conn.restart_with_changes({"enable_nestloop": "off"})
new_sysknobs = pg_conn.get_system_knobs()
self.assertEqual(new_sysknobs["enable_nestloop"], "off")
# The changes should not be additive. The "wal_buffers" should have "reset" to 4MB.
self.assertEqual(new_sysknobs["wal_buffers"], "4MB")

# Cleanup
pg_conn.shutdown_postgres()

def test_start_with_changes_doesnt_modify_input(self) -> None:
# Setup
pg_conn = self.create_pg_conn()
pg_conn.restore_pristine_snapshot()
pg_conn.restart_postgres()

# Test
conf_changes = {"wal_buffers": "8MB"}
orig_conf_changes = copy.deepcopy(conf_changes)
pg_conn.restart_with_changes(conf_changes)
self.assertEqual(conf_changes, orig_conf_changes)

# Cleanup
pg_conn.shutdown_postgres()


if __name__ == "__main__":
unittest.main()
56 changes: 45 additions & 11 deletions tune/env/pg_conn.py → env/pg_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,27 +129,40 @@ def shutdown_postgres(self) -> None:
if not exists and retcode != 0:
break

def start_with_changes(
def restart_postgres(self) -> bool:
# TODO: check if we still get the shared preload libraries correctly if we do None
return self.restart_with_changes(conf_changes=None)

def restart_with_changes(
self,
conf_changes: Optional[list[str]] = None,
conf_changes: Optional[dict[str, str]],
dump_page_cache: bool = False,
save_checkpoint: bool = False,
) -> bool:
"""
This function is called "(re)start" because it also shuts down Postgres before starting it.
This function assumes that some snapshot has already been untarred into self.dbdata_dpath.
You can do this by calling one of the wrappers around _restore_snapshot().

Note that multiple calls are not "additive". Calling this will restart from the latest saved
snapshot. If you want it to be additive without the overhead of saving a snapshot, pass in
multiple changes to `conf_changes`.
"""
# Install the new configuration changes.
if conf_changes is not None:
if SHARED_PRELOAD_LIBRARIES:
# This way of doing it works for both single or multiple libraries. An example of a way
# that *doesn't* work is `f"shared_preload_libraries='"{SHARED_PRELOAD_LIBRARIES}"'"`
conf_changes.append(
f"shared_preload_libraries='{SHARED_PRELOAD_LIBRARIES}'"
)
dbdata_auto_conf_path = self.dbdata_dpath / "postgresql.auto.conf"
with open(dbdata_auto_conf_path, "w") as f:
f.write("\n".join(conf_changes))
f.write(
"\n".join([f"{knob} = {val}" for knob, val in conf_changes.items()])
+ "\n"
)

assert (
"shared_preload_libraries" not in conf_changes
), f"You should not set shared_preload_libraries manually."

# Using single quotes around SHARED_PRELOAD_LIBRARIES works for both single or multiple libraries.
f.write(f"shared_preload_libraries = '{SHARED_PRELOAD_LIBRARIES}'")

# Start postgres instance.
self.shutdown_postgres()
Expand Down Expand Up @@ -295,7 +308,16 @@ def _set_up_boot(
logging.getLogger(DBGYM_LOGGER_NAME).debug("Set up boot")

def psql(self, sql: str) -> tuple[int, Optional[str]]:
low_sql = sql.lower()
"""
Execute a SQL command (equivalent to psql -C "[cmd]") and return a status code and its stderr.

This is meant for commands that modify the database, not those that get information from the database, which
is why it doesn't return a Cursor with the result. I designed it this way because it's difficult to provide
a general-purpose API which returns results for arbitrary SQL queries as those results could be very large.

A return code of 0 means success while a non-zero return code means failure. The stderr will be None if success
and a string if failure.
"""

def cancel_fn(conn_str: str) -> None:
with psycopg.connect(
Expand Down Expand Up @@ -350,6 +372,18 @@ def cancel_fn(conn_str: str) -> None:
self.disconnect()
return 0, None

def get_system_knobs(self) -> dict[str, str]:
"""
System knobs are those applied across the entire system. They do not include table-specific
knobs, query-specific knobs (aka query hints), or indexes.
"""
conn = self.conn()
result = conn.execute("SHOW ALL").fetchall()
knobs = {}
for row in result:
knobs[row[0]] = row[1]
return knobs

def restore_pristine_snapshot(self) -> bool:
return self._restore_snapshot(self.pristine_dbdata_snapshot_fpath)

Expand Down Expand Up @@ -381,4 +415,4 @@ def _restore_snapshot(
>> f"{self.dbdata_dpath}/postgresql.conf"
)()

return self.start_with_changes(conf_changes=None)
return self.restart_postgres()
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ set -euxo pipefail
INTENDED_DBDATA_HARDWARE="${INTENDED_DBDATA_HARDWARE:-hdd}"
BENCHMARK=tpch
SCALE_FACTOR=0.01
export DBGYM_CONFIG_PATH=tune/env/env_integtests_dbgym_config.yaml # Note that this envvar needs to be exported.
export DBGYM_CONFIG_PATH=env/env_integtests_dbgym_config.yaml # Note that this envvar needs to be exported.
WORKSPACE_PATH=$(grep 'dbgym_workspace_path:' $DBGYM_CONFIG_PATH | sed 's/dbgym_workspace_path: //')

python3 task.py benchmark $BENCHMARK data $SCALE_FACTOR
Expand Down
2 changes: 2 additions & 0 deletions scripts/run_demo.sh
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
#!/bin/bash
# You may need to do `pkill python` to fully restart the streamlit server. If you do not do this, objects cached
# with @st.cache_resource may still be persisted even after you do Ctrl-C and rerun ./scripts/run_demo.sh.
python -m streamlit run tune/demo/main.py
65 changes: 58 additions & 7 deletions tune/demo/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import streamlit as st

from tune.env.pg_conn import PostgresConn
from env.pg_conn import PostgresConn
from util.pg import DEFAULT_POSTGRES_PORT, get_is_postgres_running
from util.workspace import (
DEFAULT_BOOT_CONFIG_FPATH,
Expand All @@ -12,9 +12,28 @@
)


# This ensures that DBGymConfig is only created once. Check DBGymConfig.__init__() for why we must do this.
# The rationale behind this code is very subtle. I'll first go over streamlit concepts before describing why this function exists.
#
# First, in streamlit, there are three kinds of "script reruns". These are ordered from least to most "disruptive":
# 1. st.rerun(). Will reset any local variables but will not reset st.session_state.
# 2. Reloading the browser page (perhaps if you changed some code). Will reset local vars and st.session_state but not things
# cached with @st.cache_resource.
# 3. Restarting the streamlit server. If you're running the server locally, you can restart it by doing Ctrl-C, `pkill python`,
# and then `streamlit run ...` (or `./scripts/run_demo.sh`). Will reset local vars, st.session_state, and things cached with
# @st.cache_resource, but will not reset things persisted to disk (though we currently don't persist anything to disk). Doing
# `pkill python` is critical here to actually reset the things cached with @st.cache_resource.
#
# Next, DBGymConfig has a safeguard where it can only be created once per instance of the Python interpreter. If you just put it
# in st.session_state, it would get re-created when you reloaded the browser page, causing it to trip the assertion that checks
# DBGymConfig.num_times_created_this_run == 1. Thus, we use @st.cache_resource to avoid this.
#
# I considered modifying num_times_created_this_run to instead be num_active_instances and doing `num_active_instances -= 1` in
# DBGymConfig.__del__(). However, streamlit doesn't actually destroy objects when you reload the browser page; it only destroys
# objects when you restart the streamlit server.
#
# If you modify the code of DBGymConfig, you will need to fully restart the streamlit server for those changes to be propagated.
@st.cache_resource
def make_dbgym_cfg() -> DBGymConfig:
def make_dbgym_cfg_cached() -> DBGymConfig:
return make_standard_dbgym_cfg()


Expand All @@ -23,7 +42,7 @@ class Demo:
SCALE_FACTOR = 0.01

def __init__(self) -> None:
self.dbgym_cfg = make_dbgym_cfg()
self.dbgym_cfg = make_dbgym_cfg_cached()
self.pristine_dbdata_snapshot_path = default_pristine_dbdata_snapshot_path(
self.dbgym_cfg.dbgym_workspace_path, Demo.BENCHMARK, Demo.SCALE_FACTOR
)
Expand All @@ -41,21 +60,53 @@ def __init__(self) -> None:
DEFAULT_BOOT_CONFIG_FPATH,
)

def _get_categorized_system_knobs(self) -> tuple[dict[str, str], dict[str, str]]:
IMPORTANT_KNOBS = {"shared_buffers", "enable_nestloop"}
all_knobs = self.pg_conn.get_system_knobs()
important_knobs = {
knob: val for knob, val in all_knobs.items() if knob in IMPORTANT_KNOBS
}
unimportant_knobs = {
knob: val for knob, val in all_knobs.items() if knob not in IMPORTANT_KNOBS
}
return important_knobs, unimportant_knobs

def main(self) -> None:
is_postgres_running = get_is_postgres_running()

if is_postgres_running:
st.write("Postgres is running")
st.write("Postgres is RUNNING")

if st.button("Stop Postgres"):
self.pg_conn.shutdown_postgres()
st.rerun()

with st.form("reconfig", clear_on_submit=True, enter_to_submit=False):
knob = st.text_input("Knob", placeholder="Enter text here...")
val = st.text_input("Value", placeholder="Enter text here...")
submit_button = st.form_submit_button("Reconfigure")
if submit_button:
if knob != "" and val != "":
if "conf_changes" not in st.session_state:
st.session_state.conf_changes = dict()

# By using st.session_state, we persist changes across st.rerun() (though not across reloading the browser).
st.session_state.conf_changes[knob] = val
self.pg_conn.restart_with_changes(st.session_state.conf_changes)
st.rerun()

important_knobs, unimportant_knobs = self._get_categorized_system_knobs()
with st.expander("Important knobs", expanded=True):
st.write(important_knobs)

with st.expander("Other knobs", expanded=False):
st.write(unimportant_knobs)
else:
st.write("Postgres is not running")
st.write("Postgres is STOPPED")

if st.button("Start Postgres"):
self.pg_conn.restore_pristine_snapshot()
self.pg_conn.start_with_changes()
self.pg_conn.restart_postgres()
st.rerun()


Expand Down
4 changes: 0 additions & 4 deletions tune/env/__init__.py

This file was deleted.

4 changes: 2 additions & 2 deletions tune/protox/agent/build_trial.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from torch import nn
from torch.optim import Adam # type: ignore[attr-defined]

from tune.env.pg_conn import PostgresConn
from env.pg_conn import PostgresConn
from tune.protox.agent.agent_env import AgentEnv
from tune.protox.agent.buffers import ReplayBuffer
from tune.protox.agent.noise import ClampNoise
Expand Down Expand Up @@ -163,7 +163,7 @@ def _build_utilities(
artifact_manager=artifact_manager,
)

# If we're using Boot, PostgresConn.start_with_changes() assumes that Redis is running. Thus,
# If we're using Boot, PostgresConn.restart_postgres() assumes that Redis is running. Thus,
# we start Redis here if necessary.
enable_boot = hpo_params["enable_boot"][str(tuning_mode)]
if enable_boot:
Expand Down
Loading
Loading