Skip to content

Commit

Permalink
Primera version funcional
Browse files Browse the repository at this point in the history
  • Loading branch information
joangq committed May 15, 2024
1 parent 069afbe commit 26471e4
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 24 deletions.
27 changes: 27 additions & 0 deletions data_transformers/default_transformers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from data_transformers import transformer
from pandas import DataFrame

@transformer.convert
def drop_col(df: DataFrame, col, axis=1):
return df.drop(col, axis=axis)

@transformer.convert
def wide_to_long(df: DataFrame, primary_keys, value_name='valor', var_name='indicador'):
return df.melt(id_vars=primary_keys, value_name=value_name, var_name=var_name)

@transformer.convert
def replace_value(df: DataFrame, col: str, curr_value: str, new_value: str):
df = df.replace({col: curr_value}, new_value)
return df

@transformer.convert
def sort_values(df: DataFrame, how: str, by: list):
if how not in ['ascending', 'descending']:
raise ValueError('how must be either "ascending" or "descending"')

return df.sort_values(by=by, ascending=how=='ascending').reset_index(drop=True)

@transformer.convert
def cast_col(df: DataFrame, col: str, cast_to: type):
df[col] = df[col].astype(cast_to)
return df
110 changes: 86 additions & 24 deletions data_transformers/dtransformers.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@

from functools import reduce as foldl

# ==================================================================================================

from typing import Callable, Any, Tuple, Self
from typing import Callable, Any, Tuple
from pandas import DataFrame
from inspect import Parameter
import warnings
from copy import copy

class staticproperty(property):
Expand All @@ -19,11 +16,18 @@ class transformer_r(tuple):
import inspect

class transformer:
def __init__(self, f: transformer_t, name=None, external_sourcelines: list = None):
def __init__(self,
f: transformer_t,
name = None,
external_sourcelines: list = None,
partially_applied_args: dict = None):

if name:
self.name = name
else:
self.name = '<lambda_t>'

self.partially_applied_args = partially_applied_args

if not external_sourcelines:
self.sourcelines = inspect.getsourcelines(f)
Expand All @@ -32,8 +36,15 @@ def __init__(self, f: transformer_t, name=None, external_sourcelines: list = Non

self.f = f

def __call__(self, *args, **kwargs) -> Tuple[tuple, DataFrame]:
return transformer_r(((args, kwargs), self.f(*args, **kwargs)))
def __call__(self, *args, **kwargs) -> Tuple[dict, DataFrame]:
if kwargs or len(args) > 1:
warnings.warn("Warning, transformer being applied with more than one argument.")

params = inspect.signature(self.f).parameters
applied_args = {k: v for k, v in zip(params.keys(), args)}
final_args = {**self.partially_applied_args, **applied_args}

return transformer_r((final_args, self.f(df=applied_args['df'])))

@staticmethod
def convert(f):
Expand All @@ -42,34 +53,85 @@ def convert(f):
'name': f.__name__
}

def
params = inspect.signature(f).parameters

if len(params) == 0:
raise TypeError(f"Function {f.__name__} has to at least take a DataFrame as parameter")

has_df = params.get('df', None)

if has_df is None:
raise TypeError(f"Function {f.__name__} doesn't have a 'df' parameter")

if has_df.default is not inspect.Parameter.empty:
raise TypeError(f"Function {f.__name__} cannot have 'df' as a default parameter")

def new_f(*ags, **kw):
_params = {k: Parameter.empty for k in params.keys() if k != 'df'}
positional_args = {k: v for k, v in zip(_params.keys(), ags)}
keyword_args = {k: v for k, v in kw.items()}
default_args = {k: v.default for k, v in params.items() if v.default is not Parameter.empty}
intersection = set(positional_args.keys()).intersection(set(keyword_args.keys()))

if 0 != len(intersection):
raise TypeError(f"Can't merge positional and keyword arguments")

merge = ( _params
| default_args
| positional_args
| keyword_args)

merge = {k: v for k, v in merge.items() if v is not Parameter.empty}

def currified_df(df):
full_args = {**merge, 'df': df}
return f(**full_args)

return transformer(currified_df, **kwargs, partially_applied_args=merge)

if len(params) > 1:
return new_f
else:
return new_f()

def __str__(self):
return f"<transformer '{self.name}'>"

def __repr__(self):
return f"<transformer '{self.name}'>"



class chain:
def __init__(self, *fs: transformer):
if not all(isinstance(f, transformer) for f in fs):
raise TypeError
cases = [f for f in fs if not isinstance(f, transformer)]
types = [type(f).__name__ for f in cases]
cases_with_types_str = zip(cases, types)
cases_with_types_str = ', '.join(f"'{x[0]}': '{x[1]}'" for x in cases_with_types_str)
raise TypeError(f"Expected all arguments to be of type 'transformer', got {cases_with_types_str}")

self.fs = fs

def transformers_source(self):
sources = []
for f in self.fs:
src, count = f.sourcelines
if src[0].startswith('@'):
src = src[1:]
sources.append(''.join(src))
return sources

def __call__(self, df) -> Tuple[list, DataFrame]:
if not self.fs:
return [], df
iterator = iter(self.fs)
thunks = [(transformer(lambda: None, name='start'), (None, None), copy(df))]
thunks = [(transformer(lambda: None, name='start'), {}, copy(df))]
acc = df

while (f := next(iterator, None)):
thunk, result = f(acc)
params, result = f(acc)
acc = result
thunks.append((f, thunk, result))
thunks.append((f, params, result))

return thunks, result



drop: transformer
to_long: transformer
renombrar_columnas: transformer
sort_values: transformer
replace_values: transformer
exportar: transformer

31 changes: 31 additions & 0 deletions data_transformers/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import io
from pandas import DataFrame

def get_dataframe_info(df):
buf = io.StringIO()
df.info(buf=buf)
return buf.getvalue()

def dict_to_str(d):
return ','.join([f'{k}={v}' for k,v in d.items()])

def callstack_to_str(callstack):
frames = []
for f, params, presult in callstack:
frame = []
params.pop('df', None)
params_str = dict_to_str(params)
params_str = f'{f.name}({params_str})'
frame.append(params_str)
if isinstance(presult, DataFrame):
info = '\n'.join(get_dataframe_info(presult).split('\n')[1:-3])
frame.append(info)
frame.append('')
frame.append(presult.head(1).to_markdown())

frames.append('\n'.join(frame))
frames.append('')
frames.append('-'*30)
frames.append('')

return '\n'.join(frames)

0 comments on commit 26471e4

Please sign in to comment.