From 42e2ad87d7e1dd8e31a3539c2b75f8559c6b12c0 Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Fri, 8 Nov 2024 15:58:29 +0000 Subject: [PATCH] Fix: Migrate The Look Ecommerce DAGs --- .../pipelines/thelook_ecommerce/pipeline.yaml | 146 +----------------- .../thelook_ecommerce_dag.py | 14 +- 2 files changed, 12 insertions(+), 148 deletions(-) diff --git a/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/pipeline.yaml b/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/pipeline.yaml index 1a06963f4..7980bf8bd 100644 --- a/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/pipeline.yaml +++ b/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/pipeline.yaml @@ -17,27 +17,21 @@ resources: - type: bigquery_table table_id: products description: "The Look fictitious e-commerce dataset - products table" - - type: bigquery_table table_id: events description: "Programatically generated web events for The Look fictitious e-commerce store" - - type: bigquery_table table_id: users description: "Programatically generated users for The Look fictitious e-commerce store" - - type: bigquery_table table_id: orders description: "Programatically generated orders for The Look fictitious e-commerce store" - - type: bigquery_table table_id: order_items description: "Programatically generated order items for The Look fictitious e-commerce store" - - type: bigquery_table table_id: inventory_items description: "Programatically generated inventory for The Look fictitious e-commerce store" - - type: bigquery_table table_id: distribution_centers description: "The Look fictitious e-commerce dataset: distribution_centers table" @@ -48,8 +42,6 @@ dag: dag_id: thelook_ecommerce default_args: owner: "Google" - - # When set to True, keeps a task from getting triggered if the previous schedule for the task hasn’t succeeded depends_on_past: False start_date: "2021-02-09" max_active_runs: 1 @@ -59,24 +51,16 @@ dag: tasks: - operator: "KubernetesPodOperator" - - # Task description description: "Run CSV transform within kubernetes pod" - args: task_id: "generate_thelook" is_delete_operator_pod: False - # The name of the pod in which the task will run. This will be used (plus a random suffix) to generate a pod id name: "generate_thelook" - namespace: "composer" - service_account_name: "datasets" + namespace: "composer-user-workloads" + service_account_name: "default" + config_file: "/home/airflow/composer_kube_config" image_pull_policy: "Always" - - # Docker images will be built and pushed to GCR by default whenever the `scripts/generate_dag.py` is run. To skip building and pushing images, use the optional `--skip-builds` flag. image: "{{ var.json.thelook_ecommerce.docker_image }}" - # image: !IMAGE run_thelook_kub - - # Set the environment variables you need initialized in the container. Use these as input variables for the script your container is expected to perform. env_vars: NUM_OF_USERS: "100000" NUM_OF_GHOST_EVENTS: "5" @@ -85,36 +69,16 @@ dag: SOURCE_DIR: "data" EXTRANEOUS_HEADERS: '["event_type", "ip_address", "browser", "traffic_source", "session_id", "sequence_number", "uri", "is_sold"]' - resources: - request_memory: "8G" - request_cpu: "2" - request_ephemeral_storage: "10G" - - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load Products data to a BigQuery table" - args: task_id: "load_products_to_bq" - - # The GCS bucket where the CSV file is located in. bucket: "{{ var.value.composer_bucket }}" - - # The GCS object path for the CSV file source_objects: ["data/thelook_ecommerce/products.csv"] source_format: "CSV" destination_project_dataset_table: "thelook_ecommerce.products" - - # Use this if your CSV file contains a header row skip_leading_rows: 1 - - # How to write data to the table: overwrite, append, or write if empty - # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition write_disposition: "WRITE_TRUNCATE" - - # The BigQuery table schema based on the CSV file. For more info, see - # https://cloud.google.com/bigquery/docs/schemas. - # Always use snake_case and lowercase for column names, and be explicit, - # i.e. specify modes for all columns. schema_fields: - name: "id" type: "INTEGER" @@ -146,29 +110,14 @@ dag: - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load Events data to a BigQuery table" - args: task_id: "load_events_to_bq" - - # The GCS bucket where the CSV file is located in. bucket: "{{ var.value.composer_bucket }}" - - # The GCS object path for the CSV file source_objects: ["data/thelook_ecommerce/events.csv"] source_format: "CSV" destination_project_dataset_table: "thelook_ecommerce.events" - - # Use this if your CSV file contains a header row skip_leading_rows: 1 - - # How to write data to the table: overwrite, append, or write if empty - # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition write_disposition: "WRITE_TRUNCATE" - - # The BigQuery table schema based on the CSV file. For more info, see - # https://cloud.google.com/bigquery/docs/schemas. - # Always use snake_case and lowercase for column names, and be explicit, - # i.e. specify modes for all columns. schema_fields: - name: "id" type: "INTEGER" @@ -212,29 +161,14 @@ dag: - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load Iventory Items data to a BigQuery table" - args: task_id: "load_inventory_items_to_bq" - - # The GCS bucket where the CSV file is located in. bucket: "{{ var.value.composer_bucket }}" - - # The GCS object path for the CSV file source_objects: ["data/thelook_ecommerce/inventory_items.csv"] source_format: "CSV" destination_project_dataset_table: "thelook_ecommerce.inventory_items" - - # Use this if your CSV file contains a header row skip_leading_rows: 1 - - # How to write data to the table: overwrite, append, or write if empty - # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition write_disposition: "WRITE_TRUNCATE" - - # The BigQuery table schema based on the CSV file. For more info, see - # https://cloud.google.com/bigquery/docs/schemas. - # Always use snake_case and lowercase for column names, and be explicit, - # i.e. specify modes for all columns. schema_fields: - name: "id" type: "INTEGER" @@ -275,29 +209,14 @@ dag: - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load Order Items data to a BigQuery table" - args: task_id: "load_order_items_to_bq" - - # The GCS bucket where the CSV file is located in. bucket: "{{ var.value.composer_bucket }}" - - # The GCS object path for the CSV file source_objects: ["data/thelook_ecommerce/order_items.csv"] source_format: "CSV" destination_project_dataset_table: "thelook_ecommerce.order_items" - - # Use this if your CSV file contains a header row skip_leading_rows: 1 - - # How to write data to the table: overwrite, append, or write if empty - # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition write_disposition: "WRITE_TRUNCATE" - - # The BigQuery table schema based on the CSV file. For more info, see - # https://cloud.google.com/bigquery/docs/schemas. - # Always use snake_case and lowercase for column names, and be explicit, - # i.e. specify modes for all columns. schema_fields: - name: "id" type: "INTEGER" @@ -335,29 +254,14 @@ dag: - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load Orders data to a BigQuery table" - args: task_id: "load_orders_to_bq" - - # The GCS bucket where the CSV file is located in. bucket: "{{ var.value.composer_bucket }}" - - # The GCS object path for the CSV file source_objects: ["data/thelook_ecommerce/orders.csv"] source_format: "CSV" destination_project_dataset_table: "thelook_ecommerce.orders" - - # Use this if your CSV file contains a header row skip_leading_rows: 1 - - # How to write data to the table: overwrite, append, or write if empty - # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition write_disposition: "WRITE_TRUNCATE" - - # The BigQuery table schema based on the CSV file. For more info, see - # https://cloud.google.com/bigquery/docs/schemas. - # Always use snake_case and lowercase for column names, and be explicit, - # i.e. specify modes for all columns. schema_fields: - name: "order_id" type: "INTEGER" @@ -389,29 +293,14 @@ dag: - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load Users data to a BigQuery table" - args: task_id: "load_users_to_bq" - - # The GCS bucket where the CSV file is located in. bucket: "{{ var.value.composer_bucket }}" - - # The GCS object path for the CSV file source_objects: ["data/thelook_ecommerce/users.csv"] source_format: "CSV" destination_project_dataset_table: "thelook_ecommerce.users" - - # Use this if your CSV file contains a header row skip_leading_rows: 1 - - # How to write data to the table: overwrite, append, or write if empty - # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition write_disposition: "WRITE_TRUNCATE" - - # The BigQuery table schema based on the CSV file. For more info, see - # https://cloud.google.com/bigquery/docs/schemas. - # Always use snake_case and lowercase for column names, and be explicit, - # i.e. specify modes for all columns. schema_fields: - name: "id" type: "INTEGER" @@ -461,29 +350,14 @@ dag: - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load Distribution Centers data to a BigQuery table" - args: task_id: "load_distribution_centers_to_bq" - - # The GCS bucket where the CSV file is located in. bucket: "{{ var.value.composer_bucket }}" - - # The GCS object path for the CSV file source_objects: ["data/thelook_ecommerce/distribution_centers.csv"] source_format: "CSV" destination_project_dataset_table: "thelook_ecommerce.distribution_centers" - - # Use this if your CSV file contains a header row skip_leading_rows: 1 - - # How to write data to the table: overwrite, append, or write if empty - # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition write_disposition: "WRITE_TRUNCATE" - - # The BigQuery table schema based on the CSV file. For more info, see - # https://cloud.google.com/bigquery/docs/schemas. - # Always use snake_case and lowercase for column names, and be explicit, - # i.e. specify modes for all columns. schema_fields: - name: "id" type: "INTEGER" @@ -500,32 +374,26 @@ dag: - operator: "BigQueryInsertJobOperator" description: "Task to create the user geom column from the latitude and longitude columns" - args: task_id: "create_user_geom_column" - - # Query that creates the column as a GEOGRAPHY type, then populates it with a geographic point based on the longitude and latitude values configuration: query: query: |- - ALTER TABLE `bigquery-public-data.thelook_ecommerce.users` ADD COLUMN IF NOT EXISTS user_geom GEOGRAPHY; - UPDATE `bigquery-public-data.thelook_ecommerce.users` + ALTER TABLE `{{ var.value.gcp_project }}.thelook_ecommerce.users` ADD COLUMN IF NOT EXISTS user_geom GEOGRAPHY; + UPDATE `{{ var.value.gcp_project }}.thelook_ecommerce.users` SET user_geom = SAFE.ST_GeogFromText(CONCAT('POINT(',CAST(longitude AS STRING), ' ', CAST(latitude as STRING), ')')) WHERE longitude IS NOT NULL AND latitude IS NOT NULL; useLegacySql: False - operator: "BigQueryInsertJobOperator" description: "Task to create the distribution center geom column from the latitude and longitude columns" - args: task_id: "create_distribution_center_geom_column" - - # Query that creates the column as a GEOGRAPHY type, then populates it with a geographic point based on the longitude and latitude values configuration: query: |- - ALTER TABLE `bigquery-public-data.thelook_ecommerce.distribution_centers` + ALTER TABLE `{{ var.value.gcp_project }}.thelook_ecommerce.distribution_centers` ADD COLUMN IF NOT EXISTS distribution_center_geom GEOGRAPHY; - UPDATE `bigquery-public-data.thelook_ecommerce.distribution_centers` + UPDATE `{{ var.value.gcp_project }}.thelook_ecommerce.distribution_centers` SET distribution_center_geom = SAFE.ST_GeogFromText(CONCAT('POINT(',CAST(longitude AS STRING), ' ', CAST(latitude as STRING), ')')) WHERE longitude IS NOT NULL AND latitude IS NOT NULL; diff --git a/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/thelook_ecommerce_dag.py b/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/thelook_ecommerce_dag.py index da95bec80..12931d003 100644 --- a/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/thelook_ecommerce_dag.py +++ b/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/thelook_ecommerce_dag.py @@ -39,8 +39,9 @@ task_id="generate_thelook", is_delete_operator_pod=False, name="generate_thelook", - namespace="composer", - service_account_name="datasets", + namespace="composer-user-workloads", + service_account_name="default", + config_file="/home/airflow/composer_kube_config", image_pull_policy="Always", image="{{ var.json.thelook_ecommerce.docker_image }}", env_vars={ @@ -51,11 +52,6 @@ "SOURCE_DIR": "data", "EXTRANEOUS_HEADERS": '["event_type", "ip_address", "browser", "traffic_source", "session_id", "sequence_number", "uri", "is_sold"]', }, - resources={ - "request_memory": "8G", - "request_cpu": "2", - "request_ephemeral_storage": "10G", - }, ) # Task to load Products data to a BigQuery table @@ -231,7 +227,7 @@ task_id="create_user_geom_column", configuration={ "query": { - "query": "ALTER TABLE `bigquery-public-data.thelook_ecommerce.users` ADD COLUMN IF NOT EXISTS user_geom GEOGRAPHY;\nUPDATE `bigquery-public-data.thelook_ecommerce.users`\n SET user_geom = SAFE.ST_GeogFromText(CONCAT('POINT(',CAST(longitude AS STRING), ' ', CAST(latitude as STRING), ')'))\n WHERE longitude IS NOT NULL AND latitude IS NOT NULL;", + "query": "ALTER TABLE `{{ var.value.gcp_project }}.thelook_ecommerce.users` ADD COLUMN IF NOT EXISTS user_geom GEOGRAPHY;\nUPDATE `{{ var.value.gcp_project }}.thelook_ecommerce.users`\n SET user_geom = SAFE.ST_GeogFromText(CONCAT('POINT(',CAST(longitude AS STRING), ' ', CAST(latitude as STRING), ')'))\n WHERE longitude IS NOT NULL AND latitude IS NOT NULL;", "useLegacySql": False, } }, @@ -241,7 +237,7 @@ create_distribution_center_geom_column = bigquery.BigQueryInsertJobOperator( task_id="create_distribution_center_geom_column", configuration={ - "query": "ALTER TABLE `bigquery-public-data.thelook_ecommerce.distribution_centers`\n ADD COLUMN IF NOT EXISTS distribution_center_geom GEOGRAPHY;\nUPDATE `bigquery-public-data.thelook_ecommerce.distribution_centers`\n SET distribution_center_geom = SAFE.ST_GeogFromText(CONCAT('POINT(',CAST(longitude AS STRING), ' ', CAST(latitude as STRING), ')'))\n WHERE longitude IS NOT NULL\n AND latitude IS NOT NULL;\n# Use Legacy SQL should be false for any query that uses a DML statement", + "query": "ALTER TABLE `{{ var.value.gcp_project }}.thelook_ecommerce.distribution_centers`\n ADD COLUMN IF NOT EXISTS distribution_center_geom GEOGRAPHY;\nUPDATE `{{ var.value.gcp_project }}.thelook_ecommerce.distribution_centers`\n SET distribution_center_geom = SAFE.ST_GeogFromText(CONCAT('POINT(',CAST(longitude AS STRING), ' ', CAST(latitude as STRING), ')'))\n WHERE longitude IS NOT NULL\n AND latitude IS NOT NULL;\n# Use Legacy SQL should be false for any query that uses a DML statement", "useLegacySql": False, }, )