Skip to content

Commit

Permalink
minor changes
Browse files Browse the repository at this point in the history
  • Loading branch information
nepslor committed Oct 27, 2023
1 parent 1889154 commit 35760df
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 47 deletions.
76 changes: 40 additions & 36 deletions pyforecaster/formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,55 +146,27 @@ def transform(self, x, time_features=True, holidays=False, return_target=True, g
:return x, target: the transformed dataset and the target DataFrame with correct dimensions
"""
if global_form:
assert np.unique([tr.names for tr in self.target_transformers]) == 'target', 'When using global_form option,' \
' the only admissible target is' \
' "target"'
transformed_columns = [tr.names for tr in self.transformers]
transformed_columns = [item for sublist in transformed_columns for item in sublist]
transformed_columns = list(set(np.unique(transformed_columns)) - {'target'})
# if x is multiindex pd.DataFrame do something
if isinstance(x.columns, pd.MultiIndex):
# find columns names at level 0 that contains the targets
c_l_0 = x.columns.get_level_values(0).unique()
private_cols_l0 = [c for c in c_l_0 if not np.all([str(t) in transformed_columns for t in x[c].columns])]
shared_cols_l0 = list(set(c_l_0) - set(private_cols_l0))
x_shared = x[shared_cols_l0].droplevel(0, 1)
dfs = []
for p in private_cols_l0:
x_p = x[p]
target_name_l1 = [c for c in x_p.columns if c not in transformed_columns]
assert len(target_name_l1) == 1, 'something went wrong, there should be only one target column. You must add a transform for all the non-target columns'
target_name_l1 = target_name_l1[0]
x_p = x_p.rename({target_name_l1:'target'}, axis=1)
dfs.append(pd.concat([x_p, x_shared, pd.DataFrame(p, columns=['name'], index=x.index)], axis=1))
else:

independent_targets = [c for c in x.columns if c not in transformed_columns]
dfs = []
for c in independent_targets:
dfs.append(pd.concat(
[pd.DataFrame(x[c].rename(), columns=['target']), x[transformed_columns],
pd.DataFrame(c, columns=['name'], index=x.index)],
axis=1))
dfs = self.global_form_preprocess(x)

n_cpu = cpu_count()
n_folds = np.ceil(len(dfs) / n_cpu).astype(int)
xs, ys = [], []
if parallel:
n_cpu = cpu_count()
n_folds = np.ceil(len(dfs) / n_cpu).astype(int)
# simulate transform on one fold single core to retrieve metadata (ray won't persist class attributes)
self._simulate_transform(dfs[0])
for i in tqdm(range(n_folds)):
x, y = fdf_parallel(f=partial(self._transform, time_features=time_features, holidays=holidays,
return_target=return_target, **holidays_kwargs), df=dfs[n_cpu * i:n_cpu * (i + 1)])
return_target=return_target, **holidays_kwargs),
df=dfs[n_cpu * i:n_cpu * (i + 1)])
if reduce_memory:
x = reduce_mem_usage(x, use_ray=True)
y = reduce_mem_usage(y, use_ray=True)
xs.append(x)
ys.append(y)
else:
for df_i in dfs:
x, y = self._transform(df_i,time_features=time_features, holidays=holidays,
return_target=return_target, **holidays_kwargs)
x, y = self._transform(df_i, time_features=time_features, holidays=holidays,
return_target=return_target, **holidays_kwargs)
if reduce_memory:
x = reduce_mem_usage(x, use_ray=False, parallel=False)
y = reduce_mem_usage(y, use_ray=False, parallel=False)
Expand Down Expand Up @@ -293,7 +265,6 @@ def normalize(self, x, y, normalizing_expr=None):
df_n = df_n[[c for c in y.columns]]
return df_n


def _simulate_transform(self, x=None):
"""
This won't actually modify the dataframe, it will just populate the metqdata property of each transformer
Expand Down Expand Up @@ -509,6 +480,39 @@ def get_time_lims(self, include_target=False, extremes=True):
time_lims = pd.DataFrame([time_lims['start_time'].min(), time_lims['end_time'].max()], index=['start_time', 'end_time']).T
return time_lims

def global_form_preprocess(self, x):
assert np.unique([tr.names for tr in self.target_transformers]) == 'target', 'When using global_form option,' \
' the only admissible target is' \
' "target"'
transformed_columns = [tr.names for tr in self.transformers]
transformed_columns = [item for sublist in transformed_columns for item in sublist]
transformed_columns = list(set(np.unique(transformed_columns)) - {'target'})
# if x is multiindex pd.DataFrame do something
if isinstance(x.columns, pd.MultiIndex):
# find columns names at level 0 that contains the targets
c_l_0 = x.columns.get_level_values(0).unique()
private_cols_l0 = [c for c in c_l_0 if not np.all([str(t) in transformed_columns for t in x[c].columns])]
shared_cols_l0 = list(set(c_l_0) - set(private_cols_l0))
x_shared = x[shared_cols_l0].droplevel(0, 1)
dfs = []
for p in private_cols_l0:
x_p = x[p]
target_name_l1 = [c for c in x_p.columns if c not in transformed_columns]
assert len(
target_name_l1) == 1, 'something went wrong, there should be only one target column. You must add a transform for all the non-target columns'
target_name_l1 = target_name_l1[0]
x_p = x_p.rename({target_name_l1: 'target'}, axis=1)
dfs.append(pd.concat([x_p, x_shared, pd.DataFrame(p, columns=['name'], index=x.index)], axis=1))
else:

independent_targets = [c for c in x.columns if c not in transformed_columns]
dfs = []
for c in independent_targets:
dfs.append(pd.concat(
[pd.DataFrame(x[c].rename(), columns=['target']), x[transformed_columns],
pd.DataFrame(c, columns=['name'], index=x.index)],
axis=1))
return dfs
class Transformer:
"""
Defines and applies transformations through rolling time windows and lags
Expand Down
32 changes: 21 additions & 11 deletions tests/test_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,20 +274,30 @@ def test_normalizers_complex(self):


def test_normalizers_impossible(self):
df = pd.DataFrame(np.random.randn(100, 5)+20, index=pd.date_range('01-01-2020', freq='20min', periods=100, tz='Europe/Zurich'), columns=['a', 'b', 'c', 'd', 'e'])
formatter = pyf.Formatter().add_transform(['a', 'b'], lags=np.arange(1, 5), agg_freq='20min')
formatter.add_target_transform(['a'], lags=-np.arange(1, 5), agg_freq='20min')
formatter.add_target_normalizer(['a'], 'mean', agg_freq='10H', name='a')
formatter.add_target_normalizer(['a'], 'std', agg_freq='5H', name='b')
x_private = pd.DataFrame(np.random.randn(500, 15),
index=pd.date_range('01-01-2020', '01-05-2020', 500, tz='Europe/Zurich'),
columns=pd.MultiIndex.from_product([['b1', 'b2', 'b3'], ['a', 'b', 'c', 'd', 'e']]))
x_shared = pd.DataFrame(np.random.randn(500, 5),
index=pd.date_range('01-01-2020', '01-05-2020', 500, tz='Europe/Zurich'),
columns=pd.MultiIndex.from_product([['shared'], [0, 1, 2, 3, 4]]))

x, y = formatter.transform(df, time_features=True, holidays=True, prov='ZH')
df_mi = pd.concat([x_private, x_shared], axis=1)

formatter.add_normalization_expr('(target+a)**2 + b')
x, y_norm = formatter.transform(df, time_features=True, holidays=True, prov='ZH')
y_unnorm = formatter.normalize(x, y_norm , normalizing_expr='sqrt(target - b) - a')
formatter = pyf.Formatter().add_transform([0, 1, 2, 3, 4], lags=np.arange(10), agg_freq='20min',
relative_lags=True)
formatter.add_transform(['a', 'b', 'c', 'd'], lags=np.arange(10),
agg_freq='20min',
relative_lags=True)
formatter.add_target_transform(['target'], ['mean'], agg_bins=[-10, -15, -20])

# check if back-transform works
assert (y_unnorm-y).sum().sum() < 1e-6
formatter.add_target_normalizer(['target'], 'mean', agg_freq='10H', name='mean')
formatter.add_target_normalizer(['target'], 'std', agg_freq='5H', name='std')

x, y = formatter.transform(df_mi, time_features=True, holidays=True, prov='ZH',global_form=True)
formatter.add_normalization_expr('(target-mean)/(std+1)')
x, y_norm = formatter.transform(df_mi, time_features=True, holidays=True, prov='ZH',global_form=True)

xs = formatter.global_form_preprocess(df_mi)

if __name__ == '__main__':
unittest.main()

0 comments on commit 35760df

Please sign in to comment.