Skip to content

Commit

Permalink
Merge branch 'main' into migrate_irs_20241029
Browse files Browse the repository at this point in the history
  • Loading branch information
nlarge-google authored Oct 30, 2024
2 parents 1cfb71d + 1b3f9ec commit 5f2cdb7
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@
transform_csv = kubernetes_pod.KubernetesPodOperator(
task_id="transform_csv",
name="food_enforcement",
namespace="composer",
service_account_name="datasets",
namespace="composer-user-workloads",
service_account_name="default",
config_file="/home/airflow/composer_kube_config",
image_pull_policy="Always",
image="{{ var.json.fda_food.container_registry.run_csv_transform_kub }}",
env_vars={
Expand All @@ -56,10 +57,10 @@
"RECORD_PATH": "",
"META": '[ "status", "city", "state", "country", "classification",\n "openfda", "product_type", "event_id", "recalling_firm", "address_1",\n "address_2", "postal_code", "voluntary_mandated", "initial_firm_notification", "distribution_pattern",\n "recall_number", "product_description", "product_quantity", "reason_for_recall", "recall_initiation_date",\n "center_classification_date", "report_date", "code_info", "more_code_info", "termination_date" ]',
},
resources={
"request_memory": "4G",
"request_cpu": "1",
"request_ephemeral_storage": "5G",
container_resources={
"memory": {"request": "32Gi"},
"cpu": {"request": "2"},
"ephemeral-storage": {"request": "10Gi"},
},
)

Expand Down
16 changes: 10 additions & 6 deletions datasets/fda_food/pipelines/food_enforcement/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ dag:

task_id: "transform_csv"
name: "food_enforcement"
namespace: "composer"
service_account_name: "datasets"
namespace: "composer-user-workloads"
service_account_name: "default"
config_file: "/home/airflow/composer_kube_config"

image_pull_policy: "Always"
image: "{{ var.json.fda_food.container_registry.run_csv_transform_kub }}"
Expand Down Expand Up @@ -81,10 +82,13 @@ dag:
"address_2", "postal_code", "voluntary_mandated", "initial_firm_notification", "distribution_pattern",
"recall_number", "product_description", "product_quantity", "reason_for_recall", "recall_initiation_date",
"center_classification_date", "report_date", "code_info", "more_code_info", "termination_date" ]
resources:
request_memory: "4G"
request_cpu: "1"
request_ephemeral_storage: "5G"
container_resources:
memory:
request: "32Gi"
cpu:
request: "2"
ephemeral-storage:
request: "10Gi"

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load CSV data to a BigQuery table"
Expand Down
13 changes: 7 additions & 6 deletions datasets/fda_food/pipelines/food_events/food_events_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@
transform_csv = kubernetes_pod.KubernetesPodOperator(
task_id="transform_csv",
name="food_events",
namespace="composer",
service_account_name="datasets",
namespace="composer-user-workloads",
service_account_name="default",
config_file="/home/airflow/composer_kube_config",
image_pull_policy="Always",
image="{{ var.json.fda_food.container_registry.run_csv_transform_kub }}",
env_vars={
Expand All @@ -56,10 +57,10 @@
"RECORD_PATH": "products",
"META": '[\n "report_number", "outcomes", "date_created", "reactions", "date_started",\n ["consumer", "age"], ["consumer", "age_unit"], ["consumer", "gender"]\n]',
},
resources={
"request_memory": "4G",
"request_cpu": "1",
"request_ephemeral_storage": "5G",
container_resources={
"memory": {"request": "32Gi"},
"cpu": {"request": "2"},
"ephemeral-storage": {"request": "10Gi"},
},
)

Expand Down
16 changes: 10 additions & 6 deletions datasets/fda_food/pipelines/food_events/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ dag:

task_id: "transform_csv"
name: "food_events"
namespace: "composer"
service_account_name: "datasets"
namespace: "composer-user-workloads"
service_account_name: "default"
config_file: "/home/airflow/composer_kube_config"

image_pull_policy: "Always"
image: "{{ var.json.fda_food.container_registry.run_csv_transform_kub }}"
Expand Down Expand Up @@ -76,10 +77,13 @@ dag:
"report_number", "outcomes", "date_created", "reactions", "date_started",
["consumer", "age"], ["consumer", "age_unit"], ["consumer", "gender"]
]
resources:
request_memory: "4G"
request_cpu: "1"
request_ephemeral_storage: "5G"
container_resources:
memory:
request: "32Gi"
cpu:
request: "2"
ephemeral-storage:
request: "10Gi"

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load CSV data to a BigQuery table"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,31 @@ dag:
default_view: graph

tasks:
- operator: "KubernetesPodOperator"
- operator: "GKECreateClusterOperator"
args:
task_id: "create_cluster"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
body:
name: pdp-google-cloud-release-notes
initial_node_count: 2
network: "{{ var.value.vpc_network }}"
node_config:
machine_type: e2-standard-16
oauth_scopes:
- https://www.googleapis.com/auth/devstorage.read_write
- https://www.googleapis.com/auth/cloud-platform

- operator: "GKEStartPodOperator"
description: "Copy GCP release notes dataset"
args:
task_id: "copy_bq_dataset"
startup_timeout_seconds: 1000
name: "copy_bq_dataset"
namespace: "composer"
service_account_name: "datasets"
namespace: "default"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
cluster_name: pdp-google-cloud-release-notes
image_pull_policy: "Always"
image: "{{ var.json.google_cloud_release_notes.container_registry.copy_bq_dataset }}"
env_vars:
Expand All @@ -47,9 +65,20 @@ dag:
TARGET_PROJECT_ID: "{{ var.value.gcp_project }}"
TARGET_BQ_DATASET: google_cloud_release_notes
SERVICE_ACCOUNT: "{{ var.json.google_cloud_release_notes.service_account }}"
resources:
request_memory: "128M"
request_cpu: "200m"
container_resources:
memory:
request: "32Gi"
cpu:
request: "2"
ephemeral-storage:
request: "10Gi"

- operator: "GKEDeleteClusterOperator"
args:
task_id: "delete_cluster"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
name: pdp-google-cloud-release-notes

graph_paths:
- "copy_bq_dataset"
- "create_cluster >> copy_bq_dataset >> delete_cluster"
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@


from airflow import DAG
from airflow.providers.cncf.kubernetes.operators import kubernetes_pod
from airflow.providers.google.cloud.operators import kubernetes_engine

default_args = {
"owner": "Google",
Expand All @@ -31,13 +31,33 @@
catchup=False,
default_view="graph",
) as dag:
create_cluster = kubernetes_engine.GKECreateClusterOperator(
task_id="create_cluster",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
body={
"name": "pdp-google-cloud-release-notes",
"initial_node_count": 2,
"network": "{{ var.value.vpc_network }}",
"node_config": {
"machine_type": "e2-standard-16",
"oauth_scopes": [
"https://www.googleapis.com/auth/devstorage.read_write",
"https://www.googleapis.com/auth/cloud-platform",
],
},
},
)

# Copy GCP release notes dataset
copy_bq_dataset = kubernetes_pod.KubernetesPodOperator(
copy_bq_dataset = kubernetes_engine.GKEStartPodOperator(
task_id="copy_bq_dataset",
startup_timeout_seconds=1000,
name="copy_bq_dataset",
namespace="composer",
service_account_name="datasets",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="pdp-google-cloud-release-notes",
image_pull_policy="Always",
image="{{ var.json.google_cloud_release_notes.container_registry.copy_bq_dataset }}",
env_vars={
Expand All @@ -47,7 +67,17 @@
"TARGET_BQ_DATASET": "google_cloud_release_notes",
"SERVICE_ACCOUNT": "{{ var.json.google_cloud_release_notes.service_account }}",
},
resources={"request_memory": "128M", "request_cpu": "200m"},
container_resources={
"memory": {"request": "32Gi"},
"cpu": {"request": "2"},
"ephemeral-storage": {"request": "10Gi"},
},
)
delete_cluster = kubernetes_engine.GKEDeleteClusterOperator(
task_id="delete_cluster",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
name="pdp-google-cloud-release-notes",
)

copy_bq_dataset
create_cluster >> copy_bq_dataset >> delete_cluster

0 comments on commit 5f2cdb7

Please sign in to comment.