-
Notifications
You must be signed in to change notification settings - Fork 2
/
run.py
76 lines (63 loc) · 2.42 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import random
import time
import sys
import os
import pandas
import inspect
from colorama import Fore, Back, Style
from utils.core import (
load_pipelines_from_project, get_params, load_pipelines_from_project, get_transformation,
validate_transformation_params
)
pipeline, sources, path_to_stream = get_params()
if pipeline is None:
all = load_pipelines_from_project()
raise Exception(f'Please specify pipeline to load from the following options: ' +
",".join([p['slug'] for p in all]))
else:
pipeline = load_pipelines_from_project(pipeline)[0]
if "sources" not in pipeline:
raise Exception(
f"Pipeline {pipeline['slug']} is missing sources on the YML")
else:
sources = pipeline['sources']
if sources is None or len(sources) == 0:
raise Exception(
f"Missing CSV Files to for source from when running pipeline {pipeline['slug']}.\n Hint: please specify the name of the CSV files on the project.yml")
else:
for i in range(len(sources)):
if ".csv" not in sources[i]:
sources[i] = sources[i] + ".csv"
streams = []
if path_to_stream is not None:
streams = pandas.read_csv(
"sources/"+path_to_stream).to_dict('records')
print(Fore.BLUE +
f"Stream param detected and {len(streams)} streams fetched. Pipeline {pipeline['slug']} will run once for each stream ...")
else:
streams.append(None)
dfs = []
for source in sources:
dfs.append(pandas.read_csv("sources/"+source))
df_out = None
for stream_index in range(len(streams)):
kwargs = {}
if streams[stream_index] is not None:
print(Fore.WHITE + f"[] Running pipeline with stream #{stream_index}")
kwargs['stream'] = streams[stream_index]
count = 0
for t in pipeline['transformations']:
count += 1
print(Fore.WHITE +
f"[] Applying {count} transformation {t} with {len(dfs)} sources...")
run, _in, _out, _stream = get_transformation(pipeline['slug'], t)
if df_out is not None:
dfs[0] = df_out
arguments = validate_transformation_params(
run, kwargs['stream'] if 'stream' in kwargs else None)
df_out = run(*dfs[:len(arguments) - len(kwargs.keys())], **kwargs)
file_name = source.split('.')[0]
if not os.path.exists("output/"):
os.mkdir("output/")
df_out.to_csv(
f"output/{pipeline['slug']}{str(round(time.time(),3)).replace('.','')}.csv")