Skip to content

Commit

Permalink
Fix: Migrate The Look Ecommerce DAGs
Browse files Browse the repository at this point in the history
  • Loading branch information
nlarge-google committed Nov 8, 2024
1 parent c0526ac commit 42e2ad8
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 148 deletions.
146 changes: 7 additions & 139 deletions datasets/thelook_ecommerce/pipelines/thelook_ecommerce/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,21 @@ resources:
- type: bigquery_table
table_id: products
description: "The Look fictitious e-commerce dataset - products table"

- type: bigquery_table
table_id: events
description: "Programatically generated web events for The Look fictitious e-commerce store"

- type: bigquery_table
table_id: users
description: "Programatically generated users for The Look fictitious e-commerce store"

- type: bigquery_table
table_id: orders
description: "Programatically generated orders for The Look fictitious e-commerce store"

- type: bigquery_table
table_id: order_items
description: "Programatically generated order items for The Look fictitious e-commerce store"

- type: bigquery_table
table_id: inventory_items
description: "Programatically generated inventory for The Look fictitious e-commerce store"

- type: bigquery_table
table_id: distribution_centers
description: "The Look fictitious e-commerce dataset: distribution_centers table"
Expand All @@ -48,8 +42,6 @@ dag:
dag_id: thelook_ecommerce
default_args:
owner: "Google"

# When set to True, keeps a task from getting triggered if the previous schedule for the task hasn’t succeeded
depends_on_past: False
start_date: "2021-02-09"
max_active_runs: 1
Expand All @@ -59,24 +51,16 @@ dag:

tasks:
- operator: "KubernetesPodOperator"

# Task description
description: "Run CSV transform within kubernetes pod"

args:
task_id: "generate_thelook"
is_delete_operator_pod: False
# The name of the pod in which the task will run. This will be used (plus a random suffix) to generate a pod id
name: "generate_thelook"
namespace: "composer"
service_account_name: "datasets"
namespace: "composer-user-workloads"
service_account_name: "default"
config_file: "/home/airflow/composer_kube_config"
image_pull_policy: "Always"

# Docker images will be built and pushed to GCR by default whenever the `scripts/generate_dag.py` is run. To skip building and pushing images, use the optional `--skip-builds` flag.
image: "{{ var.json.thelook_ecommerce.docker_image }}"
# image: !IMAGE run_thelook_kub

# Set the environment variables you need initialized in the container. Use these as input variables for the script your container is expected to perform.
env_vars:
NUM_OF_USERS: "100000"
NUM_OF_GHOST_EVENTS: "5"
Expand All @@ -85,36 +69,16 @@ dag:
SOURCE_DIR: "data"
EXTRANEOUS_HEADERS: '["event_type", "ip_address", "browser", "traffic_source", "session_id", "sequence_number", "uri", "is_sold"]'

resources:
request_memory: "8G"
request_cpu: "2"
request_ephemeral_storage: "10G"

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load Products data to a BigQuery table"

args:
task_id: "load_products_to_bq"

# The GCS bucket where the CSV file is located in.
bucket: "{{ var.value.composer_bucket }}"

# The GCS object path for the CSV file
source_objects: ["data/thelook_ecommerce/products.csv"]
source_format: "CSV"
destination_project_dataset_table: "thelook_ecommerce.products"

# Use this if your CSV file contains a header row
skip_leading_rows: 1

# How to write data to the table: overwrite, append, or write if empty
# See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition
write_disposition: "WRITE_TRUNCATE"

# The BigQuery table schema based on the CSV file. For more info, see
# https://cloud.google.com/bigquery/docs/schemas.
# Always use snake_case and lowercase for column names, and be explicit,
# i.e. specify modes for all columns.
schema_fields:
- name: "id"
type: "INTEGER"
Expand Down Expand Up @@ -146,29 +110,14 @@ dag:

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load Events data to a BigQuery table"

args:
task_id: "load_events_to_bq"

# The GCS bucket where the CSV file is located in.
bucket: "{{ var.value.composer_bucket }}"

# The GCS object path for the CSV file
source_objects: ["data/thelook_ecommerce/events.csv"]
source_format: "CSV"
destination_project_dataset_table: "thelook_ecommerce.events"

# Use this if your CSV file contains a header row
skip_leading_rows: 1

# How to write data to the table: overwrite, append, or write if empty
# See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition
write_disposition: "WRITE_TRUNCATE"

# The BigQuery table schema based on the CSV file. For more info, see
# https://cloud.google.com/bigquery/docs/schemas.
# Always use snake_case and lowercase for column names, and be explicit,
# i.e. specify modes for all columns.
schema_fields:
- name: "id"
type: "INTEGER"
Expand Down Expand Up @@ -212,29 +161,14 @@ dag:

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load Iventory Items data to a BigQuery table"

args:
task_id: "load_inventory_items_to_bq"

# The GCS bucket where the CSV file is located in.
bucket: "{{ var.value.composer_bucket }}"

# The GCS object path for the CSV file
source_objects: ["data/thelook_ecommerce/inventory_items.csv"]
source_format: "CSV"
destination_project_dataset_table: "thelook_ecommerce.inventory_items"

# Use this if your CSV file contains a header row
skip_leading_rows: 1

# How to write data to the table: overwrite, append, or write if empty
# See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition
write_disposition: "WRITE_TRUNCATE"

# The BigQuery table schema based on the CSV file. For more info, see
# https://cloud.google.com/bigquery/docs/schemas.
# Always use snake_case and lowercase for column names, and be explicit,
# i.e. specify modes for all columns.
schema_fields:
- name: "id"
type: "INTEGER"
Expand Down Expand Up @@ -275,29 +209,14 @@ dag:

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load Order Items data to a BigQuery table"

args:
task_id: "load_order_items_to_bq"

# The GCS bucket where the CSV file is located in.
bucket: "{{ var.value.composer_bucket }}"

# The GCS object path for the CSV file
source_objects: ["data/thelook_ecommerce/order_items.csv"]
source_format: "CSV"
destination_project_dataset_table: "thelook_ecommerce.order_items"

# Use this if your CSV file contains a header row
skip_leading_rows: 1

# How to write data to the table: overwrite, append, or write if empty
# See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition
write_disposition: "WRITE_TRUNCATE"

# The BigQuery table schema based on the CSV file. For more info, see
# https://cloud.google.com/bigquery/docs/schemas.
# Always use snake_case and lowercase for column names, and be explicit,
# i.e. specify modes for all columns.
schema_fields:
- name: "id"
type: "INTEGER"
Expand Down Expand Up @@ -335,29 +254,14 @@ dag:

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load Orders data to a BigQuery table"

args:
task_id: "load_orders_to_bq"

# The GCS bucket where the CSV file is located in.
bucket: "{{ var.value.composer_bucket }}"

# The GCS object path for the CSV file
source_objects: ["data/thelook_ecommerce/orders.csv"]
source_format: "CSV"
destination_project_dataset_table: "thelook_ecommerce.orders"

# Use this if your CSV file contains a header row
skip_leading_rows: 1

# How to write data to the table: overwrite, append, or write if empty
# See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition
write_disposition: "WRITE_TRUNCATE"

# The BigQuery table schema based on the CSV file. For more info, see
# https://cloud.google.com/bigquery/docs/schemas.
# Always use snake_case and lowercase for column names, and be explicit,
# i.e. specify modes for all columns.
schema_fields:
- name: "order_id"
type: "INTEGER"
Expand Down Expand Up @@ -389,29 +293,14 @@ dag:

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load Users data to a BigQuery table"

args:
task_id: "load_users_to_bq"

# The GCS bucket where the CSV file is located in.
bucket: "{{ var.value.composer_bucket }}"

# The GCS object path for the CSV file
source_objects: ["data/thelook_ecommerce/users.csv"]
source_format: "CSV"
destination_project_dataset_table: "thelook_ecommerce.users"

# Use this if your CSV file contains a header row
skip_leading_rows: 1

# How to write data to the table: overwrite, append, or write if empty
# See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition
write_disposition: "WRITE_TRUNCATE"

# The BigQuery table schema based on the CSV file. For more info, see
# https://cloud.google.com/bigquery/docs/schemas.
# Always use snake_case and lowercase for column names, and be explicit,
# i.e. specify modes for all columns.
schema_fields:
- name: "id"
type: "INTEGER"
Expand Down Expand Up @@ -461,29 +350,14 @@ dag:

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load Distribution Centers data to a BigQuery table"

args:
task_id: "load_distribution_centers_to_bq"

# The GCS bucket where the CSV file is located in.
bucket: "{{ var.value.composer_bucket }}"

# The GCS object path for the CSV file
source_objects: ["data/thelook_ecommerce/distribution_centers.csv"]
source_format: "CSV"
destination_project_dataset_table: "thelook_ecommerce.distribution_centers"

# Use this if your CSV file contains a header row
skip_leading_rows: 1

# How to write data to the table: overwrite, append, or write if empty
# See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition
write_disposition: "WRITE_TRUNCATE"

# The BigQuery table schema based on the CSV file. For more info, see
# https://cloud.google.com/bigquery/docs/schemas.
# Always use snake_case and lowercase for column names, and be explicit,
# i.e. specify modes for all columns.
schema_fields:
- name: "id"
type: "INTEGER"
Expand All @@ -500,32 +374,26 @@ dag:

- operator: "BigQueryInsertJobOperator"
description: "Task to create the user geom column from the latitude and longitude columns"

args:
task_id: "create_user_geom_column"

# Query that creates the column as a GEOGRAPHY type, then populates it with a geographic point based on the longitude and latitude values
configuration:
query:
query: |-
ALTER TABLE `bigquery-public-data.thelook_ecommerce.users` ADD COLUMN IF NOT EXISTS user_geom GEOGRAPHY;
UPDATE `bigquery-public-data.thelook_ecommerce.users`
ALTER TABLE `{{ var.value.gcp_project }}.thelook_ecommerce.users` ADD COLUMN IF NOT EXISTS user_geom GEOGRAPHY;
UPDATE `{{ var.value.gcp_project }}.thelook_ecommerce.users`
SET user_geom = SAFE.ST_GeogFromText(CONCAT('POINT(',CAST(longitude AS STRING), ' ', CAST(latitude as STRING), ')'))
WHERE longitude IS NOT NULL AND latitude IS NOT NULL;
useLegacySql: False

- operator: "BigQueryInsertJobOperator"
description: "Task to create the distribution center geom column from the latitude and longitude columns"

args:
task_id: "create_distribution_center_geom_column"

# Query that creates the column as a GEOGRAPHY type, then populates it with a geographic point based on the longitude and latitude values
configuration:
query: |-
ALTER TABLE `bigquery-public-data.thelook_ecommerce.distribution_centers`
ALTER TABLE `{{ var.value.gcp_project }}.thelook_ecommerce.distribution_centers`
ADD COLUMN IF NOT EXISTS distribution_center_geom GEOGRAPHY;
UPDATE `bigquery-public-data.thelook_ecommerce.distribution_centers`
UPDATE `{{ var.value.gcp_project }}.thelook_ecommerce.distribution_centers`
SET distribution_center_geom = SAFE.ST_GeogFromText(CONCAT('POINT(',CAST(longitude AS STRING), ' ', CAST(latitude as STRING), ')'))
WHERE longitude IS NOT NULL
AND latitude IS NOT NULL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@
task_id="generate_thelook",
is_delete_operator_pod=False,
name="generate_thelook",
namespace="composer",
service_account_name="datasets",
namespace="composer-user-workloads",
service_account_name="default",
config_file="/home/airflow/composer_kube_config",
image_pull_policy="Always",
image="{{ var.json.thelook_ecommerce.docker_image }}",
env_vars={
Expand All @@ -51,11 +52,6 @@
"SOURCE_DIR": "data",
"EXTRANEOUS_HEADERS": '["event_type", "ip_address", "browser", "traffic_source", "session_id", "sequence_number", "uri", "is_sold"]',
},
resources={
"request_memory": "8G",
"request_cpu": "2",
"request_ephemeral_storage": "10G",
},
)

# Task to load Products data to a BigQuery table
Expand Down Expand Up @@ -231,7 +227,7 @@
task_id="create_user_geom_column",
configuration={
"query": {
"query": "ALTER TABLE `bigquery-public-data.thelook_ecommerce.users` ADD COLUMN IF NOT EXISTS user_geom GEOGRAPHY;\nUPDATE `bigquery-public-data.thelook_ecommerce.users`\n SET user_geom = SAFE.ST_GeogFromText(CONCAT('POINT(',CAST(longitude AS STRING), ' ', CAST(latitude as STRING), ')'))\n WHERE longitude IS NOT NULL AND latitude IS NOT NULL;",
"query": "ALTER TABLE `{{ var.value.gcp_project }}.thelook_ecommerce.users` ADD COLUMN IF NOT EXISTS user_geom GEOGRAPHY;\nUPDATE `{{ var.value.gcp_project }}.thelook_ecommerce.users`\n SET user_geom = SAFE.ST_GeogFromText(CONCAT('POINT(',CAST(longitude AS STRING), ' ', CAST(latitude as STRING), ')'))\n WHERE longitude IS NOT NULL AND latitude IS NOT NULL;",
"useLegacySql": False,
}
},
Expand All @@ -241,7 +237,7 @@
create_distribution_center_geom_column = bigquery.BigQueryInsertJobOperator(
task_id="create_distribution_center_geom_column",
configuration={
"query": "ALTER TABLE `bigquery-public-data.thelook_ecommerce.distribution_centers`\n ADD COLUMN IF NOT EXISTS distribution_center_geom GEOGRAPHY;\nUPDATE `bigquery-public-data.thelook_ecommerce.distribution_centers`\n SET distribution_center_geom = SAFE.ST_GeogFromText(CONCAT('POINT(',CAST(longitude AS STRING), ' ', CAST(latitude as STRING), ')'))\n WHERE longitude IS NOT NULL\n AND latitude IS NOT NULL;\n# Use Legacy SQL should be false for any query that uses a DML statement",
"query": "ALTER TABLE `{{ var.value.gcp_project }}.thelook_ecommerce.distribution_centers`\n ADD COLUMN IF NOT EXISTS distribution_center_geom GEOGRAPHY;\nUPDATE `{{ var.value.gcp_project }}.thelook_ecommerce.distribution_centers`\n SET distribution_center_geom = SAFE.ST_GeogFromText(CONCAT('POINT(',CAST(longitude AS STRING), ' ', CAST(latitude as STRING), ')'))\n WHERE longitude IS NOT NULL\n AND latitude IS NOT NULL;\n# Use Legacy SQL should be false for any query that uses a DML statement",
"useLegacySql": False,
},
)
Expand Down

0 comments on commit 42e2ad8

Please sign in to comment.