Skip to content

Commit

Permalink
refactored fast_adaptive_models.py, removed too much parallelism
Browse files Browse the repository at this point in the history
  • Loading branch information
nepslor committed May 16, 2024
1 parent 7768e84 commit c7753b2
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 61 deletions.
64 changes: 29 additions & 35 deletions pyforecaster/forecasting_models/fast_adaptive_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def generate_recent_history(y, w, start, i):

def tune_hyperpars(self, x_pd):
pars_opt = tune_hyperpars(x_pd, self.__class__, hyperpars=self.hyperpar_lims, n_trials=self.optimization_budget,
return_model=False, **self.init_pars)
return_model=False, model_init_kwargs=self.init_pars)
self.set_params(**pars_opt)
self.reinit_pars()
return self
Expand Down Expand Up @@ -144,7 +144,7 @@ def run(self, x, y, return_coeffs=False, start_from=0, fit=True):
class Fourier_es(StatefulForecaster):

def __init__(self, target_name='target', targets_names=None, n_sa=1, m=24, val_ratio=0.8, optimize_hyperpars=True,
optimization_budget=100, verbose=True, nodes_at_step=None,
optimization_budget=100, verbose=False, nodes_at_step=None,
q_vect=None, alpha=0.8, omega=0.99, n_harmonics=3, periodicity=None, **scengen_kwgs):
"""
:param y:
Expand Down Expand Up @@ -408,15 +408,15 @@ def __setstate__(self, states):
self.states.update(states)


class FK_multi(ScenarioGenerator):
class FK_multi(StatefulForecaster):
"""
Multistep ahead forecasting with multiple Fourier-Kalman regressors
"""

def __init__(self, target_name='target', targets_names=None, n_sa=1, n_predictors=4, alphas=None, m=24, omegas=None,
ns_harmonics=None, val_ratio=0.8, nodes_at_step=None, q_vect=None, periodicity=None,
base_predictor=Fourier_es, optimize_hyperpars=False, optimize_submodels_hyperpars=True,
optimization_budget=100, r=0.1, q=0.1, verbose=True, **scengen_kwgs):
optimization_budget=100, r=0.1, q=0.1, verbose=True, submodels_pars=None, **scengen_kwgs):
"""
:param y:
:param h: this is the numebr of steps ahead to be predicted.
Expand All @@ -426,50 +426,51 @@ def __init__(self, target_name='target', targets_names=None, n_sa=1, n_predicto
"""
n_predictors = int(n_predictors)
self.base_predictor = base_predictor
self.targets_names = targets_names
self.init_pars = {'target_name': target_name, 'n_sa': n_sa, 'n_predictors': n_predictors, 'alpha': alphas, 'm': m, 'omegas': omegas,
'ns_harmonics': ns_harmonics, 'val_ratio': val_ratio, 'nodes_at_step': nodes_at_step,
'q_vect': q_vect, 'periodicity': periodicity, 'optimize_hyperpars': optimize_hyperpars,
'optimization_budget': optimization_budget, 'r': r, 'q': q,'optimize_submodels_hyperpars':optimize_submodels_hyperpars,
'verbose':verbose, 'base_predictor': base_predictor,'targets_names':targets_names}

super().__init__(target_name=target_name, targets_names=targets_names, n_sa=n_sa, m=m, val_ratio=val_ratio,
optimize_hyperpars=optimize_hyperpars, optimization_budget=optimization_budget,
verbose=verbose, nodes_at_step=nodes_at_step, q_vect=q_vect, **scengen_kwgs)
# submodels' init pars are set to the father init class pars
self.submodels_init_pars = deepcopy(self.init_pars)
self.submodels_pars = submodels_pars

self.init_pars.update({'n_predictors': n_predictors, 'alphas': alphas, 'omegas': omegas,
'ns_harmonics': ns_harmonics, 'periodicity': periodicity, 'r': r, 'q': q,
'optimize_submodels_hyperpars': optimize_submodels_hyperpars,
'base_predictor': base_predictor, 'submodels_pars':submodels_pars})
self.init_pars.update(scengen_kwgs)
self.verbose=verbose
self.optimize_hyperpars = optimize_hyperpars
self.optimization_budget = optimization_budget
self.optimize_submodels_hyperpars = optimize_submodels_hyperpars

self.optimize_submodels_hyperpars = optimize_submodels_hyperpars
self.periodicity = periodicity if periodicity is not None else n_sa
assert self.periodicity < m, 'Periodicity must be smaller than history m'
if self.periodicity < n_sa:
print('WARNING: periodicity is smaller than n_sa, this may lead to suboptimal results.')
self.alphas = np.ones(n_predictors) * 0.01 if alphas is None else alphas
self.omegas = np.ones(n_predictors) * 0.01 if omegas is None else omegas
self.n_sa = n_sa
self.m = m
self.target_name = target_name
self.ns_harmonics = (np.ones(n_predictors) * 10).astype(int) if ns_harmonics is None else ns_harmonics
self.n_predictors = n_predictors
self.r = r
self.q = q
self.coeffs_t_history = []
self.H = None
self.F = None
self.models = None

self.states = {'x':None, 'P':None, 'R':None, 'Q':None}
self.reinit_pars()

super().__init__(q_vect, nodes_at_step=nodes_at_step, val_ratio=val_ratio, **scengen_kwgs)

def reinit_pars(self):
self.submodels_init_pars.update({'optimize_hyperpars': self.optimize_submodels_hyperpars,
'verbose':True})
# create models with increasing history length
ms = np.linspace(1, self.m, self.n_predictors + 1).astype(int)[1:]
ms = np.maximum(ms, self.n_sa)

submodels_pars = [self.submodels_init_pars.copy() for i in range(self.n_predictors)]
if np.any([m<self.periodicity for m in ms]):
print('The history of the predictors are: {}'.format(ms))
print('But periodicity is {}'.format(self.periodicity))
print('I am going to set the history of the predictors with m<periodicity to the periodicity')
ms = np.maximum(ms, self.periodicity)
for i in range(self.n_predictors):
submodels_pars[i].update({'m':ms[i]})

# precompute basis over all possible periods
self.F = np.eye(self.n_predictors)
Expand All @@ -480,18 +481,16 @@ def reinit_pars(self):
self.states['R'] = np.eye(self.n_predictors) * self.r
self.states['Q'] = np.eye(self.n_predictors) * self.q

self.models = [self.base_predictor(n_sa=self.n_sa, alpha=self.alphas[i], omega=self.omegas[i], n_harmonics=self.ns_harmonics[i], m=ms[i],
target_name=self.target_name, periodicity=self.periodicity,
targets_names=self.targets_names,
optimization_budget=self.optimization_budget, verbose=False, optimize_hyperpars=self.optimize_submodels_hyperpars) for i in
range(self.n_predictors)]
#self.states = [self.models[j].__getstate__() for j in range(self.n_predictors)]

if self.submodels_pars is not None:
self.models = [self.base_predictor(**self.submodels_pars[i]) for i in range(self.n_predictors)]
else:
self.models = [self.base_predictor(**submodels_pars[i]) for i in range(self.n_predictors)]

def fit(self, x_pd, y_pd=None, **kwargs):
if self.optimize_hyperpars:
pars_opt = tune_hyperpars(x_pd, FK_multi, hyperpars={'fake':[0, 1]},
n_trials=1, return_model=False, parallel=False,
**self.init_pars)
n_trials=1, return_model=False, parallel=False, model_init_kwargs=self.init_pars)
self.set_params(**pars_opt)
self.reinit_pars()

Expand Down Expand Up @@ -526,16 +525,11 @@ def run(self, x_pd, return_coeffs=True, fit=True):

preds_experts = np.dstack([ self.models[j].predict(x_pd) for j in range(self.n_predictors)])

#from pyforecaster.plot_utils import ts_animation
#ts_animation(np.rollaxis(preds_experts, -1),
# names=[str(i) for i in range(self.n_predictors)], target=x_pd['all'].values, frames=1000,
# interval=0.1, step=3)

for i, idx in enumerate(x_pd.index):
if i >= self.n_sa:
# average last point error over different prediction times for all the models
last_obs = x_pd[self.target_name].iloc[i]
prev_obs = x_pd[self.target_name].iloc[i-1]
# sometimes persistence error is too small, sometimes signal could be small. We normalize by the average
norm_avg_err = [np.mean([np.abs(preds_experts[i-sa, sa, predictor]-last_obs) for sa in range(self.n_sa)]) for predictor in range(self.n_predictors)]
norm_avg_err = np.array(norm_avg_err) / np.mean(norm_avg_err)
Expand Down
31 changes: 14 additions & 17 deletions pyforecaster/forecasting_models/holtwinters.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def fit_sample(pars_dict,model_class, model_init_kwargs, x, return_model=False):
else:
return score

def tune_hyperpars(x, model_class, hyperpars, n_trials=100, verbose=True, return_model=True, parallel=True, **model_init_kwargs):
def tune_hyperpars(x, model_class, hyperpars, n_trials=100, return_model=True, parallel=True, model_init_kwargs=None):
"""
:param x: pd.DataFrame (n, n_cov)
:param y: pd.Series (n)
Expand All @@ -51,7 +51,7 @@ def tune_hyperpars(x, model_class, hyperpars, n_trials=100, verbose=True, return
to train on, in a global model fashion
:return:
"""

verbose = model_init_kwargs['verbose'] if 'verbose' in model_init_kwargs.keys() else False
pars_cartridge = []
for i in range(n_trials):
trial = {}
Expand All @@ -68,37 +68,33 @@ def tune_hyperpars(x, model_class, hyperpars, n_trials=100, verbose=True, return
model_init_kwargs_0 = deepcopy(model_init_kwargs)


if len(pars_cartridge)>1:
if model_class.__name__ in ["HoltWintersMulti", "FK_multi"]:
fitted_model = fit_sample(pars_cartridge[0], model_class, model_init_kwargs, x, return_model=True)
else:
if parallel:
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import cpu_count
with ProcessPoolExecutor(max_workers=np.minimum(cpu_count(), 10)) as executor:
scores = list(tqdm(executor.map(partial(fit_sample, model_class=model_class,
model_init_kwargs=model_init_kwargs, x=x),
pars_cartridge), total=n_trials, desc='Tuning hyperpars for {}'.format(model_class.__name__)))
if verbose:
plt.figure()
t = np.linspace(0.01, 0.99, 30)
plt.plot(np.quantile(scores, t), t)
else:
scores = []
for i in tqdm(range(n_trials), desc='Tuning hyperpars for {}'.format(model_class.__name__)):
scores.append(fit_sample(pars_cartridge[i], model_class, model_init_kwargs, x))


else:
fitted_model = fit_sample(pars_cartridge[0], model_class, model_init_kwargs, x, return_model=True)
if verbose:
plt.figure()
t = np.linspace(0.01, 0.99, 30)
plt.plot(np.quantile(scores, t), t)

if model_class.__name__ in ["HoltWintersMulti"]:
alphas = np.array([m.alpha for m in fitted_model.models])
gammas_1 = ([m.gamma_1 for m in fitted_model.models])
gammas_2 = ([m.gamma_2 for m in fitted_model.models])
best_pars = {'alphas': alphas, 'gammas_1': gammas_1, 'gammas_2': gammas_2, 'optimize_submodels_hyperpars':False}
elif model_class.__name__ in ["FK_multi"]:
alphas = np.array([m.alpha for m in fitted_model.models])
omegas = ([m.omega for m in fitted_model.models])
ns_harmonics = ([m.n_harmonics for m in fitted_model.models])
best_pars = {'alphas': alphas, 'omegas': omegas, 'ns_harmonics': ns_harmonics, 'optimize_submodels_hyperpars':False}
submodels_pars = [m.get_params() for m in fitted_model.models]
best_pars = {'submodels_pars':submodels_pars, 'optimize_submodels_hyperpars':False}
else:
best_idx = np.argmin(scores)
best_pars = pars_cartridge[best_idx]
Expand Down Expand Up @@ -142,6 +138,7 @@ def __init__(self, periods, target_name, targets_names=None, q_vect=None, val_ra
'verbose':verbose, 'alpha': alpha, 'beta': beta, 'gamma_1': gamma_1, 'gamma_2': gamma_2}
super().__init__(q_vect, nodes_at_step=nodes_at_step, val_ratio=val_ratio, **scengen_kwgs)
self.targets_names = [target_name] if targets_names is None else targets_names
self.init_pars.update({'targets_names':self.targets_names})
self.periods = periods
self.optimization_budget = optimization_budget
self.n_sa = n_sa
Expand Down Expand Up @@ -176,7 +173,7 @@ def __init__(self, periods, target_name, targets_names=None, q_vect=None, val_ra
def fit(self, x_pd, y_pd=None, **kwargs):
if self.optimize_hyperpars:
pars_opt = tune_hyperpars(x_pd, HoltWinters, hyperpars={'alpha':[0, 1], 'gamma_1':[0, 1], 'gamma_2':[0, 1]},
n_trials=self.optimization_budget, targets_names=self.targets_names, return_model=False, **self.init_pars)
n_trials=self.optimization_budget, return_model=False, model_init_kwargs=self.init_pars)
self.set_params(**pars_opt)

y = x_pd[self.target_name].values
Expand Down Expand Up @@ -418,7 +415,7 @@ def reinit_pars(self):

def fit(self, x_pd, y_pd=None):
if self.optimize_hyperpars:
pars_opt = tune_hyperpars(x_pd, HoltWintersMulti, hyperpars={'fake':[0, 1]}, n_trials=1, return_model=False, parallel=False, **self.init_pars)
pars_opt = tune_hyperpars(x_pd, HoltWintersMulti, hyperpars={'fake':[0, 1]}, n_trials=1, return_model=False, parallel=False, model_init_kwargs=self.init_pars)
self.set_params(**pars_opt)
self.reinit_pars()

Expand Down
11 changes: 6 additions & 5 deletions pyforecaster/formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def transform(self, x, time_features=True, holidays=False, return_target=True, g
@staticmethod
def _transform_(tr, x):
return tr.transform(x, augment=False)
def _transform(self, x, time_features=True, holidays=False, return_target=True, **holidays_kwargs):
def _transform(self, x, time_features=True, holidays=False, return_target=True, parallel=False, **holidays_kwargs):
"""
Takes the DataFrame x and applies the specified transformations stored in the transformers in order to obtain
the pre-fold-transformed dataset: this dataset has the correct final dimensions, but fold-specific
Expand All @@ -206,7 +206,7 @@ def _transform(self, x, time_features=True, holidays=False, return_target=True,
target = self.normalize(x, target)

if len(self.transformers)>0:
if self.n_parallel>1:
if parallel:
with concurrent.futures.ThreadPoolExecutor(max_workers=self.n_parallel) as executor:
x_tr = pd.concat([i for i in executor.map(partial(self._transform_, x=x), self.transformers)], axis=1)
x = pd.concat([x, x_tr], axis=1)
Expand Down Expand Up @@ -513,11 +513,12 @@ def global_form_preprocess(self, x):
axis=1))
return dfs

def global_form_postprocess(self, x, y, xs, ys, reduce_memory=False, corr_reorder=False):
def global_form_postprocess(self, x, y, xs, ys, reduce_memory=False, corr_reorder=False, use_ray=False,
parallel=False):

if reduce_memory:
x = reduce_mem_usage(x, use_ray=True)
y = reduce_mem_usage(y, use_ray=True)
x = reduce_mem_usage(x, use_ray=use_ray, parallel=parallel)
y = reduce_mem_usage(y, use_ray=use_ray, parallel=parallel)

# for all transformations
for tr in self.transformers:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def test_global_multiindex(self):
agg_freq='20min',
relative_lags=True)
formatter.add_target_transform(['target'], ['mean'], agg_bins=[-10, -15, -20])
df = formatter.transform(df_mi, time_features=True, holidays=True, prov='ZH',global_form=True)
df = formatter.transform(df_mi, time_features=True, holidays=True, prov='ZH',global_form=True, parallel=False)

def test_global_multiindex_with_col_reordering(self):
x_private = pd.DataFrame(np.random.randn(500, 15), index=pd.date_range('01-01-2020', '01-05-2020', 500, tz='Europe/Zurich'), columns=pd.MultiIndex.from_product([['b1', 'b2', 'b3'], ['a', 'b', 'c', 'd', 'e']]))
Expand Down
8 changes: 5 additions & 3 deletions tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,16 @@ def test_hw_multi(self):
self.data = self.data.resample('1h').mean()
df_tr, df_te = self.data.iloc[:1200], self.data.iloc[1200:1500]
steps_day = 24
fes = Fourier_es(n_sa=steps_day, m=steps_day*7, target_name='all', optimization_budget=5).fit(df_tr, df_tr['all'])
fks_multi = FK_multi(n_predictors=4, n_sa=steps_day, m=steps_day*7,
target_name='all', periodicity=steps_day*2,
optimize_hyperpars=True, optimization_budget=20, targets_names=df_tr.columns[:2]).fit(df_tr)
hw_multi = HoltWintersMulti(periods=[steps_day, steps_day * 7], n_sa=steps_day, optimization_budget=3, q_vect=np.arange(11) / 10,
optimize_hyperpars=True, optimize_submodels_hyperpars=True, optimization_budget=5, targets_names=df_tr.columns[:2], verbose=False, diagnostic_plots=False).fit(df_tr)
hw_multi = HoltWintersMulti(periods=[steps_day, steps_day * 7], n_sa=steps_day, optimization_budget=5, q_vect=np.arange(11) / 10,
target_name='all', models_periods=np.array([1,2,steps_day]), targets_names=df_tr.columns[:6]).fit(df_tr)


fes = Fourier_es(n_sa=steps_day, m=steps_day*7, target_name='all', optimization_budget=5).fit(df_tr, df_tr['all'])


fks = FK(n_sa=steps_day, m=steps_day, target_name='all',
optimization_budget=2).fit(df_tr, df_tr['all'])

Expand Down

0 comments on commit c7753b2

Please sign in to comment.