diff --git a/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/pipeline.yaml b/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/pipeline.yaml index 7980bf8bd..b0454bd8d 100644 --- a/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/pipeline.yaml +++ b/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/pipeline.yaml @@ -50,15 +50,30 @@ dag: default_view: graph tasks: - - operator: "KubernetesPodOperator" + - operator: "GKECreateClusterOperator" + args: + task_id: "create_cluster" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + body: + name: pdp-thelook-ecommerce + initial_node_count: 1 + network: "{{ var.value.vpc_network }}" + node_config: + machine_type: e2-highmem-16 + oauth_scopes: + - https://www.googleapis.com/auth/devstorage.read_write + - https://www.googleapis.com/auth/cloud-platform + - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "generate_thelook" is_delete_operator_pod: False name: "generate_thelook" - namespace: "composer-user-workloads" - service_account_name: "default" - config_file: "/home/airflow/composer_kube_config" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-thelook-ecommerce + namespace: "default" image_pull_policy: "Always" image: "{{ var.json.thelook_ecommerce.docker_image }}" env_vars: @@ -68,7 +83,12 @@ dag: TARGET_GCS_PREFIX: "data/thelook_ecommerce" SOURCE_DIR: "data" EXTRANEOUS_HEADERS: '["event_type", "ip_address", "browser", "traffic_source", "session_id", "sequence_number", "uri", "is_sold"]' - + - operator: "GKEDeleteClusterOperator" + args: + task_id: "delete_cluster" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + name: pdp-thelook-ecommerce - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load Products data to a BigQuery table" args: @@ -107,7 +127,6 @@ dag: - name: "distribution_center_id" type: "INTEGER" mode: "NULLABLE" - - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load Events data to a BigQuery table" args: @@ -158,7 +177,6 @@ dag: - name: "event_type" type: "STRING" mode: "NULLABLE" - - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load Iventory Items data to a BigQuery table" args: @@ -206,7 +224,6 @@ dag: - name: "product_distribution_center_id" type: "INTEGER" mode: "NULLABLE" - - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load Order Items data to a BigQuery table" args: @@ -251,7 +268,6 @@ dag: - name: "sale_price" type: "FLOAT" mode: "NULLABLE" - - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load Orders data to a BigQuery table" args: @@ -290,7 +306,6 @@ dag: - name: "num_of_item" type: "INTEGER" mode: "NULLABLE" - - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load Users data to a BigQuery table" args: @@ -347,7 +362,6 @@ dag: - name: "created_at" type: "TIMESTAMP" mode: "NULLABLE" - - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load Distribution Centers data to a BigQuery table" args: @@ -371,7 +385,6 @@ dag: - name: "longitude" type: "FLOAT" mode: "NULLABLE" - - operator: "BigQueryInsertJobOperator" description: "Task to create the user geom column from the latitude and longitude columns" args: @@ -384,7 +397,6 @@ dag: SET user_geom = SAFE.ST_GeogFromText(CONCAT('POINT(',CAST(longitude AS STRING), ' ', CAST(latitude as STRING), ')')) WHERE longitude IS NOT NULL AND latitude IS NOT NULL; useLegacySql: False - - operator: "BigQueryInsertJobOperator" description: "Task to create the distribution center geom column from the latitude and longitude columns" args: @@ -401,4 +413,4 @@ dag: useLegacySql: False graph_paths: - - "generate_thelook >> [load_products_to_bq, load_events_to_bq, load_inventory_items_to_bq, load_order_items_to_bq, load_orders_to_bq, load_users_to_bq, load_distribution_centers_to_bq] >> create_user_geom_column >> create_distribution_center_geom_column" + - "create_cluster >> generate_thelook >> delete_cluster >> [load_products_to_bq, load_events_to_bq, load_inventory_items_to_bq, load_order_items_to_bq, load_orders_to_bq, load_users_to_bq, load_distribution_centers_to_bq] >> create_user_geom_column >> create_distribution_center_geom_column" diff --git a/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/thelook_ecommerce_dag.py b/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/thelook_ecommerce_dag.py index 12931d003..adb4b459d 100644 --- a/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/thelook_ecommerce_dag.py +++ b/datasets/thelook_ecommerce/pipelines/thelook_ecommerce/thelook_ecommerce_dag.py @@ -14,8 +14,7 @@ from airflow import DAG -from airflow.providers.cncf.kubernetes.operators import kubernetes_pod -from airflow.providers.google.cloud.operators import bigquery +from airflow.providers.google.cloud.operators import bigquery, kubernetes_engine from airflow.providers.google.cloud.transfers import gcs_to_bigquery default_args = { @@ -33,15 +32,33 @@ catchup=False, default_view="graph", ) as dag: + create_cluster = kubernetes_engine.GKECreateClusterOperator( + task_id="create_cluster", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + body={ + "name": "pdp-thelook-ecommerce", + "initial_node_count": 1, + "network": "{{ var.value.vpc_network }}", + "node_config": { + "machine_type": "e2-highmem-16", + "oauth_scopes": [ + "https://www.googleapis.com/auth/devstorage.read_write", + "https://www.googleapis.com/auth/cloud-platform", + ], + }, + }, + ) # Run CSV transform within kubernetes pod - generate_thelook = kubernetes_pod.KubernetesPodOperator( + generate_thelook = kubernetes_engine.GKEStartPodOperator( task_id="generate_thelook", is_delete_operator_pod=False, name="generate_thelook", - namespace="composer-user-workloads", - service_account_name="default", - config_file="/home/airflow/composer_kube_config", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-thelook-ecommerce", + namespace="default", image_pull_policy="Always", image="{{ var.json.thelook_ecommerce.docker_image }}", env_vars={ @@ -53,6 +70,12 @@ "EXTRANEOUS_HEADERS": '["event_type", "ip_address", "browser", "traffic_source", "session_id", "sequence_number", "uri", "is_sold"]', }, ) + delete_cluster = kubernetes_engine.GKEDeleteClusterOperator( + task_id="delete_cluster", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + name="pdp-thelook-ecommerce", + ) # Task to load Products data to a BigQuery table load_products_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( @@ -243,7 +266,9 @@ ) ( - generate_thelook + create_cluster + >> generate_thelook + >> delete_cluster >> [ load_products_to_bq, load_events_to_bq,