diff --git a/datasets/mnist/pipelines/mnist/mnist_dag.py b/datasets/mnist/pipelines/mnist/mnist_dag.py index 9904271ae..40e0cba83 100644 --- a/datasets/mnist/pipelines/mnist/mnist_dag.py +++ b/datasets/mnist/pipelines/mnist/mnist_dag.py @@ -1,4 +1,4 @@ -# Copyright 2021 Google LLC +# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ from airflow import DAG -from airflow.providers.cncf.kubernetes.operators import kubernetes_pod +from airflow.providers.google.cloud.operators import kubernetes_engine default_args = { "owner": "Google", @@ -31,13 +31,32 @@ catchup=False, default_view="graph", ) as dag: + create_cluster = kubernetes_engine.GKECreateClusterOperator( + task_id="create_cluster", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + body={ + "name": "pdp-mnist", + "initial_node_count": 1, + "network": "{{ var.value.vpc_network }}", + "node_config": { + "machine_type": "e2-standard-16", + "oauth_scopes": [ + "https://www.googleapis.com/auth/devstorage.read_write", + "https://www.googleapis.com/auth/cloud-platform", + ], + }, + }, + ) # Task to copy `t10k-images-idx3-ubyte.gz` from MNIST Database to GCS - download_and_process_source_zip_file = kubernetes_pod.KubernetesPodOperator( + download_and_process_source_zip_file = kubernetes_engine.GKEStartPodOperator( task_id="download_and_process_source_zip_file", name="mnist", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-mnist", image_pull_policy="Always", image="{{ var.json.mnist.container_registry.run_csv_transform_kub }}", env_vars={ @@ -48,19 +67,21 @@ "TARGET_GCS_PATH": "data/mnist/mnist/t10k-images-idx3-ubyte.gz", "PIPELINE_NAME": "mnist", }, - resources={ - "request_memory": "2G", - "request_cpu": "200m", - "request_ephemeral_storage": "8G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) # Task to copy `train-images-idx3-ubyte.gz` from MNIST Database to GCS - download_and_process_source_zip_file_2 = kubernetes_pod.KubernetesPodOperator( + download_and_process_source_zip_file_2 = kubernetes_engine.GKEStartPodOperator( task_id="download_and_process_source_zip_file_2", name="mnist", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-mnist", image_pull_policy="Always", image="{{ var.json.mnist.container_registry.run_csv_transform_kub }}", env_vars={ @@ -71,19 +92,21 @@ "TARGET_GCS_PATH": "data/mnist/mnist/train-images-idx3-ubyte.gz", "PIPELINE_NAME": "mnist", }, - resources={ - "request_memory": "2G", - "request_cpu": "200m", - "request_ephemeral_storage": "8G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) # Task to copy `train-labels-idx1-ubyte.gz` from MNIST Database to GCS - download_and_process_source_zip_file_3 = kubernetes_pod.KubernetesPodOperator( + download_and_process_source_zip_file_3 = kubernetes_engine.GKEStartPodOperator( task_id="download_and_process_source_zip_file_3", name="mnist", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-mnist", image_pull_policy="Always", image="{{ var.json.mnist.container_registry.run_csv_transform_kub }}", env_vars={ @@ -94,19 +117,21 @@ "TARGET_GCS_PATH": "data/mnist/mnist/train-labels-idx1-ubyte.gz", "PIPELINE_NAME": "mnist", }, - resources={ - "request_memory": "2G", - "request_cpu": "200m", - "request_ephemeral_storage": "8G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) # Task to copy `t10k-labels-idx1-ubyte.gz` from MNIST Database to GCS - download_and_process_source_zip_file_4 = kubernetes_pod.KubernetesPodOperator( + download_and_process_source_zip_file_4 = kubernetes_engine.GKEStartPodOperator( task_id="download_and_process_source_zip_file_4", name="mnist", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-mnist", image_pull_policy="Always", image="{{ var.json.mnist.container_registry.run_csv_transform_kub }}", env_vars={ @@ -117,16 +142,24 @@ "TARGET_GCS_PATH": "data/mnist/mnist/t10k-labels-idx1-ubyte.gz", "PIPELINE_NAME": "mnist", }, - resources={ - "request_memory": "2G", - "request_cpu": "200m", - "request_ephemeral_storage": "8G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) + delete_cluster = kubernetes_engine.GKEDeleteClusterOperator( + task_id="delete_cluster", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + name="pdp-mnist", + ) ( - download_and_process_source_zip_file + create_cluster + >> download_and_process_source_zip_file >> download_and_process_source_zip_file_2 >> download_and_process_source_zip_file_3 >> download_and_process_source_zip_file_4 + >> delete_cluster ) diff --git a/datasets/mnist/pipelines/mnist/pipeline.yaml b/datasets/mnist/pipelines/mnist/pipeline.yaml index e5a0a3c5c..5411959bd 100644 --- a/datasets/mnist/pipelines/mnist/pipeline.yaml +++ b/datasets/mnist/pipelines/mnist/pipeline.yaml @@ -30,13 +30,30 @@ dag: default_view: graph tasks: - - operator: "KubernetesPodOperator" + - operator: "GKECreateClusterOperator" + args: + task_id: "create_cluster" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + body: + name: pdp-mnist + initial_node_count: 1 + network: "{{ var.value.vpc_network }}" + node_config: + machine_type: e2-standard-16 + oauth_scopes: + - https://www.googleapis.com/auth/devstorage.read_write + - https://www.googleapis.com/auth/cloud-platform + + - operator: "GKEStartPodOperator" description: "Task to copy `t10k-images-idx3-ubyte.gz` from MNIST Database to GCS" args: task_id: "download_and_process_source_zip_file" name: "mnist" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-mnist image_pull_policy: "Always" image: "{{ var.json.mnist.container_registry.run_csv_transform_kub }}" env_vars: @@ -46,18 +63,23 @@ dag: TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/mnist/mnist/t10k-images-idx3-ubyte.gz" PIPELINE_NAME: "mnist" - resources: - request_memory: "2G" - request_cpu: "200m" - request_ephemeral_storage: "8G" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" - - operator: "KubernetesPodOperator" + - operator: "GKEStartPodOperator" description: "Task to copy `train-images-idx3-ubyte.gz` from MNIST Database to GCS" args: task_id: "download_and_process_source_zip_file_2" name: "mnist" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-mnist image_pull_policy: "Always" image: "{{ var.json.mnist.container_registry.run_csv_transform_kub }}" env_vars: @@ -67,18 +89,23 @@ dag: TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/mnist/mnist/train-images-idx3-ubyte.gz" PIPELINE_NAME: "mnist" - resources: - request_memory: "2G" - request_cpu: "200m" - request_ephemeral_storage: "8G" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" - - operator: "KubernetesPodOperator" + - operator: "GKEStartPodOperator" description: "Task to copy `train-labels-idx1-ubyte.gz` from MNIST Database to GCS" args: task_id: "download_and_process_source_zip_file_3" name: "mnist" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-mnist image_pull_policy: "Always" image: "{{ var.json.mnist.container_registry.run_csv_transform_kub }}" env_vars: @@ -88,18 +115,23 @@ dag: TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/mnist/mnist/train-labels-idx1-ubyte.gz" PIPELINE_NAME: "mnist" - resources: - request_memory: "2G" - request_cpu: "200m" - request_ephemeral_storage: "8G" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" - - operator: "KubernetesPodOperator" + - operator: "GKEStartPodOperator" description: "Task to copy `t10k-labels-idx1-ubyte.gz` from MNIST Database to GCS" args: task_id: "download_and_process_source_zip_file_4" name: "mnist" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-mnist image_pull_policy: "Always" image: "{{ var.json.mnist.container_registry.run_csv_transform_kub }}" env_vars: @@ -109,10 +141,21 @@ dag: TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/mnist/mnist/t10k-labels-idx1-ubyte.gz" PIPELINE_NAME: "mnist" - resources: - request_memory: "2G" - request_cpu: "200m" - request_ephemeral_storage: "8G" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" + + - operator: "GKEDeleteClusterOperator" + args: + task_id: "delete_cluster" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + name: pdp-mnist + graph_paths: - - "download_and_process_source_zip_file >> download_and_process_source_zip_file_2 >> download_and_process_source_zip_file_3 >> download_and_process_source_zip_file_4" + - "create_cluster >> download_and_process_source_zip_file >> download_and_process_source_zip_file_2 >> download_and_process_source_zip_file_3 >> download_and_process_source_zip_file_4 >> delete_cluster" diff --git a/datasets/open_buildings/pipelines/open_buildings/open_buildings_dag.py b/datasets/open_buildings/pipelines/open_buildings/open_buildings_dag.py index 717099a64..16ed45b91 100644 --- a/datasets/open_buildings/pipelines/open_buildings/open_buildings_dag.py +++ b/datasets/open_buildings/pipelines/open_buildings/open_buildings_dag.py @@ -74,8 +74,9 @@ task_id="py_gcs_to_bq", startup_timeout_seconds=1000, name="load_data", - namespace="composer", - service_account_name="datasets", + namespace="composer-user-workloads", + service_account_name="default", + config_file="/home/airflow/composer_kube_config", image_pull_policy="Always", image="{{ var.json.open_buildings.container_registry.run_script_kub }}", env_vars={ @@ -85,11 +86,6 @@ "GCS_BUCKET": "{{ var.value.composer_bucket }}", "SCHEMA_FILEPATH": "schema.json", }, - resources={ - "request_memory": "2G", - "request_cpu": "1", - "request_ephemeral_storage": "10G", - }, ) ( diff --git a/datasets/open_buildings/pipelines/open_buildings/pipeline.yaml b/datasets/open_buildings/pipelines/open_buildings/pipeline.yaml index 615bad8db..98f8e584f 100644 --- a/datasets/open_buildings/pipelines/open_buildings/pipeline.yaml +++ b/datasets/open_buildings/pipelines/open_buildings/pipeline.yaml @@ -201,8 +201,9 @@ dag: task_id: "py_gcs_to_bq" startup_timeout_seconds: 1000 name: "load_data" - namespace: "composer" - service_account_name: "datasets" + namespace: "composer-user-workloads" + service_account_name: "default" + config_file: "/home/airflow/composer_kube_config" image_pull_policy: "Always" image: "{{ var.json.open_buildings.container_registry.run_script_kub }}" env_vars: @@ -211,10 +212,6 @@ dag: DATASET_ID: "{{ var.json.open_buildings.dataset_id }}" GCS_BUCKET: "{{ var.value.composer_bucket }}" SCHEMA_FILEPATH: "schema.json" - resources: - request_memory: "2G" - request_cpu: "1" - request_ephemeral_storage: "10G" graph_paths: - "bash_gcs_to_gcs >> batch1_bash_gunzip >> batch2_bash_gunzip >> batch3_bash_gunzip >> batch4_bash_gunzip >> batch5_bash_gunzip >> py_gcs_to_bq"