From c509893e67336d9ca62835f8cc40025d6c980385 Mon Sep 17 00:00:00 2001 From: Patrick Wang Date: Tue, 3 Sep 2024 00:37:05 +0000 Subject: [PATCH] format --- tune/protox/embedding/datagen.py | 71 +++++++++++++++++++++--------- tune/protox/embedding/select.py | 17 ++++--- tune/protox/embedding/train_all.py | 10 +++-- 3 files changed, 69 insertions(+), 29 deletions(-) diff --git a/tune/protox/embedding/datagen.py b/tune/protox/embedding/datagen.py index b5ff7897..aa75d280 100644 --- a/tune/protox/embedding/datagen.py +++ b/tune/protox/embedding/datagen.py @@ -39,7 +39,12 @@ ) from tune.protox.embedding.loss import COST_COLUMNS from tune.protox.env.space.primitive_space.index_space import IndexSpace -from tune.protox.env.types import QuerySpec, QueryType, TableAttrAccessSetsMap, TableAttrListMap +from tune.protox.env.types import ( + QuerySpec, + QueryType, + TableAttrAccessSetsMap, + TableAttrListMap, +) from tune.protox.env.workload import Workload from util.pg import create_psycopg_conn from util.shell import subprocess_run @@ -52,7 +57,9 @@ # pass -QueryBatches = NewType("QueryBatches", list[tuple[str, list[tuple[QueryType, str]], Any]]) +QueryBatches = NewType( + "QueryBatches", list[tuple[str, list[tuple[QueryType, str]], Any]] +) # click steup @@ -254,7 +261,9 @@ def datagen( assert False # Process the "data structure" args - leading_col_tbls_parsed: list[str] = [] if leading_col_tbls is None else leading_col_tbls.split(",") + leading_col_tbls_parsed: list[str] = ( + [] if leading_col_tbls is None else leading_col_tbls.split(",") + ) # I chose to only use the "," delimiter in override_sample_limits_str, so the dictionary is encoded as [key],[value],[key],[value] # I felt this was better than introducing a new delimiter which might conflict with the name of a table override_sample_limits_parsed: dict[str, int] = dict() @@ -378,7 +387,9 @@ def __init__( class EmbeddingFileGenArgs: """Same comment as EmbeddingDatagenGenericArgs""" - def __init__(self, table_shape: bool, dual_class: bool, pad_min: int, rebias: float): + def __init__( + self, table_shape: bool, dual_class: bool, pad_min: int, rebias: float + ): self.table_shape = table_shape self.dual_class = dual_class self.pad_min = pad_min @@ -389,7 +400,11 @@ def get_traindata_dir(dbgym_cfg: DBGymConfig) -> Path: return dbgym_cfg.dbgym_this_run_path / "traindata_dir" -def _gen_traindata_dir(dbgym_cfg: DBGymConfig, generic_args: EmbeddingDatagenGenericArgs, dir_gen_args: EmbeddingDirGenArgs) -> None: +def _gen_traindata_dir( + dbgym_cfg: DBGymConfig, + generic_args: EmbeddingDatagenGenericArgs, + dir_gen_args: EmbeddingDirGenArgs, +) -> None: with open_and_save(dbgym_cfg, generic_args.benchmark_config_path, "r") as f: benchmark_config = yaml.safe_load(f) @@ -408,7 +423,11 @@ def _gen_traindata_dir(dbgym_cfg: DBGymConfig, generic_args: EmbeddingDatagenGen results = [] job_id = 0 for tbl in tables: - cols: list[Optional[str]] = [None] if tbl not in dir_gen_args.leading_col_tbls else cast(list[Optional[str]], modified_attrs[tbl]) + cols: list[Optional[str]] = ( + [None] + if tbl not in dir_gen_args.leading_col_tbls + else cast(list[Optional[str]], modified_attrs[tbl]) + ) for colidx, col in enumerate(cols): if col is None: output = traindata_dir / tbl @@ -607,7 +626,9 @@ def _augment_query_data(workload: Workload, data: dict[str, float]) -> dict[str, return data -def _execute_explains(cursor: psycopg.Cursor[Any], batches: QueryBatches, models: Optional[dict[Any, Any]]) -> dict[str, float]: +def _execute_explains( + cursor: psycopg.Cursor[Any], batches: QueryBatches, models: Optional[dict[Any, Any]] +) -> dict[str, float]: data: dict[str, float] = {} ou_model_data: dict[str, list[Any]] = {} @@ -697,15 +718,23 @@ def acquire_model_data(q: str, plan: dict[str, Any]) -> None: return data -def _extract_refs(generate_costs: bool, target: Optional[str], cursor: psycopg.Cursor[Any], workload: Workload, models: Optional[dict[Any, Any]]) -> tuple[dict[str, float], dict[str, float]]: +def _extract_refs( + generate_costs: bool, + target: Optional[str], + cursor: psycopg.Cursor[Any], + workload: Workload, + models: Optional[dict[Any, Any]], +) -> tuple[dict[str, float], dict[str, float]]: ref_qs = {} table_ref_qs = {} if generate_costs: # Get reference costs. - batches = QueryBatches([ - (q, workload.queries[q], workload.query_aliases[q]) - for q in workload.queries.keys() - ]) + batches = QueryBatches( + [ + (q, workload.queries[q], workload.query_aliases[q]) + for q in workload.queries.keys() + ] + ) ref_qs = _execute_explains(cursor, batches, models) ref_qs = _augment_query_data(workload, ref_qs) @@ -714,7 +743,9 @@ def _extract_refs(generate_costs: bool, target: Optional[str], cursor: psycopg.C table_ref_qs = ref_qs else: qs = workload.queries_for_table(target) - batches = QueryBatches([(q, workload.queries[q], workload.query_aliases[q]) for q in qs]) + batches = QueryBatches( + [(q, workload.queries[q], workload.query_aliases[q]) for q in qs] + ) table_ref_qs = _execute_explains(cursor, batches, models) table_ref_qs = _augment_query_data(workload, table_ref_qs) return ref_qs, table_ref_qs @@ -743,9 +774,7 @@ def _produce_index_data( # models = load_ou_models(model_dir) # Construct workload. - workload = Workload( - dbgym_cfg, tables, attributes, query_spec, workload_path, pid=p - ) + workload = Workload(dbgym_cfg, tables, attributes, query_spec, workload_path, pid=p) modified_attrs = workload.column_usages() np.random.seed(seed) @@ -843,10 +872,12 @@ def _produce_index_data( else: qs_for_tbl = workload.queries_for_table(ia.tbl_name) - batches = QueryBatches([ - (q, workload.queries[q], workload.query_aliases[q]) - for q in qs_for_tbl - ]) + batches = QueryBatches( + [ + (q, workload.queries[q], workload.query_aliases[q]) + for q in qs_for_tbl + ] + ) data = _execute_explains(cursor, batches, models) data = _augment_query_data(workload, data) if models is None: diff --git a/tune/protox/embedding/select.py b/tune/protox/embedding/select.py index da9c195d..613730b6 100644 --- a/tune/protox/embedding/select.py +++ b/tune/protox/embedding/select.py @@ -6,8 +6,8 @@ import numpy as np import pandas as pd -from pandas import DataFrame import tqdm +from pandas import DataFrame from misc.utils import DBGymConfig, default_embedder_dname, link_result from tune.protox.embedding.analyze import RANGES_FNAME, STATS_FNAME @@ -159,7 +159,7 @@ def recurse_set(source: dict[Any, Any], target: dict[Any, Any]) -> None: return data -def _attach(data: DataFrame, raw_data: DataFrame, num_limit: int=0) -> DataFrame: +def _attach(data: DataFrame, raw_data: DataFrame, num_limit: int = 0) -> DataFrame: # As the group index goes up, the perf should go up (i.e., bounds should tighten) filtered_data: dict[tuple[float, float], DataFrame] = {} new_data = [] @@ -167,7 +167,9 @@ def _attach(data: DataFrame, raw_data: DataFrame, num_limit: int=0) -> DataFrame tup_dict = {k: getattr(tup, k) for k in data.columns} if raw_data is not None and Path(tup_dict["ranges_file"]).exists(): - def compute_dist_score(current_dists: dict[str, float], base: float, upper: float) -> float: + def compute_dist_score( + current_dists: dict[str, float], base: float, upper: float + ) -> float: nonlocal filtered_data key = (base, upper) if key not in filtered_data: @@ -219,7 +221,10 @@ def compute_dist_score(current_dists: dict[str, float], base: float, upper: floa if drange[0] is None: drange = (1.0 - tup_dict["bias_separation"], 1.01) else: - drange = (drange[0] - tup_dict["bias_separation"], drange[0]) + drange = ( + drange[0] - tup_dict["bias_separation"], + drange[0], + ) current_dists = {} else: @@ -230,7 +235,9 @@ def compute_dist_score(current_dists: dict[str, float], base: float, upper: floa if len(current_dists) > 0: # Put the error in. errors.append( - compute_dist_score(current_dists, 0.0, tup_dict["bias_separation"]) + compute_dist_score( + current_dists, 0.0, tup_dict["bias_separation"] + ) ) tup_dict["idx_class_errors"] = ",".join( diff --git a/tune/protox/embedding/train_all.py b/tune/protox/embedding/train_all.py index 6e0ca432..9f0aed3a 100644 --- a/tune/protox/embedding/train_all.py +++ b/tune/protox/embedding/train_all.py @@ -15,7 +15,6 @@ import ray import torch import torch.nn as nn -from torch.optim import Adam # type: ignore[attr-defined] import tqdm import yaml from pytorch_metric_learning.utils import logging_presets @@ -26,6 +25,7 @@ from ray.tune.search import ConcurrencyLimiter from ray.tune.search.hyperopt import HyperOptSearch from sklearn.model_selection import train_test_split +from torch.optim import Adam # type: ignore[attr-defined] from torch.utils.data import TensorDataset from typing_extensions import ParamSpec @@ -227,7 +227,9 @@ def train_all_embeddings( sync_config=SyncConfig(), verbose=2, log_to_file=True, - storage_path=str(dbgym_cfg.cur_task_runs_path("embedding_ray_results", mkdir=True)), + storage_path=str( + dbgym_cfg.cur_task_runs_path("embedding_ray_results", mkdir=True) + ), ) resources = {"cpu": 1} @@ -355,8 +357,8 @@ def _build_trainer( benchmark_config_path: Path, train_size: float, workload_path: Path, - dataloader_num_workers: int=0, - disable_tqdm: bool=False, + dataloader_num_workers: int = 0, + disable_tqdm: bool = False, ) -> tuple[VAETrainer, Callable[..., Optional[dict[str, Any]]]]: max_cat_features = 0 max_attrs = 0