From 28fd39c58a8b5bdb5b06e228bbd04bc45f72574d Mon Sep 17 00:00:00 2001 From: joangq Date: Thu, 13 Jun 2024 15:38:40 -0300 Subject: [PATCH] Merge download/upload features --- data_transformers/dtransformers.py | 45 ++++++++++++++++++++++++++++-- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/data_transformers/dtransformers.py b/data_transformers/dtransformers.py index bcc0af7..629a535 100644 --- a/data_transformers/dtransformers.py +++ b/data_transformers/dtransformers.py @@ -4,7 +4,7 @@ from functools import wraps import warnings from copy import copy -from random import randint +from .utils import callstack_to_program, callstack_to_str class staticproperty(property): def __get__(self, cls, owner): @@ -105,7 +105,7 @@ def currified_df(df): return transformer(currified_df, **kwargs, partially_applied_args=merge) - if len(params) > 1: + if (len(params) - len(default_args)) > 1: return new_f else: return new_f() @@ -119,6 +119,10 @@ def __repr__(self): class chain: + fs: tuple[transformer] + thunks: list[tuple[transformer, dict, DataFrame]] + result: DataFrame + def __init__(self, *fs: transformer): if not all(isinstance(f, transformer) for f in fs): cases = [f for f in fs if not isinstance(f, transformer)] @@ -151,4 +155,41 @@ def __call__(self, df) -> Tuple[list, DataFrame]: acc = result thunks.append((f, params, result)) + self.thunks = thunks + self.result = result return thunks, result + + def _export(self, buffer): + callstack = self.thunks + imports = 'from pandas import DataFrame\nfrom data_transformers import chain, transformer\n\n' + definitions = '\n'.join(self.transformers_source()) + chain_prog = callstack_to_program(callstack) + chain_log = '\n'.join(f'# {line}' for line in callstack_to_str(callstack).split('\n')) + + buffer.write(imports) + buffer.write('\n') + buffer.write('# DEFINITIONS_START\n') + buffer.write(definitions) + buffer.write('# DEFINITIONS_END\n') + buffer.write('\n\n') + buffer.write('# PIPELINE_START\n') + buffer.write(chain_prog) + buffer.write('\n') + buffer.write('# PIPELINE_END\n') + buffer.write('\n\n') + buffer.write(chain_log) + + return buffer + + def export(self, filename, force=False): + # filename = f'{grafico_id}_transformer.py' + if not force and os.path.exists(filename): + raise FileExistsError(f"File {filename} already exists. Use force=True to overwrite it.") + + dir = os.path.dirname(filename) + if dir: + os.makedirs(dir, exist_ok=True) + with open(filename, 'w') as f: + self._export(f) + + return filename