-
Notifications
You must be signed in to change notification settings - Fork 0
/
etl_wrapper.py
28 lines (22 loc) · 894 Bytes
/
etl_wrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from etl.extract import extract_covid_data
from etl.load import load_to_dwh
from etl.transform import transform
import os
import sys
import yaml
import utils.db_utils as db_utils
import psycopg2
from utils.sns_util import notify_etl_status
CONFIG_FILE = os.path.join(sys.path[0], 'resources/config.yml')
def load_data(event, context):
try:
data = yaml.load(open(CONFIG_FILE), Loader=yaml.BaseLoader)['data']
url_covid_nyt_data = data['url_covid_nyt_data']
url_covid_jh_data = data['url_covid_jh_data']
dwh_conn = db_utils.get_dwh_conn('dwh')
df_nyt_data, df_jh_data = extract_covid_data(url_covid_nyt_data, url_covid_jh_data)
df_transformed = transform(df_nyt_data, df_jh_data)
load_to_dwh(df_transformed, dwh_conn)
except (Exception, psycopg2.Error) as error:
notify_etl_status(False, str(error))
load_data("", "")