Skip to content

Commit

Permalink
Fix: Migrate MNIST Dags To New Environment. (#869)
Browse files Browse the repository at this point in the history
  • Loading branch information
nlarge-google authored Nov 13, 2024
1 parent 36bd638 commit 61edf39
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 60 deletions.
95 changes: 64 additions & 31 deletions datasets/mnist/pipelines/mnist/mnist_dag.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2021 Google LLC
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -14,7 +14,7 @@


from airflow import DAG
from airflow.providers.cncf.kubernetes.operators import kubernetes_pod
from airflow.providers.google.cloud.operators import kubernetes_engine

default_args = {
"owner": "Google",
Expand All @@ -31,13 +31,32 @@
catchup=False,
default_view="graph",
) as dag:
create_cluster = kubernetes_engine.GKECreateClusterOperator(
task_id="create_cluster",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
body={
"name": "pdp-mnist",
"initial_node_count": 1,
"network": "{{ var.value.vpc_network }}",
"node_config": {
"machine_type": "e2-standard-16",
"oauth_scopes": [
"https://www.googleapis.com/auth/devstorage.read_write",
"https://www.googleapis.com/auth/cloud-platform",
],
},
},
)

# Task to copy `t10k-images-idx3-ubyte.gz` from MNIST Database to GCS
download_and_process_source_zip_file = kubernetes_pod.KubernetesPodOperator(
download_and_process_source_zip_file = kubernetes_engine.GKEStartPodOperator(
task_id="download_and_process_source_zip_file",
name="mnist",
namespace="composer",
service_account_name="datasets",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="pdp-mnist",
image_pull_policy="Always",
image="{{ var.json.mnist.container_registry.run_csv_transform_kub }}",
env_vars={
Expand All @@ -48,19 +67,21 @@
"TARGET_GCS_PATH": "data/mnist/mnist/t10k-images-idx3-ubyte.gz",
"PIPELINE_NAME": "mnist",
},
resources={
"request_memory": "2G",
"request_cpu": "200m",
"request_ephemeral_storage": "8G",
container_resources={
"memory": {"request": "16Gi"},
"cpu": {"request": "1"},
"ephemeral-storage": {"request": "10Gi"},
},
)

# Task to copy `train-images-idx3-ubyte.gz` from MNIST Database to GCS
download_and_process_source_zip_file_2 = kubernetes_pod.KubernetesPodOperator(
download_and_process_source_zip_file_2 = kubernetes_engine.GKEStartPodOperator(
task_id="download_and_process_source_zip_file_2",
name="mnist",
namespace="composer",
service_account_name="datasets",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="pdp-mnist",
image_pull_policy="Always",
image="{{ var.json.mnist.container_registry.run_csv_transform_kub }}",
env_vars={
Expand All @@ -71,19 +92,21 @@
"TARGET_GCS_PATH": "data/mnist/mnist/train-images-idx3-ubyte.gz",
"PIPELINE_NAME": "mnist",
},
resources={
"request_memory": "2G",
"request_cpu": "200m",
"request_ephemeral_storage": "8G",
container_resources={
"memory": {"request": "16Gi"},
"cpu": {"request": "1"},
"ephemeral-storage": {"request": "10Gi"},
},
)

# Task to copy `train-labels-idx1-ubyte.gz` from MNIST Database to GCS
download_and_process_source_zip_file_3 = kubernetes_pod.KubernetesPodOperator(
download_and_process_source_zip_file_3 = kubernetes_engine.GKEStartPodOperator(
task_id="download_and_process_source_zip_file_3",
name="mnist",
namespace="composer",
service_account_name="datasets",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="pdp-mnist",
image_pull_policy="Always",
image="{{ var.json.mnist.container_registry.run_csv_transform_kub }}",
env_vars={
Expand All @@ -94,19 +117,21 @@
"TARGET_GCS_PATH": "data/mnist/mnist/train-labels-idx1-ubyte.gz",
"PIPELINE_NAME": "mnist",
},
resources={
"request_memory": "2G",
"request_cpu": "200m",
"request_ephemeral_storage": "8G",
container_resources={
"memory": {"request": "16Gi"},
"cpu": {"request": "1"},
"ephemeral-storage": {"request": "10Gi"},
},
)

# Task to copy `t10k-labels-idx1-ubyte.gz` from MNIST Database to GCS
download_and_process_source_zip_file_4 = kubernetes_pod.KubernetesPodOperator(
download_and_process_source_zip_file_4 = kubernetes_engine.GKEStartPodOperator(
task_id="download_and_process_source_zip_file_4",
name="mnist",
namespace="composer",
service_account_name="datasets",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="pdp-mnist",
image_pull_policy="Always",
image="{{ var.json.mnist.container_registry.run_csv_transform_kub }}",
env_vars={
Expand All @@ -117,16 +142,24 @@
"TARGET_GCS_PATH": "data/mnist/mnist/t10k-labels-idx1-ubyte.gz",
"PIPELINE_NAME": "mnist",
},
resources={
"request_memory": "2G",
"request_cpu": "200m",
"request_ephemeral_storage": "8G",
container_resources={
"memory": {"request": "16Gi"},
"cpu": {"request": "1"},
"ephemeral-storage": {"request": "10Gi"},
},
)
delete_cluster = kubernetes_engine.GKEDeleteClusterOperator(
task_id="delete_cluster",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
name="pdp-mnist",
)

(
download_and_process_source_zip_file
create_cluster
>> download_and_process_source_zip_file
>> download_and_process_source_zip_file_2
>> download_and_process_source_zip_file_3
>> download_and_process_source_zip_file_4
>> delete_cluster
)
101 changes: 72 additions & 29 deletions datasets/mnist/pipelines/mnist/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,30 @@ dag:
default_view: graph

tasks:
- operator: "KubernetesPodOperator"
- operator: "GKECreateClusterOperator"
args:
task_id: "create_cluster"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
body:
name: pdp-mnist
initial_node_count: 1
network: "{{ var.value.vpc_network }}"
node_config:
machine_type: e2-standard-16
oauth_scopes:
- https://www.googleapis.com/auth/devstorage.read_write
- https://www.googleapis.com/auth/cloud-platform

- operator: "GKEStartPodOperator"
description: "Task to copy `t10k-images-idx3-ubyte.gz` from MNIST Database to GCS"
args:
task_id: "download_and_process_source_zip_file"
name: "mnist"
namespace: "composer"
service_account_name: "datasets"
namespace: "default"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
cluster_name: pdp-mnist
image_pull_policy: "Always"
image: "{{ var.json.mnist.container_registry.run_csv_transform_kub }}"
env_vars:
Expand All @@ -46,18 +63,23 @@ dag:
TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}"
TARGET_GCS_PATH: "data/mnist/mnist/t10k-images-idx3-ubyte.gz"
PIPELINE_NAME: "mnist"
resources:
request_memory: "2G"
request_cpu: "200m"
request_ephemeral_storage: "8G"
container_resources:
memory:
request: "16Gi"
cpu:
request: "1"
ephemeral-storage:
request: "10Gi"

- operator: "KubernetesPodOperator"
- operator: "GKEStartPodOperator"
description: "Task to copy `train-images-idx3-ubyte.gz` from MNIST Database to GCS"
args:
task_id: "download_and_process_source_zip_file_2"
name: "mnist"
namespace: "composer"
service_account_name: "datasets"
namespace: "default"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
cluster_name: pdp-mnist
image_pull_policy: "Always"
image: "{{ var.json.mnist.container_registry.run_csv_transform_kub }}"
env_vars:
Expand All @@ -67,18 +89,23 @@ dag:
TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}"
TARGET_GCS_PATH: "data/mnist/mnist/train-images-idx3-ubyte.gz"
PIPELINE_NAME: "mnist"
resources:
request_memory: "2G"
request_cpu: "200m"
request_ephemeral_storage: "8G"
container_resources:
memory:
request: "16Gi"
cpu:
request: "1"
ephemeral-storage:
request: "10Gi"

- operator: "KubernetesPodOperator"
- operator: "GKEStartPodOperator"
description: "Task to copy `train-labels-idx1-ubyte.gz` from MNIST Database to GCS"
args:
task_id: "download_and_process_source_zip_file_3"
name: "mnist"
namespace: "composer"
service_account_name: "datasets"
namespace: "default"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
cluster_name: pdp-mnist
image_pull_policy: "Always"
image: "{{ var.json.mnist.container_registry.run_csv_transform_kub }}"
env_vars:
Expand All @@ -88,18 +115,23 @@ dag:
TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}"
TARGET_GCS_PATH: "data/mnist/mnist/train-labels-idx1-ubyte.gz"
PIPELINE_NAME: "mnist"
resources:
request_memory: "2G"
request_cpu: "200m"
request_ephemeral_storage: "8G"
container_resources:
memory:
request: "16Gi"
cpu:
request: "1"
ephemeral-storage:
request: "10Gi"

- operator: "KubernetesPodOperator"
- operator: "GKEStartPodOperator"
description: "Task to copy `t10k-labels-idx1-ubyte.gz` from MNIST Database to GCS"
args:
task_id: "download_and_process_source_zip_file_4"
name: "mnist"
namespace: "composer"
service_account_name: "datasets"
namespace: "default"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
cluster_name: pdp-mnist
image_pull_policy: "Always"
image: "{{ var.json.mnist.container_registry.run_csv_transform_kub }}"
env_vars:
Expand All @@ -109,10 +141,21 @@ dag:
TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}"
TARGET_GCS_PATH: "data/mnist/mnist/t10k-labels-idx1-ubyte.gz"
PIPELINE_NAME: "mnist"
resources:
request_memory: "2G"
request_cpu: "200m"
request_ephemeral_storage: "8G"
container_resources:
memory:
request: "16Gi"
cpu:
request: "1"
ephemeral-storage:
request: "10Gi"

- operator: "GKEDeleteClusterOperator"
args:
task_id: "delete_cluster"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
name: pdp-mnist


graph_paths:
- "download_and_process_source_zip_file >> download_and_process_source_zip_file_2 >> download_and_process_source_zip_file_3 >> download_and_process_source_zip_file_4"
- "create_cluster >> download_and_process_source_zip_file >> download_and_process_source_zip_file_2 >> download_and_process_source_zip_file_3 >> download_and_process_source_zip_file_4 >> delete_cluster"

0 comments on commit 61edf39

Please sign in to comment.