Skip to content

Commit

Permalink
corrected parallelism issue in LGBMHybrid (exqecutor.map retain the o…
Browse files Browse the repository at this point in the history
…rder of submission), changed from H to h in formatter
  • Loading branch information
nepslor committed Jun 2, 2024
1 parent 917027c commit b309d23
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 24 deletions.
46 changes: 28 additions & 18 deletions pyforecaster/forecasting_models/gradientboosters.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@
import numpy as np
import concurrent.futures
from time import time

from multiprocessing import Pool, cpu_count
from functools import partial

class LGBMHybrid(ScenarioGenerator):
def __init__(self, device_type='cpu', max_depth=20, n_estimators=100, num_leaves=100, learning_rate=0.1, min_child_samples=20,
n_jobs=8, objective='regression', tol_period='1h', colsample_bytree=1,
n_jobs=1, objective='regression', tol_period='1h', colsample_bytree=1,
colsample_bynode=1, verbose=-1, metric='l2', n_single=1,
red_frac_multistep=1, q_vect=None, val_ratio=None, nodes_at_step=None,
formatter=None, metadata_features=None, keep_last_n_lags=0, keep_last_seconds=0, **scengen_kwgs):
formatter=None, metadata_features=None, keep_last_n_lags=0, keep_last_seconds=0, parallel=False,
**scengen_kwgs):
"""
:param n_single: number of single models, should be less than number of step ahead predictions. The rest of the
steps ahead are forecasted by a global model
Expand All @@ -28,6 +30,7 @@ def __init__(self, device_type='cpu', max_depth=20, n_estimators=100, num_leaves
"""

super().__init__(q_vect, val_ratio=val_ratio, nodes_at_step=nodes_at_step, **scengen_kwgs)
self.parallel = parallel
self.device_type = device_type
self.n_single = n_single
self.red_frac_multistep = red_frac_multistep
Expand Down Expand Up @@ -75,12 +78,16 @@ def get_lgb_pars(self):
def fit(self, x, y):
lgb_pars = self.get_lgb_pars()
x, y, x_val, y_val = self.train_val_split(x, y)
for i in tqdm(range(self.n_single)):
x_i = self.dataset_at_stepahead(x, i, self.metadata_features, formatter=self.formatter,
logger=self.logger, method='periodic', keep_last_n_lags=self.keep_last_n_lags,
keep_last_seconds=self.keep_last_seconds,
tol_period=self.tol_period)
self.models.append(LGBMRegressor(**lgb_pars).fit(x_i, y.iloc[:, i]))
if self.parallel:
with concurrent.futures.ProcessPoolExecutor(max_workers=cpu_count()-2) as executor:
self.models = list(executor.map(partial(self.fit_single, x=x, y=y, lgb_pars=lgb_pars), np.arange(self.n_single)))
else:
for i in tqdm(range(self.n_single)):
x_i = self.dataset_at_stepahead(x, i, self.metadata_features, formatter=self.formatter,
logger=self.logger, method='periodic', keep_last_n_lags=self.keep_last_n_lags,
keep_last_seconds=self.keep_last_seconds,
tol_period=self.tol_period)
self.models.append(LGBMRegressor(**lgb_pars).fit(x_i, y.iloc[:, i]))

n_sa = y.shape[1]
self.n_multistep = n_sa - self.n_single
Expand Down Expand Up @@ -110,9 +117,16 @@ def fit(self, x, y):
super().fit(x_val, y_val)
return self

def fit_single(self, i, x, y, lgb_pars):
x_i = self.dataset_at_stepahead(x, i, self.metadata_features, formatter=self.formatter,
logger=self.logger, method='periodic', keep_last_n_lags=self.keep_last_n_lags,
keep_last_seconds=self.keep_last_seconds,
tol_period=self.tol_period)
m = LGBMRegressor(**lgb_pars).fit(x_i, y.iloc[:, i])
return m
def predict(self, x, **kwargs):
preds = []
period = kwargs['period'] if 'period' in kwargs else '24H'
period = kwargs['period'] if 'period' in kwargs else '24h'
for i in range(self.n_single):
x_i = self.dataset_at_stepahead(x, i, self.metadata_features, formatter=self.formatter,
logger=self.logger, method='periodic', keep_last_n_lags=self.keep_last_n_lags,
Expand All @@ -125,21 +139,17 @@ def predict(self, x, **kwargs):
preds = pd.DataFrame(np.hstack(preds), index=x.index)
return preds

def predict_single(self, x, i):
def predict_single(self, i, x):
x = pd.concat([x.reset_index(drop=True), pd.Series(np.ones(len(x)) * i)], axis=1)
return self.multi_step_model.predict(x,num_threads=1).reshape(-1, 1)

def predict_parallel(self, x):
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(self.predict_single, x, i) for i in range(self.n_multistep)]
y_hat = []
for idx, future in enumerate(concurrent.futures.as_completed(futures)):
y_hat_i = future.result() # This will also raise any exceptions
y_hat.append(y_hat_i)
with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
y_hat = list(executor.map(partial(self.predict_single, x=x), np.arange(self.n_multistep)))
return np.hstack(y_hat)

@staticmethod
def dataset_at_stepahead(df, target_col_num, metadata_features, formatter, logger, method='periodic', keep_last_n_lags=1, period="24H",
def dataset_at_stepahead(df, target_col_num, metadata_features, formatter, logger, method='periodic', keep_last_n_lags=1, period="24h",
tol_period='1h', keep_last_seconds=0):
if formatter is None:
logger.warning('dataset_at_stepahead returned the unmodified dataset since there is no self.formatter')
Expand Down
4 changes: 2 additions & 2 deletions pyforecaster/forecasting_models/randomforests.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def fit(self, x, y):

def predict(self, x, **kwargs):
preds = []
period = kwargs['period'] if 'period' in kwargs else '24H'
period = kwargs['period'] if 'period' in kwargs else '24h'
if self.parallel:
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_parallel_workers) as executor:
preds = [i for i in tqdm(executor.map(partial(self._predict, x=x, period=period, **kwargs), range(self.n_single)),total=self.n_single)]
Expand Down Expand Up @@ -183,7 +183,7 @@ def predict_parallel(self, x, quantiles='mean', add_step=True):
return np.dstack(y_hat)

@staticmethod
def dataset_at_stepahead(df, target_col_num, metadata_features, formatter, logger, method='periodic', keep_last_n_lags=1, period="24H",
def dataset_at_stepahead(df, target_col_num, metadata_features, formatter, logger, method='periodic', keep_last_n_lags=1, period="24h",
tol_period='1h', keep_last_seconds=0):
if formatter is None:
logger.warning('dataset_at_stepahead returned the unmodified dataset since there is no self.formatter')
Expand Down
2 changes: 1 addition & 1 deletion pyforecaster/formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ def print_fold(tr_idxs, te_idxs):
fold += ' -: train, |=test, x:skip'
return fold

def prune_dataset_at_stepahead(self, df, target_col_num, metadata_features, method='periodic', period='24H', tol_period='1H', keep_last_n_lags=0, keep_last_seconds=0):
def prune_dataset_at_stepahead(self, df, target_col_num, metadata_features, method='periodic', period='24h', tol_period='1h', keep_last_n_lags=0, keep_last_seconds=0):

features = []
# retrieve referring_time of the given sa for the target from target_transformers
Expand Down
4 changes: 2 additions & 2 deletions tests/test_boosters.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ def test_linear_val_split(self):
formatter.add_target_transform(['all'], lags=-np.arange(24)-1)

x, y = formatter.transform(self.data.iloc[:1000])
n_tr = int(len(x) * 0.1)
n_tr = int(len(x) * 0.7)
x_tr, x_te, y_tr, y_te = [x.iloc[:n_tr, :].copy(), x.iloc[n_tr:, :].copy(), y.iloc[:n_tr].copy(),
y.iloc[n_tr:].copy()]

m_lin = LinearForecaster(q_vect=np.linspace(0.1, 0.9, 11), cov_est_method='vanilla').fit(x_tr, y_tr)
y_hat_lin = m_lin.predict(x_te)
q = m_lin.predict_quantiles(x_te)

m_lgbhybrid = LGBMHybrid(red_frac_multistep=0.1, val_ratio=0.3, lgb_pars={'num_leaves': 100, 'n_estimators': 100, 'learning_rate':0.05}, n_single=1).fit(x_tr, y_tr)
m_lgbhybrid = LGBMHybrid(red_frac_multistep=0.1, val_ratio=0.3, lgb_pars={'num_leaves': 100, 'n_estimators': 100, 'learning_rate':0.05}, n_single=20, parallel=True, formatter=formatter, metadata_features=['minuteofday', 'utc_offset', 'dayofweek', 'hour']).fit(x_tr, y_tr)
y_hat_lgbh = m_lgbhybrid.predict(x_te)
q = m_lgbhybrid.predict_quantiles(x_te)

Expand Down
2 changes: 1 addition & 1 deletion tests/test_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def test_prune_at_stepahead(self):
x_transformed, y_transformed = formatter.transform(self.x2)
crosspattern = pd.DataFrame()
for i in range(10):
x_i = formatter.prune_dataset_at_stepahead(x_transformed, i, metadata_features=[], method='periodic', period='24H', tol_period='10m')
x_i = formatter.prune_dataset_at_stepahead(x_transformed, i, metadata_features=[], method='periodic', period='24h', tol_period='10m')
crosspattern = crosspattern.combine_first(pd.DataFrame(1, index=x_i.columns, columns=[i]))
sb.heatmap(crosspattern)

Expand Down

0 comments on commit b309d23

Please sign in to comment.