Skip to content

Commit

Permalink
Fix: Migrate Iowa Liquor Sales DAG to new environment. (#847)
Browse files Browse the repository at this point in the history
  • Loading branch information
nlarge-google authored Oct 29, 2024
1 parent 4a64e72 commit a595326
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 19 deletions.
46 changes: 37 additions & 9 deletions datasets/iowa_liquor_sales/pipelines/sales/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,40 @@ dag:
description: "Download data"
args:
task_id: "bash_download"
bash_command: "wget -O /home/airflow/gcs/data/iowa_liquor_sales/raw_files/data.csv https://data.iowa.gov/api/views/m3tr-qhgy/rows.csv"
bash_command: |
curl https://data.iowa.gov/api/views/m3tr-qhgy/rows.csv | gsutil cp - gs://{{ var.value.composer_bucket }}/data/iowa_liquor_sales/raw_files/data.csv
- operator: "BashOperator"
description: "Split data file into smaller chunks"
args:
task_id: "bash_split"
bash_command: tail -n +2 /home/airflow/gcs/data/iowa_liquor_sales/raw_files/data.csv | split -d -l 4000000 - --filter='sh -c "{ head -n1 /home/airflow/gcs/data/iowa_liquor_sales/raw_files/data.csv; cat; } > $FILE"' /home/airflow/gcs/data/iowa_liquor_sales/raw_files/split_data_ ;

- operator: "KubernetesPodOperator"
- operator: "GKECreateClusterOperator"
args:
task_id: "create_cluster"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
body:
name: pdp-iowa-liquor-sales
initial_node_count: 1
network: "{{ var.value.vpc_network }}"
node_config:
machine_type: e2-standard-16
oauth_scopes:
- https://www.googleapis.com/auth/devstorage.read_write
- https://www.googleapis.com/auth/cloud-platform

- operator: "GKEStartPodOperator"
description: "Run CSV transform within kubernetes pod"
args:
task_id: "kub_transform_csv"
startup_timeout_seconds: 1000
name: "Sales"
namespace: "composer"
service_account_name: "datasets"
namespace: "default"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
cluster_name: pdp-iowa-liquor-sales
image_pull_policy: "Always"
image: "{{ var.json.iowa_liquor_sales.container_registry.run_csv_transform_kub }}"
env_vars:
Expand Down Expand Up @@ -116,10 +134,20 @@ dag:
"Volume Sold (Liters)":"volume_sold_liters",
"Volume Sold (Gallons)":"volume_sold_gallons"
}
resources:
request_memory: "4G"
request_cpu: "1"
request_ephemeral_storage: "10G"
container_resources:
memory:
request: "16Gi"
cpu:
request: "1"
ephemeral-storage:
request: "10Gi"

- operator: "GKEDeleteClusterOperator"
args:
task_id: "delete_cluster"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
name: pdp-imdb-reviews

- operator: "BashOperator"
description: "Combine the split files into one"
Expand Down Expand Up @@ -237,4 +265,4 @@ dag:


graph_paths:
- "bash_download >> bash_split >> kub_transform_csv >> bash_concatenate >> load_to_bq"
- "bash_download >> bash_split >> create_cluster >> kub_transform_csv >> delete_cluster >> bash_concatenate >> load_to_bq"
53 changes: 43 additions & 10 deletions datasets/iowa_liquor_sales/pipelines/sales/sales_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from airflow import DAG
from airflow.operators import bash
from airflow.providers.cncf.kubernetes.operators import kubernetes_pod
from airflow.providers.google.cloud.operators import kubernetes_engine
from airflow.providers.google.cloud.transfers import gcs_to_bigquery

default_args = {
Expand All @@ -37,22 +37,41 @@
# Download data
bash_download = bash.BashOperator(
task_id="bash_download",
bash_command="wget -O /home/airflow/gcs/data/iowa_liquor_sales/raw_files/data.csv https://data.iowa.gov/api/views/m3tr-qhgy/rows.csv",
bash_command="curl https://data.iowa.gov/api/views/m3tr-qhgy/rows.csv | gsutil cp - gs://{{ var.value.composer_bucket }}/data/iowa_liquor_sales/raw_files/data.csv\n",
)

# Split data file into smaller chunks
bash_split = bash.BashOperator(
task_id="bash_split",
bash_command='tail -n +2 /home/airflow/gcs/data/iowa_liquor_sales/raw_files/data.csv | split -d -l 4000000 - --filter=\u0027sh -c "{ head -n1 /home/airflow/gcs/data/iowa_liquor_sales/raw_files/data.csv; cat; } \u003e $FILE"\u0027 /home/airflow/gcs/data/iowa_liquor_sales/raw_files/split_data_ ;',
)
create_cluster = kubernetes_engine.GKECreateClusterOperator(
task_id="create_cluster",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
body={
"name": "pdp-iowa-liquor-sales",
"initial_node_count": 1,
"network": "{{ var.value.vpc_network }}",
"node_config": {
"machine_type": "e2-standard-16",
"oauth_scopes": [
"https://www.googleapis.com/auth/devstorage.read_write",
"https://www.googleapis.com/auth/cloud-platform",
],
},
},
)

# Run CSV transform within kubernetes pod
kub_transform_csv = kubernetes_pod.KubernetesPodOperator(
kub_transform_csv = kubernetes_engine.GKEStartPodOperator(
task_id="kub_transform_csv",
startup_timeout_seconds=1000,
name="Sales",
namespace="composer",
service_account_name="datasets",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="pdp-iowa-liquor-sales",
image_pull_policy="Always",
image="{{ var.json.iowa_liquor_sales.container_registry.run_csv_transform_kub }}",
env_vars={
Expand All @@ -66,12 +85,18 @@
"HEADERS": '[\n "invoice_and_item_number",\n "date",\n "store_number",\n "store_name",\n "address",\n "city",\n "zip_code",\n "store_location",\n "county_number",\n "county",\n "category",\n "category_name",\n "vendor_number",\n "vendor_name",\n "item_number",\n "item_description",\n "pack",\n "bottle_volume_ml",\n "state_bottle_cost",\n "state_bottle_retail",\n "bottles_sold",\n "sale_dollars",\n "volume_sold_liters",\n "volume_sold_gallons"\n]',
"RENAME_MAPPINGS": '{\n "Invoice/Item Number":"invoice_and_item_number",\n "Date":"date",\n "Store Number":"store_number",\n "Store Name":"store_name",\n "Address":"address",\n "City":"city",\n "Zip Code":"zip_code",\n "Store Location":"store_location",\n "County Number":"county_number",\n "County":"county",\n "Category":"category",\n "Category Name":"category_name",\n "Vendor Number":"vendor_number",\n "Vendor Name":"vendor_name",\n "Item Number":"item_number",\n "Item Description":"item_description",\n "Pack":"pack",\n "Bottle Volume (ml)":"bottle_volume_ml",\n "State Bottle Cost":"state_bottle_cost",\n "State Bottle Retail":"state_bottle_retail",\n "Bottles Sold":"bottles_sold",\n "Sale (Dollars)":"sale_dollars",\n "Volume Sold (Liters)":"volume_sold_liters",\n "Volume Sold (Gallons)":"volume_sold_gallons"\n}',
},
resources={
"request_memory": "4G",
"request_cpu": "1",
"request_ephemeral_storage": "10G",
container_resources={
"memory": {"request": "16Gi"},
"cpu": {"request": "1"},
"ephemeral-storage": {"request": "10Gi"},
},
)
delete_cluster = kubernetes_engine.GKEDeleteClusterOperator(
task_id="delete_cluster",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
name="pdp-imdb-reviews",
)

# Combine the split files into one
bash_concatenate = bash.BashOperator(
Expand Down Expand Up @@ -236,4 +261,12 @@
],
)

bash_download >> bash_split >> kub_transform_csv >> bash_concatenate >> load_to_bq
(
bash_download
>> bash_split
>> create_cluster
>> kub_transform_csv
>> delete_cluster
>> bash_concatenate
>> load_to_bq
)

0 comments on commit a595326

Please sign in to comment.