From 47a8e76b0b05d25490530ea2e279bb85a63a7161 Mon Sep 17 00:00:00 2001 From: Nicholas Large Date: Mon, 11 Nov 2024 13:00:53 -0600 Subject: [PATCH 1/3] Fix: Migrate The Look Ecommerce DAGs (#861) * Fix: Migrate The Look Ecommerce DAGs * Fix: Change to GKEStartPodOperator. * Fix: Resolved configuration issue in pipeline.yaml. --- .../pipelines/thelook_ecommerce/pipeline.yaml | 195 ++++-------------- .../thelook_ecommerce_dag.py | 51 +++-- 2 files changed, 75 insertions(+), 171 deletions(-) diff --git a/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/pipeline.yaml b/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/pipeline.yaml index 1a06963f4..082452847 100644 --- a/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/pipeline.yaml +++ b/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/pipeline.yaml @@ -17,27 +17,21 @@ resources: - type: bigquery_table table_id: products description: "The Look fictitious e-commerce dataset - products table" - - type: bigquery_table table_id: events description: "Programatically generated web events for The Look fictitious e-commerce store" - - type: bigquery_table table_id: users description: "Programatically generated users for The Look fictitious e-commerce store" - - type: bigquery_table table_id: orders description: "Programatically generated orders for The Look fictitious e-commerce store" - - type: bigquery_table table_id: order_items description: "Programatically generated order items for The Look fictitious e-commerce store" - - type: bigquery_table table_id: inventory_items description: "Programatically generated inventory for The Look fictitious e-commerce store" - - type: bigquery_table table_id: distribution_centers description: "The Look fictitious e-commerce dataset: distribution_centers table" @@ -48,8 +42,6 @@ dag: dag_id: thelook_ecommerce default_args: owner: "Google" - - # When set to True, keeps a task from getting triggered if the previous schedule for the task hasn’t succeeded depends_on_past: False start_date: "2021-02-09" max_active_runs: 1 @@ -58,25 +50,32 @@ dag: default_view: graph tasks: - - operator: "KubernetesPodOperator" - - # Task description + - operator: "GKECreateClusterOperator" + args: + task_id: "create_cluster" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + body: + name: pdp-thelook-ecommerce + initial_node_count: 1 + network: "{{ var.value.vpc_network }}" + node_config: + machine_type: e2-highmem-16 + oauth_scopes: + - https://www.googleapis.com/auth/devstorage.read_write + - https://www.googleapis.com/auth/cloud-platform + - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" - args: task_id: "generate_thelook" is_delete_operator_pod: False - # The name of the pod in which the task will run. This will be used (plus a random suffix) to generate a pod id name: "generate_thelook" - namespace: "composer" - service_account_name: "datasets" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-thelook-ecommerce + namespace: "default" image_pull_policy: "Always" - - # Docker images will be built and pushed to GCR by default whenever the `scripts/generate_dag.py` is run. To skip building and pushing images, use the optional `--skip-builds` flag. image: "{{ var.json.thelook_ecommerce.docker_image }}" - # image: !IMAGE run_thelook_kub - - # Set the environment variables you need initialized in the container. Use these as input variables for the script your container is expected to perform. env_vars: NUM_OF_USERS: "100000" NUM_OF_GHOST_EVENTS: "5" @@ -84,37 +83,22 @@ dag: TARGET_GCS_PREFIX: "data/thelook_ecommerce" SOURCE_DIR: "data" EXTRANEOUS_HEADERS: '["event_type", "ip_address", "browser", "traffic_source", "session_id", "sequence_number", "uri", "is_sold"]' - - resources: - request_memory: "8G" - request_cpu: "2" - request_ephemeral_storage: "10G" - + - operator: "GKEDeleteClusterOperator" + args: + task_id: "delete_cluster" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + name: pdp-thelook-ecommerce - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load Products data to a BigQuery table" - args: task_id: "load_products_to_bq" - - # The GCS bucket where the CSV file is located in. bucket: "{{ var.value.composer_bucket }}" - - # The GCS object path for the CSV file source_objects: ["data/thelook_ecommerce/products.csv"] source_format: "CSV" destination_project_dataset_table: "thelook_ecommerce.products" - - # Use this if your CSV file contains a header row skip_leading_rows: 1 - - # How to write data to the table: overwrite, append, or write if empty - # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition write_disposition: "WRITE_TRUNCATE" - - # The BigQuery table schema based on the CSV file. For more info, see - # https://cloud.google.com/bigquery/docs/schemas. - # Always use snake_case and lowercase for column names, and be explicit, - # i.e. specify modes for all columns. schema_fields: - name: "id" type: "INTEGER" @@ -143,32 +127,16 @@ dag: - name: "distribution_center_id" type: "INTEGER" mode: "NULLABLE" - - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load Events data to a BigQuery table" - args: task_id: "load_events_to_bq" - - # The GCS bucket where the CSV file is located in. bucket: "{{ var.value.composer_bucket }}" - - # The GCS object path for the CSV file source_objects: ["data/thelook_ecommerce/events.csv"] source_format: "CSV" destination_project_dataset_table: "thelook_ecommerce.events" - - # Use this if your CSV file contains a header row skip_leading_rows: 1 - - # How to write data to the table: overwrite, append, or write if empty - # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition write_disposition: "WRITE_TRUNCATE" - - # The BigQuery table schema based on the CSV file. For more info, see - # https://cloud.google.com/bigquery/docs/schemas. - # Always use snake_case and lowercase for column names, and be explicit, - # i.e. specify modes for all columns. schema_fields: - name: "id" type: "INTEGER" @@ -209,32 +177,16 @@ dag: - name: "event_type" type: "STRING" mode: "NULLABLE" - - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load Iventory Items data to a BigQuery table" - args: task_id: "load_inventory_items_to_bq" - - # The GCS bucket where the CSV file is located in. bucket: "{{ var.value.composer_bucket }}" - - # The GCS object path for the CSV file source_objects: ["data/thelook_ecommerce/inventory_items.csv"] source_format: "CSV" destination_project_dataset_table: "thelook_ecommerce.inventory_items" - - # Use this if your CSV file contains a header row skip_leading_rows: 1 - - # How to write data to the table: overwrite, append, or write if empty - # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition write_disposition: "WRITE_TRUNCATE" - - # The BigQuery table schema based on the CSV file. For more info, see - # https://cloud.google.com/bigquery/docs/schemas. - # Always use snake_case and lowercase for column names, and be explicit, - # i.e. specify modes for all columns. schema_fields: - name: "id" type: "INTEGER" @@ -272,32 +224,16 @@ dag: - name: "product_distribution_center_id" type: "INTEGER" mode: "NULLABLE" - - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load Order Items data to a BigQuery table" - args: task_id: "load_order_items_to_bq" - - # The GCS bucket where the CSV file is located in. bucket: "{{ var.value.composer_bucket }}" - - # The GCS object path for the CSV file source_objects: ["data/thelook_ecommerce/order_items.csv"] source_format: "CSV" destination_project_dataset_table: "thelook_ecommerce.order_items" - - # Use this if your CSV file contains a header row skip_leading_rows: 1 - - # How to write data to the table: overwrite, append, or write if empty - # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition write_disposition: "WRITE_TRUNCATE" - - # The BigQuery table schema based on the CSV file. For more info, see - # https://cloud.google.com/bigquery/docs/schemas. - # Always use snake_case and lowercase for column names, and be explicit, - # i.e. specify modes for all columns. schema_fields: - name: "id" type: "INTEGER" @@ -332,32 +268,16 @@ dag: - name: "sale_price" type: "FLOAT" mode: "NULLABLE" - - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load Orders data to a BigQuery table" - args: task_id: "load_orders_to_bq" - - # The GCS bucket where the CSV file is located in. bucket: "{{ var.value.composer_bucket }}" - - # The GCS object path for the CSV file source_objects: ["data/thelook_ecommerce/orders.csv"] source_format: "CSV" destination_project_dataset_table: "thelook_ecommerce.orders" - - # Use this if your CSV file contains a header row skip_leading_rows: 1 - - # How to write data to the table: overwrite, append, or write if empty - # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition write_disposition: "WRITE_TRUNCATE" - - # The BigQuery table schema based on the CSV file. For more info, see - # https://cloud.google.com/bigquery/docs/schemas. - # Always use snake_case and lowercase for column names, and be explicit, - # i.e. specify modes for all columns. schema_fields: - name: "order_id" type: "INTEGER" @@ -386,32 +306,16 @@ dag: - name: "num_of_item" type: "INTEGER" mode: "NULLABLE" - - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load Users data to a BigQuery table" - args: task_id: "load_users_to_bq" - - # The GCS bucket where the CSV file is located in. bucket: "{{ var.value.composer_bucket }}" - - # The GCS object path for the CSV file source_objects: ["data/thelook_ecommerce/users.csv"] source_format: "CSV" destination_project_dataset_table: "thelook_ecommerce.users" - - # Use this if your CSV file contains a header row skip_leading_rows: 1 - - # How to write data to the table: overwrite, append, or write if empty - # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition write_disposition: "WRITE_TRUNCATE" - - # The BigQuery table schema based on the CSV file. For more info, see - # https://cloud.google.com/bigquery/docs/schemas. - # Always use snake_case and lowercase for column names, and be explicit, - # i.e. specify modes for all columns. schema_fields: - name: "id" type: "INTEGER" @@ -458,32 +362,16 @@ dag: - name: "created_at" type: "TIMESTAMP" mode: "NULLABLE" - - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load Distribution Centers data to a BigQuery table" - args: task_id: "load_distribution_centers_to_bq" - - # The GCS bucket where the CSV file is located in. bucket: "{{ var.value.composer_bucket }}" - - # The GCS object path for the CSV file source_objects: ["data/thelook_ecommerce/distribution_centers.csv"] source_format: "CSV" destination_project_dataset_table: "thelook_ecommerce.distribution_centers" - - # Use this if your CSV file contains a header row skip_leading_rows: 1 - - # How to write data to the table: overwrite, append, or write if empty - # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition write_disposition: "WRITE_TRUNCATE" - - # The BigQuery table schema based on the CSV file. For more info, see - # https://cloud.google.com/bigquery/docs/schemas. - # Always use snake_case and lowercase for column names, and be explicit, - # i.e. specify modes for all columns. schema_fields: - name: "id" type: "INTEGER" @@ -497,40 +385,33 @@ dag: - name: "longitude" type: "FLOAT" mode: "NULLABLE" - - operator: "BigQueryInsertJobOperator" description: "Task to create the user geom column from the latitude and longitude columns" - args: task_id: "create_user_geom_column" - - # Query that creates the column as a GEOGRAPHY type, then populates it with a geographic point based on the longitude and latitude values configuration: query: query: |- - ALTER TABLE `bigquery-public-data.thelook_ecommerce.users` ADD COLUMN IF NOT EXISTS user_geom GEOGRAPHY; - UPDATE `bigquery-public-data.thelook_ecommerce.users` + ALTER TABLE `{{ var.value.gcp_project }}.thelook_ecommerce.users` ADD COLUMN IF NOT EXISTS user_geom GEOGRAPHY; + UPDATE `{{ var.value.gcp_project }}.thelook_ecommerce.users` SET user_geom = SAFE.ST_GeogFromText(CONCAT('POINT(',CAST(longitude AS STRING), ' ', CAST(latitude as STRING), ')')) WHERE longitude IS NOT NULL AND latitude IS NOT NULL; useLegacySql: False - - operator: "BigQueryInsertJobOperator" description: "Task to create the distribution center geom column from the latitude and longitude columns" - args: task_id: "create_distribution_center_geom_column" - - # Query that creates the column as a GEOGRAPHY type, then populates it with a geographic point based on the longitude and latitude values configuration: - query: |- - ALTER TABLE `bigquery-public-data.thelook_ecommerce.distribution_centers` - ADD COLUMN IF NOT EXISTS distribution_center_geom GEOGRAPHY; - UPDATE `bigquery-public-data.thelook_ecommerce.distribution_centers` - SET distribution_center_geom = SAFE.ST_GeogFromText(CONCAT('POINT(',CAST(longitude AS STRING), ' ', CAST(latitude as STRING), ')')) - WHERE longitude IS NOT NULL - AND latitude IS NOT NULL; - # Use Legacy SQL should be false for any query that uses a DML statement - useLegacySql: False + query: + query: |- + ALTER TABLE `{{ var.value.gcp_project }}.thelook_ecommerce.distribution_centers` + ADD COLUMN IF NOT EXISTS distribution_center_geom GEOGRAPHY; + UPDATE `{{ var.value.gcp_project }}.thelook_ecommerce.distribution_centers` + SET distribution_center_geom = SAFE.ST_GeogFromText(CONCAT('POINT(',CAST(longitude AS STRING), ' ', CAST(latitude as STRING), ')')) + WHERE longitude IS NOT NULL + AND latitude IS NOT NULL; + # Use Legacy SQL should be false for any query that uses a DML statement + useLegacySql: False graph_paths: - - "generate_thelook >> [load_products_to_bq, load_events_to_bq, load_inventory_items_to_bq, load_order_items_to_bq, load_orders_to_bq, load_users_to_bq, load_distribution_centers_to_bq] >> create_user_geom_column >> create_distribution_center_geom_column" + - "create_cluster >> generate_thelook >> delete_cluster >> [load_products_to_bq, load_events_to_bq, load_inventory_items_to_bq, load_order_items_to_bq, load_orders_to_bq, load_users_to_bq, load_distribution_centers_to_bq] >> create_user_geom_column >> create_distribution_center_geom_column" diff --git a/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/thelook_ecommerce_dag.py b/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/thelook_ecommerce_dag.py index da95bec80..1c47b471a 100644 --- a/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/thelook_ecommerce_dag.py +++ b/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/thelook_ecommerce_dag.py @@ -14,8 +14,7 @@ from airflow import DAG -from airflow.providers.cncf.kubernetes.operators import kubernetes_pod -from airflow.providers.google.cloud.operators import bigquery +from airflow.providers.google.cloud.operators import bigquery, kubernetes_engine from airflow.providers.google.cloud.transfers import gcs_to_bigquery default_args = { @@ -33,14 +32,33 @@ catchup=False, default_view="graph", ) as dag: + create_cluster = kubernetes_engine.GKECreateClusterOperator( + task_id="create_cluster", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + body={ + "name": "pdp-thelook-ecommerce", + "initial_node_count": 1, + "network": "{{ var.value.vpc_network }}", + "node_config": { + "machine_type": "e2-highmem-16", + "oauth_scopes": [ + "https://www.googleapis.com/auth/devstorage.read_write", + "https://www.googleapis.com/auth/cloud-platform", + ], + }, + }, + ) # Run CSV transform within kubernetes pod - generate_thelook = kubernetes_pod.KubernetesPodOperator( + generate_thelook = kubernetes_engine.GKEStartPodOperator( task_id="generate_thelook", is_delete_operator_pod=False, name="generate_thelook", - namespace="composer", - service_account_name="datasets", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-thelook-ecommerce", + namespace="default", image_pull_policy="Always", image="{{ var.json.thelook_ecommerce.docker_image }}", env_vars={ @@ -51,11 +69,12 @@ "SOURCE_DIR": "data", "EXTRANEOUS_HEADERS": '["event_type", "ip_address", "browser", "traffic_source", "session_id", "sequence_number", "uri", "is_sold"]', }, - resources={ - "request_memory": "8G", - "request_cpu": "2", - "request_ephemeral_storage": "10G", - }, + ) + delete_cluster = kubernetes_engine.GKEDeleteClusterOperator( + task_id="delete_cluster", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + name="pdp-thelook-ecommerce", ) # Task to load Products data to a BigQuery table @@ -231,7 +250,7 @@ task_id="create_user_geom_column", configuration={ "query": { - "query": "ALTER TABLE `bigquery-public-data.thelook_ecommerce.users` ADD COLUMN IF NOT EXISTS user_geom GEOGRAPHY;\nUPDATE `bigquery-public-data.thelook_ecommerce.users`\n SET user_geom = SAFE.ST_GeogFromText(CONCAT('POINT(',CAST(longitude AS STRING), ' ', CAST(latitude as STRING), ')'))\n WHERE longitude IS NOT NULL AND latitude IS NOT NULL;", + "query": "ALTER TABLE `{{ var.value.gcp_project }}.thelook_ecommerce.users` ADD COLUMN IF NOT EXISTS user_geom GEOGRAPHY;\nUPDATE `{{ var.value.gcp_project }}.thelook_ecommerce.users`\n SET user_geom = SAFE.ST_GeogFromText(CONCAT('POINT(',CAST(longitude AS STRING), ' ', CAST(latitude as STRING), ')'))\n WHERE longitude IS NOT NULL AND latitude IS NOT NULL;", "useLegacySql": False, } }, @@ -241,13 +260,17 @@ create_distribution_center_geom_column = bigquery.BigQueryInsertJobOperator( task_id="create_distribution_center_geom_column", configuration={ - "query": "ALTER TABLE `bigquery-public-data.thelook_ecommerce.distribution_centers`\n ADD COLUMN IF NOT EXISTS distribution_center_geom GEOGRAPHY;\nUPDATE `bigquery-public-data.thelook_ecommerce.distribution_centers`\n SET distribution_center_geom = SAFE.ST_GeogFromText(CONCAT('POINT(',CAST(longitude AS STRING), ' ', CAST(latitude as STRING), ')'))\n WHERE longitude IS NOT NULL\n AND latitude IS NOT NULL;\n# Use Legacy SQL should be false for any query that uses a DML statement", - "useLegacySql": False, + "query": { + "query": "ALTER TABLE `{{ var.value.gcp_project }}.thelook_ecommerce.distribution_centers`\n ADD COLUMN IF NOT EXISTS distribution_center_geom GEOGRAPHY;\nUPDATE `{{ var.value.gcp_project }}.thelook_ecommerce.distribution_centers`\n SET distribution_center_geom = SAFE.ST_GeogFromText(CONCAT('POINT(',CAST(longitude AS STRING), ' ', CAST(latitude as STRING), ')'))\nWHERE longitude IS NOT NULL\n AND latitude IS NOT NULL;\n# Use Legacy SQL should be false for any query that uses a DML statement", + "useLegacySql": False, + } }, ) ( - generate_thelook + create_cluster + >> generate_thelook + >> delete_cluster >> [ load_products_to_bq, load_events_to_bq, From f8a0632c02fe5d5af2ad65c0add66e1d1b7d37d1 Mon Sep 17 00:00:00 2001 From: Nicholas Large Date: Mon, 11 Nov 2024 13:01:53 -0600 Subject: [PATCH 2/3] Fix: Migrate Multilingual Spoken Words Corpus to the new environment. (#863) --- .../multilingual_spoken_words_corpus_dag.py | 43 +++++++++++++++---- .../pipeline.yaml | 43 +++++++++++++++---- 2 files changed, 70 insertions(+), 16 deletions(-) diff --git a/datasets/multilingual_spoken_words_corpus/pipelines/multilingual_spoken_words_corpus/multilingual_spoken_words_corpus_dag.py b/datasets/multilingual_spoken_words_corpus/pipelines/multilingual_spoken_words_corpus/multilingual_spoken_words_corpus_dag.py index 04abe6909..90c5aefd7 100644 --- a/datasets/multilingual_spoken_words_corpus/pipelines/multilingual_spoken_words_corpus/multilingual_spoken_words_corpus_dag.py +++ b/datasets/multilingual_spoken_words_corpus/pipelines/multilingual_spoken_words_corpus/multilingual_spoken_words_corpus_dag.py @@ -15,7 +15,7 @@ from airflow import DAG from airflow.operators import bash -from airflow.providers.cncf.kubernetes.operators import kubernetes_pod +from airflow.providers.google.cloud.operators import kubernetes_engine from airflow.providers.google.cloud.transfers import gcs_to_bigquery, gcs_to_gcs default_args = { @@ -53,14 +53,33 @@ "source_file": "metadata.json.gz", }, ) + create_cluster = kubernetes_engine.GKECreateClusterOperator( + task_id="create_cluster", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + body={ + "name": "pdp-multilingual-spoken-words-corpus", + "initial_node_count": 1, + "network": "{{ var.value.vpc_network }}", + "node_config": { + "machine_type": "e2-standard-16", + "oauth_scopes": [ + "https://www.googleapis.com/auth/devstorage.read_write", + "https://www.googleapis.com/auth/cloud-platform", + ], + }, + }, + ) # Run CSV transform within kubernetes pod - metadata_csv_transform = kubernetes_pod.KubernetesPodOperator( + metadata_csv_transform = kubernetes_engine.GKEStartPodOperator( task_id="metadata_csv_transform", startup_timeout_seconds=600, name="metadata_csv_transform", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-multilingual-spoken-words-corpus", image_pull_policy="Always", image="{{ var.json.multilingual_spoken_words_corpus.container_registry.run_csv_transform_kub }}", env_vars={ @@ -72,12 +91,18 @@ "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", "TARGET_GCS_PATH": "data/multilingual_spoken_words_corpus/metadata_data_output.csv", }, - resources={ - "request_memory": "4G", - "request_cpu": "1", - "request_ephemeral_storage": "4G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) + delete_cluster = kubernetes_engine.GKEDeleteClusterOperator( + task_id="delete_cluster", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + name="pdp-multilingual-spoken-words-corpus", + ) # Task to load CSV data to a BigQuery table load_metadata_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( @@ -133,6 +158,8 @@ ( copy_metadata_file_to_gcs >> unzip_metadata_gz + >> create_cluster >> metadata_csv_transform + >> delete_cluster >> load_metadata_to_bq ) diff --git a/datasets/multilingual_spoken_words_corpus/pipelines/multilingual_spoken_words_corpus/pipeline.yaml b/datasets/multilingual_spoken_words_corpus/pipelines/multilingual_spoken_words_corpus/pipeline.yaml index 30f9bbb6c..f80cbcb82 100644 --- a/datasets/multilingual_spoken_words_corpus/pipelines/multilingual_spoken_words_corpus/pipeline.yaml +++ b/datasets/multilingual_spoken_words_corpus/pipelines/multilingual_spoken_words_corpus/pipeline.yaml @@ -56,14 +56,31 @@ dag: data_dir: "/home/airflow/gcs/data/multilingual_spoken_words_corpus" source_file: "metadata.json.gz" - - operator: "KubernetesPodOperator" + - operator: "GKECreateClusterOperator" + args: + task_id: "create_cluster" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + body: + name: pdp-multilingual-spoken-words-corpus + initial_node_count: 1 + network: "{{ var.value.vpc_network }}" + node_config: + machine_type: e2-standard-16 + oauth_scopes: + - https://www.googleapis.com/auth/devstorage.read_write + - https://www.googleapis.com/auth/cloud-platform + + - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "metadata_csv_transform" startup_timeout_seconds: 600 name: "metadata_csv_transform" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-multilingual-spoken-words-corpus image_pull_policy: "Always" image: "{{ var.json.multilingual_spoken_words_corpus.container_registry.run_csv_transform_kub }}" env_vars: @@ -75,10 +92,20 @@ dag: ["lang_abbr", "language", "number_of_words","word", "word_count", "filename"] TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/multilingual_spoken_words_corpus/metadata_data_output.csv" - resources: - request_memory: "4G" - request_cpu: "1" - request_ephemeral_storage: "4G" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" + + - operator: "GKEDeleteClusterOperator" + args: + task_id: "delete_cluster" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + name: pdp-multilingual-spoken-words-corpus - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" @@ -117,4 +144,4 @@ dag: mode: "nullable" graph_paths: - - "copy_metadata_file_to_gcs >> unzip_metadata_gz >> metadata_csv_transform >> load_metadata_to_bq" + - "copy_metadata_file_to_gcs >> unzip_metadata_gz >> create_cluster >> metadata_csv_transform >> delete_cluster >> load_metadata_to_bq" From 95bfb7263e8f4ddc10170902b969c014db7b1224 Mon Sep 17 00:00:00 2001 From: Nicholas Large Date: Mon, 11 Nov 2024 13:02:50 -0600 Subject: [PATCH 3/3] Fix: Migrate mimic_iii dag (#864) --- datasets/mimic_iii/pipelines/mimic_iii/mimic_iii_dag.py | 6 +++--- datasets/mimic_iii/pipelines/mimic_iii/pipeline.yaml | 8 +++----- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/datasets/mimic_iii/pipelines/mimic_iii/mimic_iii_dag.py b/datasets/mimic_iii/pipelines/mimic_iii/mimic_iii_dag.py index 56e170b09..b72f9bce6 100644 --- a/datasets/mimic_iii/pipelines/mimic_iii/mimic_iii_dag.py +++ b/datasets/mimic_iii/pipelines/mimic_iii/mimic_iii_dag.py @@ -36,8 +36,9 @@ copy_bq_dataset = kubernetes_pod.KubernetesPodOperator( task_id="copy_bq_dataset", name="copy_bq_dataset", - namespace="composer", - service_account_name="datasets", + namespace="composer-user-workloads", + service_account_name="default", + config_file="/home/airflow/composer_kube_config", image_pull_policy="Always", image="{{ var.json.mimic_iii.container_registry.run_script_kub }}", env_vars={ @@ -46,7 +47,6 @@ "TARGET_PROJECT_ID": "{{ var.json.mimic_iii.target_project_id }}", "TARGET_BQ_DATASET": "{{ var.json.mimic_iii.target_bq_dataset }}", }, - resources={"request_memory": "128M", "request_cpu": "200m"}, ) copy_bq_dataset diff --git a/datasets/mimic_iii/pipelines/mimic_iii/pipeline.yaml b/datasets/mimic_iii/pipelines/mimic_iii/pipeline.yaml index 8d2f3a87b..fd384ce28 100644 --- a/datasets/mimic_iii/pipelines/mimic_iii/pipeline.yaml +++ b/datasets/mimic_iii/pipelines/mimic_iii/pipeline.yaml @@ -34,8 +34,9 @@ dag: args: task_id: "copy_bq_dataset" name: "copy_bq_dataset" - namespace: "composer" - service_account_name: "datasets" + namespace: "composer-user-workloads" + service_account_name: "default" + config_file: "/home/airflow/composer_kube_config" image_pull_policy: "Always" image: "{{ var.json.mimic_iii.container_registry.run_script_kub }}" env_vars: @@ -43,9 +44,6 @@ dag: SOURCE_BQ_DATASET: "{{ var.json.mimic_iii.source_bq_dataset }}" TARGET_PROJECT_ID: "{{ var.json.mimic_iii.target_project_id }}" TARGET_BQ_DATASET: "{{ var.json.mimic_iii.target_bq_dataset }}" - resources: - request_memory: "128M" - request_cpu: "200m" graph_paths: - "copy_bq_dataset"