Skip to content

Commit

Permalink
Fix: Migrate Libraries IO DAGs to new environment. (#852)
Browse files Browse the repository at this point in the history
* Fix: Migrate Libraries IO DAGs to new environment.

* Fix: Resolve cluster reference.
  • Loading branch information
nlarge-google authored Nov 4, 2024
1 parent d1dc743 commit e5c5161
Show file tree
Hide file tree
Showing 12 changed files with 785 additions and 348 deletions.
75 changes: 53 additions & 22 deletions datasets/libraries_io/pipelines/dependencies/dependencies_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from airflow import DAG
from airflow.operators import bash
from airflow.providers.cncf.kubernetes.operators import kubernetes_pod
from airflow.providers.google.cloud.operators import kubernetes_engine
from airflow.providers.google.cloud.transfers import gcs_to_bigquery

default_args = {
Expand All @@ -33,6 +33,23 @@
catchup=False,
default_view="graph",
) as dag:
create_cluster = kubernetes_engine.GKECreateClusterOperator(
task_id="create_cluster",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
body={
"name": "pdp-libraries-io-dependencies",
"initial_node_count": 1,
"network": "{{ var.value.vpc_network }}",
"node_config": {
"machine_type": "e2-standard-16",
"oauth_scopes": [
"https://www.googleapis.com/auth/devstorage.read_write",
"https://www.googleapis.com/auth/cloud-platform",
],
},
},
)

# Fetch data gcs - gcs
bash_gcs_to_gcs = bash.BashOperator(
Expand All @@ -41,12 +58,14 @@
)

# Run CSV transform within kubernetes pod
transform_dependencies = kubernetes_pod.KubernetesPodOperator(
transform_dependencies = kubernetes_engine.GKEStartPodOperator(
task_id="transform_dependencies",
startup_timeout_seconds=600,
name="dependencies",
namespace="composer",
service_account_name="datasets",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="pdp-libraries-io-dependencies",
image_pull_policy="Always",
image="{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}",
env_vars={
Expand All @@ -61,10 +80,10 @@
"RENAME_MAPPINGS": '{"ID":"id","Platform":"platform","Project Name":"project_name","Project ID":"project_id","Version Number":"version_number", "Version ID":"version_id","Dependency Name":"dependency_name","Dependency Platform":"dependency_platform", "Dependency Kind":"dependency_kind","Optional Dependency":"optional_dependency", "Dependency Requirements":"dependency_requirements","Dependency Project ID":"dependency_project_id"}',
"CSV_HEADERS": '["id","platform","project_name","project_id","version_number","version_id","dependency_name","dependency_platform", "dependency_kind","optional_dependency","dependency_requirements","dependency_project_id"]',
},
resources={
"request_memory": "4G",
"request_cpu": "1",
"request_ephemeral_storage": "10G",
container_resources={
"memory": {"request": "16Gi"},
"cpu": {"request": "1"},
"ephemeral-storage": {"request": "10Gi"},
},
)

Expand Down Expand Up @@ -155,12 +174,14 @@
)

# Run CSV transform within kubernetes pod
transform_dependencies_2 = kubernetes_pod.KubernetesPodOperator(
transform_dependencies_2 = kubernetes_engine.GKEStartPodOperator(
task_id="transform_dependencies_2",
startup_timeout_seconds=600,
name="dependencies",
namespace="composer",
service_account_name="datasets",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="pdp-libraries-io-dependencies",
image_pull_policy="Always",
image="{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}",
env_vars={
Expand All @@ -175,10 +196,10 @@
"RENAME_MAPPINGS": '{"ID":"id","Platform":"platform","Project Name":"project_name","Project ID":"project_id","Version Number":"version_number", "Version ID":"version_id","Dependency Name":"dependency_name","Dependency Platform":"dependency_platform", "Dependency Kind":"dependency_kind","Optional Dependency":"optional_dependency", "Dependency Requirements":"dependency_requirements","Dependency Project ID":"dependency_project_id"}',
"CSV_HEADERS": '["id","platform","project_name","project_id","version_number","version_id","dependency_name","dependency_platform", "dependency_kind","optional_dependency","dependency_requirements","dependency_project_id"]',
},
resources={
"request_memory": "4G",
"request_cpu": "1",
"request_ephemeral_storage": "10G",
container_resources={
"memory": {"request": "16Gi"},
"cpu": {"request": "1"},
"ephemeral-storage": {"request": "10Gi"},
},
)

Expand Down Expand Up @@ -269,12 +290,14 @@
)

# Run CSV transform within kubernetes pod
transform_dependencies_3 = kubernetes_pod.KubernetesPodOperator(
transform_dependencies_3 = kubernetes_engine.GKEStartPodOperator(
task_id="transform_dependencies_3",
startup_timeout_seconds=600,
name="dependencies",
namespace="composer",
service_account_name="datasets",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="pdp-libraries-io-dependencies",
image_pull_policy="Always",
image="{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}",
env_vars={
Expand All @@ -289,12 +312,18 @@
"RENAME_MAPPINGS": '{"ID":"id","Platform":"platform","Project Name":"project_name","Project ID":"project_id","Version Number":"version_number", "Version ID":"version_id","Dependency Name":"dependency_name","Dependency Platform":"dependency_platform", "Dependency Kind":"dependency_kind","Optional Dependency":"optional_dependency", "Dependency Requirements":"dependency_requirements","Dependency Project ID":"dependency_project_id"}',
"CSV_HEADERS": '["id","platform","project_name","project_id","version_number","version_id","dependency_name","dependency_platform", "dependency_kind","optional_dependency","dependency_requirements","dependency_project_id"]',
},
resources={
"request_memory": "4G",
"request_cpu": "1",
"request_ephemeral_storage": "10G",
container_resources={
"memory": {"request": "16Gi"},
"cpu": {"request": "1"},
"ephemeral-storage": {"request": "10Gi"},
},
)
delete_cluster = kubernetes_engine.GKEDeleteClusterOperator(
task_id="delete_cluster",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
name="pdp-libraries-io-dependencies",
)

# Task to load CSV data to a BigQuery table
load_dependencies_to_bq_3 = gcs_to_bigquery.GCSToBigQueryOperator(
Expand Down Expand Up @@ -384,7 +413,9 @@

(
bash_gcs_to_gcs
>> create_cluster
>> [transform_dependencies, transform_dependencies_2, transform_dependencies_3]
>> delete_cluster
>> load_dependencies_to_bq
>> load_dependencies_to_bq_2
>> load_dependencies_to_bq_3
Expand Down
81 changes: 59 additions & 22 deletions datasets/libraries_io/pipelines/dependencies/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,21 @@ dag:
default_view: graph

tasks:
- operator: "GKECreateClusterOperator"
args:
task_id: "create_cluster"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
body:
name: pdp-libraries-io-dependencies
initial_node_count: 1
network: "{{ var.value.vpc_network }}"
node_config:
machine_type: e2-standard-16
oauth_scopes:
- https://www.googleapis.com/auth/devstorage.read_write
- https://www.googleapis.com/auth/cloud-platform

- operator: "BashOperator"
description: "Fetch data gcs - gcs"
args:
Expand All @@ -53,14 +68,16 @@ dag:
rm /home/airflow/gcs/data/libraries_io/dependencies/dependencies.c
fi
- operator: "KubernetesPodOperator"
- operator: "GKEStartPodOperator"
description: "Run CSV transform within kubernetes pod"
args:
task_id: "transform_dependencies"
startup_timeout_seconds: 600
name: "dependencies"
namespace: "composer"
service_account_name: "datasets"
namespace: "default"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
cluster_name: pdp-libraries-io-dependencies
image_pull_policy: "Always"
image: "{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}"
env_vars:
Expand All @@ -80,10 +97,13 @@ dag:
CSV_HEADERS: >-
["id","platform","project_name","project_id","version_number","version_id","dependency_name","dependency_platform",
"dependency_kind","optional_dependency","dependency_requirements","dependency_project_id"]
resources:
request_memory: "4G"
request_cpu: "1"
request_ephemeral_storage: "10G"
container_resources:
memory:
request: "16Gi"
cpu:
request: "1"
ephemeral-storage:
request: "10Gi"

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load CSV data to a BigQuery table"
Expand Down Expand Up @@ -146,14 +166,16 @@ dag:
description: "The unique primary key of the project for this dependency in the Libraries.io database."
mode: "nullable"

- operator: "KubernetesPodOperator"
- operator: "GKEStartPodOperator"
description: "Run CSV transform within kubernetes pod"
args:
task_id: "transform_dependencies_2"
startup_timeout_seconds: 600
name: "dependencies"
namespace: "composer"
service_account_name: "datasets"
namespace: "default"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
cluster_name: pdp-libraries-io-dependencies
image_pull_policy: "Always"
image: "{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}"
env_vars:
Expand All @@ -173,10 +195,13 @@ dag:
CSV_HEADERS: >-
["id","platform","project_name","project_id","version_number","version_id","dependency_name","dependency_platform",
"dependency_kind","optional_dependency","dependency_requirements","dependency_project_id"]
resources:
request_memory: "4G"
request_cpu: "1"
request_ephemeral_storage: "10G"
container_resources:
memory:
request: "16Gi"
cpu:
request: "1"
ephemeral-storage:
request: "10Gi"

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load CSV data to a BigQuery table"
Expand Down Expand Up @@ -239,14 +264,16 @@ dag:
description: "The unique primary key of the project for this dependency in the Libraries.io database."
mode: "nullable"

- operator: "KubernetesPodOperator"
- operator: "GKEStartPodOperator"
description: "Run CSV transform within kubernetes pod"
args:
task_id: "transform_dependencies_3"
startup_timeout_seconds: 600
name: "dependencies"
namespace: "composer"
service_account_name: "datasets"
namespace: "default"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
cluster_name: pdp-libraries-io-dependencies
image_pull_policy: "Always"
image: "{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}"
env_vars:
Expand All @@ -266,10 +293,20 @@ dag:
CSV_HEADERS: >-
["id","platform","project_name","project_id","version_number","version_id","dependency_name","dependency_platform",
"dependency_kind","optional_dependency","dependency_requirements","dependency_project_id"]
resources:
request_memory: "4G"
request_cpu: "1"
request_ephemeral_storage: "10G"
container_resources:
memory:
request: "16Gi"
cpu:
request: "1"
ephemeral-storage:
request: "10Gi"

- operator: "GKEDeleteClusterOperator"
args:
task_id: "delete_cluster"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
name: pdp-libraries-io-dependencies

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load CSV data to a BigQuery table"
Expand Down Expand Up @@ -333,4 +370,4 @@ dag:
mode: "nullable"

graph_paths:
- "bash_gcs_to_gcs >> [transform_dependencies,transform_dependencies_2,transform_dependencies_3] >> load_dependencies_to_bq >> load_dependencies_to_bq_2 >> load_dependencies_to_bq_3"
- "bash_gcs_to_gcs >> create_cluster >> [transform_dependencies,transform_dependencies_2,transform_dependencies_3] >> delete_cluster >> load_dependencies_to_bq >> load_dependencies_to_bq_2 >> load_dependencies_to_bq_3"
43 changes: 35 additions & 8 deletions datasets/libraries_io/pipelines/projects/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,31 @@ dag:
cp /home/airflow/gcs/data/libraries_io/libraries-1.4.0-2018-12-22/projects-1.4.0-2018-12-22.csv /home/airflow/gcs/data/libraries_io/projects/projects.csv
fi
- operator: "KubernetesPodOperator"
- operator: "GKECreateClusterOperator"
args:
task_id: "create_cluster"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
body:
name: pdp-libraries-io-projects
initial_node_count: 1
network: "{{ var.value.vpc_network }}"
node_config:
machine_type: e2-standard-16
oauth_scopes:
- https://www.googleapis.com/auth/devstorage.read_write
- https://www.googleapis.com/auth/cloud-platform

- operator: "GKEStartPodOperator"
description: "Run CSV transform within kubernetes pod"
args:
task_id: "transform_projects"
startup_timeout_seconds: 600
name: "projects"
namespace: "composer"
service_account_name: "datasets"
namespace: "default"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
cluster_name: pdp-libraries-io-projects
image_pull_policy: "Always"
image: "{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}"
env_vars:
Expand All @@ -80,10 +97,20 @@ dag:
"Latest Release Number":"latest_release_number","Package Manager ID":"package_manager_id","Dependent Projects Count":"dependent_projects_count",
"Language":"language","Status":"status","Last synced Timestamp":"last_synced_timestamp","Dependent Repositories Count":"dependent_repositories_count",
"Repository ID":"repository_id"}
resources:
request_memory: "4G"
request_cpu: "1"
request_ephemeral_storage: "10G"
container_resources:
memory:
request: "16Gi"
cpu:
request: "1"
ephemeral-storage:
request: "10Gi"

- operator: "GKEDeleteClusterOperator"
args:
task_id: "delete_cluster"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
name: pdp-libraries-io-projects

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load CSV data to a BigQuery table"
Expand Down Expand Up @@ -183,4 +210,4 @@ dag:
mode: "nullable"

graph_paths:
- "bash_gcs_to_gcs >> transform_projects >> load_projects_to_bq"
- "create_cluster >> bash_gcs_to_gcs >> transform_projects >> delete_cluster >> load_projects_to_bq"
Loading

0 comments on commit e5c5161

Please sign in to comment.