diff --git a/datasets/san_francisco/pipelines/san_francisco/pipeline.yaml b/datasets/san_francisco/pipelines/san_francisco/pipeline.yaml index a5e072566..aeb438eee 100644 --- a/datasets/san_francisco/pipelines/san_francisco/pipeline.yaml +++ b/datasets/san_francisco/pipelines/san_francisco/pipeline.yaml @@ -30,14 +30,31 @@ dag: catchup: False default_view: graph tasks: - - operator: "KubernetesPodOperator" + - operator: "GKECreateClusterOperator" + args: + task_id: "create_cluster" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + body: + name: pdp-san-francisco + initial_node_count: 2 + network: "{{ var.value.vpc_network }}" + node_config: + machine_type: e2-standard-16 + oauth_scopes: + - https://www.googleapis.com/auth/devstorage.read_write + - https://www.googleapis.com/auth/cloud-platform + + - operator: "GKEStartPodOperator" description: "Run New York 311 Service Requests Pipeline" args: task_id: "sf_311_service_requests" name: "sf_311_service_requests" startup_timeout_seconds: 600 - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-san-francisco image_pull_policy: "Always" image: "{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}" env_vars: @@ -148,17 +165,22 @@ dag: "longitude", "police_district" ] - resources: - limit_memory: "8G" - limit_cpu: "3" - request_ephemeral_storage: "10G" - - operator: "KubernetesPodOperator" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" + - operator: "GKEStartPodOperator" description: "Run San Francisco Municipal Calendar Pipeline" args: task_id: "sf_calendar" name: "calendar" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-san-francisco image_pull_policy: "Always" image: "{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}" env_vars: @@ -215,17 +237,22 @@ dag: "date": "exceptions", "exception_type_str": "exception_type" } - resources: - limit_memory: "8G" - limit_cpu: "3" - request_ephemeral_storage: "10G" - - operator: "KubernetesPodOperator" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" + - operator: "GKEStartPodOperator" description: "Run San Francisco Municipal Routes Pipeline" args: task_id: "sf_muni_routes" name: "muni_routes" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-san-francisco image_pull_policy: "Always" image: "{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}" env_vars: @@ -251,17 +278,22 @@ dag: "route_long_name", "route_type" ] - resources: - limit_memory: "8G" - limit_cpu: "3" - request_ephemeral_storage: "10G" - - operator: "KubernetesPodOperator" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" + - operator: "GKEStartPodOperator" description: "Run San Francisco Municipal Shapes Pipeline" args: task_id: "sf_muni_shapes" name: "muni_shapes" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-san-francisco image_pull_policy: "Always" image: "{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}" env_vars: @@ -296,17 +328,22 @@ dag: "shape_point_geom", "shape_distance_traveled" ] - resources: - limit_memory: "8G" - limit_cpu: "3" - request_ephemeral_storage: "10G" - - operator: "KubernetesPodOperator" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" + - operator: "GKEStartPodOperator" description: "Run San Francisco Municipal Stops Pipeline" args: task_id: "sf_muni_stops" name: "muni_stops" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-san-francisco image_pull_policy: "Always" image: "{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}" env_vars: @@ -333,17 +370,22 @@ dag: "stop_lon", "stop_geom" ] - resources: - limit_memory: "8G" - limit_cpu: "3" - request_ephemeral_storage: "10G" - - operator: "KubernetesPodOperator" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" + - operator: "GKEStartPodOperator" description: "Run San Francisco Municipal Stop Times Pipeline" args: task_id: "sf_muni_stop_times" name: "muni_stop_times" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-san-francisco image_pull_policy: "Always" image: "{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}" env_vars: @@ -423,17 +465,22 @@ dag: "dropoff_type", "exact_timepoint" ] - resources: - limit_memory: "8G" - limit_cpu: "3" - request_ephemeral_storage: "10G" - - operator: "KubernetesPodOperator" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" + - operator: "GKEStartPodOperator" description: "Run San Francisco Municipal Fares Pipeline" args: task_id: "sf_muni_fares" name: "muni_fares" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-san-francisco image_pull_policy: "Always" image: "{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}" env_vars: @@ -476,17 +523,22 @@ dag: "transfers_permitted", "transfer_duration" ] - resources: - limit_memory: "8G" - limit_cpu: "3" - request_ephemeral_storage: "10G" - - operator: "KubernetesPodOperator" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" + - operator: "GKEStartPodOperator" description: "Run San Francisco Municipal Trips Pipeline" args: task_id: "sf_muni_trips" name: "muni_trips" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-san-francisco image_pull_policy: "Always" image: "{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}" env_vars: @@ -530,17 +582,22 @@ dag: "shape_id", "trip_shape" ] - resources: - limit_memory: "8G" - limit_cpu: "3" - request_ephemeral_storage: "10G" - - operator: "KubernetesPodOperator" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" + - operator: "GKEStartPodOperator" description: "Run San Francisco Police Department Incidents Pipeline" args: task_id: "sfpd_incidents" name: "sfpd_incidents" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-san-francisco image_pull_policy: "Always" image: "{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}" env_vars: @@ -599,17 +656,22 @@ dag: "pdid", "timestamp" ] - resources: - limit_memory: "8G" - limit_cpu: "3" - request_ephemeral_storage: "10G" - - operator: "KubernetesPodOperator" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" + - operator: "GKEStartPodOperator" description: "Run San Francisco Bikeshare Stations Pipeline" args: task_id: "sf_bikeshare_stations" name: "bikeshare_stations" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-san-francisco image_pull_policy: "Always" image: "{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}" env_vars: @@ -684,17 +746,22 @@ dag: "has_kiosk", "station_geom" ] - resources: - limit_memory: "8G" - limit_cpu: "3" - request_ephemeral_storage: "10G" - - operator: "KubernetesPodOperator" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" + - operator: "GKEStartPodOperator" description: "Run San Francisco Bikeshare Status Pipeline" args: task_id: "sf_bikeshare_status" name: "bikeshare_status" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-san-francisco image_pull_policy: "Always" image: "{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}" env_vars: @@ -749,17 +816,22 @@ dag: "num_ebikes_available", "eightd_has_available_keys" ] - resources: - limit_memory: "8G" - limit_cpu: "3" - request_ephemeral_storage: "10G" - - operator: "KubernetesPodOperator" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" + - operator: "GKEStartPodOperator" description: "Run San Francisco Bikeshare Trips Pipeline" args: task_id: "sf_bikeshare_trips" name: "sf_bikeshare_trips" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-san-francisco image_pull_policy: "Always" image: "{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}" env_vars: @@ -939,17 +1011,22 @@ dag: "start_station_geom", "end_station_geom" ] - resources: - limit_memory: "8G" - limit_cpu: "3" - request_ephemeral_storage: "10G" - - operator: "KubernetesPodOperator" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" + - operator: "GKEStartPodOperator" description: "Run San Francisco Film Locations Pipeline" args: task_id: "sf_film_locations" name: "sf_film_locations" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-san-francisco image_pull_policy: "Always" image: "{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}" env_vars: @@ -1005,17 +1082,22 @@ dag: "actor_2", "actor_3" ] - resources: - limit_memory: "8G" - limit_cpu: "3" - request_ephemeral_storage: "10G" - - operator: "KubernetesPodOperator" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" + - operator: "GKEStartPodOperator" description: "Run San Francisco Fire Department Service Calls Pipeline" args: task_id: "sffd_service_calls" name: "sffd_service_calls" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-san-francisco image_pull_policy: "Always" image: "{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}" env_vars: @@ -1195,17 +1277,22 @@ dag: "neighborhood_name", "location_geom" ] - resources: - limit_memory: "8G" - limit_cpu: "3" - request_ephemeral_storage: "10G" - - operator: "KubernetesPodOperator" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" + - operator: "GKEStartPodOperator" description: "Run San Francisco Street Trees Pipeline" args: task_id: "sf_street_trees" name: "sf_street_trees" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-san-francisco image_pull_policy: "Always" image: "{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}" env_vars: @@ -1314,10 +1401,20 @@ dag: "longitude", "location" ] - resources: - limit_memory: "8G" - limit_cpu: "3" - request_ephemeral_storage: "10G" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" + + - operator: "GKEDeleteClusterOperator" + args: + task_id: "delete_cluster" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + name: pdp-san-francisco graph_paths: - - "[ sf_bikeshare_stations, sf_bikeshare_status, sf_film_locations, sf_street_trees ] >> sf_bikeshare_trips >> [ sf_calendar, sf_muni_routes, sf_muni_shapes, sf_muni_stops, sf_muni_stop_times, sf_muni_fares, sf_muni_trips ] >> sffd_service_calls >> sfpd_incidents >> sf_311_service_requests" + - "create_cluster >> [ sf_bikeshare_stations, sf_bikeshare_status, sf_film_locations, sf_street_trees ] >> sf_bikeshare_trips >> [ sf_calendar, sf_muni_routes, sf_muni_shapes, sf_muni_stops, sf_muni_stop_times, sf_muni_fares, sf_muni_trips ] >> sffd_service_calls >> sfpd_incidents >> sf_311_service_requests >> delete_cluster" diff --git a/datasets/san_francisco/pipelines/san_francisco/san_francisco_dag.py b/datasets/san_francisco/pipelines/san_francisco/san_francisco_dag.py index b644a1921..90390029f 100644 --- a/datasets/san_francisco/pipelines/san_francisco/san_francisco_dag.py +++ b/datasets/san_francisco/pipelines/san_francisco/san_francisco_dag.py @@ -14,7 +14,7 @@ from airflow import DAG -from airflow.providers.cncf.kubernetes.operators import kubernetes_pod +from airflow.providers.google.cloud.operators import kubernetes_engine default_args = { "owner": "Google", @@ -31,14 +31,33 @@ catchup=False, default_view="graph", ) as dag: + create_cluster = kubernetes_engine.GKECreateClusterOperator( + task_id="create_cluster", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + body={ + "name": "pdp-san-francisco", + "initial_node_count": 2, + "network": "{{ var.value.vpc_network }}", + "node_config": { + "machine_type": "e2-standard-16", + "oauth_scopes": [ + "https://www.googleapis.com/auth/devstorage.read_write", + "https://www.googleapis.com/auth/cloud-platform", + ], + }, + }, + ) # Run New York 311 Service Requests Pipeline - sf_311_service_requests = kubernetes_pod.KubernetesPodOperator( + sf_311_service_requests = kubernetes_engine.GKEStartPodOperator( task_id="sf_311_service_requests", name="sf_311_service_requests", startup_timeout_seconds=600, - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-san-francisco", image_pull_policy="Always", image="{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}", env_vars={ @@ -65,19 +84,21 @@ "DATE_FORMAT_LIST": '{\n "created_date": "%m/%d/%Y %H:%M:%S %p",\n "closed_date": "%m/%d/%Y %H:%M:%S %p",\n "resolution_action_updated_date": "%m/%d/%Y %H:%M:%S %p"\n}', "REORDER_HEADERS_LIST": '[\n "unique_key",\n "created_date",\n "closed_date",\n "resolution_action_updated_date",\n "status",\n "status_notes",\n "agency_name",\n "category",\n "complaint_type",\n "descriptor",\n "incident_address",\n "supervisor_district",\n "neighborhood",\n "location",\n "source",\n "media_url",\n "latitude",\n "longitude",\n "police_district"\n]', }, - resources={ - "limit_memory": "8G", - "limit_cpu": "3", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) # Run San Francisco Municipal Calendar Pipeline - sf_calendar = kubernetes_pod.KubernetesPodOperator( + sf_calendar = kubernetes_engine.GKEStartPodOperator( task_id="sf_calendar", name="calendar", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-san-francisco", image_pull_policy="Always", image="{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}", env_vars={ @@ -97,19 +118,21 @@ "REORDER_HEADERS_LIST": '[\n "service_id", "service_desc",\n "monday", "tuesday", "wednesday",\n "thursday", "friday", "saturday", "sunday",\n "exceptions", "exception_type"\n]', "RENAME_HEADERS_LIST": '{\n "monday_str": "monday",\n "tuesday_str": "tuesday",\n "wednesday_str": "wednesday",\n "thursday_str": "thursday",\n "friday_str": "friday",\n "saturday_str": "saturday",\n "sunday_str": "sunday",\n "service_description": "service_desc",\n "date": "exceptions",\n "exception_type_str": "exception_type"\n}', }, - resources={ - "limit_memory": "8G", - "limit_cpu": "3", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) # Run San Francisco Municipal Routes Pipeline - sf_muni_routes = kubernetes_pod.KubernetesPodOperator( + sf_muni_routes = kubernetes_engine.GKEStartPodOperator( task_id="sf_muni_routes", name="muni_routes", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-san-francisco", image_pull_policy="Always", image="{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}", env_vars={ @@ -127,19 +150,21 @@ "SCHEMA_PATH": "data/san_francisco/schema/sf_muni_routes_schema.json", "REORDER_HEADERS_LIST": '[\n "route_id",\n "route_short_name",\n "route_long_name",\n "route_type"\n]', }, - resources={ - "limit_memory": "8G", - "limit_cpu": "3", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) # Run San Francisco Municipal Shapes Pipeline - sf_muni_shapes = kubernetes_pod.KubernetesPodOperator( + sf_muni_shapes = kubernetes_engine.GKEStartPodOperator( task_id="sf_muni_shapes", name="muni_shapes", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-san-francisco", image_pull_policy="Always", image="{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}", env_vars={ @@ -158,19 +183,21 @@ "RENAME_HEADERS_LIST": '{\n "shape_pt_lon": "shape_point_lon",\n "shape_pt_lat": "shape_point_lat",\n "shape_pt_sequence": "shape_point_sequence",\n "shape_dist_traveled": "shape_distance_traveled"\n}', "REORDER_HEADERS_LIST": '[\n "shape_id",\n "shape_point_sequence",\n "shape_point_lat",\n "shape_point_lon",\n "shape_point_geom",\n "shape_distance_traveled"\n]', }, - resources={ - "limit_memory": "8G", - "limit_cpu": "3", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) # Run San Francisco Municipal Stops Pipeline - sf_muni_stops = kubernetes_pod.KubernetesPodOperator( + sf_muni_stops = kubernetes_engine.GKEStartPodOperator( task_id="sf_muni_stops", name="muni_stops", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-san-francisco", image_pull_policy="Always", image="{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}", env_vars={ @@ -188,19 +215,21 @@ "SCHEMA_PATH": "data/san_francisco/schema/sf_muni_stops_schema.json", "REORDER_HEADERS_LIST": '[\n "stop_id",\n "stop_name",\n "stop_lat",\n "stop_lon",\n "stop_geom"\n]', }, - resources={ - "limit_memory": "8G", - "limit_cpu": "3", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) # Run San Francisco Municipal Stop Times Pipeline - sf_muni_stop_times = kubernetes_pod.KubernetesPodOperator( + sf_muni_stop_times = kubernetes_engine.GKEStartPodOperator( task_id="sf_muni_stop_times", name="muni_stop_times", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-san-francisco", image_pull_policy="Always", image="{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}", env_vars={ @@ -220,19 +249,21 @@ "STARTS_WITH_PATTERN_LIST": '[\n [ ["dropoff_type", "dropoff_type"], [ "0", "regular" ] ],\n [ ["dropoff_type", "dropoff_type"], [ "1", "none" ] ],\n [ ["dropoff_type", "dropoff_type"], [ "2", "phone" ] ],\n [ ["dropoff_type", "dropoff_type"], [ "3", "driver" ] ],\n [ ["pickup_type", "pickup_type"], [ "0", "regular" ] ],\n [ ["pickup_type", "pickup_type"], [ "1", "none" ] ],\n [ ["pickup_type", "pickup_type"], [ "2", "phone" ] ],\n [ ["pickup_type", "pickup_type"], [ "3", "driver" ] ],\n [ ["exact_timepoint", "timepoint"], [ "0", "FALSE" ] ],\n [ ["exact_timepoint", "timepoint"], [ "1", "TRUE" ] ],\n [ ["arrives_next_day", "arrival_time"], ["24", "TRUE"] ],\n [ ["arrives_next_day", "arrival_time"], ["25", "TRUE"] ],\n [ ["arrives_next_day", "arrival_time"], ["26", "TRUE"] ],\n [ ["arrives_next_day", "arrival_time"], ["27", "TRUE"] ],\n [ ["arrives_next_day", "arrival_time"], ["28", "TRUE"] ],\n [ ["arrives_next_day", "arrival_time"], ["29", "TRUE"] ],\n [ ["arrives_next_day", "arrival_time"], ["30", "TRUE"] ],\n [ ["arrives_next_day", "arrival_time"], ["31", "TRUE"] ],\n [ ["arrival_time", "arrival_time"], ["^24(.*)", "00$2"] ],\n [ ["arrival_time", "arrival_time"], ["^25(.*)", "01$2"] ],\n [ ["arrival_time", "arrival_time"], ["^26(.*)", "02$2"] ],\n [ ["arrival_time", "arrival_time"], ["^27(.*)", "03$2"] ],\n [ ["arrival_time", "arrival_time"], ["^28(.*)", "04$2"] ],\n [ ["arrival_time", "arrival_time"], ["^29(.*)", "05$2"] ],\n [ ["arrival_time", "arrival_time"], ["^30(.*)", "06$2"] ],\n [ ["arrival_time", "arrival_time"], ["^31(.*)", "07$2"] ],\n [ ["departs_next_day", "departure_time"], ["24", "TRUE"] ],\n [ ["departs_next_day", "departure_time"], ["25", "TRUE"] ],\n [ ["departs_next_day", "departure_time"], ["26", "TRUE"] ],\n [ ["departs_next_day", "departure_time"], ["27", "TRUE"] ],\n [ ["departs_next_day", "departure_time"], ["28", "TRUE"] ],\n [ ["departs_next_day", "departure_time"], ["29", "TRUE"] ],\n [ ["departs_next_day", "departure_time"], ["30", "TRUE"] ],\n [ ["departs_next_day", "departure_time"], ["31", "TRUE"] ],\n [ ["departure_time", "departure_time"], ["^24(.*)", "00$2"] ],\n [ ["departure_time", "departure_time"], ["^25(.*)", "01$2"] ],\n [ ["departure_time", "departure_time"], ["^26(.*)", "02$2"] ],\n [ ["departure_time", "departure_time"], ["^27(.*)", "03$2"] ],\n [ ["departure_time", "departure_time"], ["^28(.*)", "04$2"] ],\n [ ["departure_time", "departure_time"], ["^29(.*)", "05$2"] ],\n [ ["departure_time", "departure_time"], ["^30(.*)", "06$2"] ],\n [ ["departure_time", "departure_time"], ["^31(.*)", "07$2"] ]\n]', "REORDER_HEADERS_LIST": '[\n "stop_id",\n "trip_id",\n "stop_sequence",\n "arrival_time",\n "arrives_next_day",\n "departure_time",\n "departs_next_day",\n "dropoff_type",\n "exact_timepoint"\n]', }, - resources={ - "limit_memory": "8G", - "limit_cpu": "3", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) # Run San Francisco Municipal Fares Pipeline - sf_muni_fares = kubernetes_pod.KubernetesPodOperator( + sf_muni_fares = kubernetes_engine.GKEStartPodOperator( task_id="sf_muni_fares", name="muni_fares", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-san-francisco", image_pull_policy="Always", image="{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}", env_vars={ @@ -252,19 +283,21 @@ "STARTS_WITH_PATTERN_LIST": '[\n [ ["payment_type", "payment_type"], [ "0", "during" ] ],\n [ ["payment_type", "payment_type"], [ "1", "after" ] ]\n]', "REORDER_HEADERS_LIST": '[\n "fare_id",\n "rider_id",\n "rider_desc",\n "price",\n "payment_method",\n "transfers_permitted",\n "transfer_duration"\n]', }, - resources={ - "limit_memory": "8G", - "limit_cpu": "3", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) # Run San Francisco Municipal Trips Pipeline - sf_muni_trips = kubernetes_pod.KubernetesPodOperator( + sf_muni_trips = kubernetes_engine.GKEStartPodOperator( task_id="sf_muni_trips", name="muni_trips", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-san-francisco", image_pull_policy="Always", image="{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}", env_vars={ @@ -284,19 +317,21 @@ "STARTS_WITH_PATTERN_LIST": '[\n [ ["direction_id", "direction_id"], [ "0", "O" ] ],\n [ ["direction_id", "direction_id"], [ "1", "I" ] ],\n [ ["SERVICE_CA", "SERVICE_CA"], [ "nan", "" ] ]\n]', "REORDER_HEADERS_LIST": '[\n "trip_id",\n "route_id",\n "direction",\n "block_id",\n "service_category",\n "trip_headsign",\n "shape_id",\n "trip_shape"\n]', }, - resources={ - "limit_memory": "8G", - "limit_cpu": "3", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) # Run San Francisco Police Department Incidents Pipeline - sfpd_incidents = kubernetes_pod.KubernetesPodOperator( + sfpd_incidents = kubernetes_engine.GKEStartPodOperator( task_id="sfpd_incidents", name="sfpd_incidents", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-san-francisco", image_pull_policy="Always", image="{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}", env_vars={ @@ -318,19 +353,21 @@ "RENAME_HEADERS_LIST": '{\n "IncidntNum": "unique_key",\n "Category": "category",\n "Descript": "descript",\n "DayOfWeek": "dayofweek",\n "PdDistrict": "pddistrict",\n "Resolution": "resolution",\n "Address": "address",\n "X": "longitude",\n "Y": "latitude",\n "Location": "location",\n "PdId": "pdid",\n "Date": "Date",\n "Time": "Time"\n}', "REORDER_HEADERS_LIST": '[\n "unique_key",\n "category",\n "descript",\n "dayofweek",\n "pddistrict",\n "resolution",\n "address",\n "longitude",\n "latitude",\n "location",\n "pdid",\n "timestamp"\n]', }, - resources={ - "limit_memory": "8G", - "limit_cpu": "3", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) # Run San Francisco Bikeshare Stations Pipeline - sf_bikeshare_stations = kubernetes_pod.KubernetesPodOperator( + sf_bikeshare_stations = kubernetes_engine.GKEStartPodOperator( task_id="sf_bikeshare_stations", name="bikeshare_stations", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-san-francisco", image_pull_policy="Always", image="{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}", env_vars={ @@ -353,19 +390,21 @@ "EMPTY_KEY_LIST": '[\n "station_id",\n "name",\n "lat",\n "lon"\n]', "REORDER_HEADERS_LIST": '[\n "station_id",\n "name",\n "short_name",\n "lat",\n "lon",\n "region_id",\n "rental_methods",\n "capacity",\n "external_id",\n "eightd_has_key_dispenser",\n "has_kiosk",\n "station_geom"\n]', }, - resources={ - "limit_memory": "8G", - "limit_cpu": "3", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) # Run San Francisco Bikeshare Status Pipeline - sf_bikeshare_status = kubernetes_pod.KubernetesPodOperator( + sf_bikeshare_status = kubernetes_engine.GKEStartPodOperator( task_id="sf_bikeshare_status", name="bikeshare_status", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-san-francisco", image_pull_policy="Always", image="{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}", env_vars={ @@ -386,19 +425,21 @@ "EMPTY_KEY_LIST": '[\n "station_id",\n "num_bikes_available",\n "num_docks_available",\n "is_installed",\n "is_renting",\n "is_returning",\n "last_reported"\n]', "REORDER_HEADERS_LIST": '[\n "station_id",\n "num_bikes_available",\n "num_bikes_disabled",\n "num_docks_available",\n "num_docks_disabled",\n "is_installed",\n "is_renting",\n "is_returning",\n "last_reported",\n "num_ebikes_available",\n "eightd_has_available_keys"\n]', }, - resources={ - "limit_memory": "8G", - "limit_cpu": "3", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) # Run San Francisco Bikeshare Trips Pipeline - sf_bikeshare_trips = kubernetes_pod.KubernetesPodOperator( + sf_bikeshare_trips = kubernetes_engine.GKEStartPodOperator( task_id="sf_bikeshare_trips", name="sf_bikeshare_trips", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-san-francisco", image_pull_policy="Always", image="{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}", env_vars={ @@ -426,19 +467,21 @@ "GEN_LOCATION_LIST": '{\n "start_station_geom": [ "start_station_longitude", "start_station_latitude" ],\n "end_station_geom": [ "end_station_longitude", "end_station_latitude" ]\n}', "REORDER_HEADERS_LIST": '[\n "trip_id",\n "duration_sec",\n "start_date",\n "start_station_name",\n "start_station_id",\n "end_date",\n "end_station_name",\n "end_station_id",\n "bike_number",\n "zip_code",\n "subscriber_type",\n "subscription_type",\n "start_station_latitude",\n "start_station_longitude",\n "end_station_latitude",\n "end_station_longitude",\n "member_birth_year",\n "member_gender",\n "bike_share_for_all_trip",\n "start_station_geom",\n "end_station_geom"\n]', }, - resources={ - "limit_memory": "8G", - "limit_cpu": "3", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) # Run San Francisco Film Locations Pipeline - sf_film_locations = kubernetes_pod.KubernetesPodOperator( + sf_film_locations = kubernetes_engine.GKEStartPodOperator( task_id="sf_film_locations", name="sf_film_locations", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-san-francisco", image_pull_policy="Always", image="{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}", env_vars={ @@ -460,19 +503,21 @@ "STRIP_NEWLINES_LIST": '[\n "production_company",\n "fun_facts"\n]', "REORDER_HEADERS_LIST": '[\n "title",\n "release_year",\n "locations",\n "fun_facts",\n "production_company",\n "distributor",\n "director",\n "writer",\n "actor_1",\n "actor_2",\n "actor_3"\n]', }, - resources={ - "limit_memory": "8G", - "limit_cpu": "3", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) # Run San Francisco Fire Department Service Calls Pipeline - sffd_service_calls = kubernetes_pod.KubernetesPodOperator( + sffd_service_calls = kubernetes_engine.GKEStartPodOperator( task_id="sffd_service_calls", name="sffd_service_calls", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-san-francisco", image_pull_policy="Always", image="{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}", env_vars={ @@ -495,19 +540,21 @@ "DATE_FORMAT_LIST": '{\n "call_date": "%m/%d/%Y",\n "watch_date": "%m/%d/%Y",\n "available_timestamp": "%m/%d/%Y %H:%M:%S %p",\n "dispatch_timestamp": "%m/%d/%Y %H:%M:%S %p",\n "entry_timestamp": "%m/%d/%Y %H:%M:%S %p",\n "on_scene_timestamp": "%m/%d/%Y %H:%M:%S %p",\n "received_timestamp": "%m/%d/%Y %H:%M:%S %p",\n "response_timestamp": "%m/%d/%Y %H:%M:%S %p",\n "transport_timestamp": "%m/%d/%Y %H:%M:%S %p",\n "hospital_timestamp": "%m/%d/%Y %H:%M:%S %p"\n}', "REORDER_HEADERS_LIST": '[\n "call_number",\n "unit_id",\n "incident_number",\n "call_type",\n "call_date",\n "watch_date",\n "received_timestamp",\n "entry_timestamp",\n "dispatch_timestamp",\n "response_timestamp",\n "on_scene_timestamp",\n "transport_timestamp",\n "hospital_timestamp",\n "call_final_disposition",\n "available_timestamp",\n "address",\n "city",\n "zipcode_of_incident",\n "battalion",\n "station_area",\n "box",\n "original_priority",\n "priority",\n "final_priority",\n "als_unit",\n "call_type_group",\n "number_of_alarms",\n "unit_type",\n "unit_sequence_in_call_dispatch",\n "fire_prevention_district",\n "supervisor_district",\n "row_id",\n "latitude",\n "longitude",\n "neighborhood_name",\n "location_geom"\n]', }, - resources={ - "limit_memory": "8G", - "limit_cpu": "3", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) # Run San Francisco Street Trees Pipeline - sf_street_trees = kubernetes_pod.KubernetesPodOperator( + sf_street_trees = kubernetes_engine.GKEStartPodOperator( task_id="sf_street_trees", name="sf_street_trees", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-san-francisco", image_pull_policy="Always", image="{{ var.json.san_francisco.container_registry.run_csv_transform_kub }}", env_vars={ @@ -531,15 +578,27 @@ "EMPTY_KEY_LIST": '[\n "tree_id"\n]', "REORDER_HEADERS_LIST": '[\n "tree_id",\n "legal_status",\n "species",\n "address",\n "site_order",\n "site_info",\n "plant_type",\n "care_taker",\n "care_assistant",\n "plant_date",\n "dbh",\n "plot_size",\n "permit_notes",\n "x_coordinate",\n "y_coordinate",\n "latitude",\n "longitude",\n "location"\n]', }, - resources={ - "limit_memory": "8G", - "limit_cpu": "3", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) + delete_cluster = kubernetes_engine.GKEDeleteClusterOperator( + task_id="delete_cluster", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + name="pdp-san-francisco", + ) ( - [sf_bikeshare_stations, sf_bikeshare_status, sf_film_locations, sf_street_trees] + create_cluster + >> [ + sf_bikeshare_stations, + sf_bikeshare_status, + sf_film_locations, + sf_street_trees, + ] >> sf_bikeshare_trips >> [ sf_calendar, @@ -553,4 +612,5 @@ >> sffd_service_calls >> sfpd_incidents >> sf_311_service_requests + >> delete_cluster )