-
Notifications
You must be signed in to change notification settings - Fork 2
/
star_schema.py
107 lines (99 loc) · 3.55 KB
/
star_schema.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.amazon.aws.sensors.emr import EmrJobFlowSensor
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator
import os
def get_emr_operators(entity_name, process):
bucket_name = os.environ['EMR_BUCKET_NAME']
SPARK_STEPS = [
{
'Name': f'{process}_{entity_name}',
'ActionOnFailure': 'TERMINATE_CLUSTER',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'/usr/bin/spark-submit',
'--deploy-mode',
'cluster',
'--master',
'yarn',
'--conf',
'spark.yarn.submit.waitAppCompletion=true',
'--conf',
'spark.sql.parquet.fs.optimized.committer.optimization-enabled=true',
'--conf',
f'spark.yarn.appMasterEnv.EMR_BUCKET_NAME={bucket_name}',
'--py-files',
f's3://{bucket_name}/packages/packages.zip',
f's3a://{bucket_name}/src/job_template.py',
entity_name,
process,
bucket_name
],
},
}
]
JOB_FLOW_OVERRIDES = {
'Name': 'airflow-dag-star_schema',
'ReleaseLabel': 'emr-6.7.0',
'LogUri': f"s3://{bucket_name}/log/",
'Instances': {
'InstanceGroups': [
{
'Name': 'Master node',
'Market': 'SPOT',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1,
}
],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
},
'Steps': SPARK_STEPS,
'JobFlowRole': 'LabInstanceProfile',
'ServiceRole': 'EMR_DefaultRole',
'BootstrapActions': [
{
'Name': 'install_packages',
'ScriptBootstrapAction': {
'Path': f's3://{bucket_name}/bootstrap_actions/install_packages.sh',
}
},
]
}
job_flow_creator = EmrCreateJobFlowOperator(
task_id=f'job_flow-{entity_name}-{process}',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
)
job_sensor = EmrJobFlowSensor(
task_id=f'check_job-{entity_name}-{process}',
job_flow_id=job_flow_creator.output,
aws_conn_id='aws_default',
)
return job_flow_creator, job_sensor
with DAG(
"bancos_star_schema",
default_args={
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
},
description="A dag that generates the 'Bancos Tarifas' star_schema",
schedule_interval=None,
start_date=datetime(2022, 8, 1),
catchup=False,
tags=["project4", "emr"],
) as dag:
t1, t2 = get_emr_operators('bancos', 'extract')
t3, t4 = get_emr_operators('lista_tarifas', 'extract')
t5, t6 = get_emr_operators('bancos', 'transform')
t7, t8 = get_emr_operators('lista_tarifas', 'transform')
t9, t10 = get_emr_operators('star_schema', 'transform')
t1 >> t2 >> t3 >> t4
t4 >> t5 >> t6 >> t9 >> t10
t4 >> t7 >> t8 >> t9 >> t10