Skip to content

Commit

Permalink
Merge branch 'main' into migrate_open_buildings_20241108
Browse files Browse the repository at this point in the history
  • Loading branch information
nlarge-google authored Nov 11, 2024
2 parents 5490dd1 + 95bfb72 commit 1da5ada
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 195 deletions.
6 changes: 3 additions & 3 deletions datasets/mimic_iii/pipelines/mimic_iii/mimic_iii_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@
copy_bq_dataset = kubernetes_pod.KubernetesPodOperator(
task_id="copy_bq_dataset",
name="copy_bq_dataset",
namespace="composer",
service_account_name="datasets",
namespace="composer-user-workloads",
service_account_name="default",
config_file="/home/airflow/composer_kube_config",
image_pull_policy="Always",
image="{{ var.json.mimic_iii.container_registry.run_script_kub }}",
env_vars={
Expand All @@ -46,7 +47,6 @@
"TARGET_PROJECT_ID": "{{ var.json.mimic_iii.target_project_id }}",
"TARGET_BQ_DATASET": "{{ var.json.mimic_iii.target_bq_dataset }}",
},
resources={"request_memory": "128M", "request_cpu": "200m"},
)

copy_bq_dataset
8 changes: 3 additions & 5 deletions datasets/mimic_iii/pipelines/mimic_iii/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,16 @@ dag:
args:
task_id: "copy_bq_dataset"
name: "copy_bq_dataset"
namespace: "composer"
service_account_name: "datasets"
namespace: "composer-user-workloads"
service_account_name: "default"
config_file: "/home/airflow/composer_kube_config"
image_pull_policy: "Always"
image: "{{ var.json.mimic_iii.container_registry.run_script_kub }}"
env_vars:
SOURCE_PROJECT_ID: "{{ var.json.mimic_iii.source_project_id }}"
SOURCE_BQ_DATASET: "{{ var.json.mimic_iii.source_bq_dataset }}"
TARGET_PROJECT_ID: "{{ var.json.mimic_iii.target_project_id }}"
TARGET_BQ_DATASET: "{{ var.json.mimic_iii.target_bq_dataset }}"
resources:
request_memory: "128M"
request_cpu: "200m"

graph_paths:
- "copy_bq_dataset"
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from airflow import DAG
from airflow.operators import bash
from airflow.providers.cncf.kubernetes.operators import kubernetes_pod
from airflow.providers.google.cloud.operators import kubernetes_engine
from airflow.providers.google.cloud.transfers import gcs_to_bigquery, gcs_to_gcs

default_args = {
Expand Down Expand Up @@ -53,14 +53,33 @@
"source_file": "metadata.json.gz",
},
)
create_cluster = kubernetes_engine.GKECreateClusterOperator(
task_id="create_cluster",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
body={
"name": "pdp-multilingual-spoken-words-corpus",
"initial_node_count": 1,
"network": "{{ var.value.vpc_network }}",
"node_config": {
"machine_type": "e2-standard-16",
"oauth_scopes": [
"https://www.googleapis.com/auth/devstorage.read_write",
"https://www.googleapis.com/auth/cloud-platform",
],
},
},
)

# Run CSV transform within kubernetes pod
metadata_csv_transform = kubernetes_pod.KubernetesPodOperator(
metadata_csv_transform = kubernetes_engine.GKEStartPodOperator(
task_id="metadata_csv_transform",
startup_timeout_seconds=600,
name="metadata_csv_transform",
namespace="composer",
service_account_name="datasets",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="pdp-multilingual-spoken-words-corpus",
image_pull_policy="Always",
image="{{ var.json.multilingual_spoken_words_corpus.container_registry.run_csv_transform_kub }}",
env_vars={
Expand All @@ -72,12 +91,18 @@
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/multilingual_spoken_words_corpus/metadata_data_output.csv",
},
resources={
"request_memory": "4G",
"request_cpu": "1",
"request_ephemeral_storage": "4G",
container_resources={
"memory": {"request": "16Gi"},
"cpu": {"request": "1"},
"ephemeral-storage": {"request": "10Gi"},
},
)
delete_cluster = kubernetes_engine.GKEDeleteClusterOperator(
task_id="delete_cluster",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
name="pdp-multilingual-spoken-words-corpus",
)

# Task to load CSV data to a BigQuery table
load_metadata_to_bq = gcs_to_bigquery.GCSToBigQueryOperator(
Expand Down Expand Up @@ -133,6 +158,8 @@
(
copy_metadata_file_to_gcs
>> unzip_metadata_gz
>> create_cluster
>> metadata_csv_transform
>> delete_cluster
>> load_metadata_to_bq
)
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,31 @@ dag:
data_dir: "/home/airflow/gcs/data/multilingual_spoken_words_corpus"
source_file: "metadata.json.gz"

- operator: "KubernetesPodOperator"
- operator: "GKECreateClusterOperator"
args:
task_id: "create_cluster"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
body:
name: pdp-multilingual-spoken-words-corpus
initial_node_count: 1
network: "{{ var.value.vpc_network }}"
node_config:
machine_type: e2-standard-16
oauth_scopes:
- https://www.googleapis.com/auth/devstorage.read_write
- https://www.googleapis.com/auth/cloud-platform

- operator: "GKEStartPodOperator"
description: "Run CSV transform within kubernetes pod"
args:
task_id: "metadata_csv_transform"
startup_timeout_seconds: 600
name: "metadata_csv_transform"
namespace: "composer"
service_account_name: "datasets"
namespace: "default"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
cluster_name: pdp-multilingual-spoken-words-corpus
image_pull_policy: "Always"
image: "{{ var.json.multilingual_spoken_words_corpus.container_registry.run_csv_transform_kub }}"
env_vars:
Expand All @@ -75,10 +92,20 @@ dag:
["lang_abbr", "language", "number_of_words","word", "word_count", "filename"]
TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}"
TARGET_GCS_PATH: "data/multilingual_spoken_words_corpus/metadata_data_output.csv"
resources:
request_memory: "4G"
request_cpu: "1"
request_ephemeral_storage: "4G"
container_resources:
memory:
request: "16Gi"
cpu:
request: "1"
ephemeral-storage:
request: "10Gi"

- operator: "GKEDeleteClusterOperator"
args:
task_id: "delete_cluster"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
name: pdp-multilingual-spoken-words-corpus

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load CSV data to a BigQuery table"
Expand Down Expand Up @@ -117,4 +144,4 @@ dag:
mode: "nullable"

graph_paths:
- "copy_metadata_file_to_gcs >> unzip_metadata_gz >> metadata_csv_transform >> load_metadata_to_bq"
- "copy_metadata_file_to_gcs >> unzip_metadata_gz >> create_cluster >> metadata_csv_transform >> delete_cluster >> load_metadata_to_bq"
Loading

0 comments on commit 1da5ada

Please sign in to comment.