Skip to content

Commit

Permalink
Merge pull request optuna#4947 from not522/split_trials
Browse files Browse the repository at this point in the history
Add `_split_trials` instead of `_get_observation_pairs` and `_split_observation_pairs`
  • Loading branch information
HideakiImamura authored Oct 4, 2023
2 parents 2020a1a + 58c0ad2 commit c59aaf2
Show file tree
Hide file tree
Showing 3 changed files with 286 additions and 445 deletions.
296 changes: 142 additions & 154 deletions optuna/samplers/_tpe/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
import math
from typing import Any
from typing import Callable
from typing import cast
from typing import Dict
from typing import List
from typing import Optional
from typing import Sequence
from typing import Tuple
from typing import Union
import warnings

Expand Down Expand Up @@ -445,19 +444,15 @@ def _sample(
use_cache = not self._constant_liar
trials = study._get_trials(deepcopy=False, states=states, use_cache=use_cache)

scores, violations = _get_observation_pairs(
# We divide data into below and above.
n = sum(trial.state != TrialState.RUNNING for trial in trials) # Ignore running trials.
below_trials, above_trials = _split_trials(
study,
trials,
self._gamma(n),
self._constraints_func is not None,
)

n = sum(s < float("inf") for s, v in scores) # Ignore running trials.

# We divide data into below and above.
indices_below, indices_above = _split_observation_pairs(scores, self._gamma(n), violations)

below_trials = np.asarray(trials, dtype=object)[indices_below].tolist()
above_trials = np.asarray(trials, dtype=object)[indices_above].tolist()
below = self._get_internal_repr(below_trials, search_space)
above = self._get_internal_repr(above_trials, search_space)

Expand Down Expand Up @@ -585,165 +580,158 @@ def _calculate_nondomination_rank(loss_vals: np.ndarray) -> np.ndarray:
return ranks


def _get_observation_pairs(
def _split_trials(
study: Study,
trials: list[FrozenTrial],
constraints_enabled: bool = False,
) -> tuple[list[tuple[float, list[float]]], list[float] | None]:
"""Get observation pairs from the study.
This function collects observation pairs from the complete or pruned trials of the study.
In addition, if ``constant_liar`` is :obj:`True`, the running trials are considered.
The values for trials that don't contain the parameter in the ``param_names`` are skipped.
An observation pair fundamentally consists of a parameter value and an objective value.
However, due to the pruning mechanism of Optuna, final objective values are not always
available. Therefore, this function uses intermediate values in addition to the final
ones, and reports the value with its step count as ``(-step, value)``.
Consequently, the structure of the observation pair is as follows:
``(param_value, (-step, value))``.
The second element of an observation pair is used to rank observations in
``_split_observation_pairs`` method (i.e., observations are sorted lexicographically by
``(-step, value)``).
When ``constraints_enabled`` is :obj:`True`, 1-dimensional violation values are returned
as the third element (:obj:`None` otherwise). Each value is a float of 0 or greater and a
trial is feasible if and only if its violation score is 0.
"""

signs = []
for d in study.directions:
if d == StudyDirection.MINIMIZE:
signs.append(1)
else:
signs.append(-1)
n_below: int,
constraints_enabled: bool,
) -> tuple[list[FrozenTrial], list[FrozenTrial]]:
complete_trials = []
pruned_trials = []
running_trials = []
infeasible_trials = []

scores = []
violations: Optional[List[float]] = [] if constraints_enabled else None
for trial in trials:
# We extract score from the trial.
if trial.state is TrialState.COMPLETE:
assert trial.values is not None
score = (-float("inf"), [sign * v for sign, v in zip(signs, trial.values)])
elif trial.state is TrialState.PRUNED:
assert not study._is_multi_objective()

if len(trial.intermediate_values) > 0:
step, intermediate_value = max(trial.intermediate_values.items())
if math.isnan(intermediate_value):
score = (-step, [float("inf")])
else:
score = (-step, [signs[0] * intermediate_value])
else:
score = (1, [0.0])
elif trial.state is TrialState.RUNNING:
assert not study._is_multi_objective()
score = (float("inf"), [signs[0] * float("inf")])
if constraints_enabled and _get_infeasible_trial_score(trial) > 0:
infeasible_trials.append(trial)
elif trial.state == TrialState.COMPLETE:
complete_trials.append(trial)
elif trial.state == TrialState.PRUNED:
pruned_trials.append(trial)
elif trial.state == TrialState.RUNNING:
running_trials.append(trial)
else:
assert False
scores.append(score)

if constraints_enabled:
assert violations is not None
if trial.state != TrialState.RUNNING:
constraint = trial.system_attrs.get(_CONSTRAINTS_KEY)
if constraint is None:
warnings.warn(
f"Trial {trial.number} does not have constraint values."
" It will be treated as a lower priority than other trials."
)
violation = float("inf")
else:
# Violation values of infeasible dimensions are summed up.
violation = sum(v for v in constraint if v > 0)
violations.append(violation)
else:
violations.append(float("inf"))

return scores, violations
# We divide data into below and above.
below_complete, above_complete = _split_complete_trials(complete_trials, study, n_below)
n_below -= len(below_complete)
below_pruned, above_pruned = _split_pruned_trials(pruned_trials, study, n_below)
n_below -= len(below_pruned)
below_infeasible, above_infeasible = _split_infeasible_trials(infeasible_trials, n_below)

below_trials = below_complete + below_pruned + below_infeasible
above_trials = above_complete + above_pruned + above_infeasible + running_trials
below_trials.sort(key=lambda trial: trial.number)
above_trials.sort(key=lambda trial: trial.number)

return below_trials, above_trials


def _split_observation_pairs(
loss_vals: List[Tuple[float, List[float]]],
def _split_complete_trials(
trials: Sequence[FrozenTrial], study: Study, n_below: int
) -> tuple[list[FrozenTrial], list[FrozenTrial]]:
n_below = min(n_below, len(trials))
if len(study.directions) <= 1:
return _split_complete_trials_single_objective(trials, study, n_below)
else:
return _split_complete_trials_multi_objective(trials, study, n_below)


def _split_complete_trials_single_objective(
trials: Sequence[FrozenTrial],
study: Study,
n_below: int,
violations: Optional[List[float]],
) -> Tuple[np.ndarray, np.ndarray]:
# When constrains is not None, trials are split into below and above
# according to the following rules.
# 1. Feasible trials are better than infeasible trials.
# 2. Infeasible trials are sorted by sum of how much they violate each constraint.
# 3. Feasible trials are sorted by loss_vals.
if violations is not None:
violation_1d = np.array(violations, dtype=float)
idx = violation_1d.argsort(kind="stable")
if n_below >= len(idx) or violation_1d[idx[n_below]] > 0:
# Below is filled by all feasible trials and trials with smaller violation values.
indices_below = idx[:n_below]
indices_above = idx[n_below:]
) -> tuple[list[FrozenTrial], list[FrozenTrial]]:
if study.direction == StudyDirection.MINIMIZE:
sorted_trials = sorted(trials, key=lambda trial: cast(float, trial.value))
else:
sorted_trials = sorted(trials, key=lambda trial: cast(float, trial.value), reverse=True)
return sorted_trials[:n_below], sorted_trials[n_below:]


def _split_complete_trials_multi_objective(
trials: Sequence[FrozenTrial],
study: Study,
n_below: int,
) -> tuple[list[FrozenTrial], list[FrozenTrial]]:
if n_below == 0:
return [], []

lvals = np.asarray([trial.values for trial in trials])
for i, direction in enumerate(study.directions):
if direction == StudyDirection.MAXIMIZE:
lvals[:, i] *= -1

# Solving HSSP for variables number of times is a waste of time.
nondomination_ranks = _calculate_nondomination_rank(lvals)
assert 0 <= n_below <= len(lvals)

indices = np.array(range(len(lvals)))
indices_below = np.empty(n_below, dtype=int)

# Nondomination rank-based selection
i = 0
last_idx = 0
while last_idx < n_below and last_idx + sum(nondomination_ranks == i) <= n_below:
length = indices[nondomination_ranks == i].shape[0]
indices_below[last_idx : last_idx + length] = indices[nondomination_ranks == i]
last_idx += length
i += 1

# Hypervolume subset selection problem (HSSP)-based selection
subset_size = n_below - last_idx
if subset_size > 0:
rank_i_lvals = lvals[nondomination_ranks == i]
rank_i_indices = indices[nondomination_ranks == i]
worst_point = np.max(rank_i_lvals, axis=0)
reference_point = np.maximum(1.1 * worst_point, 0.9 * worst_point)
reference_point[reference_point == 0] = EPS
selected_indices = _solve_hssp(rank_i_lvals, rank_i_indices, subset_size, reference_point)
indices_below[last_idx:] = selected_indices

below_trials = []
above_trials = []
for index in range(len(trials)):
if index in indices_below:
below_trials.append(trials[index])
else:
# All trials in below are feasible.
# Feasible trials with smaller loss_vals are selected.
(feasible_idx,) = (violation_1d == 0).nonzero()
(infeasible_idx,) = (violation_1d > 0).nonzero()
assert len(feasible_idx) >= n_below
feasible_below, feasible_above = _split_observation_pairs(
[loss_vals[i] for i in feasible_idx], n_below, None
)
indices_below = feasible_idx[feasible_below]
indices_above = np.concatenate([feasible_idx[feasible_above], infeasible_idx])
# `np.sort` is used to keep chronological order.
return np.sort(indices_below), np.sort(indices_above)

n_objectives = 1
if len(loss_vals) > 0:
n_objectives = len(loss_vals[0][1])

if n_objectives <= 1:
loss_values = np.asarray(
[(s, v[0]) for s, v in loss_vals], dtype=[("step", float), ("score", float)]
)
above_trials.append(trials[index])
return below_trials, above_trials


index_loss_ascending = np.argsort(loss_values, kind="stable")
# `np.sort` is used to keep chronological order.
indices_below = np.sort(index_loss_ascending[:n_below])
indices_above = np.sort(index_loss_ascending[n_below:])
def _get_pruned_trial_score(trial: FrozenTrial, study: Study) -> tuple[float, float]:
if len(trial.intermediate_values) > 0:
step, intermediate_value = max(trial.intermediate_values.items())
if math.isnan(intermediate_value):
return -step, float("inf")
elif study.direction == StudyDirection.MINIMIZE:
return -step, intermediate_value
else:
return -step, -intermediate_value
else:
# Multi-objective TPE does not support pruning, so it ignores the ``step``.
lvals = np.asarray([v for _, v in loss_vals])

# Solving HSSP for variables number of times is a waste of time.
nondomination_ranks = _calculate_nondomination_rank(lvals)
assert 0 <= n_below <= len(lvals)

indices = np.array(range(len(lvals)))
indices_below = np.empty(n_below, dtype=int)

# Nondomination rank-based selection
i = 0
last_idx = 0
while last_idx < n_below and last_idx + sum(nondomination_ranks == i) <= n_below:
length = indices[nondomination_ranks == i].shape[0]
indices_below[last_idx : last_idx + length] = indices[nondomination_ranks == i]
last_idx += length
i += 1

# Hypervolume subset selection problem (HSSP)-based selection
subset_size = n_below - last_idx
if subset_size > 0:
rank_i_lvals = lvals[nondomination_ranks == i]
rank_i_indices = indices[nondomination_ranks == i]
worst_point = np.max(rank_i_lvals, axis=0)
reference_point = np.maximum(1.1 * worst_point, 0.9 * worst_point)
reference_point[reference_point == 0] = EPS
selected_indices = _solve_hssp(
rank_i_lvals, rank_i_indices, subset_size, reference_point
)
indices_below[last_idx:] = selected_indices
return 1, 0.0


def _split_pruned_trials(
trials: Sequence[FrozenTrial],
study: Study,
n_below: int,
) -> tuple[list[FrozenTrial], list[FrozenTrial]]:
n_below = min(n_below, len(trials))
sorted_trials = sorted(trials, key=lambda trial: _get_pruned_trial_score(trial, study))
return sorted_trials[:n_below], sorted_trials[n_below:]


def _get_infeasible_trial_score(trial: FrozenTrial) -> float:
constraint = trial.system_attrs.get(_CONSTRAINTS_KEY)
if constraint is None:
warnings.warn(
f"Trial {trial.number} does not have constraint values."
" It will be treated as a lower priority than other trials."
)
return float("inf")
else:
# Violation values of infeasible dimensions are summed up.
return sum(v for v in constraint if v > 0)

indices_above = np.setdiff1d(indices, indices_below)

return indices_below, indices_above
def _split_infeasible_trials(
trials: Sequence[FrozenTrial], n_below: int
) -> tuple[list[FrozenTrial], list[FrozenTrial]]:
n_below = min(n_below, len(trials))
sorted_trials = sorted(trials, key=_get_infeasible_trial_score)
return sorted_trials[:n_below], sorted_trials[n_below:]


def _calculate_weights_below_for_multi_objective(
Expand Down
Loading

0 comments on commit c59aaf2

Please sign in to comment.