diff --git a/datasets/google_cloud_release_notes/pipelines/release_notes/pipeline.yaml b/datasets/google_cloud_release_notes/pipelines/release_notes/pipeline.yaml index e241c445e..a59e95c02 100644 --- a/datasets/google_cloud_release_notes/pipelines/release_notes/pipeline.yaml +++ b/datasets/google_cloud_release_notes/pipelines/release_notes/pipeline.yaml @@ -32,13 +32,31 @@ dag: default_view: graph tasks: - - operator: "KubernetesPodOperator" + - operator: "GKECreateClusterOperator" + args: + task_id: "create_cluster" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + body: + name: pdp-google-cloud-release-notes + initial_node_count: 2 + network: "{{ var.value.vpc_network }}" + node_config: + machine_type: e2-standard-16 + oauth_scopes: + - https://www.googleapis.com/auth/devstorage.read_write + - https://www.googleapis.com/auth/cloud-platform + + - operator: "GKEStartPodOperator" description: "Copy GCP release notes dataset" args: task_id: "copy_bq_dataset" + startup_timeout_seconds: 1000 name: "copy_bq_dataset" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-google-cloud-release-notes image_pull_policy: "Always" image: "{{ var.json.google_cloud_release_notes.container_registry.copy_bq_dataset }}" env_vars: @@ -47,9 +65,20 @@ dag: TARGET_PROJECT_ID: "{{ var.value.gcp_project }}" TARGET_BQ_DATASET: google_cloud_release_notes SERVICE_ACCOUNT: "{{ var.json.google_cloud_release_notes.service_account }}" - resources: - request_memory: "128M" - request_cpu: "200m" + container_resources: + memory: + request: "32Gi" + cpu: + request: "2" + ephemeral-storage: + request: "10Gi" + + - operator: "GKEDeleteClusterOperator" + args: + task_id: "delete_cluster" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + name: pdp-google-cloud-release-notes graph_paths: - - "copy_bq_dataset" + - "create_cluster >> copy_bq_dataset >> delete_cluster" diff --git a/datasets/google_cloud_release_notes/pipelines/release_notes/release_notes_dag.py b/datasets/google_cloud_release_notes/pipelines/release_notes/release_notes_dag.py index 9f9a37d33..430a18f61 100644 --- a/datasets/google_cloud_release_notes/pipelines/release_notes/release_notes_dag.py +++ b/datasets/google_cloud_release_notes/pipelines/release_notes/release_notes_dag.py @@ -14,7 +14,7 @@ from airflow import DAG -from airflow.providers.cncf.kubernetes.operators import kubernetes_pod +from airflow.providers.google.cloud.operators import kubernetes_engine default_args = { "owner": "Google", @@ -31,13 +31,33 @@ catchup=False, default_view="graph", ) as dag: + create_cluster = kubernetes_engine.GKECreateClusterOperator( + task_id="create_cluster", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + body={ + "name": "pdp-google-cloud-release-notes", + "initial_node_count": 2, + "network": "{{ var.value.vpc_network }}", + "node_config": { + "machine_type": "e2-standard-16", + "oauth_scopes": [ + "https://www.googleapis.com/auth/devstorage.read_write", + "https://www.googleapis.com/auth/cloud-platform", + ], + }, + }, + ) # Copy GCP release notes dataset - copy_bq_dataset = kubernetes_pod.KubernetesPodOperator( + copy_bq_dataset = kubernetes_engine.GKEStartPodOperator( task_id="copy_bq_dataset", + startup_timeout_seconds=1000, name="copy_bq_dataset", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-google-cloud-release-notes", image_pull_policy="Always", image="{{ var.json.google_cloud_release_notes.container_registry.copy_bq_dataset }}", env_vars={ @@ -47,7 +67,17 @@ "TARGET_BQ_DATASET": "google_cloud_release_notes", "SERVICE_ACCOUNT": "{{ var.json.google_cloud_release_notes.service_account }}", }, - resources={"request_memory": "128M", "request_cpu": "200m"}, + container_resources={ + "memory": {"request": "32Gi"}, + "cpu": {"request": "2"}, + "ephemeral-storage": {"request": "10Gi"}, + }, + ) + delete_cluster = kubernetes_engine.GKEDeleteClusterOperator( + task_id="delete_cluster", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + name="pdp-google-cloud-release-notes", ) - copy_bq_dataset + create_cluster >> copy_bq_dataset >> delete_cluster