diff --git a/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/pipeline.yaml b/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/pipeline.yaml index 1a06963f4..082452847 100644 --- a/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/pipeline.yaml +++ b/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/pipeline.yaml @@ -17,27 +17,21 @@ resources: - type: bigquery_table table_id: products description: "The Look fictitious e-commerce dataset - products table" - - type: bigquery_table table_id: events description: "Programatically generated web events for The Look fictitious e-commerce store" - - type: bigquery_table table_id: users description: "Programatically generated users for The Look fictitious e-commerce store" - - type: bigquery_table table_id: orders description: "Programatically generated orders for The Look fictitious e-commerce store" - - type: bigquery_table table_id: order_items description: "Programatically generated order items for The Look fictitious e-commerce store" - - type: bigquery_table table_id: inventory_items description: "Programatically generated inventory for The Look fictitious e-commerce store" - - type: bigquery_table table_id: distribution_centers description: "The Look fictitious e-commerce dataset: distribution_centers table" @@ -48,8 +42,6 @@ dag: dag_id: thelook_ecommerce default_args: owner: "Google" - - # When set to True, keeps a task from getting triggered if the previous schedule for the task hasn’t succeeded depends_on_past: False start_date: "2021-02-09" max_active_runs: 1 @@ -58,25 +50,32 @@ dag: default_view: graph tasks: - - operator: "KubernetesPodOperator" - - # Task description + - operator: "GKECreateClusterOperator" + args: + task_id: "create_cluster" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + body: + name: pdp-thelook-ecommerce + initial_node_count: 1 + network: "{{ var.value.vpc_network }}" + node_config: + machine_type: e2-highmem-16 + oauth_scopes: + - https://www.googleapis.com/auth/devstorage.read_write + - https://www.googleapis.com/auth/cloud-platform + - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" - args: task_id: "generate_thelook" is_delete_operator_pod: False - # The name of the pod in which the task will run. This will be used (plus a random suffix) to generate a pod id name: "generate_thelook" - namespace: "composer" - service_account_name: "datasets" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-thelook-ecommerce + namespace: "default" image_pull_policy: "Always" - - # Docker images will be built and pushed to GCR by default whenever the `scripts/generate_dag.py` is run. To skip building and pushing images, use the optional `--skip-builds` flag. image: "{{ var.json.thelook_ecommerce.docker_image }}" - # image: !IMAGE run_thelook_kub - - # Set the environment variables you need initialized in the container. Use these as input variables for the script your container is expected to perform. env_vars: NUM_OF_USERS: "100000" NUM_OF_GHOST_EVENTS: "5" @@ -84,37 +83,22 @@ dag: TARGET_GCS_PREFIX: "data/thelook_ecommerce" SOURCE_DIR: "data" EXTRANEOUS_HEADERS: '["event_type", "ip_address", "browser", "traffic_source", "session_id", "sequence_number", "uri", "is_sold"]' - - resources: - request_memory: "8G" - request_cpu: "2" - request_ephemeral_storage: "10G" - + - operator: "GKEDeleteClusterOperator" + args: + task_id: "delete_cluster" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + name: pdp-thelook-ecommerce - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load Products data to a BigQuery table" - args: task_id: "load_products_to_bq" - - # The GCS bucket where the CSV file is located in. bucket: "{{ var.value.composer_bucket }}" - - # The GCS object path for the CSV file source_objects: ["data/thelook_ecommerce/products.csv"] source_format: "CSV" destination_project_dataset_table: "thelook_ecommerce.products" - - # Use this if your CSV file contains a header row skip_leading_rows: 1 - - # How to write data to the table: overwrite, append, or write if empty - # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition write_disposition: "WRITE_TRUNCATE" - - # The BigQuery table schema based on the CSV file. For more info, see - # https://cloud.google.com/bigquery/docs/schemas. - # Always use snake_case and lowercase for column names, and be explicit, - # i.e. specify modes for all columns. schema_fields: - name: "id" type: "INTEGER" @@ -143,32 +127,16 @@ dag: - name: "distribution_center_id" type: "INTEGER" mode: "NULLABLE" - - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load Events data to a BigQuery table" - args: task_id: "load_events_to_bq" - - # The GCS bucket where the CSV file is located in. bucket: "{{ var.value.composer_bucket }}" - - # The GCS object path for the CSV file source_objects: ["data/thelook_ecommerce/events.csv"] source_format: "CSV" destination_project_dataset_table: "thelook_ecommerce.events" - - # Use this if your CSV file contains a header row skip_leading_rows: 1 - - # How to write data to the table: overwrite, append, or write if empty - # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition write_disposition: "WRITE_TRUNCATE" - - # The BigQuery table schema based on the CSV file. For more info, see - # https://cloud.google.com/bigquery/docs/schemas. - # Always use snake_case and lowercase for column names, and be explicit, - # i.e. specify modes for all columns. schema_fields: - name: "id" type: "INTEGER" @@ -209,32 +177,16 @@ dag: - name: "event_type" type: "STRING" mode: "NULLABLE" - - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load Iventory Items data to a BigQuery table" - args: task_id: "load_inventory_items_to_bq" - - # The GCS bucket where the CSV file is located in. bucket: "{{ var.value.composer_bucket }}" - - # The GCS object path for the CSV file source_objects: ["data/thelook_ecommerce/inventory_items.csv"] source_format: "CSV" destination_project_dataset_table: "thelook_ecommerce.inventory_items" - - # Use this if your CSV file contains a header row skip_leading_rows: 1 - - # How to write data to the table: overwrite, append, or write if empty - # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition write_disposition: "WRITE_TRUNCATE" - - # The BigQuery table schema based on the CSV file. For more info, see - # https://cloud.google.com/bigquery/docs/schemas. - # Always use snake_case and lowercase for column names, and be explicit, - # i.e. specify modes for all columns. schema_fields: - name: "id" type: "INTEGER" @@ -272,32 +224,16 @@ dag: - name: "product_distribution_center_id" type: "INTEGER" mode: "NULLABLE" - - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load Order Items data to a BigQuery table" - args: task_id: "load_order_items_to_bq" - - # The GCS bucket where the CSV file is located in. bucket: "{{ var.value.composer_bucket }}" - - # The GCS object path for the CSV file source_objects: ["data/thelook_ecommerce/order_items.csv"] source_format: "CSV" destination_project_dataset_table: "thelook_ecommerce.order_items" - - # Use this if your CSV file contains a header row skip_leading_rows: 1 - - # How to write data to the table: overwrite, append, or write if empty - # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition write_disposition: "WRITE_TRUNCATE" - - # The BigQuery table schema based on the CSV file. For more info, see - # https://cloud.google.com/bigquery/docs/schemas. - # Always use snake_case and lowercase for column names, and be explicit, - # i.e. specify modes for all columns. schema_fields: - name: "id" type: "INTEGER" @@ -332,32 +268,16 @@ dag: - name: "sale_price" type: "FLOAT" mode: "NULLABLE" - - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load Orders data to a BigQuery table" - args: task_id: "load_orders_to_bq" - - # The GCS bucket where the CSV file is located in. bucket: "{{ var.value.composer_bucket }}" - - # The GCS object path for the CSV file source_objects: ["data/thelook_ecommerce/orders.csv"] source_format: "CSV" destination_project_dataset_table: "thelook_ecommerce.orders" - - # Use this if your CSV file contains a header row skip_leading_rows: 1 - - # How to write data to the table: overwrite, append, or write if empty - # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition write_disposition: "WRITE_TRUNCATE" - - # The BigQuery table schema based on the CSV file. For more info, see - # https://cloud.google.com/bigquery/docs/schemas. - # Always use snake_case and lowercase for column names, and be explicit, - # i.e. specify modes for all columns. schema_fields: - name: "order_id" type: "INTEGER" @@ -386,32 +306,16 @@ dag: - name: "num_of_item" type: "INTEGER" mode: "NULLABLE" - - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load Users data to a BigQuery table" - args: task_id: "load_users_to_bq" - - # The GCS bucket where the CSV file is located in. bucket: "{{ var.value.composer_bucket }}" - - # The GCS object path for the CSV file source_objects: ["data/thelook_ecommerce/users.csv"] source_format: "CSV" destination_project_dataset_table: "thelook_ecommerce.users" - - # Use this if your CSV file contains a header row skip_leading_rows: 1 - - # How to write data to the table: overwrite, append, or write if empty - # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition write_disposition: "WRITE_TRUNCATE" - - # The BigQuery table schema based on the CSV file. For more info, see - # https://cloud.google.com/bigquery/docs/schemas. - # Always use snake_case and lowercase for column names, and be explicit, - # i.e. specify modes for all columns. schema_fields: - name: "id" type: "INTEGER" @@ -458,32 +362,16 @@ dag: - name: "created_at" type: "TIMESTAMP" mode: "NULLABLE" - - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load Distribution Centers data to a BigQuery table" - args: task_id: "load_distribution_centers_to_bq" - - # The GCS bucket where the CSV file is located in. bucket: "{{ var.value.composer_bucket }}" - - # The GCS object path for the CSV file source_objects: ["data/thelook_ecommerce/distribution_centers.csv"] source_format: "CSV" destination_project_dataset_table: "thelook_ecommerce.distribution_centers" - - # Use this if your CSV file contains a header row skip_leading_rows: 1 - - # How to write data to the table: overwrite, append, or write if empty - # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition write_disposition: "WRITE_TRUNCATE" - - # The BigQuery table schema based on the CSV file. For more info, see - # https://cloud.google.com/bigquery/docs/schemas. - # Always use snake_case and lowercase for column names, and be explicit, - # i.e. specify modes for all columns. schema_fields: - name: "id" type: "INTEGER" @@ -497,40 +385,33 @@ dag: - name: "longitude" type: "FLOAT" mode: "NULLABLE" - - operator: "BigQueryInsertJobOperator" description: "Task to create the user geom column from the latitude and longitude columns" - args: task_id: "create_user_geom_column" - - # Query that creates the column as a GEOGRAPHY type, then populates it with a geographic point based on the longitude and latitude values configuration: query: query: |- - ALTER TABLE `bigquery-public-data.thelook_ecommerce.users` ADD COLUMN IF NOT EXISTS user_geom GEOGRAPHY; - UPDATE `bigquery-public-data.thelook_ecommerce.users` + ALTER TABLE `{{ var.value.gcp_project }}.thelook_ecommerce.users` ADD COLUMN IF NOT EXISTS user_geom GEOGRAPHY; + UPDATE `{{ var.value.gcp_project }}.thelook_ecommerce.users` SET user_geom = SAFE.ST_GeogFromText(CONCAT('POINT(',CAST(longitude AS STRING), ' ', CAST(latitude as STRING), ')')) WHERE longitude IS NOT NULL AND latitude IS NOT NULL; useLegacySql: False - - operator: "BigQueryInsertJobOperator" description: "Task to create the distribution center geom column from the latitude and longitude columns" - args: task_id: "create_distribution_center_geom_column" - - # Query that creates the column as a GEOGRAPHY type, then populates it with a geographic point based on the longitude and latitude values configuration: - query: |- - ALTER TABLE `bigquery-public-data.thelook_ecommerce.distribution_centers` - ADD COLUMN IF NOT EXISTS distribution_center_geom GEOGRAPHY; - UPDATE `bigquery-public-data.thelook_ecommerce.distribution_centers` - SET distribution_center_geom = SAFE.ST_GeogFromText(CONCAT('POINT(',CAST(longitude AS STRING), ' ', CAST(latitude as STRING), ')')) - WHERE longitude IS NOT NULL - AND latitude IS NOT NULL; - # Use Legacy SQL should be false for any query that uses a DML statement - useLegacySql: False + query: + query: |- + ALTER TABLE `{{ var.value.gcp_project }}.thelook_ecommerce.distribution_centers` + ADD COLUMN IF NOT EXISTS distribution_center_geom GEOGRAPHY; + UPDATE `{{ var.value.gcp_project }}.thelook_ecommerce.distribution_centers` + SET distribution_center_geom = SAFE.ST_GeogFromText(CONCAT('POINT(',CAST(longitude AS STRING), ' ', CAST(latitude as STRING), ')')) + WHERE longitude IS NOT NULL + AND latitude IS NOT NULL; + # Use Legacy SQL should be false for any query that uses a DML statement + useLegacySql: False graph_paths: - - "generate_thelook >> [load_products_to_bq, load_events_to_bq, load_inventory_items_to_bq, load_order_items_to_bq, load_orders_to_bq, load_users_to_bq, load_distribution_centers_to_bq] >> create_user_geom_column >> create_distribution_center_geom_column" + - "create_cluster >> generate_thelook >> delete_cluster >> [load_products_to_bq, load_events_to_bq, load_inventory_items_to_bq, load_order_items_to_bq, load_orders_to_bq, load_users_to_bq, load_distribution_centers_to_bq] >> create_user_geom_column >> create_distribution_center_geom_column" diff --git a/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/thelook_ecommerce_dag.py b/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/thelook_ecommerce_dag.py index da95bec80..1c47b471a 100644 --- a/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/thelook_ecommerce_dag.py +++ b/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/thelook_ecommerce_dag.py @@ -14,8 +14,7 @@ from airflow import DAG -from airflow.providers.cncf.kubernetes.operators import kubernetes_pod -from airflow.providers.google.cloud.operators import bigquery +from airflow.providers.google.cloud.operators import bigquery, kubernetes_engine from airflow.providers.google.cloud.transfers import gcs_to_bigquery default_args = { @@ -33,14 +32,33 @@ catchup=False, default_view="graph", ) as dag: + create_cluster = kubernetes_engine.GKECreateClusterOperator( + task_id="create_cluster", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + body={ + "name": "pdp-thelook-ecommerce", + "initial_node_count": 1, + "network": "{{ var.value.vpc_network }}", + "node_config": { + "machine_type": "e2-highmem-16", + "oauth_scopes": [ + "https://www.googleapis.com/auth/devstorage.read_write", + "https://www.googleapis.com/auth/cloud-platform", + ], + }, + }, + ) # Run CSV transform within kubernetes pod - generate_thelook = kubernetes_pod.KubernetesPodOperator( + generate_thelook = kubernetes_engine.GKEStartPodOperator( task_id="generate_thelook", is_delete_operator_pod=False, name="generate_thelook", - namespace="composer", - service_account_name="datasets", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-thelook-ecommerce", + namespace="default", image_pull_policy="Always", image="{{ var.json.thelook_ecommerce.docker_image }}", env_vars={ @@ -51,11 +69,12 @@ "SOURCE_DIR": "data", "EXTRANEOUS_HEADERS": '["event_type", "ip_address", "browser", "traffic_source", "session_id", "sequence_number", "uri", "is_sold"]', }, - resources={ - "request_memory": "8G", - "request_cpu": "2", - "request_ephemeral_storage": "10G", - }, + ) + delete_cluster = kubernetes_engine.GKEDeleteClusterOperator( + task_id="delete_cluster", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + name="pdp-thelook-ecommerce", ) # Task to load Products data to a BigQuery table @@ -231,7 +250,7 @@ task_id="create_user_geom_column", configuration={ "query": { - "query": "ALTER TABLE `bigquery-public-data.thelook_ecommerce.users` ADD COLUMN IF NOT EXISTS user_geom GEOGRAPHY;\nUPDATE `bigquery-public-data.thelook_ecommerce.users`\n SET user_geom = SAFE.ST_GeogFromText(CONCAT('POINT(',CAST(longitude AS STRING), ' ', CAST(latitude as STRING), ')'))\n WHERE longitude IS NOT NULL AND latitude IS NOT NULL;", + "query": "ALTER TABLE `{{ var.value.gcp_project }}.thelook_ecommerce.users` ADD COLUMN IF NOT EXISTS user_geom GEOGRAPHY;\nUPDATE `{{ var.value.gcp_project }}.thelook_ecommerce.users`\n SET user_geom = SAFE.ST_GeogFromText(CONCAT('POINT(',CAST(longitude AS STRING), ' ', CAST(latitude as STRING), ')'))\n WHERE longitude IS NOT NULL AND latitude IS NOT NULL;", "useLegacySql": False, } }, @@ -241,13 +260,17 @@ create_distribution_center_geom_column = bigquery.BigQueryInsertJobOperator( task_id="create_distribution_center_geom_column", configuration={ - "query": "ALTER TABLE `bigquery-public-data.thelook_ecommerce.distribution_centers`\n ADD COLUMN IF NOT EXISTS distribution_center_geom GEOGRAPHY;\nUPDATE `bigquery-public-data.thelook_ecommerce.distribution_centers`\n SET distribution_center_geom = SAFE.ST_GeogFromText(CONCAT('POINT(',CAST(longitude AS STRING), ' ', CAST(latitude as STRING), ')'))\n WHERE longitude IS NOT NULL\n AND latitude IS NOT NULL;\n# Use Legacy SQL should be false for any query that uses a DML statement", - "useLegacySql": False, + "query": { + "query": "ALTER TABLE `{{ var.value.gcp_project }}.thelook_ecommerce.distribution_centers`\n ADD COLUMN IF NOT EXISTS distribution_center_geom GEOGRAPHY;\nUPDATE `{{ var.value.gcp_project }}.thelook_ecommerce.distribution_centers`\n SET distribution_center_geom = SAFE.ST_GeogFromText(CONCAT('POINT(',CAST(longitude AS STRING), ' ', CAST(latitude as STRING), ')'))\nWHERE longitude IS NOT NULL\n AND latitude IS NOT NULL;\n# Use Legacy SQL should be false for any query that uses a DML statement", + "useLegacySql": False, + } }, ) ( - generate_thelook + create_cluster + >> generate_thelook + >> delete_cluster >> [ load_products_to_bq, load_events_to_bq,