Skip to content

Commit

Permalink
Fix: Change to GKEStartPodOperator.
Browse files Browse the repository at this point in the history
  • Loading branch information
nlarge-google committed Nov 8, 2024
1 parent 42e2ad8 commit ca1d39e
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,30 @@ dag:
default_view: graph

tasks:
- operator: "KubernetesPodOperator"
- operator: "GKECreateClusterOperator"
args:
task_id: "create_cluster"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
body:
name: pdp-thelook-ecommerce
initial_node_count: 1
network: "{{ var.value.vpc_network }}"
node_config:
machine_type: e2-highmem-16
oauth_scopes:
- https://www.googleapis.com/auth/devstorage.read_write
- https://www.googleapis.com/auth/cloud-platform
- operator: "GKEStartPodOperator"
description: "Run CSV transform within kubernetes pod"
args:
task_id: "generate_thelook"
is_delete_operator_pod: False
name: "generate_thelook"
namespace: "composer-user-workloads"
service_account_name: "default"
config_file: "/home/airflow/composer_kube_config"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
cluster_name: pdp-thelook-ecommerce
namespace: "default"
image_pull_policy: "Always"
image: "{{ var.json.thelook_ecommerce.docker_image }}"
env_vars:
Expand All @@ -68,7 +83,12 @@ dag:
TARGET_GCS_PREFIX: "data/thelook_ecommerce"
SOURCE_DIR: "data"
EXTRANEOUS_HEADERS: '["event_type", "ip_address", "browser", "traffic_source", "session_id", "sequence_number", "uri", "is_sold"]'

- operator: "GKEDeleteClusterOperator"
args:
task_id: "delete_cluster"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
name: pdp-thelook-ecommerce
- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load Products data to a BigQuery table"
args:
Expand Down Expand Up @@ -107,7 +127,6 @@ dag:
- name: "distribution_center_id"
type: "INTEGER"
mode: "NULLABLE"

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load Events data to a BigQuery table"
args:
Expand Down Expand Up @@ -158,7 +177,6 @@ dag:
- name: "event_type"
type: "STRING"
mode: "NULLABLE"

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load Iventory Items data to a BigQuery table"
args:
Expand Down Expand Up @@ -206,7 +224,6 @@ dag:
- name: "product_distribution_center_id"
type: "INTEGER"
mode: "NULLABLE"

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load Order Items data to a BigQuery table"
args:
Expand Down Expand Up @@ -251,7 +268,6 @@ dag:
- name: "sale_price"
type: "FLOAT"
mode: "NULLABLE"

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load Orders data to a BigQuery table"
args:
Expand Down Expand Up @@ -290,7 +306,6 @@ dag:
- name: "num_of_item"
type: "INTEGER"
mode: "NULLABLE"

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load Users data to a BigQuery table"
args:
Expand Down Expand Up @@ -347,7 +362,6 @@ dag:
- name: "created_at"
type: "TIMESTAMP"
mode: "NULLABLE"

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load Distribution Centers data to a BigQuery table"
args:
Expand All @@ -371,7 +385,6 @@ dag:
- name: "longitude"
type: "FLOAT"
mode: "NULLABLE"

- operator: "BigQueryInsertJobOperator"
description: "Task to create the user geom column from the latitude and longitude columns"
args:
Expand All @@ -384,7 +397,6 @@ dag:
SET user_geom = SAFE.ST_GeogFromText(CONCAT('POINT(',CAST(longitude AS STRING), ' ', CAST(latitude as STRING), ')'))
WHERE longitude IS NOT NULL AND latitude IS NOT NULL;
useLegacySql: False

- operator: "BigQueryInsertJobOperator"
description: "Task to create the distribution center geom column from the latitude and longitude columns"
args:
Expand All @@ -401,4 +413,4 @@ dag:
useLegacySql: False

graph_paths:
- "generate_thelook >> [load_products_to_bq, load_events_to_bq, load_inventory_items_to_bq, load_order_items_to_bq, load_orders_to_bq, load_users_to_bq, load_distribution_centers_to_bq] >> create_user_geom_column >> create_distribution_center_geom_column"
- "create_cluster >> generate_thelook >> delete_cluster >> [load_products_to_bq, load_events_to_bq, load_inventory_items_to_bq, load_order_items_to_bq, load_orders_to_bq, load_users_to_bq, load_distribution_centers_to_bq] >> create_user_geom_column >> create_distribution_center_geom_column"
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@


from airflow import DAG
from airflow.providers.cncf.kubernetes.operators import kubernetes_pod
from airflow.providers.google.cloud.operators import bigquery
from airflow.providers.google.cloud.operators import bigquery, kubernetes_engine
from airflow.providers.google.cloud.transfers import gcs_to_bigquery

default_args = {
Expand All @@ -33,15 +32,33 @@
catchup=False,
default_view="graph",
) as dag:
create_cluster = kubernetes_engine.GKECreateClusterOperator(
task_id="create_cluster",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
body={
"name": "pdp-thelook-ecommerce",
"initial_node_count": 1,
"network": "{{ var.value.vpc_network }}",
"node_config": {
"machine_type": "e2-highmem-16",
"oauth_scopes": [
"https://www.googleapis.com/auth/devstorage.read_write",
"https://www.googleapis.com/auth/cloud-platform",
],
},
},
)

# Run CSV transform within kubernetes pod
generate_thelook = kubernetes_pod.KubernetesPodOperator(
generate_thelook = kubernetes_engine.GKEStartPodOperator(
task_id="generate_thelook",
is_delete_operator_pod=False,
name="generate_thelook",
namespace="composer-user-workloads",
service_account_name="default",
config_file="/home/airflow/composer_kube_config",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="pdp-thelook-ecommerce",
namespace="default",
image_pull_policy="Always",
image="{{ var.json.thelook_ecommerce.docker_image }}",
env_vars={
Expand All @@ -53,6 +70,12 @@
"EXTRANEOUS_HEADERS": '["event_type", "ip_address", "browser", "traffic_source", "session_id", "sequence_number", "uri", "is_sold"]',
},
)
delete_cluster = kubernetes_engine.GKEDeleteClusterOperator(
task_id="delete_cluster",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
name="pdp-thelook-ecommerce",
)

# Task to load Products data to a BigQuery table
load_products_to_bq = gcs_to_bigquery.GCSToBigQueryOperator(
Expand Down Expand Up @@ -243,7 +266,9 @@
)

(
generate_thelook
create_cluster
>> generate_thelook
>> delete_cluster
>> [
load_products_to_bq,
load_events_to_bq,
Expand Down

0 comments on commit ca1d39e

Please sign in to comment.