Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Migrate Libraries IO DAGs to new environment. #852

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 53 additions & 22 deletions datasets/libraries_io/pipelines/dependencies/dependencies_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from airflow import DAG
from airflow.operators import bash
from airflow.providers.cncf.kubernetes.operators import kubernetes_pod
from airflow.providers.google.cloud.operators import kubernetes_engine
from airflow.providers.google.cloud.transfers import gcs_to_bigquery

default_args = {
Expand All @@ -33,6 +33,23 @@
catchup=False,
default_view="graph",
) as dag:
create_cluster = kubernetes_engine.GKECreateClusterOperator(
task_id="create_cluster",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
body={
"name": "pdp-libraries-io-dependencies",
"initial_node_count": 1,
"network": "{{ var.value.vpc_network }}",
"node_config": {
"machine_type": "e2-standard-16",
"oauth_scopes": [
"https://www.googleapis.com/auth/devstorage.read_write",
"https://www.googleapis.com/auth/cloud-platform",
],
},
},
)

# Fetch data gcs - gcs
bash_gcs_to_gcs = bash.BashOperator(
Expand All @@ -41,12 +58,14 @@
)

# Run CSV transform within kubernetes pod
transform_dependencies = kubernetes_pod.KubernetesPodOperator(
transform_dependencies = kubernetes_engine.GKEStartPodOperator(
task_id="transform_dependencies",
startup_timeout_seconds=600,
name="dependencies",
namespace="composer",
service_account_name="datasets",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="pdp-libraries-io-dependencies",
image_pull_policy="Always",
image="{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}",
env_vars={
Expand All @@ -61,10 +80,10 @@
"RENAME_MAPPINGS": '{"ID":"id","Platform":"platform","Project Name":"project_name","Project ID":"project_id","Version Number":"version_number", "Version ID":"version_id","Dependency Name":"dependency_name","Dependency Platform":"dependency_platform", "Dependency Kind":"dependency_kind","Optional Dependency":"optional_dependency", "Dependency Requirements":"dependency_requirements","Dependency Project ID":"dependency_project_id"}',
"CSV_HEADERS": '["id","platform","project_name","project_id","version_number","version_id","dependency_name","dependency_platform", "dependency_kind","optional_dependency","dependency_requirements","dependency_project_id"]',
},
resources={
"request_memory": "4G",
"request_cpu": "1",
"request_ephemeral_storage": "10G",
container_resources={
"memory": {"request": "16Gi"},
"cpu": {"request": "1"},
"ephemeral-storage": {"request": "10Gi"},
},
)

Expand Down Expand Up @@ -155,12 +174,14 @@
)

# Run CSV transform within kubernetes pod
transform_dependencies_2 = kubernetes_pod.KubernetesPodOperator(
transform_dependencies_2 = kubernetes_engine.GKEStartPodOperator(
task_id="transform_dependencies_2",
startup_timeout_seconds=600,
name="dependencies",
namespace="composer",
service_account_name="datasets",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="pdp-libraries-io-dependencies",
image_pull_policy="Always",
image="{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}",
env_vars={
Expand All @@ -175,10 +196,10 @@
"RENAME_MAPPINGS": '{"ID":"id","Platform":"platform","Project Name":"project_name","Project ID":"project_id","Version Number":"version_number", "Version ID":"version_id","Dependency Name":"dependency_name","Dependency Platform":"dependency_platform", "Dependency Kind":"dependency_kind","Optional Dependency":"optional_dependency", "Dependency Requirements":"dependency_requirements","Dependency Project ID":"dependency_project_id"}',
"CSV_HEADERS": '["id","platform","project_name","project_id","version_number","version_id","dependency_name","dependency_platform", "dependency_kind","optional_dependency","dependency_requirements","dependency_project_id"]',
},
resources={
"request_memory": "4G",
"request_cpu": "1",
"request_ephemeral_storage": "10G",
container_resources={
"memory": {"request": "16Gi"},
"cpu": {"request": "1"},
"ephemeral-storage": {"request": "10Gi"},
},
)

Expand Down Expand Up @@ -269,12 +290,14 @@
)

# Run CSV transform within kubernetes pod
transform_dependencies_3 = kubernetes_pod.KubernetesPodOperator(
transform_dependencies_3 = kubernetes_engine.GKEStartPodOperator(
task_id="transform_dependencies_3",
startup_timeout_seconds=600,
name="dependencies",
namespace="composer",
service_account_name="datasets",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="pdp-libraries-io-dependencies",
image_pull_policy="Always",
image="{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}",
env_vars={
Expand All @@ -289,12 +312,18 @@
"RENAME_MAPPINGS": '{"ID":"id","Platform":"platform","Project Name":"project_name","Project ID":"project_id","Version Number":"version_number", "Version ID":"version_id","Dependency Name":"dependency_name","Dependency Platform":"dependency_platform", "Dependency Kind":"dependency_kind","Optional Dependency":"optional_dependency", "Dependency Requirements":"dependency_requirements","Dependency Project ID":"dependency_project_id"}',
"CSV_HEADERS": '["id","platform","project_name","project_id","version_number","version_id","dependency_name","dependency_platform", "dependency_kind","optional_dependency","dependency_requirements","dependency_project_id"]',
},
resources={
"request_memory": "4G",
"request_cpu": "1",
"request_ephemeral_storage": "10G",
container_resources={
"memory": {"request": "16Gi"},
"cpu": {"request": "1"},
"ephemeral-storage": {"request": "10Gi"},
},
)
delete_cluster = kubernetes_engine.GKEDeleteClusterOperator(
task_id="delete_cluster",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
name="pdp-libraries-io-dependencies",
)

# Task to load CSV data to a BigQuery table
load_dependencies_to_bq_3 = gcs_to_bigquery.GCSToBigQueryOperator(
Expand Down Expand Up @@ -384,7 +413,9 @@

(
bash_gcs_to_gcs
>> create_cluster
>> [transform_dependencies, transform_dependencies_2, transform_dependencies_3]
>> delete_cluster
>> load_dependencies_to_bq
>> load_dependencies_to_bq_2
>> load_dependencies_to_bq_3
Expand Down
81 changes: 59 additions & 22 deletions datasets/libraries_io/pipelines/dependencies/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,21 @@ dag:
default_view: graph

tasks:
- operator: "GKECreateClusterOperator"
args:
task_id: "create_cluster"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
body:
name: pdp-libraries-io-dependencies
initial_node_count: 1
network: "{{ var.value.vpc_network }}"
node_config:
machine_type: e2-standard-16
oauth_scopes:
- https://www.googleapis.com/auth/devstorage.read_write
- https://www.googleapis.com/auth/cloud-platform

- operator: "BashOperator"
description: "Fetch data gcs - gcs"
args:
Expand All @@ -53,14 +68,16 @@ dag:
rm /home/airflow/gcs/data/libraries_io/dependencies/dependencies.c
fi

- operator: "KubernetesPodOperator"
- operator: "GKEStartPodOperator"
description: "Run CSV transform within kubernetes pod"
args:
task_id: "transform_dependencies"
startup_timeout_seconds: 600
name: "dependencies"
namespace: "composer"
service_account_name: "datasets"
namespace: "default"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
cluster_name: pdp-libraries-io-dependencies
image_pull_policy: "Always"
image: "{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}"
env_vars:
Expand All @@ -80,10 +97,13 @@ dag:
CSV_HEADERS: >-
["id","platform","project_name","project_id","version_number","version_id","dependency_name","dependency_platform",
"dependency_kind","optional_dependency","dependency_requirements","dependency_project_id"]
resources:
request_memory: "4G"
request_cpu: "1"
request_ephemeral_storage: "10G"
container_resources:
memory:
request: "16Gi"
cpu:
request: "1"
ephemeral-storage:
request: "10Gi"

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load CSV data to a BigQuery table"
Expand Down Expand Up @@ -146,14 +166,16 @@ dag:
description: "The unique primary key of the project for this dependency in the Libraries.io database."
mode: "nullable"

- operator: "KubernetesPodOperator"
- operator: "GKEStartPodOperator"
description: "Run CSV transform within kubernetes pod"
args:
task_id: "transform_dependencies_2"
startup_timeout_seconds: 600
name: "dependencies"
namespace: "composer"
service_account_name: "datasets"
namespace: "default"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
cluster_name: pdp-libraries-io-dependencies
image_pull_policy: "Always"
image: "{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}"
env_vars:
Expand All @@ -173,10 +195,13 @@ dag:
CSV_HEADERS: >-
["id","platform","project_name","project_id","version_number","version_id","dependency_name","dependency_platform",
"dependency_kind","optional_dependency","dependency_requirements","dependency_project_id"]
resources:
request_memory: "4G"
request_cpu: "1"
request_ephemeral_storage: "10G"
container_resources:
memory:
request: "16Gi"
cpu:
request: "1"
ephemeral-storage:
request: "10Gi"

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load CSV data to a BigQuery table"
Expand Down Expand Up @@ -239,14 +264,16 @@ dag:
description: "The unique primary key of the project for this dependency in the Libraries.io database."
mode: "nullable"

- operator: "KubernetesPodOperator"
- operator: "GKEStartPodOperator"
description: "Run CSV transform within kubernetes pod"
args:
task_id: "transform_dependencies_3"
startup_timeout_seconds: 600
name: "dependencies"
namespace: "composer"
service_account_name: "datasets"
namespace: "default"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
cluster_name: pdp-libraries-io-dependencies
image_pull_policy: "Always"
image: "{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}"
env_vars:
Expand All @@ -266,10 +293,20 @@ dag:
CSV_HEADERS: >-
["id","platform","project_name","project_id","version_number","version_id","dependency_name","dependency_platform",
"dependency_kind","optional_dependency","dependency_requirements","dependency_project_id"]
resources:
request_memory: "4G"
request_cpu: "1"
request_ephemeral_storage: "10G"
container_resources:
memory:
request: "16Gi"
cpu:
request: "1"
ephemeral-storage:
request: "10Gi"

- operator: "GKEDeleteClusterOperator"
args:
task_id: "delete_cluster"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
name: pdp-libraries-io-dependencies

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load CSV data to a BigQuery table"
Expand Down Expand Up @@ -333,4 +370,4 @@ dag:
mode: "nullable"

graph_paths:
- "bash_gcs_to_gcs >> [transform_dependencies,transform_dependencies_2,transform_dependencies_3] >> load_dependencies_to_bq >> load_dependencies_to_bq_2 >> load_dependencies_to_bq_3"
- "bash_gcs_to_gcs >> create_cluster >> [transform_dependencies,transform_dependencies_2,transform_dependencies_3] >> delete_cluster >> load_dependencies_to_bq >> load_dependencies_to_bq_2 >> load_dependencies_to_bq_3"
43 changes: 35 additions & 8 deletions datasets/libraries_io/pipelines/projects/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,31 @@ dag:
cp /home/airflow/gcs/data/libraries_io/libraries-1.4.0-2018-12-22/projects-1.4.0-2018-12-22.csv /home/airflow/gcs/data/libraries_io/projects/projects.csv
fi

- operator: "KubernetesPodOperator"
- operator: "GKECreateClusterOperator"
args:
task_id: "create_cluster"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
body:
name: pdp-libraries-io-projects
initial_node_count: 1
network: "{{ var.value.vpc_network }}"
node_config:
machine_type: e2-standard-16
oauth_scopes:
- https://www.googleapis.com/auth/devstorage.read_write
- https://www.googleapis.com/auth/cloud-platform

- operator: "GKEStartPodOperator"
description: "Run CSV transform within kubernetes pod"
args:
task_id: "transform_projects"
startup_timeout_seconds: 600
name: "projects"
namespace: "composer"
service_account_name: "datasets"
namespace: "default"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
cluster_name: pdp-libraries-io-projects
image_pull_policy: "Always"
image: "{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}"
env_vars:
Expand All @@ -80,10 +97,20 @@ dag:
"Latest Release Number":"latest_release_number","Package Manager ID":"package_manager_id","Dependent Projects Count":"dependent_projects_count",
"Language":"language","Status":"status","Last synced Timestamp":"last_synced_timestamp","Dependent Repositories Count":"dependent_repositories_count",
"Repository ID":"repository_id"}
resources:
request_memory: "4G"
request_cpu: "1"
request_ephemeral_storage: "10G"
container_resources:
memory:
request: "16Gi"
cpu:
request: "1"
ephemeral-storage:
request: "10Gi"

- operator: "GKEDeleteClusterOperator"
args:
task_id: "delete_cluster"
project_id: "{{ var.value.gcp_project }}"
location: "us-central1-c"
name: pdp-libraries-io-projects

- operator: "GoogleCloudStorageToBigQueryOperator"
description: "Task to load CSV data to a BigQuery table"
Expand Down Expand Up @@ -183,4 +210,4 @@ dag:
mode: "nullable"

graph_paths:
- "bash_gcs_to_gcs >> transform_projects >> load_projects_to_bq"
- "create_cluster >> bash_gcs_to_gcs >> transform_projects >> delete_cluster >> load_projects_to_bq"
Loading
Loading