Skip to content

Commit

Permalink
Merge download/upload features
Browse files Browse the repository at this point in the history
  • Loading branch information
joangq committed Jun 13, 2024
1 parent d57d1fb commit 28fd39c
Showing 1 changed file with 43 additions and 2 deletions.
45 changes: 43 additions & 2 deletions data_transformers/dtransformers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from functools import wraps
import warnings
from copy import copy
from random import randint
from .utils import callstack_to_program, callstack_to_str

class staticproperty(property):
def __get__(self, cls, owner):
Expand Down Expand Up @@ -105,7 +105,7 @@ def currified_df(df):

return transformer(currified_df, **kwargs, partially_applied_args=merge)

if len(params) > 1:
if (len(params) - len(default_args)) > 1:
return new_f
else:
return new_f()
Expand All @@ -119,6 +119,10 @@ def __repr__(self):


class chain:
fs: tuple[transformer]
thunks: list[tuple[transformer, dict, DataFrame]]
result: DataFrame

def __init__(self, *fs: transformer):
if not all(isinstance(f, transformer) for f in fs):
cases = [f for f in fs if not isinstance(f, transformer)]
Expand Down Expand Up @@ -151,4 +155,41 @@ def __call__(self, df) -> Tuple[list, DataFrame]:
acc = result
thunks.append((f, params, result))

self.thunks = thunks
self.result = result
return thunks, result

def _export(self, buffer):
callstack = self.thunks
imports = 'from pandas import DataFrame\nfrom data_transformers import chain, transformer\n\n'
definitions = '\n'.join(self.transformers_source())
chain_prog = callstack_to_program(callstack)
chain_log = '\n'.join(f'# {line}' for line in callstack_to_str(callstack).split('\n'))

buffer.write(imports)
buffer.write('\n')
buffer.write('# DEFINITIONS_START\n')
buffer.write(definitions)
buffer.write('# DEFINITIONS_END\n')
buffer.write('\n\n')
buffer.write('# PIPELINE_START\n')
buffer.write(chain_prog)
buffer.write('\n')
buffer.write('# PIPELINE_END\n')
buffer.write('\n\n')
buffer.write(chain_log)

return buffer

def export(self, filename, force=False):
# filename = f'{grafico_id}_transformer.py'
if not force and os.path.exists(filename):
raise FileExistsError(f"File {filename} already exists. Use force=True to overwrite it.")

dir = os.path.dirname(filename)
if dir:
os.makedirs(dir, exist_ok=True)
with open(filename, 'w') as f:
self._export(f)

return filename

0 comments on commit 28fd39c

Please sign in to comment.