Skip to content

Commit

Permalink
Fix: Migrate The Look Ecommerce DAGs (#861)
Browse files Browse the repository at this point in the history
* Fix: Migrate The Look Ecommerce DAGs

* Fix: Change to GKEStartPodOperator.

* Fix: Resolved configuration issue in pipeline.yaml.
  • Loading branch information
nlarge-google authored Nov 11, 2024
1 parent 4df1a02 commit 47a8e76
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 171 deletions.
195 changes: 38 additions & 157 deletions datasets/thelook_ecommerce/pipelines/thelook_ecommerce/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,21 @@ resources:
- type: bigquery_table
table_id: products
description: "The Look fictitious e-commerce dataset - products table"

- type: bigquery_table
table_id: events
description: "Programatically generated web events for The Look fictitious e-commerce store"

- type: bigquery_table
table_id: users
description: "Programatically generated users for The Look fictitious e-commerce store"

- type: bigquery_table
table_id: orders
description: "Programatically generated orders for The Look fictitious e-commerce store"

- type: bigquery_table
table_id: order_items
description: "Programatically generated order items for The Look fictitious e-commerce store"

- type: bigquery_table
table_id: inventory_items
description: "Programatically generated inventory for The Look fictitious e-commerce store"

- type: bigquery_table
table_id: distribution_centers
description: "The Look fictitious e-commerce dataset: distribution_centers table"
Expand All @@ -48,8 +42,6 @@ dag:
dag_id: thelook_ecommerce
default_args:
owner: "Google"

# When set to True, keeps a task from getting triggered if the previous schedule for the task hasn’t succeeded
depends_on_past: False
start_date: "2021-02-09"
max_active_runs: 1
Expand All @@ -58,63 +50,55 @@ dag:
default_view: graph

tasks:
- operator: "KubernetesPodOperator"

# Task description
- operator: "GKECreateClusterOperator"
args:
task_id: "create_cluster"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
body:
name: pdp-thelook-ecommerce
initial_node_count: 1
network: "{{ var.value.vpc_network }}"
node_config:
machine_type: e2-highmem-16
oauth_scopes:
- https://www.googleapis.com/auth/devstorage.read_write
- https://www.googleapis.com/auth/cloud-platform
- operator: "GKEStartPodOperator"
description: "Run CSV transform within kubernetes pod"

args:
task_id: "generate_thelook"
is_delete_operator_pod: False
# The name of the pod in which the task will run. This will be used (plus a random suffix) to generate a pod id
name: "generate_thelook"
namespace: "composer"
service_account_name: "datasets"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
cluster_name: pdp-thelook-ecommerce
namespace: "default"
image_pull_policy: "Always"

# Docker images will be built and pushed to GCR by default whenever the `scripts/generate_dag.py` is run. To skip building and pushing images, use the optional `--skip-builds` flag.
image: "{{ var.json.thelook_ecommerce.docker_image }}"
# image: !IMAGE run_thelook_kub

# Set the environment variables you need initialized in the container. Use these as input variables for the script your container is expected to perform.
env_vars:
NUM_OF_USERS: "100000"
NUM_OF_GHOST_EVENTS: "5"
TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}"
TARGET_GCS_PREFIX: "data/thelook_ecommerce"
SOURCE_DIR: "data"
EXTRANEOUS_HEADERS: '["event_type", "ip_address", "browser", "traffic_source", "session_id", "sequence_number", "uri", "is_sold"]'

resources:
request_memory: "8G"
request_cpu: "2"
request_ephemeral_storage: "10G"

- operator: "GKEDeleteClusterOperator"
args:
task_id: "delete_cluster"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
name: pdp-thelook-ecommerce
- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load Products data to a BigQuery table"

args:
task_id: "load_products_to_bq"

# The GCS bucket where the CSV file is located in.
bucket: "{{ var.value.composer_bucket }}"

# The GCS object path for the CSV file
source_objects: ["data/thelook_ecommerce/products.csv"]
source_format: "CSV"
destination_project_dataset_table: "thelook_ecommerce.products"

# Use this if your CSV file contains a header row
skip_leading_rows: 1

# How to write data to the table: overwrite, append, or write if empty
# See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition
write_disposition: "WRITE_TRUNCATE"

# The BigQuery table schema based on the CSV file. For more info, see
# https://cloud.google.com/bigquery/docs/schemas.
# Always use snake_case and lowercase for column names, and be explicit,
# i.e. specify modes for all columns.
schema_fields:
- name: "id"
type: "INTEGER"
Expand Down Expand Up @@ -143,32 +127,16 @@ dag:
- name: "distribution_center_id"
type: "INTEGER"
mode: "NULLABLE"

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load Events data to a BigQuery table"

args:
task_id: "load_events_to_bq"

# The GCS bucket where the CSV file is located in.
bucket: "{{ var.value.composer_bucket }}"

# The GCS object path for the CSV file
source_objects: ["data/thelook_ecommerce/events.csv"]
source_format: "CSV"
destination_project_dataset_table: "thelook_ecommerce.events"

# Use this if your CSV file contains a header row
skip_leading_rows: 1

# How to write data to the table: overwrite, append, or write if empty
# See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition
write_disposition: "WRITE_TRUNCATE"

# The BigQuery table schema based on the CSV file. For more info, see
# https://cloud.google.com/bigquery/docs/schemas.
# Always use snake_case and lowercase for column names, and be explicit,
# i.e. specify modes for all columns.
schema_fields:
- name: "id"
type: "INTEGER"
Expand Down Expand Up @@ -209,32 +177,16 @@ dag:
- name: "event_type"
type: "STRING"
mode: "NULLABLE"

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load Iventory Items data to a BigQuery table"

args:
task_id: "load_inventory_items_to_bq"

# The GCS bucket where the CSV file is located in.
bucket: "{{ var.value.composer_bucket }}"

# The GCS object path for the CSV file
source_objects: ["data/thelook_ecommerce/inventory_items.csv"]
source_format: "CSV"
destination_project_dataset_table: "thelook_ecommerce.inventory_items"

# Use this if your CSV file contains a header row
skip_leading_rows: 1

# How to write data to the table: overwrite, append, or write if empty
# See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition
write_disposition: "WRITE_TRUNCATE"

# The BigQuery table schema based on the CSV file. For more info, see
# https://cloud.google.com/bigquery/docs/schemas.
# Always use snake_case and lowercase for column names, and be explicit,
# i.e. specify modes for all columns.
schema_fields:
- name: "id"
type: "INTEGER"
Expand Down Expand Up @@ -272,32 +224,16 @@ dag:
- name: "product_distribution_center_id"
type: "INTEGER"
mode: "NULLABLE"

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load Order Items data to a BigQuery table"

args:
task_id: "load_order_items_to_bq"

# The GCS bucket where the CSV file is located in.
bucket: "{{ var.value.composer_bucket }}"

# The GCS object path for the CSV file
source_objects: ["data/thelook_ecommerce/order_items.csv"]
source_format: "CSV"
destination_project_dataset_table: "thelook_ecommerce.order_items"

# Use this if your CSV file contains a header row
skip_leading_rows: 1

# How to write data to the table: overwrite, append, or write if empty
# See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition
write_disposition: "WRITE_TRUNCATE"

# The BigQuery table schema based on the CSV file. For more info, see
# https://cloud.google.com/bigquery/docs/schemas.
# Always use snake_case and lowercase for column names, and be explicit,
# i.e. specify modes for all columns.
schema_fields:
- name: "id"
type: "INTEGER"
Expand Down Expand Up @@ -332,32 +268,16 @@ dag:
- name: "sale_price"
type: "FLOAT"
mode: "NULLABLE"

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load Orders data to a BigQuery table"

args:
task_id: "load_orders_to_bq"

# The GCS bucket where the CSV file is located in.
bucket: "{{ var.value.composer_bucket }}"

# The GCS object path for the CSV file
source_objects: ["data/thelook_ecommerce/orders.csv"]
source_format: "CSV"
destination_project_dataset_table: "thelook_ecommerce.orders"

# Use this if your CSV file contains a header row
skip_leading_rows: 1

# How to write data to the table: overwrite, append, or write if empty
# See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition
write_disposition: "WRITE_TRUNCATE"

# The BigQuery table schema based on the CSV file. For more info, see
# https://cloud.google.com/bigquery/docs/schemas.
# Always use snake_case and lowercase for column names, and be explicit,
# i.e. specify modes for all columns.
schema_fields:
- name: "order_id"
type: "INTEGER"
Expand Down Expand Up @@ -386,32 +306,16 @@ dag:
- name: "num_of_item"
type: "INTEGER"
mode: "NULLABLE"

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load Users data to a BigQuery table"

args:
task_id: "load_users_to_bq"

# The GCS bucket where the CSV file is located in.
bucket: "{{ var.value.composer_bucket }}"

# The GCS object path for the CSV file
source_objects: ["data/thelook_ecommerce/users.csv"]
source_format: "CSV"
destination_project_dataset_table: "thelook_ecommerce.users"

# Use this if your CSV file contains a header row
skip_leading_rows: 1

# How to write data to the table: overwrite, append, or write if empty
# See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition
write_disposition: "WRITE_TRUNCATE"

# The BigQuery table schema based on the CSV file. For more info, see
# https://cloud.google.com/bigquery/docs/schemas.
# Always use snake_case and lowercase for column names, and be explicit,
# i.e. specify modes for all columns.
schema_fields:
- name: "id"
type: "INTEGER"
Expand Down Expand Up @@ -458,32 +362,16 @@ dag:
- name: "created_at"
type: "TIMESTAMP"
mode: "NULLABLE"

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load Distribution Centers data to a BigQuery table"

args:
task_id: "load_distribution_centers_to_bq"

# The GCS bucket where the CSV file is located in.
bucket: "{{ var.value.composer_bucket }}"

# The GCS object path for the CSV file
source_objects: ["data/thelook_ecommerce/distribution_centers.csv"]
source_format: "CSV"
destination_project_dataset_table: "thelook_ecommerce.distribution_centers"

# Use this if your CSV file contains a header row
skip_leading_rows: 1

# How to write data to the table: overwrite, append, or write if empty
# See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition
write_disposition: "WRITE_TRUNCATE"

# The BigQuery table schema based on the CSV file. For more info, see
# https://cloud.google.com/bigquery/docs/schemas.
# Always use snake_case and lowercase for column names, and be explicit,
# i.e. specify modes for all columns.
schema_fields:
- name: "id"
type: "INTEGER"
Expand All @@ -497,40 +385,33 @@ dag:
- name: "longitude"
type: "FLOAT"
mode: "NULLABLE"

- operator: "BigQueryInsertJobOperator"
description: "Task to create the user geom column from the latitude and longitude columns"

args:
task_id: "create_user_geom_column"

# Query that creates the column as a GEOGRAPHY type, then populates it with a geographic point based on the longitude and latitude values
configuration:
query:
query: |-
ALTER TABLE `bigquery-public-data.thelook_ecommerce.users` ADD COLUMN IF NOT EXISTS user_geom GEOGRAPHY;
UPDATE `bigquery-public-data.thelook_ecommerce.users`
ALTER TABLE `{{ var.value.gcp_project }}.thelook_ecommerce.users` ADD COLUMN IF NOT EXISTS user_geom GEOGRAPHY;
UPDATE `{{ var.value.gcp_project }}.thelook_ecommerce.users`
SET user_geom = SAFE.ST_GeogFromText(CONCAT('POINT(',CAST(longitude AS STRING), ' ', CAST(latitude as STRING), ')'))
WHERE longitude IS NOT NULL AND latitude IS NOT NULL;
useLegacySql: False

- operator: "BigQueryInsertJobOperator"
description: "Task to create the distribution center geom column from the latitude and longitude columns"

args:
task_id: "create_distribution_center_geom_column"

# Query that creates the column as a GEOGRAPHY type, then populates it with a geographic point based on the longitude and latitude values
configuration:
query: |-
ALTER TABLE `bigquery-public-data.thelook_ecommerce.distribution_centers`
ADD COLUMN IF NOT EXISTS distribution_center_geom GEOGRAPHY;
UPDATE `bigquery-public-data.thelook_ecommerce.distribution_centers`
SET distribution_center_geom = SAFE.ST_GeogFromText(CONCAT('POINT(',CAST(longitude AS STRING), ' ', CAST(latitude as STRING), ')'))
WHERE longitude IS NOT NULL
AND latitude IS NOT NULL;
# Use Legacy SQL should be false for any query that uses a DML statement
useLegacySql: False
query:
query: |-
ALTER TABLE `{{ var.value.gcp_project }}.thelook_ecommerce.distribution_centers`
ADD COLUMN IF NOT EXISTS distribution_center_geom GEOGRAPHY;
UPDATE `{{ var.value.gcp_project }}.thelook_ecommerce.distribution_centers`
SET distribution_center_geom = SAFE.ST_GeogFromText(CONCAT('POINT(',CAST(longitude AS STRING), ' ', CAST(latitude as STRING), ')'))
WHERE longitude IS NOT NULL
AND latitude IS NOT NULL;
# Use Legacy SQL should be false for any query that uses a DML statement
useLegacySql: False

graph_paths:
- "generate_thelook >> [load_products_to_bq, load_events_to_bq, load_inventory_items_to_bq, load_order_items_to_bq, load_orders_to_bq, load_users_to_bq, load_distribution_centers_to_bq] >> create_user_geom_column >> create_distribution_center_geom_column"
- "create_cluster >> generate_thelook >> delete_cluster >> [load_products_to_bq, load_events_to_bq, load_inventory_items_to_bq, load_order_items_to_bq, load_orders_to_bq, load_users_to_bq, load_distribution_centers_to_bq] >> create_user_geom_column >> create_distribution_center_geom_column"
Loading

0 comments on commit 47a8e76

Please sign in to comment.