diff --git a/pyforecaster/forecasting_models/fast_adaptive_models.py b/pyforecaster/forecasting_models/fast_adaptive_models.py index 4d6c7e9..ab14feb 100644 --- a/pyforecaster/forecasting_models/fast_adaptive_models.py +++ b/pyforecaster/forecasting_models/fast_adaptive_models.py @@ -114,7 +114,7 @@ def generate_recent_history(y, w, start, i): def tune_hyperpars(self, x_pd): pars_opt = tune_hyperpars(x_pd, self.__class__, hyperpars=self.hyperpar_lims, n_trials=self.optimization_budget, - return_model=False, **self.init_pars) + return_model=False, model_init_kwargs=self.init_pars) self.set_params(**pars_opt) self.reinit_pars() return self @@ -144,7 +144,7 @@ def run(self, x, y, return_coeffs=False, start_from=0, fit=True): class Fourier_es(StatefulForecaster): def __init__(self, target_name='target', targets_names=None, n_sa=1, m=24, val_ratio=0.8, optimize_hyperpars=True, - optimization_budget=100, verbose=True, nodes_at_step=None, + optimization_budget=100, verbose=False, nodes_at_step=None, q_vect=None, alpha=0.8, omega=0.99, n_harmonics=3, periodicity=None, **scengen_kwgs): """ :param y: @@ -408,7 +408,7 @@ def __setstate__(self, states): self.states.update(states) -class FK_multi(ScenarioGenerator): +class FK_multi(StatefulForecaster): """ Multistep ahead forecasting with multiple Fourier-Kalman regressors """ @@ -416,7 +416,7 @@ class FK_multi(ScenarioGenerator): def __init__(self, target_name='target', targets_names=None, n_sa=1, n_predictors=4, alphas=None, m=24, omegas=None, ns_harmonics=None, val_ratio=0.8, nodes_at_step=None, q_vect=None, periodicity=None, base_predictor=Fourier_es, optimize_hyperpars=False, optimize_submodels_hyperpars=True, - optimization_budget=100, r=0.1, q=0.1, verbose=True, **scengen_kwgs): + optimization_budget=100, r=0.1, q=0.1, verbose=True, submodels_pars=None, **scengen_kwgs): """ :param y: :param h: this is the numebr of steps ahead to be predicted. @@ -426,28 +426,25 @@ def __init__(self, target_name='target', targets_names=None, n_sa=1, n_predicto """ n_predictors = int(n_predictors) self.base_predictor = base_predictor - self.targets_names = targets_names - self.init_pars = {'target_name': target_name, 'n_sa': n_sa, 'n_predictors': n_predictors, 'alpha': alphas, 'm': m, 'omegas': omegas, - 'ns_harmonics': ns_harmonics, 'val_ratio': val_ratio, 'nodes_at_step': nodes_at_step, - 'q_vect': q_vect, 'periodicity': periodicity, 'optimize_hyperpars': optimize_hyperpars, - 'optimization_budget': optimization_budget, 'r': r, 'q': q,'optimize_submodels_hyperpars':optimize_submodels_hyperpars, - 'verbose':verbose, 'base_predictor': base_predictor,'targets_names':targets_names} + + super().__init__(target_name=target_name, targets_names=targets_names, n_sa=n_sa, m=m, val_ratio=val_ratio, + optimize_hyperpars=optimize_hyperpars, optimization_budget=optimization_budget, + verbose=verbose, nodes_at_step=nodes_at_step, q_vect=q_vect, **scengen_kwgs) + # submodels' init pars are set to the father init class pars + self.submodels_init_pars = deepcopy(self.init_pars) + self.submodels_pars = submodels_pars + + self.init_pars.update({'n_predictors': n_predictors, 'alphas': alphas, 'omegas': omegas, + 'ns_harmonics': ns_harmonics, 'periodicity': periodicity, 'r': r, 'q': q, + 'optimize_submodels_hyperpars': optimize_submodels_hyperpars, + 'base_predictor': base_predictor, 'submodels_pars':submodels_pars}) self.init_pars.update(scengen_kwgs) - self.verbose=verbose - self.optimize_hyperpars = optimize_hyperpars - self.optimization_budget = optimization_budget - self.optimize_submodels_hyperpars = optimize_submodels_hyperpars + self.optimize_submodels_hyperpars = optimize_submodels_hyperpars self.periodicity = periodicity if periodicity is not None else n_sa assert self.periodicity < m, 'Periodicity must be smaller than history m' if self.periodicity < n_sa: print('WARNING: periodicity is smaller than n_sa, this may lead to suboptimal results.') - self.alphas = np.ones(n_predictors) * 0.01 if alphas is None else alphas - self.omegas = np.ones(n_predictors) * 0.01 if omegas is None else omegas - self.n_sa = n_sa - self.m = m - self.target_name = target_name - self.ns_harmonics = (np.ones(n_predictors) * 10).astype(int) if ns_harmonics is None else ns_harmonics self.n_predictors = n_predictors self.r = r self.q = q @@ -455,21 +452,25 @@ def __init__(self, target_name='target', targets_names=None, n_sa=1, n_predicto self.H = None self.F = None self.models = None - self.states = {'x':None, 'P':None, 'R':None, 'Q':None} self.reinit_pars() - super().__init__(q_vect, nodes_at_step=nodes_at_step, val_ratio=val_ratio, **scengen_kwgs) def reinit_pars(self): + self.submodels_init_pars.update({'optimize_hyperpars': self.optimize_submodels_hyperpars, + 'verbose':True}) + # create models with increasing history length ms = np.linspace(1, self.m, self.n_predictors + 1).astype(int)[1:] ms = np.maximum(ms, self.n_sa) + submodels_pars = [self.submodels_init_pars.copy() for i in range(self.n_predictors)] if np.any([m= self.n_sa: # average last point error over different prediction times for all the models last_obs = x_pd[self.target_name].iloc[i] - prev_obs = x_pd[self.target_name].iloc[i-1] # sometimes persistence error is too small, sometimes signal could be small. We normalize by the average norm_avg_err = [np.mean([np.abs(preds_experts[i-sa, sa, predictor]-last_obs) for sa in range(self.n_sa)]) for predictor in range(self.n_predictors)] norm_avg_err = np.array(norm_avg_err) / np.mean(norm_avg_err) diff --git a/pyforecaster/forecasting_models/holtwinters.py b/pyforecaster/forecasting_models/holtwinters.py index b8219ea..bffc52e 100644 --- a/pyforecaster/forecasting_models/holtwinters.py +++ b/pyforecaster/forecasting_models/holtwinters.py @@ -40,7 +40,7 @@ def fit_sample(pars_dict,model_class, model_init_kwargs, x, return_model=False): else: return score -def tune_hyperpars(x, model_class, hyperpars, n_trials=100, verbose=True, return_model=True, parallel=True, **model_init_kwargs): +def tune_hyperpars(x, model_class, hyperpars, n_trials=100, return_model=True, parallel=True, model_init_kwargs=None): """ :param x: pd.DataFrame (n, n_cov) :param y: pd.Series (n) @@ -51,7 +51,7 @@ def tune_hyperpars(x, model_class, hyperpars, n_trials=100, verbose=True, return to train on, in a global model fashion :return: """ - + verbose = model_init_kwargs['verbose'] if 'verbose' in model_init_kwargs.keys() else False pars_cartridge = [] for i in range(n_trials): trial = {} @@ -68,7 +68,9 @@ def tune_hyperpars(x, model_class, hyperpars, n_trials=100, verbose=True, return model_init_kwargs_0 = deepcopy(model_init_kwargs) - if len(pars_cartridge)>1: + if model_class.__name__ in ["HoltWintersMulti", "FK_multi"]: + fitted_model = fit_sample(pars_cartridge[0], model_class, model_init_kwargs, x, return_model=True) + else: if parallel: from concurrent.futures import ProcessPoolExecutor from multiprocessing import cpu_count @@ -76,18 +78,14 @@ def tune_hyperpars(x, model_class, hyperpars, n_trials=100, verbose=True, return scores = list(tqdm(executor.map(partial(fit_sample, model_class=model_class, model_init_kwargs=model_init_kwargs, x=x), pars_cartridge), total=n_trials, desc='Tuning hyperpars for {}'.format(model_class.__name__))) - if verbose: - plt.figure() - t = np.linspace(0.01, 0.99, 30) - plt.plot(np.quantile(scores, t), t) else: scores = [] for i in tqdm(range(n_trials), desc='Tuning hyperpars for {}'.format(model_class.__name__)): scores.append(fit_sample(pars_cartridge[i], model_class, model_init_kwargs, x)) - - - else: - fitted_model = fit_sample(pars_cartridge[0], model_class, model_init_kwargs, x, return_model=True) + if verbose: + plt.figure() + t = np.linspace(0.01, 0.99, 30) + plt.plot(np.quantile(scores, t), t) if model_class.__name__ in ["HoltWintersMulti"]: alphas = np.array([m.alpha for m in fitted_model.models]) @@ -95,10 +93,8 @@ def tune_hyperpars(x, model_class, hyperpars, n_trials=100, verbose=True, return gammas_2 = ([m.gamma_2 for m in fitted_model.models]) best_pars = {'alphas': alphas, 'gammas_1': gammas_1, 'gammas_2': gammas_2, 'optimize_submodels_hyperpars':False} elif model_class.__name__ in ["FK_multi"]: - alphas = np.array([m.alpha for m in fitted_model.models]) - omegas = ([m.omega for m in fitted_model.models]) - ns_harmonics = ([m.n_harmonics for m in fitted_model.models]) - best_pars = {'alphas': alphas, 'omegas': omegas, 'ns_harmonics': ns_harmonics, 'optimize_submodels_hyperpars':False} + submodels_pars = [m.get_params() for m in fitted_model.models] + best_pars = {'submodels_pars':submodels_pars, 'optimize_submodels_hyperpars':False} else: best_idx = np.argmin(scores) best_pars = pars_cartridge[best_idx] @@ -142,6 +138,7 @@ def __init__(self, periods, target_name, targets_names=None, q_vect=None, val_ra 'verbose':verbose, 'alpha': alpha, 'beta': beta, 'gamma_1': gamma_1, 'gamma_2': gamma_2} super().__init__(q_vect, nodes_at_step=nodes_at_step, val_ratio=val_ratio, **scengen_kwgs) self.targets_names = [target_name] if targets_names is None else targets_names + self.init_pars.update({'targets_names':self.targets_names}) self.periods = periods self.optimization_budget = optimization_budget self.n_sa = n_sa @@ -176,7 +173,7 @@ def __init__(self, periods, target_name, targets_names=None, q_vect=None, val_ra def fit(self, x_pd, y_pd=None, **kwargs): if self.optimize_hyperpars: pars_opt = tune_hyperpars(x_pd, HoltWinters, hyperpars={'alpha':[0, 1], 'gamma_1':[0, 1], 'gamma_2':[0, 1]}, - n_trials=self.optimization_budget, targets_names=self.targets_names, return_model=False, **self.init_pars) + n_trials=self.optimization_budget, return_model=False, model_init_kwargs=self.init_pars) self.set_params(**pars_opt) y = x_pd[self.target_name].values @@ -418,7 +415,7 @@ def reinit_pars(self): def fit(self, x_pd, y_pd=None): if self.optimize_hyperpars: - pars_opt = tune_hyperpars(x_pd, HoltWintersMulti, hyperpars={'fake':[0, 1]}, n_trials=1, return_model=False, parallel=False, **self.init_pars) + pars_opt = tune_hyperpars(x_pd, HoltWintersMulti, hyperpars={'fake':[0, 1]}, n_trials=1, return_model=False, parallel=False, model_init_kwargs=self.init_pars) self.set_params(**pars_opt) self.reinit_pars() diff --git a/pyforecaster/formatter.py b/pyforecaster/formatter.py index 65ac883..bee9c0f 100644 --- a/pyforecaster/formatter.py +++ b/pyforecaster/formatter.py @@ -180,7 +180,7 @@ def transform(self, x, time_features=True, holidays=False, return_target=True, g @staticmethod def _transform_(tr, x): return tr.transform(x, augment=False) - def _transform(self, x, time_features=True, holidays=False, return_target=True, **holidays_kwargs): + def _transform(self, x, time_features=True, holidays=False, return_target=True, parallel=False, **holidays_kwargs): """ Takes the DataFrame x and applies the specified transformations stored in the transformers in order to obtain the pre-fold-transformed dataset: this dataset has the correct final dimensions, but fold-specific @@ -206,7 +206,7 @@ def _transform(self, x, time_features=True, holidays=False, return_target=True, target = self.normalize(x, target) if len(self.transformers)>0: - if self.n_parallel>1: + if parallel: with concurrent.futures.ThreadPoolExecutor(max_workers=self.n_parallel) as executor: x_tr = pd.concat([i for i in executor.map(partial(self._transform_, x=x), self.transformers)], axis=1) x = pd.concat([x, x_tr], axis=1) @@ -513,11 +513,12 @@ def global_form_preprocess(self, x): axis=1)) return dfs - def global_form_postprocess(self, x, y, xs, ys, reduce_memory=False, corr_reorder=False): + def global_form_postprocess(self, x, y, xs, ys, reduce_memory=False, corr_reorder=False, use_ray=False, + parallel=False): if reduce_memory: - x = reduce_mem_usage(x, use_ray=True) - y = reduce_mem_usage(y, use_ray=True) + x = reduce_mem_usage(x, use_ray=use_ray, parallel=parallel) + y = reduce_mem_usage(y, use_ray=use_ray, parallel=parallel) # for all transformations for tr in self.transformers: diff --git a/tests/test_formatter.py b/tests/test_formatter.py index 6c71668..b9509bd 100644 --- a/tests/test_formatter.py +++ b/tests/test_formatter.py @@ -236,7 +236,7 @@ def test_global_multiindex(self): agg_freq='20min', relative_lags=True) formatter.add_target_transform(['target'], ['mean'], agg_bins=[-10, -15, -20]) - df = formatter.transform(df_mi, time_features=True, holidays=True, prov='ZH',global_form=True) + df = formatter.transform(df_mi, time_features=True, holidays=True, prov='ZH',global_form=True, parallel=False) def test_global_multiindex_with_col_reordering(self): x_private = pd.DataFrame(np.random.randn(500, 15), index=pd.date_range('01-01-2020', '01-05-2020', 500, tz='Europe/Zurich'), columns=pd.MultiIndex.from_product([['b1', 'b2', 'b3'], ['a', 'b', 'c', 'd', 'e']])) diff --git a/tests/test_models.py b/tests/test_models.py index acc0ec3..730b699 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -101,14 +101,16 @@ def test_hw_multi(self): self.data = self.data.resample('1h').mean() df_tr, df_te = self.data.iloc[:1200], self.data.iloc[1200:1500] steps_day = 24 - fes = Fourier_es(n_sa=steps_day, m=steps_day*7, target_name='all', optimization_budget=5).fit(df_tr, df_tr['all']) fks_multi = FK_multi(n_predictors=4, n_sa=steps_day, m=steps_day*7, target_name='all', periodicity=steps_day*2, - optimize_hyperpars=True, optimization_budget=20, targets_names=df_tr.columns[:2]).fit(df_tr) - hw_multi = HoltWintersMulti(periods=[steps_day, steps_day * 7], n_sa=steps_day, optimization_budget=3, q_vect=np.arange(11) / 10, + optimize_hyperpars=True, optimize_submodels_hyperpars=True, optimization_budget=5, targets_names=df_tr.columns[:2], verbose=False, diagnostic_plots=False).fit(df_tr) + hw_multi = HoltWintersMulti(periods=[steps_day, steps_day * 7], n_sa=steps_day, optimization_budget=5, q_vect=np.arange(11) / 10, target_name='all', models_periods=np.array([1,2,steps_day]), targets_names=df_tr.columns[:6]).fit(df_tr) + fes = Fourier_es(n_sa=steps_day, m=steps_day*7, target_name='all', optimization_budget=5).fit(df_tr, df_tr['all']) + + fks = FK(n_sa=steps_day, m=steps_day, target_name='all', optimization_budget=2).fit(df_tr, df_tr['all'])