diff --git a/data_transformers/default_transformers.py b/data_transformers/default_transformers.py new file mode 100644 index 0000000..28c3067 --- /dev/null +++ b/data_transformers/default_transformers.py @@ -0,0 +1,27 @@ +from data_transformers import transformer +from pandas import DataFrame + +@transformer.convert +def drop_col(df: DataFrame, col, axis=1): + return df.drop(col, axis=axis) + +@transformer.convert +def wide_to_long(df: DataFrame, primary_keys, value_name='valor', var_name='indicador'): + return df.melt(id_vars=primary_keys, value_name=value_name, var_name=var_name) + +@transformer.convert +def replace_value(df: DataFrame, col: str, curr_value: str, new_value: str): + df = df.replace({col: curr_value}, new_value) + return df + +@transformer.convert +def sort_values(df: DataFrame, how: str, by: list): + if how not in ['ascending', 'descending']: + raise ValueError('how must be either "ascending" or "descending"') + + return df.sort_values(by=by, ascending=how=='ascending').reset_index(drop=True) + +@transformer.convert +def cast_col(df: DataFrame, col: str, cast_to: type): + df[col] = df[col].astype(cast_to) + return df \ No newline at end of file diff --git a/data_transformers/dtransformers.py b/data_transformers/dtransformers.py index f99898c..1014564 100644 --- a/data_transformers/dtransformers.py +++ b/data_transformers/dtransformers.py @@ -1,10 +1,7 @@ - -from functools import reduce as foldl - -# ================================================================================================== - -from typing import Callable, Any, Tuple, Self +from typing import Callable, Any, Tuple from pandas import DataFrame +from inspect import Parameter +import warnings from copy import copy class staticproperty(property): @@ -19,11 +16,18 @@ class transformer_r(tuple): import inspect class transformer: - def __init__(self, f: transformer_t, name=None, external_sourcelines: list = None): + def __init__(self, + f: transformer_t, + name = None, + external_sourcelines: list = None, + partially_applied_args: dict = None): + if name: self.name = name else: self.name = '' + + self.partially_applied_args = partially_applied_args if not external_sourcelines: self.sourcelines = inspect.getsourcelines(f) @@ -32,8 +36,15 @@ def __init__(self, f: transformer_t, name=None, external_sourcelines: list = Non self.f = f - def __call__(self, *args, **kwargs) -> Tuple[tuple, DataFrame]: - return transformer_r(((args, kwargs), self.f(*args, **kwargs))) + def __call__(self, *args, **kwargs) -> Tuple[dict, DataFrame]: + if kwargs or len(args) > 1: + warnings.warn("Warning, transformer being applied with more than one argument.") + + params = inspect.signature(self.f).parameters + applied_args = {k: v for k, v in zip(params.keys(), args)} + final_args = {**self.partially_applied_args, **applied_args} + + return transformer_r((final_args, self.f(df=applied_args['df']))) @staticmethod def convert(f): @@ -42,34 +53,85 @@ def convert(f): 'name': f.__name__ } - def + params = inspect.signature(f).parameters + + if len(params) == 0: + raise TypeError(f"Function {f.__name__} has to at least take a DataFrame as parameter") + + has_df = params.get('df', None) + + if has_df is None: + raise TypeError(f"Function {f.__name__} doesn't have a 'df' parameter") + + if has_df.default is not inspect.Parameter.empty: + raise TypeError(f"Function {f.__name__} cannot have 'df' as a default parameter") + + def new_f(*ags, **kw): + _params = {k: Parameter.empty for k in params.keys() if k != 'df'} + positional_args = {k: v for k, v in zip(_params.keys(), ags)} + keyword_args = {k: v for k, v in kw.items()} + default_args = {k: v.default for k, v in params.items() if v.default is not Parameter.empty} + intersection = set(positional_args.keys()).intersection(set(keyword_args.keys())) + + if 0 != len(intersection): + raise TypeError(f"Can't merge positional and keyword arguments") + + merge = ( _params + | default_args + | positional_args + | keyword_args) + + merge = {k: v for k, v in merge.items() if v is not Parameter.empty} + + def currified_df(df): + full_args = {**merge, 'df': df} + return f(**full_args) + + return transformer(currified_df, **kwargs, partially_applied_args=merge) + + if len(params) > 1: + return new_f + else: + return new_f() + + def __str__(self): + return f"" + + def __repr__(self): + return f"" + class chain: def __init__(self, *fs: transformer): if not all(isinstance(f, transformer) for f in fs): - raise TypeError + cases = [f for f in fs if not isinstance(f, transformer)] + types = [type(f).__name__ for f in cases] + cases_with_types_str = zip(cases, types) + cases_with_types_str = ', '.join(f"'{x[0]}': '{x[1]}'" for x in cases_with_types_str) + raise TypeError(f"Expected all arguments to be of type 'transformer', got {cases_with_types_str}") self.fs = fs + + def transformers_source(self): + sources = [] + for f in self.fs: + src, count = f.sourcelines + if src[0].startswith('@'): + src = src[1:] + sources.append(''.join(src)) + return sources def __call__(self, df) -> Tuple[list, DataFrame]: + if not self.fs: + return [], df iterator = iter(self.fs) - thunks = [(transformer(lambda: None, name='start'), (None, None), copy(df))] + thunks = [(transformer(lambda: None, name='start'), {}, copy(df))] acc = df while (f := next(iterator, None)): - thunk, result = f(acc) + params, result = f(acc) acc = result - thunks.append((f, thunk, result)) + thunks.append((f, params, result)) return thunks, result - - - -drop: transformer -to_long: transformer -renombrar_columnas: transformer -sort_values: transformer -replace_values: transformer -exportar: transformer - diff --git a/data_transformers/utils.py b/data_transformers/utils.py new file mode 100644 index 0000000..49074f3 --- /dev/null +++ b/data_transformers/utils.py @@ -0,0 +1,31 @@ +import io +from pandas import DataFrame + +def get_dataframe_info(df): + buf = io.StringIO() + df.info(buf=buf) + return buf.getvalue() + +def dict_to_str(d): + return ','.join([f'{k}={v}' for k,v in d.items()]) + +def callstack_to_str(callstack): + frames = [] + for f, params, presult in callstack: + frame = [] + params.pop('df', None) + params_str = dict_to_str(params) + params_str = f'{f.name}({params_str})' + frame.append(params_str) + if isinstance(presult, DataFrame): + info = '\n'.join(get_dataframe_info(presult).split('\n')[1:-3]) + frame.append(info) + frame.append('') + frame.append(presult.head(1).to_markdown()) + + frames.append('\n'.join(frame)) + frames.append('') + frames.append('-'*30) + frames.append('') + + return '\n'.join(frames) \ No newline at end of file