-
Notifications
You must be signed in to change notification settings - Fork 4
/
prepare_earnings_report.py
118 lines (104 loc) · 3.89 KB
/
prepare_earnings_report.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
"""
## Prepares the earnings report for a given toy
The toy can be selected in manual runs as an Airflow param.
By default the report will be on our flagship product, the Carrot Plushy.
"""
from airflow.datasets import Dataset
from airflow.decorators import dag, task
from airflow.models.param import Param
from airflow.models.baseoperator import chain
from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from pendulum import datetime
from include.helpers import (
set_up_report_platform,
execute_report_mailer,
tear_down_report_platform,
)
SNOWFLAKE_CONN_ID = "snowflake_de_team"
@dag(
start_date=datetime(2024, 1, 1),
schedule=DatasetOrTimeSchedule(
timetable=CronTriggerTimetable("0 * * * *", timezone="UTC"),
datasets=[Dataset("snowflake://sales_reports_table")],
), # NEW in Airflow 2.9: Schedule on time and datasets
catchup=False,
tags=["DatasetOrTimeSchedule", "use-case"],
default_args={"owner": "Cerberus", "retries": 3, "retry_delay": 5},
description="Load data from S3 to Snowflake",
doc_md=__doc__,
params={
"toy_to_report": Param(
"Carrot Plushy",
type="string",
title="Core toy to report",
description="Define which toy to generate a report on.",
enum=[
"Carrot Plushy",
"ChewChew Train Dog Bed",
"Where is the Ball? - Transparent Edition",
"Stack of Artisinal Homework",
"Post breakfast treats - calory free",
],
),
"simulate_metric_fetch_failure": Param(
False,
type="boolean",
title="Simulate metric fetch failure",
description="Simulate a failure in the metric fetch set to True to see Setup/Teardown in Action.",
),
"simulate_metric_mail_failure": Param(
False,
type="boolean",
title="Simulate metric mail failure",
description="Simulate a failure in the metric mailing set to True to see Setup/Teardown in Action.",
),
},
)
def prepare_earnings_report():
@task
def set_up_internal_reporting_platform():
r = set_up_report_platform()
return r
@task(retries=0)
def get_key_metrics(**context):
key_metric = context["params"]["toy_to_report"]
simulate_metric_fetch_failure = context["params"][
"simulate_metric_fetch_failure"
]
if simulate_metric_fetch_failure:
raise Exception("Metric fetch failed!")
return key_metric
get_total_earnings = SnowflakeOperator(
task_id=f"get_total_earnings",
snowflake_conn_id=SNOWFLAKE_CONN_ID,
sql="""
SELECT SUM(REVENUE) FROM sales_reports_table
WHERE PRODUCTNAME = '{{ ti.xcom_pull(task_ids='get_key_metrics') }}';
""",
)
@task(retries=0)
def send_mails_internal_platform(**context):
simulate_failure = context["params"]["simulate_metric_mail_failure"]
if simulate_failure:
raise Exception("Metric mailing failed!")
r = execute_report_mailer()
return r
@task
def tear_down_internal_reporting_platform():
r = tear_down_report_platform()
return r
set_up_internal_reporting_platform_obj = set_up_internal_reporting_platform()
tear_down_internal_reporting_platform_obj = tear_down_internal_reporting_platform()
tear_down_internal_reporting_platform_obj.as_teardown(
setups=[set_up_internal_reporting_platform_obj]
)
chain(
set_up_internal_reporting_platform_obj,
get_key_metrics(),
get_total_earnings,
send_mails_internal_platform(),
tear_down_internal_reporting_platform_obj,
)
prepare_earnings_report()