From 78138fea2cab3c1c074833d6bb27400279cbd4cc Mon Sep 17 00:00:00 2001 From: sahilsekhri258 Date: Tue, 12 Nov 2024 17:08:30 +0000 Subject: [PATCH] Fix: Migration Changes for Google Political Ads dag with single cluster (#867) --- .../process_csvs_and_load_to_bq/pipeline.yaml | 215 +---------- .../process_csvs_and_load_to_bq_dag.py | 360 +++--------------- 2 files changed, 71 insertions(+), 504 deletions(-) diff --git a/datasets/google_political_ads/pipelines/process_csvs_and_load_to_bq/pipeline.yaml b/datasets/google_political_ads/pipelines/process_csvs_and_load_to_bq/pipeline.yaml index 18f1f7a7c..1af9a8253 100644 --- a/datasets/google_political_ads/pipelines/process_csvs_and_load_to_bq/pipeline.yaml +++ b/datasets/google_political_ads/pipelines/process_csvs_and_load_to_bq/pipeline.yaml @@ -77,11 +77,11 @@ dag: - operator: "GKECreateClusterOperator" args: - task_id: "create_cluster_transform_advertiser_declared_stats" + task_id: "create_cluster" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" body: - name: pdp-google-political-ads-declared-stats + name: pdp-google-political-ads initial_node_count: 2 network: "{{ var.value.vpc_network }}" node_config: @@ -100,7 +100,7 @@ dag: namespace: "default" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" - cluster_name: pdp-google-political-ads-declared-stats + cluster_name: pdp-google-political-ads image_pull_policy: "Always" image: "{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}" env_vars: @@ -138,13 +138,6 @@ dag: cpu: request: "200m" - - operator: "GKEDeleteClusterOperator" - args: - task_id: "delete_cluster_transform_advertiser_declared_stats" - project_id: "{{ var.value.gcp_project }}" - location: "us-central1-c" - name: pdp-google-political-ads-declared-stats - - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" args: @@ -188,21 +181,6 @@ dag: description: "The New Zealand advertiser's declared Promoter Statement address." mode: "nullable" - - operator: "GKECreateClusterOperator" - args: - task_id: "create_cluster_transform_advertiser_geo_spend" - project_id: "{{ var.value.gcp_project }}" - location: "us-central1-c" - body: - name: pdp-google-political-ads-advertiser-geo - initial_node_count: 2 - network: "{{ var.value.vpc_network }}" - node_config: - machine_type: e2-standard-16 - oauth_scopes: - - https://www.googleapis.com/auth/devstorage.read_write - - https://www.googleapis.com/auth/cloud-platform - # advertiser_geo_spend - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" @@ -213,7 +191,7 @@ dag: namespace: "default" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" - cluster_name: pdp-google-political-ads-advertiser-geo + cluster_name: pdp-google-political-ads image_pull_policy: "Always" image: "{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}" env_vars: @@ -285,13 +263,6 @@ dag: cpu: request: "200m" - - operator: "GKEDeleteClusterOperator" - args: - task_id: "delete_cluster_transform_advertiser_geo_spend" - project_id: "{{ var.value.gcp_project }}" - location: "us-central1-c" - name: pdp-google-political-ads-advertiser-geo - - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" args: @@ -401,21 +372,6 @@ dag: description: "Total amount in CLP spent on election ads in this region." mode: "nullable" - - operator: "GKECreateClusterOperator" - args: - task_id: "create_cluster_transform_advertiser_stats" - project_id: "{{ var.value.gcp_project }}" - location: "us-central1-c" - body: - name: pdp-google-political-ads-advertiser-stat - initial_node_count: 2 - network: "{{ var.value.vpc_network }}" - node_config: - machine_type: e2-standard-16 - oauth_scopes: - - https://www.googleapis.com/auth/devstorage.read_write - - https://www.googleapis.com/auth/cloud-platform - # advertiser_stats - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" @@ -426,7 +382,7 @@ dag: namespace: "default" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" - cluster_name: pdp-google-political-ads-advertiser-stat + cluster_name: pdp-google-political-ads image_pull_policy: "Always" image: "{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}" env_vars: @@ -502,13 +458,6 @@ dag: cpu: request: "200m" - - operator: "GKEDeleteClusterOperator" - args: - task_id: "delete_cluster_transform_advertiser_stats" - project_id: "{{ var.value.gcp_project }}" - location: "us-central1-c" - name: pdp-google-political-ads-advertiser-stat - - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" args: @@ -626,21 +575,6 @@ dag: description: "Total amount in CLP spent on election ads by the advertiser." mode: "nullable" - - operator: "GKECreateClusterOperator" - args: - task_id: "create_cluster_transform_advertiser_weekly_spend" - project_id: "{{ var.value.gcp_project }}" - location: "us-central1-c" - body: - name: pdp-google-political-ads-weekly-spend - initial_node_count: 2 - network: "{{ var.value.vpc_network }}" - node_config: - machine_type: e2-standard-16 - oauth_scopes: - - https://www.googleapis.com/auth/devstorage.read_write - - https://www.googleapis.com/auth/cloud-platform - # advertiser_weekly_spend - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" @@ -651,7 +585,7 @@ dag: namespace: "default" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" - cluster_name: pdp-google-political-ads-weekly-spend + cluster_name: pdp-google-political-ads image_pull_policy: "Always" image: "{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}" env_vars: @@ -723,13 +657,6 @@ dag: cpu: request: "200m" - - operator: "GKEDeleteClusterOperator" - args: - task_id: "delete_cluster_transform_advertiser_weekly_spend" - project_id: "{{ var.value.gcp_project }}" - location: "us-central1-c" - name: pdp-google-political-ads-weekly-spend - - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" args: @@ -839,21 +766,6 @@ dag: description: "The amount in CLP spent on election ads during the given week by the advertiser." mode: "nullable" - - operator: "GKECreateClusterOperator" - args: - task_id: "create_cluster_transform_campaign_targeting" - project_id: "{{ var.value.gcp_project }}" - location: "us-central1-c" - body: - name: pdp-google-political-ads-campaign-target - initial_node_count: 2 - network: "{{ var.value.vpc_network }}" - node_config: - machine_type: e2-standard-16 - oauth_scopes: - - https://www.googleapis.com/auth/devstorage.read_write - - https://www.googleapis.com/auth/cloud-platform - - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: @@ -863,7 +775,7 @@ dag: namespace: "default" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" - cluster_name: pdp-google-political-ads-campaign-target + cluster_name: pdp-google-political-ads image_pull_policy: "Always" image: "{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}" env_vars: @@ -907,13 +819,6 @@ dag: cpu: request: "200m" - - operator: "GKEDeleteClusterOperator" - args: - task_id: "delete_cluster_transform_campaign_targeting" - project_id: "{{ var.value.gcp_project }}" - location: "us-central1-c" - name: pdp-google-political-ads-campaign-target - - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" args: @@ -967,21 +872,6 @@ dag: description: "[DEPRECATED] Name of advertiser." mode: "nullable" - - operator: "GKECreateClusterOperator" - args: - task_id: "create_cluster_transform_creative_stats" - project_id: "{{ var.value.gcp_project }}" - location: "us-central1-c" - body: - name: pdp-google-political-ads-creative-stats - initial_node_count: 2 - network: "{{ var.value.vpc_network }}" - node_config: - machine_type: e2-standard-16 - oauth_scopes: - - https://www.googleapis.com/auth/devstorage.read_write - - https://www.googleapis.com/auth/cloud-platform - - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: @@ -991,7 +881,7 @@ dag: namespace: "default" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" - cluster_name: pdp-google-political-ads-creative-stats + cluster_name: pdp-google-political-ads image_pull_policy: "Always" image: "{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}" env_vars: @@ -1133,13 +1023,6 @@ dag: ephemeral-storage: request: "10G" - - operator: "GKEDeleteClusterOperator" - args: - task_id: "delete_cluster_transform_creative_stats" - project_id: "{{ var.value.gcp_project }}" - location: "us-central1-c" - name: pdp-google-political-ads-creative-stats - - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" args: @@ -1377,21 +1260,6 @@ dag: description: "Upper bound of the amount in CLP spent by the advertiser on the election ad." mode: "nullable" - - operator: "GKECreateClusterOperator" - args: - task_id: "create_cluster_transform_geo_spend" - project_id: "{{ var.value.gcp_project }}" - location: "us-central1-c" - body: - name: pdp-google-political-ads-geo-spend - initial_node_count: 2 - network: "{{ var.value.vpc_network }}" - node_config: - machine_type: e2-standard-16 - oauth_scopes: - - https://www.googleapis.com/auth/devstorage.read_write - - https://www.googleapis.com/auth/cloud-platform - - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: @@ -1401,7 +1269,7 @@ dag: namespace: "default" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" - cluster_name: pdp-google-political-ads-geo-spend + cluster_name: pdp-google-political-ads image_pull_policy: "Always" image: "{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}" env_vars: @@ -1471,13 +1339,6 @@ dag: cpu: request: "200m" - - operator: "GKEDeleteClusterOperator" - args: - task_id: "delete_cluster_transform_geo_spend" - project_id: "{{ var.value.gcp_project }}" - location: "us-central1-c" - name: pdp-google-political-ads-geo-spend - - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" args: @@ -1582,21 +1443,6 @@ dag: description: "Total amount in CLP spent on election ads in this region." mode: "nullable" - - operator: "GKECreateClusterOperator" - args: - task_id: "create_cluster_transform_last_updated" - project_id: "{{ var.value.gcp_project }}" - location: "us-central1-c" - body: - name: pdp-google-political-ads-last-updated - initial_node_count: 2 - network: "{{ var.value.vpc_network }}" - node_config: - machine_type: e2-standard-16 - oauth_scopes: - - https://www.googleapis.com/auth/devstorage.read_write - - https://www.googleapis.com/auth/cloud-platform - - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: @@ -1606,7 +1452,7 @@ dag: namespace: "default" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" - cluster_name: pdp-google-political-ads-last-updated + cluster_name: pdp-google-political-ads image_pull_policy: "Always" image: "{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}" env_vars: @@ -1628,13 +1474,6 @@ dag: cpu: request: "200m" - - operator: "GKEDeleteClusterOperator" - args: - task_id: "delete_cluster_transform_last_updated" - project_id: "{{ var.value.gcp_project }}" - location: "us-central1-c" - name: pdp-google-political-ads-last-updated - - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" args: @@ -1652,21 +1491,6 @@ dag: description: "The time the report data was most recently updated" mode: "nullable" - - operator: "GKECreateClusterOperator" - args: - task_id: "create_cluster_transform_top_keywords_history" - project_id: "{{ var.value.gcp_project }}" - location: "us-central1-c" - body: - name: pdp-google-political-ads-top-keywords - initial_node_count: 2 - network: "{{ var.value.vpc_network }}" - node_config: - machine_type: e2-standard-16 - oauth_scopes: - - https://www.googleapis.com/auth/devstorage.read_write - - https://www.googleapis.com/auth/cloud-platform - - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: @@ -1676,7 +1500,7 @@ dag: namespace: "default" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" - cluster_name: pdp-google-political-ads-top-keywords + cluster_name: pdp-google-political-ads image_pull_policy: "Always" image: "{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}" env_vars: @@ -1734,10 +1558,10 @@ dag: - operator: "GKEDeleteClusterOperator" args: - task_id: "delete_cluster_transform_top_keywords_history" + task_id: "delete_cluster" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" - name: pdp-google-political-ads-top-keywords + name: pdp-google-political-ads - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" @@ -1817,13 +1641,6 @@ dag: mode: "nullable" graph_paths: - - "download_zip_file_to_composer_bucket >> [create_cluster_transform_advertiser_declared_stats, create_cluster_transform_advertiser_geo_spend, create_cluster_transform_advertiser_stats, create_cluster_transform_advertiser_weekly_spend, create_cluster_transform_campaign_targeting, create_cluster_transform_creative_stats, create_cluster_transform_geo_spend, create_cluster_transform_last_updated, create_cluster_transform_top_keywords_history]" - - "create_cluster_transform_advertiser_declared_stats >> transform_advertiser_declared_stats_csv >> delete_cluster_transform_advertiser_declared_stats >> load_advertiser_declared_stats_to_bq" - - "create_cluster_transform_advertiser_geo_spend >> transform_advertiser_geo_spend_csv >> delete_cluster_transform_advertiser_geo_spend >> load_advertiser_geo_spend_to_bq" - - "create_cluster_transform_advertiser_stats >> transform_advertiser_stats_csv >> delete_cluster_transform_advertiser_stats >> load_advertiser_stats_to_bq" - - "create_cluster_transform_advertiser_weekly_spend >> transform_advertiser_weekly_spend_csv >> delete_cluster_transform_advertiser_weekly_spend >> load_advertiser_weekly_spend_to_bq" - - "create_cluster_transform_campaign_targeting >> transform_campaign_targeting_csv >> delete_cluster_transform_campaign_targeting >> load_campaign_targeting_to_bq" - - "create_cluster_transform_creative_stats >> transform_creative_stats_csv >> delete_cluster_transform_creative_stats >> load_creative_stats_to_bq" - - "create_cluster_transform_geo_spend >> transform_geo_spend_csv >> delete_cluster_transform_geo_spend >> load_geo_spend_to_bq" - - "create_cluster_transform_last_updated >> transform_last_updated_csv >> delete_cluster_transform_last_updated >> load_last_updated_to_bq" - - "create_cluster_transform_top_keywords_history >> transform_top_keywords_history_csv >> delete_cluster_transform_top_keywords_history >> load_top_keywords_history_to_bq" + - "download_zip_file_to_composer_bucket >> create_cluster" + - "create_cluster >> [transform_advertiser_declared_stats_csv,transform_advertiser_geo_spend_csv,transform_advertiser_stats_csv,transform_advertiser_weekly_spend_csv,transform_campaign_targeting_csv,transform_creative_stats_csv,transform_geo_spend_csv,transform_last_updated_csv,transform_top_keywords_history_csv] >> delete_cluster" + - "delete_cluster >> [load_advertiser_declared_stats_to_bq,load_advertiser_geo_spend_to_bq,load_advertiser_stats_to_bq,load_advertiser_weekly_spend_to_bq,load_campaign_targeting_to_bq,load_creative_stats_to_bq,load_geo_spend_to_bq,load_last_updated_to_bq,load_top_keywords_history_to_bq]" diff --git a/datasets/google_political_ads/pipelines/process_csvs_and_load_to_bq/process_csvs_and_load_to_bq_dag.py b/datasets/google_political_ads/pipelines/process_csvs_and_load_to_bq/process_csvs_and_load_to_bq_dag.py index 81d06dae0..17ecc22c7 100644 --- a/datasets/google_political_ads/pipelines/process_csvs_and_load_to_bq/process_csvs_and_load_to_bq_dag.py +++ b/datasets/google_political_ads/pipelines/process_csvs_and_load_to_bq/process_csvs_and_load_to_bq_dag.py @@ -43,24 +43,22 @@ impersonation_chain="{{ var.json.google_political_ads.service_account }}", move_object=False, ) - create_cluster_transform_advertiser_declared_stats = ( - kubernetes_engine.GKECreateClusterOperator( - task_id="create_cluster_transform_advertiser_declared_stats", - project_id="{{ var.value.gcp_project }}", - location="us-central1-c", - body={ - "name": "pdp-google-political-ads-declared-stats", - "initial_node_count": 2, - "network": "{{ var.value.vpc_network }}", - "node_config": { - "machine_type": "e2-standard-16", - "oauth_scopes": [ - "https://www.googleapis.com/auth/devstorage.read_write", - "https://www.googleapis.com/auth/cloud-platform", - ], - }, - }, - ) + create_cluster = kubernetes_engine.GKECreateClusterOperator( + task_id="create_cluster", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + body={ + "name": "pdp-google-political-ads", + "initial_node_count": 2, + "network": "{{ var.value.vpc_network }}", + "node_config": { + "machine_type": "e2-standard-16", + "oauth_scopes": [ + "https://www.googleapis.com/auth/devstorage.read_write", + "https://www.googleapis.com/auth/cloud-platform", + ], + }, + }, ) # Run CSV transform within kubernetes pod @@ -71,7 +69,7 @@ namespace="default", project_id="{{ var.value.gcp_project }}", location="us-central1-c", - cluster_name="pdp-google-political-ads-declared-stats", + cluster_name="pdp-google-political-ads", image_pull_policy="Always", image="{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}", env_vars={ @@ -88,14 +86,6 @@ }, container_resources={"memory": {"request": "1G"}, "cpu": {"request": "200m"}}, ) - delete_cluster_transform_advertiser_declared_stats = ( - kubernetes_engine.GKEDeleteClusterOperator( - task_id="delete_cluster_transform_advertiser_declared_stats", - project_id="{{ var.value.gcp_project }}", - location="us-central1-c", - name="pdp-google-political-ads-declared-stats", - ) - ) # Task to load CSV data to a BigQuery table load_advertiser_declared_stats_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( @@ -153,25 +143,6 @@ }, ], ) - create_cluster_transform_advertiser_geo_spend = ( - kubernetes_engine.GKECreateClusterOperator( - task_id="create_cluster_transform_advertiser_geo_spend", - project_id="{{ var.value.gcp_project }}", - location="us-central1-c", - body={ - "name": "pdp-google-political-ads-advertiser-geo", - "initial_node_count": 2, - "network": "{{ var.value.vpc_network }}", - "node_config": { - "machine_type": "e2-standard-16", - "oauth_scopes": [ - "https://www.googleapis.com/auth/devstorage.read_write", - "https://www.googleapis.com/auth/cloud-platform", - ], - }, - }, - ) - ) # Run CSV transform within kubernetes pod transform_advertiser_geo_spend_csv = kubernetes_engine.GKEStartPodOperator( @@ -181,7 +152,7 @@ namespace="default", project_id="{{ var.value.gcp_project }}", location="us-central1-c", - cluster_name="pdp-google-political-ads-advertiser-geo", + cluster_name="pdp-google-political-ads", image_pull_policy="Always", image="{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}", env_vars={ @@ -198,14 +169,6 @@ }, container_resources={"memory": {"request": "1G"}, "cpu": {"request": "200m"}}, ) - delete_cluster_transform_advertiser_geo_spend = ( - kubernetes_engine.GKEDeleteClusterOperator( - task_id="delete_cluster_transform_advertiser_geo_spend", - project_id="{{ var.value.gcp_project }}", - location="us-central1-c", - name="pdp-google-political-ads-advertiser-geo", - ) - ) # Task to load CSV data to a BigQuery table load_advertiser_geo_spend_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( @@ -365,25 +328,6 @@ }, ], ) - create_cluster_transform_advertiser_stats = ( - kubernetes_engine.GKECreateClusterOperator( - task_id="create_cluster_transform_advertiser_stats", - project_id="{{ var.value.gcp_project }}", - location="us-central1-c", - body={ - "name": "pdp-google-political-ads-advertiser-stat", - "initial_node_count": 2, - "network": "{{ var.value.vpc_network }}", - "node_config": { - "machine_type": "e2-standard-16", - "oauth_scopes": [ - "https://www.googleapis.com/auth/devstorage.read_write", - "https://www.googleapis.com/auth/cloud-platform", - ], - }, - }, - ) - ) # Run CSV transform within kubernetes pod transform_advertiser_stats_csv = kubernetes_engine.GKEStartPodOperator( @@ -393,7 +337,7 @@ namespace="default", project_id="{{ var.value.gcp_project }}", location="us-central1-c", - cluster_name="pdp-google-political-ads-advertiser-stat", + cluster_name="pdp-google-political-ads", image_pull_policy="Always", image="{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}", env_vars={ @@ -410,14 +354,6 @@ }, container_resources={"memory": {"request": "1G"}, "cpu": {"request": "200m"}}, ) - delete_cluster_transform_advertiser_stats = ( - kubernetes_engine.GKEDeleteClusterOperator( - task_id="delete_cluster_transform_advertiser_stats", - project_id="{{ var.value.gcp_project }}", - location="us-central1-c", - name="pdp-google-political-ads-advertiser-stat", - ) - ) # Task to load CSV data to a BigQuery table load_advertiser_stats_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( @@ -587,25 +523,6 @@ }, ], ) - create_cluster_transform_advertiser_weekly_spend = ( - kubernetes_engine.GKECreateClusterOperator( - task_id="create_cluster_transform_advertiser_weekly_spend", - project_id="{{ var.value.gcp_project }}", - location="us-central1-c", - body={ - "name": "pdp-google-political-ads-weekly-spend", - "initial_node_count": 2, - "network": "{{ var.value.vpc_network }}", - "node_config": { - "machine_type": "e2-standard-16", - "oauth_scopes": [ - "https://www.googleapis.com/auth/devstorage.read_write", - "https://www.googleapis.com/auth/cloud-platform", - ], - }, - }, - ) - ) # Run CSV transform within kubernetes pod transform_advertiser_weekly_spend_csv = kubernetes_engine.GKEStartPodOperator( @@ -615,7 +532,7 @@ namespace="default", project_id="{{ var.value.gcp_project }}", location="us-central1-c", - cluster_name="pdp-google-political-ads-weekly-spend", + cluster_name="pdp-google-political-ads", image_pull_policy="Always", image="{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}", env_vars={ @@ -632,14 +549,6 @@ }, container_resources={"memory": {"request": "1G"}, "cpu": {"request": "200m"}}, ) - delete_cluster_transform_advertiser_weekly_spend = ( - kubernetes_engine.GKEDeleteClusterOperator( - task_id="delete_cluster_transform_advertiser_weekly_spend", - project_id="{{ var.value.gcp_project }}", - location="us-central1-c", - name="pdp-google-political-ads-weekly-spend", - ) - ) # Task to load CSV data to a BigQuery table load_advertiser_weekly_spend_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( @@ -799,25 +708,6 @@ }, ], ) - create_cluster_transform_campaign_targeting = ( - kubernetes_engine.GKECreateClusterOperator( - task_id="create_cluster_transform_campaign_targeting", - project_id="{{ var.value.gcp_project }}", - location="us-central1-c", - body={ - "name": "pdp-google-political-ads-campaign-target", - "initial_node_count": 2, - "network": "{{ var.value.vpc_network }}", - "node_config": { - "machine_type": "e2-standard-16", - "oauth_scopes": [ - "https://www.googleapis.com/auth/devstorage.read_write", - "https://www.googleapis.com/auth/cloud-platform", - ], - }, - }, - ) - ) # Run CSV transform within kubernetes pod transform_campaign_targeting_csv = kubernetes_engine.GKEStartPodOperator( @@ -827,7 +717,7 @@ namespace="default", project_id="{{ var.value.gcp_project }}", location="us-central1-c", - cluster_name="pdp-google-political-ads-campaign-target", + cluster_name="pdp-google-political-ads", image_pull_policy="Always", image="{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}", env_vars={ @@ -844,14 +734,6 @@ }, container_resources={"memory": {"request": "1G"}, "cpu": {"request": "200m"}}, ) - delete_cluster_transform_campaign_targeting = ( - kubernetes_engine.GKEDeleteClusterOperator( - task_id="delete_cluster_transform_campaign_targeting", - project_id="{{ var.value.gcp_project }}", - location="us-central1-c", - name="pdp-google-political-ads-campaign-target", - ) - ) # Task to load CSV data to a BigQuery table load_campaign_targeting_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( @@ -925,25 +807,6 @@ }, ], ) - create_cluster_transform_creative_stats = ( - kubernetes_engine.GKECreateClusterOperator( - task_id="create_cluster_transform_creative_stats", - project_id="{{ var.value.gcp_project }}", - location="us-central1-c", - body={ - "name": "pdp-google-political-ads-creative-stats", - "initial_node_count": 2, - "network": "{{ var.value.vpc_network }}", - "node_config": { - "machine_type": "e2-standard-16", - "oauth_scopes": [ - "https://www.googleapis.com/auth/devstorage.read_write", - "https://www.googleapis.com/auth/cloud-platform", - ], - }, - }, - ) - ) # Run CSV transform within kubernetes pod transform_creative_stats_csv = kubernetes_engine.GKEStartPodOperator( @@ -953,7 +816,7 @@ namespace="default", project_id="{{ var.value.gcp_project }}", location="us-central1-c", - cluster_name="pdp-google-political-ads-creative-stats", + cluster_name="pdp-google-political-ads", image_pull_policy="Always", image="{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}", env_vars={ @@ -974,14 +837,6 @@ "ephemeral-storage": {"request": "10G"}, }, ) - delete_cluster_transform_creative_stats = ( - kubernetes_engine.GKEDeleteClusterOperator( - task_id="delete_cluster_transform_creative_stats", - project_id="{{ var.value.gcp_project }}", - location="us-central1-c", - name="pdp-google-political-ads-creative-stats", - ) - ) # Task to load CSV data to a BigQuery table load_creative_stats_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( @@ -1331,23 +1186,6 @@ }, ], ) - create_cluster_transform_geo_spend = kubernetes_engine.GKECreateClusterOperator( - task_id="create_cluster_transform_geo_spend", - project_id="{{ var.value.gcp_project }}", - location="us-central1-c", - body={ - "name": "pdp-google-political-ads-geo-spend", - "initial_node_count": 2, - "network": "{{ var.value.vpc_network }}", - "node_config": { - "machine_type": "e2-standard-16", - "oauth_scopes": [ - "https://www.googleapis.com/auth/devstorage.read_write", - "https://www.googleapis.com/auth/cloud-platform", - ], - }, - }, - ) # Run CSV transform within kubernetes pod transform_geo_spend_csv = kubernetes_engine.GKEStartPodOperator( @@ -1357,7 +1195,7 @@ namespace="default", project_id="{{ var.value.gcp_project }}", location="us-central1-c", - cluster_name="pdp-google-political-ads-geo-spend", + cluster_name="pdp-google-political-ads", image_pull_policy="Always", image="{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}", env_vars={ @@ -1374,12 +1212,6 @@ }, container_resources={"memory": {"request": "1G"}, "cpu": {"request": "200m"}}, ) - delete_cluster_transform_geo_spend = kubernetes_engine.GKEDeleteClusterOperator( - task_id="delete_cluster_transform_geo_spend", - project_id="{{ var.value.gcp_project }}", - location="us-central1-c", - name="pdp-google-political-ads-geo-spend", - ) # Task to load CSV data to a BigQuery table load_geo_spend_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( @@ -1531,23 +1363,6 @@ }, ], ) - create_cluster_transform_last_updated = kubernetes_engine.GKECreateClusterOperator( - task_id="create_cluster_transform_last_updated", - project_id="{{ var.value.gcp_project }}", - location="us-central1-c", - body={ - "name": "pdp-google-political-ads-last-updated", - "initial_node_count": 2, - "network": "{{ var.value.vpc_network }}", - "node_config": { - "machine_type": "e2-standard-16", - "oauth_scopes": [ - "https://www.googleapis.com/auth/devstorage.read_write", - "https://www.googleapis.com/auth/cloud-platform", - ], - }, - }, - ) # Run CSV transform within kubernetes pod transform_last_updated_csv = kubernetes_engine.GKEStartPodOperator( @@ -1557,7 +1372,7 @@ namespace="default", project_id="{{ var.value.gcp_project }}", location="us-central1-c", - cluster_name="pdp-google-political-ads-last-updated", + cluster_name="pdp-google-political-ads", image_pull_policy="Always", image="{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}", env_vars={ @@ -1574,12 +1389,6 @@ }, container_resources={"memory": {"request": "128M"}, "cpu": {"request": "200m"}}, ) - delete_cluster_transform_last_updated = kubernetes_engine.GKEDeleteClusterOperator( - task_id="delete_cluster_transform_last_updated", - project_id="{{ var.value.gcp_project }}", - location="us-central1-c", - name="pdp-google-political-ads-last-updated", - ) # Task to load CSV data to a BigQuery table load_last_updated_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( @@ -1599,25 +1408,6 @@ } ], ) - create_cluster_transform_top_keywords_history = ( - kubernetes_engine.GKECreateClusterOperator( - task_id="create_cluster_transform_top_keywords_history", - project_id="{{ var.value.gcp_project }}", - location="us-central1-c", - body={ - "name": "pdp-google-political-ads-top-keywords", - "initial_node_count": 2, - "network": "{{ var.value.vpc_network }}", - "node_config": { - "machine_type": "e2-standard-16", - "oauth_scopes": [ - "https://www.googleapis.com/auth/devstorage.read_write", - "https://www.googleapis.com/auth/cloud-platform", - ], - }, - }, - ) - ) # Run CSV transform within kubernetes pod transform_top_keywords_history_csv = kubernetes_engine.GKEStartPodOperator( @@ -1627,7 +1417,7 @@ namespace="default", project_id="{{ var.value.gcp_project }}", location="us-central1-c", - cluster_name="pdp-google-political-ads-top-keywords", + cluster_name="pdp-google-political-ads", image_pull_policy="Always", image="{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}", env_vars={ @@ -1644,13 +1434,11 @@ }, container_resources={"memory": {"request": "1G"}, "cpu": {"request": "200m"}}, ) - delete_cluster_transform_top_keywords_history = ( - kubernetes_engine.GKEDeleteClusterOperator( - task_id="delete_cluster_transform_top_keywords_history", - project_id="{{ var.value.gcp_project }}", - location="us-central1-c", - name="pdp-google-political-ads-top-keywords", - ) + delete_cluster = kubernetes_engine.GKEDeleteClusterOperator( + task_id="delete_cluster", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + name="pdp-google-political-ads", ) # Task to load CSV data to a BigQuery table @@ -1764,68 +1552,30 @@ ], ) - download_zip_file_to_composer_bucket >> [ - create_cluster_transform_advertiser_declared_stats, - create_cluster_transform_advertiser_geo_spend, - create_cluster_transform_advertiser_stats, - create_cluster_transform_advertiser_weekly_spend, - create_cluster_transform_campaign_targeting, - create_cluster_transform_creative_stats, - create_cluster_transform_geo_spend, - create_cluster_transform_last_updated, - create_cluster_transform_top_keywords_history, - ] - ( - create_cluster_transform_advertiser_declared_stats - >> transform_advertiser_declared_stats_csv - >> delete_cluster_transform_advertiser_declared_stats - >> load_advertiser_declared_stats_to_bq - ) - ( - create_cluster_transform_advertiser_geo_spend - >> transform_advertiser_geo_spend_csv - >> delete_cluster_transform_advertiser_geo_spend - >> load_advertiser_geo_spend_to_bq - ) - ( - create_cluster_transform_advertiser_stats - >> transform_advertiser_stats_csv - >> delete_cluster_transform_advertiser_stats - >> load_advertiser_stats_to_bq - ) - ( - create_cluster_transform_advertiser_weekly_spend - >> transform_advertiser_weekly_spend_csv - >> delete_cluster_transform_advertiser_weekly_spend - >> load_advertiser_weekly_spend_to_bq - ) + download_zip_file_to_composer_bucket >> create_cluster ( - create_cluster_transform_campaign_targeting - >> transform_campaign_targeting_csv - >> delete_cluster_transform_campaign_targeting - >> load_campaign_targeting_to_bq - ) - ( - create_cluster_transform_creative_stats - >> transform_creative_stats_csv - >> delete_cluster_transform_creative_stats - >> load_creative_stats_to_bq - ) - ( - create_cluster_transform_geo_spend - >> transform_geo_spend_csv - >> delete_cluster_transform_geo_spend - >> load_geo_spend_to_bq - ) - ( - create_cluster_transform_last_updated - >> transform_last_updated_csv - >> delete_cluster_transform_last_updated - >> load_last_updated_to_bq - ) - ( - create_cluster_transform_top_keywords_history - >> transform_top_keywords_history_csv - >> delete_cluster_transform_top_keywords_history - >> load_top_keywords_history_to_bq + create_cluster + >> [ + transform_advertiser_declared_stats_csv, + transform_advertiser_geo_spend_csv, + transform_advertiser_stats_csv, + transform_advertiser_weekly_spend_csv, + transform_campaign_targeting_csv, + transform_creative_stats_csv, + transform_geo_spend_csv, + transform_last_updated_csv, + transform_top_keywords_history_csv, + ] + >> delete_cluster ) + delete_cluster >> [ + load_advertiser_declared_stats_to_bq, + load_advertiser_geo_spend_to_bq, + load_advertiser_stats_to_bq, + load_advertiser_weekly_spend_to_bq, + load_campaign_targeting_to_bq, + load_creative_stats_to_bq, + load_geo_spend_to_bq, + load_last_updated_to_bq, + load_top_keywords_history_to_bq, + ]