From e5c516182e305fbdaa5a45d1427029fe7e3efceb Mon Sep 17 00:00:00 2001 From: Nicholas Large Date: Mon, 4 Nov 2024 09:05:45 -0600 Subject: [PATCH] Fix: Migrate Libraries IO DAGs to new environment. (#852) * Fix: Migrate Libraries IO DAGs to new environment. * Fix: Resolve cluster reference. --- .../dependencies/dependencies_dag.py | 75 ++++-- .../pipelines/dependencies/pipeline.yaml | 81 ++++-- .../pipelines/projects/pipeline.yaml | 43 +++- .../pipelines/projects/projects_dag.py | 49 +++- .../pipelines/repositories/pipeline.yaml | 81 ++++-- .../repositories/repositories_dag.py | 83 ++++-- .../repository_dependencies/pipeline.yaml | 242 +++++++++++------- .../repository_dependencies_dag.py | 227 +++++++++------- .../libraries_io/pipelines/tags/pipeline.yaml | 81 ++++-- .../libraries_io/pipelines/tags/tags_dag.py | 79 ++++-- .../pipelines/versions/pipeline.yaml | 43 +++- .../pipelines/versions/versions_dag.py | 49 +++- 12 files changed, 785 insertions(+), 348 deletions(-) diff --git a/datasets/libraries_io/pipelines/dependencies/dependencies_dag.py b/datasets/libraries_io/pipelines/dependencies/dependencies_dag.py index bd1a906a4..8f3ba820b 100644 --- a/datasets/libraries_io/pipelines/dependencies/dependencies_dag.py +++ b/datasets/libraries_io/pipelines/dependencies/dependencies_dag.py @@ -15,7 +15,7 @@ from airflow import DAG from airflow.operators import bash -from airflow.providers.cncf.kubernetes.operators import kubernetes_pod +from airflow.providers.google.cloud.operators import kubernetes_engine from airflow.providers.google.cloud.transfers import gcs_to_bigquery default_args = { @@ -33,6 +33,23 @@ catchup=False, default_view="graph", ) as dag: + create_cluster = kubernetes_engine.GKECreateClusterOperator( + task_id="create_cluster", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + body={ + "name": "pdp-libraries-io-dependencies", + "initial_node_count": 1, + "network": "{{ var.value.vpc_network }}", + "node_config": { + "machine_type": "e2-standard-16", + "oauth_scopes": [ + "https://www.googleapis.com/auth/devstorage.read_write", + "https://www.googleapis.com/auth/cloud-platform", + ], + }, + }, + ) # Fetch data gcs - gcs bash_gcs_to_gcs = bash.BashOperator( @@ -41,12 +58,14 @@ ) # Run CSV transform within kubernetes pod - transform_dependencies = kubernetes_pod.KubernetesPodOperator( + transform_dependencies = kubernetes_engine.GKEStartPodOperator( task_id="transform_dependencies", startup_timeout_seconds=600, name="dependencies", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-libraries-io-dependencies", image_pull_policy="Always", image="{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}", env_vars={ @@ -61,10 +80,10 @@ "RENAME_MAPPINGS": '{"ID":"id","Platform":"platform","Project Name":"project_name","Project ID":"project_id","Version Number":"version_number", "Version ID":"version_id","Dependency Name":"dependency_name","Dependency Platform":"dependency_platform", "Dependency Kind":"dependency_kind","Optional Dependency":"optional_dependency", "Dependency Requirements":"dependency_requirements","Dependency Project ID":"dependency_project_id"}', "CSV_HEADERS": '["id","platform","project_name","project_id","version_number","version_id","dependency_name","dependency_platform", "dependency_kind","optional_dependency","dependency_requirements","dependency_project_id"]', }, - resources={ - "request_memory": "4G", - "request_cpu": "1", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) @@ -155,12 +174,14 @@ ) # Run CSV transform within kubernetes pod - transform_dependencies_2 = kubernetes_pod.KubernetesPodOperator( + transform_dependencies_2 = kubernetes_engine.GKEStartPodOperator( task_id="transform_dependencies_2", startup_timeout_seconds=600, name="dependencies", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-libraries-io-dependencies", image_pull_policy="Always", image="{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}", env_vars={ @@ -175,10 +196,10 @@ "RENAME_MAPPINGS": '{"ID":"id","Platform":"platform","Project Name":"project_name","Project ID":"project_id","Version Number":"version_number", "Version ID":"version_id","Dependency Name":"dependency_name","Dependency Platform":"dependency_platform", "Dependency Kind":"dependency_kind","Optional Dependency":"optional_dependency", "Dependency Requirements":"dependency_requirements","Dependency Project ID":"dependency_project_id"}', "CSV_HEADERS": '["id","platform","project_name","project_id","version_number","version_id","dependency_name","dependency_platform", "dependency_kind","optional_dependency","dependency_requirements","dependency_project_id"]', }, - resources={ - "request_memory": "4G", - "request_cpu": "1", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) @@ -269,12 +290,14 @@ ) # Run CSV transform within kubernetes pod - transform_dependencies_3 = kubernetes_pod.KubernetesPodOperator( + transform_dependencies_3 = kubernetes_engine.GKEStartPodOperator( task_id="transform_dependencies_3", startup_timeout_seconds=600, name="dependencies", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-libraries-io-dependencies", image_pull_policy="Always", image="{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}", env_vars={ @@ -289,12 +312,18 @@ "RENAME_MAPPINGS": '{"ID":"id","Platform":"platform","Project Name":"project_name","Project ID":"project_id","Version Number":"version_number", "Version ID":"version_id","Dependency Name":"dependency_name","Dependency Platform":"dependency_platform", "Dependency Kind":"dependency_kind","Optional Dependency":"optional_dependency", "Dependency Requirements":"dependency_requirements","Dependency Project ID":"dependency_project_id"}', "CSV_HEADERS": '["id","platform","project_name","project_id","version_number","version_id","dependency_name","dependency_platform", "dependency_kind","optional_dependency","dependency_requirements","dependency_project_id"]', }, - resources={ - "request_memory": "4G", - "request_cpu": "1", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) + delete_cluster = kubernetes_engine.GKEDeleteClusterOperator( + task_id="delete_cluster", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + name="pdp-libraries-io-dependencies", + ) # Task to load CSV data to a BigQuery table load_dependencies_to_bq_3 = gcs_to_bigquery.GCSToBigQueryOperator( @@ -384,7 +413,9 @@ ( bash_gcs_to_gcs + >> create_cluster >> [transform_dependencies, transform_dependencies_2, transform_dependencies_3] + >> delete_cluster >> load_dependencies_to_bq >> load_dependencies_to_bq_2 >> load_dependencies_to_bq_3 diff --git a/datasets/libraries_io/pipelines/dependencies/pipeline.yaml b/datasets/libraries_io/pipelines/dependencies/pipeline.yaml index 926ee039a..6730a7229 100644 --- a/datasets/libraries_io/pipelines/dependencies/pipeline.yaml +++ b/datasets/libraries_io/pipelines/dependencies/pipeline.yaml @@ -32,6 +32,21 @@ dag: default_view: graph tasks: + - operator: "GKECreateClusterOperator" + args: + task_id: "create_cluster" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + body: + name: pdp-libraries-io-dependencies + initial_node_count: 1 + network: "{{ var.value.vpc_network }}" + node_config: + machine_type: e2-standard-16 + oauth_scopes: + - https://www.googleapis.com/auth/devstorage.read_write + - https://www.googleapis.com/auth/cloud-platform + - operator: "BashOperator" description: "Fetch data gcs - gcs" args: @@ -53,14 +68,16 @@ dag: rm /home/airflow/gcs/data/libraries_io/dependencies/dependencies.c fi - - operator: "KubernetesPodOperator" + - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "transform_dependencies" startup_timeout_seconds: 600 name: "dependencies" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-libraries-io-dependencies image_pull_policy: "Always" image: "{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}" env_vars: @@ -80,10 +97,13 @@ dag: CSV_HEADERS: >- ["id","platform","project_name","project_id","version_number","version_id","dependency_name","dependency_platform", "dependency_kind","optional_dependency","dependency_requirements","dependency_project_id"] - resources: - request_memory: "4G" - request_cpu: "1" - request_ephemeral_storage: "10G" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" @@ -146,14 +166,16 @@ dag: description: "The unique primary key of the project for this dependency in the Libraries.io database." mode: "nullable" - - operator: "KubernetesPodOperator" + - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "transform_dependencies_2" startup_timeout_seconds: 600 name: "dependencies" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-libraries-io-dependencies image_pull_policy: "Always" image: "{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}" env_vars: @@ -173,10 +195,13 @@ dag: CSV_HEADERS: >- ["id","platform","project_name","project_id","version_number","version_id","dependency_name","dependency_platform", "dependency_kind","optional_dependency","dependency_requirements","dependency_project_id"] - resources: - request_memory: "4G" - request_cpu: "1" - request_ephemeral_storage: "10G" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" @@ -239,14 +264,16 @@ dag: description: "The unique primary key of the project for this dependency in the Libraries.io database." mode: "nullable" - - operator: "KubernetesPodOperator" + - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "transform_dependencies_3" startup_timeout_seconds: 600 name: "dependencies" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-libraries-io-dependencies image_pull_policy: "Always" image: "{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}" env_vars: @@ -266,10 +293,20 @@ dag: CSV_HEADERS: >- ["id","platform","project_name","project_id","version_number","version_id","dependency_name","dependency_platform", "dependency_kind","optional_dependency","dependency_requirements","dependency_project_id"] - resources: - request_memory: "4G" - request_cpu: "1" - request_ephemeral_storage: "10G" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" + + - operator: "GKEDeleteClusterOperator" + args: + task_id: "delete_cluster" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + name: pdp-libraries-io-dependencies - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" @@ -333,4 +370,4 @@ dag: mode: "nullable" graph_paths: - - "bash_gcs_to_gcs >> [transform_dependencies,transform_dependencies_2,transform_dependencies_3] >> load_dependencies_to_bq >> load_dependencies_to_bq_2 >> load_dependencies_to_bq_3" + - "bash_gcs_to_gcs >> create_cluster >> [transform_dependencies,transform_dependencies_2,transform_dependencies_3] >> delete_cluster >> load_dependencies_to_bq >> load_dependencies_to_bq_2 >> load_dependencies_to_bq_3" diff --git a/datasets/libraries_io/pipelines/projects/pipeline.yaml b/datasets/libraries_io/pipelines/projects/pipeline.yaml index d11960e4c..654ad1e4a 100644 --- a/datasets/libraries_io/pipelines/projects/pipeline.yaml +++ b/datasets/libraries_io/pipelines/projects/pipeline.yaml @@ -49,14 +49,31 @@ dag: cp /home/airflow/gcs/data/libraries_io/libraries-1.4.0-2018-12-22/projects-1.4.0-2018-12-22.csv /home/airflow/gcs/data/libraries_io/projects/projects.csv fi - - operator: "KubernetesPodOperator" + - operator: "GKECreateClusterOperator" + args: + task_id: "create_cluster" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + body: + name: pdp-libraries-io-projects + initial_node_count: 1 + network: "{{ var.value.vpc_network }}" + node_config: + machine_type: e2-standard-16 + oauth_scopes: + - https://www.googleapis.com/auth/devstorage.read_write + - https://www.googleapis.com/auth/cloud-platform + + - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "transform_projects" startup_timeout_seconds: 600 name: "projects" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-libraries-io-projects image_pull_policy: "Always" image: "{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}" env_vars: @@ -80,10 +97,20 @@ dag: "Latest Release Number":"latest_release_number","Package Manager ID":"package_manager_id","Dependent Projects Count":"dependent_projects_count", "Language":"language","Status":"status","Last synced Timestamp":"last_synced_timestamp","Dependent Repositories Count":"dependent_repositories_count", "Repository ID":"repository_id"} - resources: - request_memory: "4G" - request_cpu: "1" - request_ephemeral_storage: "10G" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" + + - operator: "GKEDeleteClusterOperator" + args: + task_id: "delete_cluster" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + name: pdp-libraries-io-projects - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" @@ -183,4 +210,4 @@ dag: mode: "nullable" graph_paths: - - "bash_gcs_to_gcs >> transform_projects >> load_projects_to_bq" + - "create_cluster >> bash_gcs_to_gcs >> transform_projects >> delete_cluster >> load_projects_to_bq" diff --git a/datasets/libraries_io/pipelines/projects/projects_dag.py b/datasets/libraries_io/pipelines/projects/projects_dag.py index 4463d9c6a..9e91966ea 100644 --- a/datasets/libraries_io/pipelines/projects/projects_dag.py +++ b/datasets/libraries_io/pipelines/projects/projects_dag.py @@ -15,7 +15,7 @@ from airflow import DAG from airflow.operators import bash -from airflow.providers.cncf.kubernetes.operators import kubernetes_pod +from airflow.providers.google.cloud.operators import kubernetes_engine from airflow.providers.google.cloud.transfers import gcs_to_bigquery default_args = { @@ -39,14 +39,33 @@ task_id="bash_gcs_to_gcs", bash_command="if test -f /home/airflow/gcs/data/libraries_io/lib-1.6.0.tar.gz;\nthen\n mkdir /home/airflow/gcs/data/libraries_io/projects/\n cp /home/airflow/gcs/data/libraries_io/libraries-1.4.0-2018-12-22/projects-1.4.0-2018-12-22.csv /home/airflow/gcs/data/libraries_io/projects/projects.csv\nelse\n mkdir /home/airflow/gcs/data/libraries_io/\n curl -o /home/airflow/gcs/data/libraries_io/lib-1.6.0.tar.gz -L https://zenodo.org/record/2536573/files/Libraries.io-open-data-1.4.0.tar.gz\n tar -xf /home/airflow/gcs/data/libraries_io/lib-1.6.0.tar.gz -C /home/airflow/gcs/data/libraries_io/\n mkdir /home/airflow/gcs/data/libraries_io/projects/\n cp /home/airflow/gcs/data/libraries_io/libraries-1.4.0-2018-12-22/projects-1.4.0-2018-12-22.csv /home/airflow/gcs/data/libraries_io/projects/projects.csv\nfi\n", ) + create_cluster = kubernetes_engine.GKECreateClusterOperator( + task_id="create_cluster", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + body={ + "name": "pdp-libraries-io-projects", + "initial_node_count": 1, + "network": "{{ var.value.vpc_network }}", + "node_config": { + "machine_type": "e2-standard-16", + "oauth_scopes": [ + "https://www.googleapis.com/auth/devstorage.read_write", + "https://www.googleapis.com/auth/cloud-platform", + ], + }, + }, + ) # Run CSV transform within kubernetes pod - transform_projects = kubernetes_pod.KubernetesPodOperator( + transform_projects = kubernetes_engine.GKEStartPodOperator( task_id="transform_projects", startup_timeout_seconds=600, name="projects", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-libraries-io-projects", image_pull_policy="Always", image="{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}", env_vars={ @@ -61,12 +80,18 @@ "CSV_HEADERS": '["id","platform","name","created_timestamp","updated_timestamp","description","keywords","homepage_url","licenses", "repository_url","versions_count","sourcerank","latest_release_publish_timestamp","latest_release_number", "package_manager_id","dependent_projects_count","language","status","last_synced_timestamp", "dependent_repositories_count","repository_id"]', "RENAME_MAPPINGS": '{"ID":"id","Platform":"platform","Name":"name","Created Timestamp":"created_timestamp","Updated Timestamp":"updated_timestamp", "Description":"description","Keywords":"keywords","Homepage URL":"homepage_url","Licenses":"licenses","Repository URL":"repository_url", "Versions Count":"versions_count","SourceRank":"sourcerank","Latest Release Publish Timestamp":"latest_release_publish_timestamp", "Latest Release Number":"latest_release_number","Package Manager ID":"package_manager_id","Dependent Projects Count":"dependent_projects_count", "Language":"language","Status":"status","Last synced Timestamp":"last_synced_timestamp","Dependent Repositories Count":"dependent_repositories_count", "Repository ID":"repository_id"}', }, - resources={ - "request_memory": "4G", - "request_cpu": "1", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) + delete_cluster = kubernetes_engine.GKEDeleteClusterOperator( + task_id="delete_cluster", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + name="pdp-libraries-io-projects", + ) # Task to load CSV data to a BigQuery table load_projects_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( @@ -208,4 +233,10 @@ ], ) - bash_gcs_to_gcs >> transform_projects >> load_projects_to_bq + ( + create_cluster + >> bash_gcs_to_gcs + >> transform_projects + >> delete_cluster + >> load_projects_to_bq + ) diff --git a/datasets/libraries_io/pipelines/repositories/pipeline.yaml b/datasets/libraries_io/pipelines/repositories/pipeline.yaml index 3859c00b8..16be9804b 100644 --- a/datasets/libraries_io/pipelines/repositories/pipeline.yaml +++ b/datasets/libraries_io/pipelines/repositories/pipeline.yaml @@ -53,14 +53,31 @@ dag: rm /home/airflow/gcs/data/libraries_io/repositories/repositories.csv fi - - operator: "KubernetesPodOperator" + - operator: "GKECreateClusterOperator" + args: + task_id: "create_cluster" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + body: + name: pdp-libraries-io-repositories + initial_node_count: 1 + network: "{{ var.value.vpc_network }}" + node_config: + machine_type: e2-standard-16 + oauth_scopes: + - https://www.googleapis.com/auth/devstorage.read_write + - https://www.googleapis.com/auth/cloud-platform + + - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "transform_repositories" startup_timeout_seconds: 600 name: "repositories" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-libraries-io-repositories image_pull_policy: "Always" image: "{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}" env_vars: @@ -91,10 +108,13 @@ dag: "readme_filename","changelog_filename","contributing_guidelines_filename","license_filename","code_of_conduct_filename", "security_threat_model_filename","security_audit_filename","status","last_synced_timestamp","sourcerank","display_name", "scm_type","pull_requests_enabled","logo_url","keywords","an"] - resources: - request_memory: "4G" - request_cpu: "1" - request_ephemeral_storage: "10G" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" @@ -269,14 +289,16 @@ dag: description: "" mode: "nullable" - - operator: "KubernetesPodOperator" + - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "transform_repositories_2" startup_timeout_seconds: 600 name: "repositories" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-libraries-io-repositories image_pull_policy: "Always" image: "{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}" env_vars: @@ -307,10 +329,13 @@ dag: "readme_filename","changelog_filename","contributing_guidelines_filename","license_filename","code_of_conduct_filename", "security_threat_model_filename","security_audit_filename","status","last_synced_timestamp","sourcerank","display_name", "scm_type","pull_requests_enabled","logo_url","keywords","an"] - resources: - request_memory: "4G" - request_cpu: "1" - request_ephemeral_storage: "10G" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" @@ -485,14 +510,16 @@ dag: description: "" mode: "nullable" - - operator: "KubernetesPodOperator" + - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "transform_repositories_3" startup_timeout_seconds: 600 name: "repositories" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-libraries-io-repositories image_pull_policy: "Always" image: "{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}" env_vars: @@ -523,10 +550,13 @@ dag: "readme_filename","changelog_filename","contributing_guidelines_filename","license_filename","code_of_conduct_filename", "security_threat_model_filename","security_audit_filename","status","last_synced_timestamp","sourcerank","display_name", "scm_type","pull_requests_enabled","logo_url","keywords","an"] - resources: - request_memory: "4G" - request_cpu: "1" - request_ephemeral_storage: "10G" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" @@ -701,5 +731,12 @@ dag: description: "" mode: "nullable" + - operator: "GKEDeleteClusterOperator" + args: + task_id: "delete_cluster" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + name: pdp-libraries-io-repositories + graph_paths: - - "bash_gcs_to_gcs >> [transform_repositories,transform_repositories_2,transform_repositories_3] >> load_repositories_to_bq >> load_repositories_to_bq_2 >> load_repositories_to_bq_3" + - "bash_gcs_to_gcs >> create_cluster >> [transform_repositories,transform_repositories_2,transform_repositories_3] >> delete_cluster >> [load_repositories_to_bq, load_repositories_to_bq_2, load_repositories_to_bq_3]" diff --git a/datasets/libraries_io/pipelines/repositories/repositories_dag.py b/datasets/libraries_io/pipelines/repositories/repositories_dag.py index e0df7a715..ad845e23f 100644 --- a/datasets/libraries_io/pipelines/repositories/repositories_dag.py +++ b/datasets/libraries_io/pipelines/repositories/repositories_dag.py @@ -15,7 +15,7 @@ from airflow import DAG from airflow.operators import bash -from airflow.providers.cncf.kubernetes.operators import kubernetes_pod +from airflow.providers.google.cloud.operators import kubernetes_engine from airflow.providers.google.cloud.transfers import gcs_to_bigquery default_args = { @@ -39,14 +39,33 @@ task_id="bash_gcs_to_gcs", bash_command="if test -f /home/airflow/gcs/data/libraries_io/lib-1.6.0.tar.gz;\nthen\n mkdir -p /home/airflow/gcs/data/libraries_io/repositories/\n cp /home/airflow/gcs/data/libraries_io/libraries-1.4.0-2018-12-22/repositories-1.4.0-2018-12-22.csv /home/airflow/gcs/data/libraries_io/repositories/repositories.csv\n split -l 13000000 --additional-suffix=.csv /home/airflow/gcs/data/libraries_io/repositories/repositories.csv /home/airflow/gcs/data/libraries_io/repositories/\n rm /home/airflow/gcs/data/libraries_io/repositories/repositories.csv\nelse\n mkdir -p /home/airflow/gcs/data/libraries_io/\n curl -o /home/airflow/gcs/data/libraries_io/lib-1.6.0.tar.gz -L https://zenodo.org/record/2536573/files/Libraries.io-open-data-1.4.0.tar.gz\n tar -xf /home/airflow/gcs/data/libraries_io/lib-1.6.0.tar.gz -C /home/airflow/gcs/data/libraries_io/\n mkdir -p /home/airflow/gcs/data/libraries_io/repositories/\n cp /home/airflow/gcs/data/libraries_io/libraries-1.4.0-2018-12-22/repositories-1.4.0-2018-12-22.csv /home/airflow/gcs/data/libraries_io/repositories/repositories.csv\n split -l 13000000 --additional-suffix=.csv /home/airflow/gcs/data/libraries_io/repositories/repositories.csv /home/airflow/gcs/data/libraries_io/repositories/\n rm /home/airflow/gcs/data/libraries_io/repositories/repositories.csv\nfi\n", ) + create_cluster = kubernetes_engine.GKECreateClusterOperator( + task_id="create_cluster", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + body={ + "name": "pdp-libraries-io-repositories", + "initial_node_count": 1, + "network": "{{ var.value.vpc_network }}", + "node_config": { + "machine_type": "e2-standard-16", + "oauth_scopes": [ + "https://www.googleapis.com/auth/devstorage.read_write", + "https://www.googleapis.com/auth/cloud-platform", + ], + }, + }, + ) # Run CSV transform within kubernetes pod - transform_repositories = kubernetes_pod.KubernetesPodOperator( + transform_repositories = kubernetes_engine.GKEStartPodOperator( task_id="transform_repositories", startup_timeout_seconds=600, name="repositories", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-libraries-io-repositories", image_pull_policy="Always", image="{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}", env_vars={ @@ -61,10 +80,10 @@ "RENAME_MAPPINGS": '{"ID":"id","Host Type":"host_type","Name with Owner":"name_with_owner","Description":"description","Fork":"fork", "Created Timestamp":"created_timestamp","Updated Timestamp":"updated_timestamp","Last pushed Timestamp":"last_pushed_timestamp", "Homepage URL":"homepage_url","Size":"size","Stars Count":"stars_count","Language":"language","Issues enable":"issues_enabled", "Wiki enabled":"wiki_enabled","Pages enabled":"pages_enabled","Forks Count":"forks_count","Mirror URL":"mirror_url", "Open Issues Count":"open_issues_count","Default branch":"default_branch","Watchers Count":"watchers_count","UUID":"uuid", "Fork Source Name with Owner":"fork_source_name_with_owner","License":"license","Contributors Count":"contributors_count", "Readme filename":"readme_filename","Changelog filename":"changelog_filename","Contributing guidelines filename":"contributing_guidelines_filename", "License filename":"license_filename","Code of Conduct filename":"code_of_conduct_filename", "Security Threat Model filename":"security_threat_model_filename","Security Audit filename":"security_audit_filename", "Status":"status","Last Synced Timestamp":"last_synced_timestamp","SourceRank":"sourcerank","Display Name":"display_name", "SCM typ":"scm_type","Pull requests enabled":"pull_requests_enabled","Logo URL":"logo_url","Keywords":"keywords","39":"an"}', "CSV_HEADERS": '["id","host_type","name_with_owner","description","fork","created_timestamp","updated_timestamp","last_pushed_timestamp", "homepage_url","size","stars_count","language","issues_enabled","wiki_enabled","pages_enabled","forks_count","mirror_url", "open_issues_count","default_branch","watchers_count","uuid","fork_source_name_with_owner","license","contributors_count", "readme_filename","changelog_filename","contributing_guidelines_filename","license_filename","code_of_conduct_filename", "security_threat_model_filename","security_audit_filename","status","last_synced_timestamp","sourcerank","display_name", "scm_type","pull_requests_enabled","logo_url","keywords","an"]', }, - resources={ - "request_memory": "4G", - "request_cpu": "1", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) @@ -318,12 +337,14 @@ ) # Run CSV transform within kubernetes pod - transform_repositories_2 = kubernetes_pod.KubernetesPodOperator( + transform_repositories_2 = kubernetes_engine.GKEStartPodOperator( task_id="transform_repositories_2", startup_timeout_seconds=600, name="repositories", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-libraries-io-repositories", image_pull_policy="Always", image="{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}", env_vars={ @@ -338,10 +359,10 @@ "RENAME_MAPPINGS": '{"ID":"id","Host Type":"host_type","Name with Owner":"name_with_owner","Description":"description","Fork":"fork", "Created Timestamp":"created_timestamp","Updated Timestamp":"updated_timestamp","Last pushed Timestamp":"last_pushed_timestamp", "Homepage URL":"homepage_url","Size":"size","Stars Count":"stars_count","Language":"language","Issues enable":"issues_enabled", "Wiki enabled":"wiki_enabled","Pages enabled":"pages_enabled","Forks Count":"forks_count","Mirror URL":"mirror_url", "Open Issues Count":"open_issues_count","Default branch":"default_branch","Watchers Count":"watchers_count","UUID":"uuid", "Fork Source Name with Owner":"fork_source_name_with_owner","License":"license","Contributors Count":"contributors_count", "Readme filename":"readme_filename","Changelog filename":"changelog_filename","Contributing guidelines filename":"contributing_guidelines_filename", "License filename":"license_filename","Code of Conduct filename":"code_of_conduct_filename", "Security Threat Model filename":"security_threat_model_filename","Security Audit filename":"security_audit_filename", "Status":"status","Last Synced Timestamp":"last_synced_timestamp","SourceRank":"sourcerank","Display Name":"display_name", "SCM typ":"scm_type","Pull requests enabled":"pull_requests_enabled","Logo URL":"logo_url","Keywords":"keywords","39":"an"}', "CSV_HEADERS": '["id","host_type","name_with_owner","description","fork","created_timestamp","updated_timestamp","last_pushed_timestamp", "homepage_url","size","stars_count","language","issues_enabled","wiki_enabled","pages_enabled","forks_count","mirror_url", "open_issues_count","default_branch","watchers_count","uuid","fork_source_name_with_owner","license","contributors_count", "readme_filename","changelog_filename","contributing_guidelines_filename","license_filename","code_of_conduct_filename", "security_threat_model_filename","security_audit_filename","status","last_synced_timestamp","sourcerank","display_name", "scm_type","pull_requests_enabled","logo_url","keywords","an"]', }, - resources={ - "request_memory": "4G", - "request_cpu": "1", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) @@ -595,12 +616,14 @@ ) # Run CSV transform within kubernetes pod - transform_repositories_3 = kubernetes_pod.KubernetesPodOperator( + transform_repositories_3 = kubernetes_engine.GKEStartPodOperator( task_id="transform_repositories_3", startup_timeout_seconds=600, name="repositories", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-libraries-io-repositories", image_pull_policy="Always", image="{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}", env_vars={ @@ -615,10 +638,10 @@ "RENAME_MAPPINGS": '{"ID":"id","Host Type":"host_type","Name with Owner":"name_with_owner","Description":"description","Fork":"fork", "Created Timestamp":"created_timestamp","Updated Timestamp":"updated_timestamp","Last pushed Timestamp":"last_pushed_timestamp", "Homepage URL":"homepage_url","Size":"size","Stars Count":"stars_count","Language":"language","Issues enable":"issues_enabled", "Wiki enabled":"wiki_enabled","Pages enabled":"pages_enabled","Forks Count":"forks_count","Mirror URL":"mirror_url", "Open Issues Count":"open_issues_count","Default branch":"default_branch","Watchers Count":"watchers_count","UUID":"uuid", "Fork Source Name with Owner":"fork_source_name_with_owner","License":"license","Contributors Count":"contributors_count", "Readme filename":"readme_filename","Changelog filename":"changelog_filename","Contributing guidelines filename":"contributing_guidelines_filename", "License filename":"license_filename","Code of Conduct filename":"code_of_conduct_filename", "Security Threat Model filename":"security_threat_model_filename","Security Audit filename":"security_audit_filename", "Status":"status","Last Synced Timestamp":"last_synced_timestamp","SourceRank":"sourcerank","Display Name":"display_name", "SCM typ":"scm_type","Pull requests enabled":"pull_requests_enabled","Logo URL":"logo_url","Keywords":"keywords","39":"an"}', "CSV_HEADERS": '["id","host_type","name_with_owner","description","fork","created_timestamp","updated_timestamp","last_pushed_timestamp", "homepage_url","size","stars_count","language","issues_enabled","wiki_enabled","pages_enabled","forks_count","mirror_url", "open_issues_count","default_branch","watchers_count","uuid","fork_source_name_with_owner","license","contributors_count", "readme_filename","changelog_filename","contributing_guidelines_filename","license_filename","code_of_conduct_filename", "security_threat_model_filename","security_audit_filename","status","last_synced_timestamp","sourcerank","display_name", "scm_type","pull_requests_enabled","logo_url","keywords","an"]', }, - resources={ - "request_memory": "4G", - "request_cpu": "1", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) @@ -870,11 +893,21 @@ {"name": "an", "type": "string", "description": "", "mode": "nullable"}, ], ) + delete_cluster = kubernetes_engine.GKEDeleteClusterOperator( + task_id="delete_cluster", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + name="pdp-libraries-io-repositories", + ) ( bash_gcs_to_gcs + >> create_cluster >> [transform_repositories, transform_repositories_2, transform_repositories_3] - >> load_repositories_to_bq - >> load_repositories_to_bq_2 - >> load_repositories_to_bq_3 + >> delete_cluster + >> [ + load_repositories_to_bq, + load_repositories_to_bq_2, + load_repositories_to_bq_3, + ] ) diff --git a/datasets/libraries_io/pipelines/repository_dependencies/pipeline.yaml b/datasets/libraries_io/pipelines/repository_dependencies/pipeline.yaml index 525f5be7c..4ca4ad7fe 100644 --- a/datasets/libraries_io/pipelines/repository_dependencies/pipeline.yaml +++ b/datasets/libraries_io/pipelines/repository_dependencies/pipeline.yaml @@ -53,14 +53,31 @@ dag: rm /home/airflow/gcs/data/libraries_io/repository_dependencies/repository_dependencies.csv fi - - operator: "KubernetesPodOperator" + - operator: "GKECreateClusterOperator" + args: + task_id: "create_cluster" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + body: + name: pdp-libraries-io-repository-dependencies + initial_node_count: 1 + network: "{{ var.value.vpc_network }}" + node_config: + machine_type: e2-standard-16 + oauth_scopes: + - https://www.googleapis.com/auth/devstorage.read_write + - https://www.googleapis.com/auth/cloud-platform + + - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "transform_repository_dependencies" startup_timeout_seconds: 600 name: "repository_dependencies" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-libraries-io-repository-dependencies image_pull_policy: "Always" image: "{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}" env_vars: @@ -77,14 +94,16 @@ dag: "Manifest Platform":"manifest_platform","Manifest Filepath":"manifest_filepath","Git branch":"git_branch", "Manifest kind":"manifest_kind","Optional":"optional","Dependency Project Name":"dependency_project_name", "Dependency Requirements":"dependency_requirements","Dependency Kind":"dependency_kind","Dependency Project ID":"dependency_project_id"} - CSV_HEADERS: >- ["id","host_type","repository_name_with_owner","repository_id","manifest_platform","manifest_filepath","git_branch", "manifest_kind","optional","dependency_project_name","dependency_requirements","dependency_kind","dependency_project_id"] - resources: - request_memory: "4G" - request_cpu: "1" - request_ephemeral_storage: "10G" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" @@ -151,14 +170,16 @@ dag: description: "The unique primary key of the project for this dependency in the Libraries.io database." mode: "nullable" - - operator: "KubernetesPodOperator" + - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "transform_repository_dependencies_2" startup_timeout_seconds: 600 name: "repository_dependencies" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-libraries-io-repository-dependencies image_pull_policy: "Always" image: "{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}" env_vars: @@ -175,14 +196,16 @@ dag: "Manifest Platform":"manifest_platform","Manifest Filepath":"manifest_filepath","Git branch":"git_branch", "Manifest kind":"manifest_kind","Optional":"optional","Dependency Project Name":"dependency_project_name", "Dependency Requirements":"dependency_requirements","Dependency Kind":"dependency_kind","Dependency Project ID":"dependency_project_id"} - CSV_HEADERS: >- ["id","host_type","repository_name_with_owner","repository_id","manifest_platform","manifest_filepath","git_branch", "manifest_kind","optional","dependency_project_name","dependency_requirements","dependency_kind","dependency_project_id"] - resources: - request_memory: "4G" - request_cpu: "1" - request_ephemeral_storage: "10G" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" @@ -249,14 +272,16 @@ dag: description: "The unique primary key of the project for this dependency in the Libraries.io database." mode: "nullable" - - operator: "KubernetesPodOperator" + - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "transform_repository_dependencies_3" startup_timeout_seconds: 600 name: "repository_dependencies" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-libraries-io-repository-dependencies image_pull_policy: "Always" image: "{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}" env_vars: @@ -273,14 +298,16 @@ dag: "Manifest Platform":"manifest_platform","Manifest Filepath":"manifest_filepath","Git branch":"git_branch", "Manifest kind":"manifest_kind","Optional":"optional","Dependency Project Name":"dependency_project_name", "Dependency Requirements":"dependency_requirements","Dependency Kind":"dependency_kind","Dependency Project ID":"dependency_project_id"} - CSV_HEADERS: >- ["id","host_type","repository_name_with_owner","repository_id","manifest_platform","manifest_filepath","git_branch", "manifest_kind","optional","dependency_project_name","dependency_requirements","dependency_kind","dependency_project_id"] - resources: - request_memory: "4G" - request_cpu: "1" - request_ephemeral_storage: "10G" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" @@ -347,14 +374,16 @@ dag: description: "The unique primary key of the project for this dependency in the Libraries.io database." mode: "nullable" - - operator: "KubernetesPodOperator" + - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "transform_repository_dependencies_4" startup_timeout_seconds: 600 name: "repository_dependencies" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-libraries-io-repository-dependencies image_pull_policy: "Always" image: "{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}" env_vars: @@ -370,14 +399,16 @@ dag: "Manifest Platform":"manifest_platform","Manifest Filepath":"manifest_filepath","Git branch":"git_branch", "Manifest kind":"manifest_kind","Optional":"optional","Dependency Project Name":"dependency_project_name", "Dependency Requirements":"dependency_requirements","Dependency Kind":"dependency_kind","Dependency Project ID":"dependency_project_id"} - CSV_HEADERS: >- ["id","host_type","repository_name_with_owner","repository_id","manifest_platform","manifest_filepath","git_branch", "manifest_kind","optional","dependency_project_name","dependency_requirements","dependency_kind","dependency_project_id"] - resources: - request_memory: "4G" - request_cpu: "1" - request_ephemeral_storage: "10G" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" @@ -444,14 +475,16 @@ dag: description: "The unique primary key of the project for this dependency in the Libraries.io database." mode: "nullable" - - operator: "KubernetesPodOperator" + - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "transform_repository_dependencies_5" startup_timeout_seconds: 600 name: "repository_dependencies" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-libraries-io-repository-dependencies image_pull_policy: "Always" image: "{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}" env_vars: @@ -468,14 +501,16 @@ dag: "Manifest Platform":"manifest_platform","Manifest Filepath":"manifest_filepath","Git branch":"git_branch", "Manifest kind":"manifest_kind","Optional":"optional","Dependency Project Name":"dependency_project_name", "Dependency Requirements":"dependency_requirements","Dependency Kind":"dependency_kind","Dependency Project ID":"dependency_project_id"} - CSV_HEADERS: >- ["id","host_type","repository_name_with_owner","repository_id","manifest_platform","manifest_filepath","git_branch", "manifest_kind","optional","dependency_project_name","dependency_requirements","dependency_kind","dependency_project_id"] - resources: - request_memory: "4G" - request_cpu: "1" - request_ephemeral_storage: "10G" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" @@ -542,14 +577,16 @@ dag: description: "The unique primary key of the project for this dependency in the Libraries.io database." mode: "nullable" - - operator: "KubernetesPodOperator" + - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "transform_repository_dependencies_6" startup_timeout_seconds: 600 name: "repository_dependencies" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-libraries-io-repository-dependencies image_pull_policy: "Always" image: "{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}" env_vars: @@ -566,14 +603,16 @@ dag: "Manifest Platform":"manifest_platform","Manifest Filepath":"manifest_filepath","Git branch":"git_branch", "Manifest kind":"manifest_kind","Optional":"optional","Dependency Project Name":"dependency_project_name", "Dependency Requirements":"dependency_requirements","Dependency Kind":"dependency_kind","Dependency Project ID":"dependency_project_id"} - CSV_HEADERS: >- ["id","host_type","repository_name_with_owner","repository_id","manifest_platform","manifest_filepath","git_branch", "manifest_kind","optional","dependency_project_name","dependency_requirements","dependency_kind","dependency_project_id"] - resources: - request_memory: "4G" - request_cpu: "1" - request_ephemeral_storage: "10G" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" @@ -640,14 +679,16 @@ dag: description: "The unique primary key of the project for this dependency in the Libraries.io database." mode: "nullable" - - operator: "KubernetesPodOperator" + - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "transform_repository_dependencies_7" startup_timeout_seconds: 600 name: "repository_dependencies" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-libraries-io-repository-dependencies image_pull_policy: "Always" image: "{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}" env_vars: @@ -664,14 +705,16 @@ dag: "Manifest Platform":"manifest_platform","Manifest Filepath":"manifest_filepath","Git branch":"git_branch", "Manifest kind":"manifest_kind","Optional":"optional","Dependency Project Name":"dependency_project_name", "Dependency Requirements":"dependency_requirements","Dependency Kind":"dependency_kind","Dependency Project ID":"dependency_project_id"} - CSV_HEADERS: >- ["id","host_type","repository_name_with_owner","repository_id","manifest_platform","manifest_filepath","git_branch", "manifest_kind","optional","dependency_project_name","dependency_requirements","dependency_kind","dependency_project_id"] - resources: - request_memory: "4G" - request_cpu: "1" - request_ephemeral_storage: "10G" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" @@ -738,14 +781,16 @@ dag: description: "The unique primary key of the project for this dependency in the Libraries.io database." mode: "nullable" - - operator: "KubernetesPodOperator" + - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "transform_repository_dependencies_8" startup_timeout_seconds: 600 name: "repository_dependencies" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-libraries-io-repository-dependencies image_pull_policy: "Always" image: "{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}" env_vars: @@ -762,14 +807,16 @@ dag: "Manifest Platform":"manifest_platform","Manifest Filepath":"manifest_filepath","Git branch":"git_branch", "Manifest kind":"manifest_kind","Optional":"optional","Dependency Project Name":"dependency_project_name", "Dependency Requirements":"dependency_requirements","Dependency Kind":"dependency_kind","Dependency Project ID":"dependency_project_id"} - CSV_HEADERS: >- ["id","host_type","repository_name_with_owner","repository_id","manifest_platform","manifest_filepath","git_branch", "manifest_kind","optional","dependency_project_name","dependency_requirements","dependency_kind","dependency_project_id"] - resources: - request_memory: "4G" - request_cpu: "1" - request_ephemeral_storage: "10G" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" @@ -836,14 +883,16 @@ dag: description: "The unique primary key of the project for this dependency in the Libraries.io database." mode: "nullable" - - operator: "KubernetesPodOperator" + - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "transform_repository_dependencies_9" startup_timeout_seconds: 600 name: "repository_dependencies" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-libraries-io-repository-dependencies image_pull_policy: "Always" image: "{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}" env_vars: @@ -863,10 +912,13 @@ dag: CSV_HEADERS: >- ["id","host_type","repository_name_with_owner","repository_id","manifest_platform","manifest_filepath","git_branch", "manifest_kind","optional","dependency_project_name","dependency_requirements","dependency_kind","dependency_project_id"] - resources: - request_memory: "4G" - request_cpu: "1" - request_ephemeral_storage: "10G" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" @@ -933,14 +985,16 @@ dag: description: "The unique primary key of the project for this dependency in the Libraries.io database." mode: "nullable" - - operator: "KubernetesPodOperator" + - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "transform_repository_dependencies_11" startup_timeout_seconds: 600 name: "repository_dependencies" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-libraries-io-repository-dependencies image_pull_policy: "Always" image: "{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}" env_vars: @@ -961,10 +1015,13 @@ dag: CSV_HEADERS: >- ["id","host_type","repository_name_with_owner","repository_id","manifest_platform","manifest_filepath","git_branch", "manifest_kind","optional","dependency_project_name","dependency_requirements","dependency_kind","dependency_project_id"] - resources: - request_memory: "4G" - request_cpu: "1" - request_ephemeral_storage: "10G" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" @@ -1031,14 +1088,16 @@ dag: description: "The unique primary key of the project for this dependency in the Libraries.io database." mode: "nullable" - - operator: "KubernetesPodOperator" + - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "transform_repository_dependencies_10" startup_timeout_seconds: 600 name: "repository_dependencies" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-libraries-io-repository-dependencies image_pull_policy: "Always" image: "{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}" env_vars: @@ -1055,14 +1114,16 @@ dag: "Manifest Platform":"manifest_platform","Manifest Filepath":"manifest_filepath","Git branch":"git_branch", "Manifest kind":"manifest_kind","Optional":"optional","Dependency Project Name":"dependency_project_name", "Dependency Requirements":"dependency_requirements","Dependency Kind":"dependency_kind","Dependency Project ID":"dependency_project_id"} - CSV_HEADERS: >- ["id","host_type","repository_name_with_owner","repository_id","manifest_platform","manifest_filepath","git_branch", "manifest_kind","optional","dependency_project_name","dependency_requirements","dependency_kind","dependency_project_id"] - resources: - request_memory: "4G" - request_cpu: "1" - request_ephemeral_storage: "10G" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" @@ -1129,5 +1190,12 @@ dag: description: "The unique primary key of the project for this dependency in the Libraries.io database." mode: "nullable" + - operator: "GKEDeleteClusterOperator" + args: + task_id: "delete_cluster" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + name: pdp-libraries-io-repository-dependencies + graph_paths: - - "bash_gcs_to_gcs >> [transform_repository_dependencies,transform_repository_dependencies_2,transform_repository_dependencies_3,transform_repository_dependencies_4,transform_repository_dependencies_5,transform_repository_dependencies_6,transform_repository_dependencies_7,transform_repository_dependencies_8,transform_repository_dependencies_9,transform_repository_dependencies_10,transform_repository_dependencies_11] >> load_repository_dependencies_to_bq >> load_repository_dependencies_to_bq_2 >> load_repository_dependencies_to_bq_3 >> load_repository_dependencies_to_bq_4 >> load_repository_dependencies_to_bq_5 >> load_repository_dependencies_to_bq_6 >> load_repository_dependencies_to_bq_7 >> load_repository_dependencies_to_bq_8 >> load_repository_dependencies_to_bq_9 >> load_repository_dependencies_to_bq_10 >> load_repository_dependencies_to_bq_11" + - "bash_gcs_to_gcs >> create_cluster >> [transform_repository_dependencies,transform_repository_dependencies_2,transform_repository_dependencies_3,transform_repository_dependencies_4,transform_repository_dependencies_5,transform_repository_dependencies_6,transform_repository_dependencies_7,transform_repository_dependencies_8,transform_repository_dependencies_9,transform_repository_dependencies_10,transform_repository_dependencies_11] >> delete_cluster >> [load_repository_dependencies_to_bq, load_repository_dependencies_to_bq_2, load_repository_dependencies_to_bq_3, load_repository_dependencies_to_bq_4, load_repository_dependencies_to_bq_5, load_repository_dependencies_to_bq_6, load_repository_dependencies_to_bq_7, load_repository_dependencies_to_bq_8, load_repository_dependencies_to_bq_9, load_repository_dependencies_to_bq_10, load_repository_dependencies_to_bq_11]" diff --git a/datasets/libraries_io/pipelines/repository_dependencies/repository_dependencies_dag.py b/datasets/libraries_io/pipelines/repository_dependencies/repository_dependencies_dag.py index 1e49a0178..b5324c1e7 100644 --- a/datasets/libraries_io/pipelines/repository_dependencies/repository_dependencies_dag.py +++ b/datasets/libraries_io/pipelines/repository_dependencies/repository_dependencies_dag.py @@ -15,7 +15,7 @@ from airflow import DAG from airflow.operators import bash -from airflow.providers.cncf.kubernetes.operators import kubernetes_pod +from airflow.providers.google.cloud.operators import kubernetes_engine from airflow.providers.google.cloud.transfers import gcs_to_bigquery default_args = { @@ -39,14 +39,33 @@ task_id="bash_gcs_to_gcs", bash_command="if test -f /home/airflow/gcs/data/libraries_io/lib-1.6.0.tar.gz;\nthen\n mkdir /home/airflow/gcs/data/libraries_io/repository_dependencies/\n cp /home/airflow/gcs/data/libraries_io/libraries-1.4.0-2018-12-22/repository_dependencies-1.4.0-2018-12-22.csv /home/airflow/gcs/data/libraries_io/repository_dependencies/repository_dependencies.csv\n split -l 37000000 --additional-suffix=.csv /home/airflow/gcs/data/libraries_io/repository_dependencies/repository_dependencies.csv /home/airflow/gcs/data/libraries_io/repository_dependencies/\n rm /home/airflow/gcs/data/libraries_io/repository_dependencies/repository_dependencies.csv\nelse\n mkdir /home/airflow/gcs/data/libraries_io/\n curl -o /home/airflow/gcs/data/libraries_io/lib-1.6.0.tar.gz -L https://zenodo.org/record/2536573/files/Libraries.io-open-data-1.4.0.tar.gz\n tar -xf /home/airflow/gcs/data/libraries_io/lib-1.6.0.tar.gz -C /home/airflow/gcs/data/libraries_io/\n mkdir /home/airflow/gcs/data/libraries_io/repository_dependencies/\n cp /home/airflow/gcs/data/libraries_io/libraries-1.4.0-2018-12-22/repository_dependencies-1.4.0-2018-12-22.csv /home/airflow/gcs/data/libraries_io/repository_dependencies/repository_dependencies.csv\n split -l 37000000 --additional-suffix=.csv /home/airflow/gcs/data/libraries_io/repository_dependencies/repository_dependencies.csv /home/airflow/gcs/data/libraries_io/repository_dependencies/\n rm /home/airflow/gcs/data/libraries_io/repository_dependencies/repository_dependencies.csv\nfi\n", ) + create_cluster = kubernetes_engine.GKECreateClusterOperator( + task_id="create_cluster", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + body={ + "name": "pdp-libraries-io-repository-dependencies", + "initial_node_count": 1, + "network": "{{ var.value.vpc_network }}", + "node_config": { + "machine_type": "e2-standard-16", + "oauth_scopes": [ + "https://www.googleapis.com/auth/devstorage.read_write", + "https://www.googleapis.com/auth/cloud-platform", + ], + }, + }, + ) # Run CSV transform within kubernetes pod - transform_repository_dependencies = kubernetes_pod.KubernetesPodOperator( + transform_repository_dependencies = kubernetes_engine.GKEStartPodOperator( task_id="transform_repository_dependencies", startup_timeout_seconds=600, name="repository_dependencies", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-libraries-io-repository-dependencies", image_pull_policy="Always", image="{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}", env_vars={ @@ -61,10 +80,10 @@ "RENAME_MAPPINGS": '{"ID":"id","Host Type":"host_type","Repository Name with Owner":"repository_name_with_owner","Repository ID":"repository_id", "Manifest Platform":"manifest_platform","Manifest Filepath":"manifest_filepath","Git branch":"git_branch", "Manifest kind":"manifest_kind","Optional":"optional","Dependency Project Name":"dependency_project_name", "Dependency Requirements":"dependency_requirements","Dependency Kind":"dependency_kind","Dependency Project ID":"dependency_project_id"}', "CSV_HEADERS": '["id","host_type","repository_name_with_owner","repository_id","manifest_platform","manifest_filepath","git_branch", "manifest_kind","optional","dependency_project_name","dependency_requirements","dependency_kind","dependency_project_id"]', }, - resources={ - "request_memory": "4G", - "request_cpu": "1", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) @@ -163,12 +182,14 @@ ) # Run CSV transform within kubernetes pod - transform_repository_dependencies_2 = kubernetes_pod.KubernetesPodOperator( + transform_repository_dependencies_2 = kubernetes_engine.GKEStartPodOperator( task_id="transform_repository_dependencies_2", startup_timeout_seconds=600, name="repository_dependencies", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-libraries-io-repository-dependencies", image_pull_policy="Always", image="{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}", env_vars={ @@ -183,10 +204,10 @@ "RENAME_MAPPINGS": '{"ID":"id","Host Type":"host_type","Repository Name with Owner":"repository_name_with_owner","Repository ID":"repository_id", "Manifest Platform":"manifest_platform","Manifest Filepath":"manifest_filepath","Git branch":"git_branch", "Manifest kind":"manifest_kind","Optional":"optional","Dependency Project Name":"dependency_project_name", "Dependency Requirements":"dependency_requirements","Dependency Kind":"dependency_kind","Dependency Project ID":"dependency_project_id"}', "CSV_HEADERS": '["id","host_type","repository_name_with_owner","repository_id","manifest_platform","manifest_filepath","git_branch", "manifest_kind","optional","dependency_project_name","dependency_requirements","dependency_kind","dependency_project_id"]', }, - resources={ - "request_memory": "4G", - "request_cpu": "1", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) @@ -285,12 +306,14 @@ ) # Run CSV transform within kubernetes pod - transform_repository_dependencies_3 = kubernetes_pod.KubernetesPodOperator( + transform_repository_dependencies_3 = kubernetes_engine.GKEStartPodOperator( task_id="transform_repository_dependencies_3", startup_timeout_seconds=600, name="repository_dependencies", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-libraries-io-repository-dependencies", image_pull_policy="Always", image="{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}", env_vars={ @@ -305,10 +328,10 @@ "RENAME_MAPPINGS": '{"ID":"id","Host Type":"host_type","Repository Name with Owner":"repository_name_with_owner","Repository ID":"repository_id", "Manifest Platform":"manifest_platform","Manifest Filepath":"manifest_filepath","Git branch":"git_branch", "Manifest kind":"manifest_kind","Optional":"optional","Dependency Project Name":"dependency_project_name", "Dependency Requirements":"dependency_requirements","Dependency Kind":"dependency_kind","Dependency Project ID":"dependency_project_id"}', "CSV_HEADERS": '["id","host_type","repository_name_with_owner","repository_id","manifest_platform","manifest_filepath","git_branch", "manifest_kind","optional","dependency_project_name","dependency_requirements","dependency_kind","dependency_project_id"]', }, - resources={ - "request_memory": "4G", - "request_cpu": "1", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) @@ -407,12 +430,14 @@ ) # Run CSV transform within kubernetes pod - transform_repository_dependencies_4 = kubernetes_pod.KubernetesPodOperator( + transform_repository_dependencies_4 = kubernetes_engine.GKEStartPodOperator( task_id="transform_repository_dependencies_4", startup_timeout_seconds=600, name="repository_dependencies", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-libraries-io-repository-dependencies", image_pull_policy="Always", image="{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}", env_vars={ @@ -426,10 +451,10 @@ "RENAME_MAPPINGS": '{"ID":"id","Host Type":"host_type","Repository Name with Owner":"repository_name_with_owner","Repository ID":"repository_id", "Manifest Platform":"manifest_platform","Manifest Filepath":"manifest_filepath","Git branch":"git_branch", "Manifest kind":"manifest_kind","Optional":"optional","Dependency Project Name":"dependency_project_name", "Dependency Requirements":"dependency_requirements","Dependency Kind":"dependency_kind","Dependency Project ID":"dependency_project_id"}', "CSV_HEADERS": '["id","host_type","repository_name_with_owner","repository_id","manifest_platform","manifest_filepath","git_branch", "manifest_kind","optional","dependency_project_name","dependency_requirements","dependency_kind","dependency_project_id"]', }, - resources={ - "request_memory": "4G", - "request_cpu": "1", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) @@ -528,12 +553,14 @@ ) # Run CSV transform within kubernetes pod - transform_repository_dependencies_5 = kubernetes_pod.KubernetesPodOperator( + transform_repository_dependencies_5 = kubernetes_engine.GKEStartPodOperator( task_id="transform_repository_dependencies_5", startup_timeout_seconds=600, name="repository_dependencies", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-libraries-io-repository-dependencies", image_pull_policy="Always", image="{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}", env_vars={ @@ -548,10 +575,10 @@ "RENAME_MAPPINGS": '{"ID":"id","Host Type":"host_type","Repository Name with Owner":"repository_name_with_owner","Repository ID":"repository_id", "Manifest Platform":"manifest_platform","Manifest Filepath":"manifest_filepath","Git branch":"git_branch", "Manifest kind":"manifest_kind","Optional":"optional","Dependency Project Name":"dependency_project_name", "Dependency Requirements":"dependency_requirements","Dependency Kind":"dependency_kind","Dependency Project ID":"dependency_project_id"}', "CSV_HEADERS": '["id","host_type","repository_name_with_owner","repository_id","manifest_platform","manifest_filepath","git_branch", "manifest_kind","optional","dependency_project_name","dependency_requirements","dependency_kind","dependency_project_id"]', }, - resources={ - "request_memory": "4G", - "request_cpu": "1", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) @@ -650,12 +677,14 @@ ) # Run CSV transform within kubernetes pod - transform_repository_dependencies_6 = kubernetes_pod.KubernetesPodOperator( + transform_repository_dependencies_6 = kubernetes_engine.GKEStartPodOperator( task_id="transform_repository_dependencies_6", startup_timeout_seconds=600, name="repository_dependencies", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-libraries-io-repository-dependencies", image_pull_policy="Always", image="{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}", env_vars={ @@ -670,10 +699,10 @@ "RENAME_MAPPINGS": '{"ID":"id","Host Type":"host_type","Repository Name with Owner":"repository_name_with_owner","Repository ID":"repository_id", "Manifest Platform":"manifest_platform","Manifest Filepath":"manifest_filepath","Git branch":"git_branch", "Manifest kind":"manifest_kind","Optional":"optional","Dependency Project Name":"dependency_project_name", "Dependency Requirements":"dependency_requirements","Dependency Kind":"dependency_kind","Dependency Project ID":"dependency_project_id"}', "CSV_HEADERS": '["id","host_type","repository_name_with_owner","repository_id","manifest_platform","manifest_filepath","git_branch", "manifest_kind","optional","dependency_project_name","dependency_requirements","dependency_kind","dependency_project_id"]', }, - resources={ - "request_memory": "4G", - "request_cpu": "1", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) @@ -772,12 +801,14 @@ ) # Run CSV transform within kubernetes pod - transform_repository_dependencies_7 = kubernetes_pod.KubernetesPodOperator( + transform_repository_dependencies_7 = kubernetes_engine.GKEStartPodOperator( task_id="transform_repository_dependencies_7", startup_timeout_seconds=600, name="repository_dependencies", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-libraries-io-repository-dependencies", image_pull_policy="Always", image="{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}", env_vars={ @@ -792,10 +823,10 @@ "RENAME_MAPPINGS": '{"ID":"id","Host Type":"host_type","Repository Name with Owner":"repository_name_with_owner","Repository ID":"repository_id", "Manifest Platform":"manifest_platform","Manifest Filepath":"manifest_filepath","Git branch":"git_branch", "Manifest kind":"manifest_kind","Optional":"optional","Dependency Project Name":"dependency_project_name", "Dependency Requirements":"dependency_requirements","Dependency Kind":"dependency_kind","Dependency Project ID":"dependency_project_id"}', "CSV_HEADERS": '["id","host_type","repository_name_with_owner","repository_id","manifest_platform","manifest_filepath","git_branch", "manifest_kind","optional","dependency_project_name","dependency_requirements","dependency_kind","dependency_project_id"]', }, - resources={ - "request_memory": "4G", - "request_cpu": "1", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) @@ -894,12 +925,14 @@ ) # Run CSV transform within kubernetes pod - transform_repository_dependencies_8 = kubernetes_pod.KubernetesPodOperator( + transform_repository_dependencies_8 = kubernetes_engine.GKEStartPodOperator( task_id="transform_repository_dependencies_8", startup_timeout_seconds=600, name="repository_dependencies", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-libraries-io-repository-dependencies", image_pull_policy="Always", image="{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}", env_vars={ @@ -914,10 +947,10 @@ "RENAME_MAPPINGS": '{"ID":"id","Host Type":"host_type","Repository Name with Owner":"repository_name_with_owner","Repository ID":"repository_id", "Manifest Platform":"manifest_platform","Manifest Filepath":"manifest_filepath","Git branch":"git_branch", "Manifest kind":"manifest_kind","Optional":"optional","Dependency Project Name":"dependency_project_name", "Dependency Requirements":"dependency_requirements","Dependency Kind":"dependency_kind","Dependency Project ID":"dependency_project_id"}', "CSV_HEADERS": '["id","host_type","repository_name_with_owner","repository_id","manifest_platform","manifest_filepath","git_branch", "manifest_kind","optional","dependency_project_name","dependency_requirements","dependency_kind","dependency_project_id"]', }, - resources={ - "request_memory": "4G", - "request_cpu": "1", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) @@ -1016,12 +1049,14 @@ ) # Run CSV transform within kubernetes pod - transform_repository_dependencies_9 = kubernetes_pod.KubernetesPodOperator( + transform_repository_dependencies_9 = kubernetes_engine.GKEStartPodOperator( task_id="transform_repository_dependencies_9", startup_timeout_seconds=600, name="repository_dependencies", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-libraries-io-repository-dependencies", image_pull_policy="Always", image="{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}", env_vars={ @@ -1036,10 +1071,10 @@ "RENAME_MAPPINGS": '{"ID":"id","Host Type":"host_type","Repository Name with Owner":"repository_name_with_owner","Repository ID":"repository_id", "Manifest Platform":"manifest_platform","Manifest Filepath":"manifest_filepath","Git branch":"git_branch", "Manifest kind":"manifest_kind","Optional":"optional","Dependency Project Name":"dependency_project_name", "Dependency Requirements":"dependency_requirements","Dependency Kind":"dependency_kind","Dependency Project ID":"dependency_project_id"}', "CSV_HEADERS": '["id","host_type","repository_name_with_owner","repository_id","manifest_platform","manifest_filepath","git_branch", "manifest_kind","optional","dependency_project_name","dependency_requirements","dependency_kind","dependency_project_id"]', }, - resources={ - "request_memory": "4G", - "request_cpu": "1", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) @@ -1138,12 +1173,14 @@ ) # Run CSV transform within kubernetes pod - transform_repository_dependencies_11 = kubernetes_pod.KubernetesPodOperator( + transform_repository_dependencies_11 = kubernetes_engine.GKEStartPodOperator( task_id="transform_repository_dependencies_11", startup_timeout_seconds=600, name="repository_dependencies", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-libraries-io-repository-dependencies", image_pull_policy="Always", image="{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}", env_vars={ @@ -1158,10 +1195,10 @@ "RENAME_MAPPINGS": '{"ID":"id","Host Type":"host_type","Repository Name with Owner":"repository_name_with_owner","Repository ID":"repository_id", "Manifest Platform":"manifest_platform","Manifest Filepath":"manifest_filepath","Git branch":"git_branch", "Manifest kind":"manifest_kind","Optional":"optional","Dependency Project Name":"dependency_project_name", "Dependency Requirements":"dependency_requirements","Dependency Kind":"dependency_kind","Dependency Project ID":"dependency_project_id"}', "CSV_HEADERS": '["id","host_type","repository_name_with_owner","repository_id","manifest_platform","manifest_filepath","git_branch", "manifest_kind","optional","dependency_project_name","dependency_requirements","dependency_kind","dependency_project_id"]', }, - resources={ - "request_memory": "4G", - "request_cpu": "1", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) @@ -1260,12 +1297,14 @@ ) # Run CSV transform within kubernetes pod - transform_repository_dependencies_10 = kubernetes_pod.KubernetesPodOperator( + transform_repository_dependencies_10 = kubernetes_engine.GKEStartPodOperator( task_id="transform_repository_dependencies_10", startup_timeout_seconds=600, name="repository_dependencies", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-libraries-io-repository-dependencies", image_pull_policy="Always", image="{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}", env_vars={ @@ -1280,10 +1319,10 @@ "RENAME_MAPPINGS": '{"ID":"id","Host Type":"host_type","Repository Name with Owner":"repository_name_with_owner","Repository ID":"repository_id", "Manifest Platform":"manifest_platform","Manifest Filepath":"manifest_filepath","Git branch":"git_branch", "Manifest kind":"manifest_kind","Optional":"optional","Dependency Project Name":"dependency_project_name", "Dependency Requirements":"dependency_requirements","Dependency Kind":"dependency_kind","Dependency Project ID":"dependency_project_id"}', "CSV_HEADERS": '["id","host_type","repository_name_with_owner","repository_id","manifest_platform","manifest_filepath","git_branch", "manifest_kind","optional","dependency_project_name","dependency_requirements","dependency_kind","dependency_project_id"]', }, - resources={ - "request_memory": "4G", - "request_cpu": "1", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) @@ -1380,9 +1419,16 @@ }, ], ) + delete_cluster = kubernetes_engine.GKEDeleteClusterOperator( + task_id="delete_cluster", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + name="pdp-libraries-io-repository-dependencies", + ) ( bash_gcs_to_gcs + >> create_cluster >> [ transform_repository_dependencies, transform_repository_dependencies_2, @@ -1396,15 +1442,18 @@ transform_repository_dependencies_10, transform_repository_dependencies_11, ] - >> load_repository_dependencies_to_bq - >> load_repository_dependencies_to_bq_2 - >> load_repository_dependencies_to_bq_3 - >> load_repository_dependencies_to_bq_4 - >> load_repository_dependencies_to_bq_5 - >> load_repository_dependencies_to_bq_6 - >> load_repository_dependencies_to_bq_7 - >> load_repository_dependencies_to_bq_8 - >> load_repository_dependencies_to_bq_9 - >> load_repository_dependencies_to_bq_10 - >> load_repository_dependencies_to_bq_11 + >> delete_cluster + >> [ + load_repository_dependencies_to_bq, + load_repository_dependencies_to_bq_2, + load_repository_dependencies_to_bq_3, + load_repository_dependencies_to_bq_4, + load_repository_dependencies_to_bq_5, + load_repository_dependencies_to_bq_6, + load_repository_dependencies_to_bq_7, + load_repository_dependencies_to_bq_8, + load_repository_dependencies_to_bq_9, + load_repository_dependencies_to_bq_10, + load_repository_dependencies_to_bq_11, + ] ) diff --git a/datasets/libraries_io/pipelines/tags/pipeline.yaml b/datasets/libraries_io/pipelines/tags/pipeline.yaml index b6e764813..fa0d07201 100644 --- a/datasets/libraries_io/pipelines/tags/pipeline.yaml +++ b/datasets/libraries_io/pipelines/tags/pipeline.yaml @@ -53,14 +53,31 @@ dag: rm /home/airflow/gcs/data/libraries_io/tags/tags.csv fi - - operator: "KubernetesPodOperator" + - operator: "GKECreateClusterOperator" + args: + task_id: "create_cluster" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + body: + name: pdp-libraries-io-tags + initial_node_count: 1 + network: "{{ var.value.vpc_network }}" + node_config: + machine_type: e2-standard-16 + oauth_scopes: + - https://www.googleapis.com/auth/devstorage.read_write + - https://www.googleapis.com/auth/cloud-platform + + - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "transform_tags" startup_timeout_seconds: 600 name: "tags" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-libraries-io-tags image_pull_policy: "Always" image: "{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}" env_vars: @@ -78,10 +95,13 @@ dag: "Tag Created Timestamp":"tag_created_timestamp","Tag Updated Timestamp":"tag_updated_timestamp"} CSV_HEADERS: >- ["id","host_type","repository_name_with_owner","repository_id","tag_name","tag_git_sha","tag_published_timestamp","tag_created_timestamp","tag_updated_timestamp"] - resources: - request_memory: "4G" - request_cpu: "1" - request_ephemeral_storage: "10G" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" @@ -132,14 +152,16 @@ dag: description: "The timestamp of when the tag was last saved by Libraries.io." mode: "nullable" - - operator: "KubernetesPodOperator" + - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "transform_tags_2" startup_timeout_seconds: 600 name: "tags" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-libraries-io-tags image_pull_policy: "Always" image: "{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}" env_vars: @@ -157,10 +179,13 @@ dag: "Tag Created Timestamp":"tag_created_timestamp","Tag Updated Timestamp":"tag_updated_timestamp"} CSV_HEADERS: >- ["id","host_type","repository_name_with_owner","repository_id","tag_name","tag_git_sha","tag_published_timestamp","tag_created_timestamp","tag_updated_timestamp"] - resources: - request_memory: "4G" - request_cpu: "1" - request_ephemeral_storage: "10G" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" @@ -211,14 +236,16 @@ dag: description: "The timestamp of when the tag was last saved by Libraries.io." mode: "nullable" - - operator: "KubernetesPodOperator" + - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "transform_tags_3" startup_timeout_seconds: 600 name: "tags" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-libraries-io-tags image_pull_policy: "Always" image: "{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}" env_vars: @@ -236,10 +263,13 @@ dag: "Tag Created Timestamp":"tag_created_timestamp","Tag Updated Timestamp":"tag_updated_timestamp"} CSV_HEADERS: >- ["id","host_type","repository_name_with_owner","repository_id","tag_name","tag_git_sha","tag_published_timestamp","tag_created_timestamp","tag_updated_timestamp"] - resources: - request_memory: "4G" - request_cpu: "1" - request_ephemeral_storage: "10G" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" @@ -290,5 +320,12 @@ dag: description: "The timestamp of when the tag was last saved by Libraries.io." mode: "nullable" + - operator: "GKEDeleteClusterOperator" + args: + task_id: "delete_cluster" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + name: pdp-libraries-io-tags + graph_paths: - - "bash_gcs_to_gcs >> [transform_tags,transform_tags_2,transform_tags_3] >> load_tags_to_bq >> load_tags_to_bq_2 >> load_tags_to_bq_3" + - "bash_gcs_to_gcs >> create_cluster >> [transform_tags,transform_tags_2,transform_tags_3] >> delete_cluster >> [load_tags_to_bq, load_tags_to_bq_2, load_tags_to_bq_3]" diff --git a/datasets/libraries_io/pipelines/tags/tags_dag.py b/datasets/libraries_io/pipelines/tags/tags_dag.py index 6dc4987df..ecf565cf0 100644 --- a/datasets/libraries_io/pipelines/tags/tags_dag.py +++ b/datasets/libraries_io/pipelines/tags/tags_dag.py @@ -15,7 +15,7 @@ from airflow import DAG from airflow.operators import bash -from airflow.providers.cncf.kubernetes.operators import kubernetes_pod +from airflow.providers.google.cloud.operators import kubernetes_engine from airflow.providers.google.cloud.transfers import gcs_to_bigquery default_args = { @@ -39,14 +39,33 @@ task_id="bash_gcs_to_gcs", bash_command="if test -f /home/airflow/gcs/data/libraries_io/lib-1.6.0.tar.gz;\nthen\n mkdir /home/airflow/gcs/data/libraries_io/tags/\n cp /home/airflow/gcs/data/libraries_io/libraries-1.4.0-2018-12-22/tags-1.4.0-2018-12-22.csv /home/airflow/gcs/data/libraries_io/tags/tags.csv\n split -l 20000000 --additional-suffix=.csv /home/airflow/gcs/data/libraries_io/tags/tags.csv /home/airflow/gcs/data/libraries_io/tags/\n rm /home/airflow/gcs/data/libraries_io/tags/tags.csv\nelse\n mkdir /home/airflow/gcs/data/libraries_io/\n curl -o /home/airflow/gcs/data/libraries_io/lib-1.6.0.tar.gz -L https://zenodo.org/record/2536573/files/Libraries.io-open-data-1.4.0.tar.gz\n tar -xf /home/airflow/gcs/data/libraries_io/lib-1.6.0.tar.gz -C /home/airflow/gcs/data/libraries_io/\n mkdir /home/airflow/gcs/data/libraries_io/tags/\n cp /home/airflow/gcs/data/libraries_io/libraries-1.4.0-2018-12-22/tags-1.4.0-2018-12-22.csv /home/airflow/gcs/data/libraries_io/tags/tags.csv\n split -l 20000000 --additional-suffix=.csv /home/airflow/gcs/data/libraries_io/tags/tags.csv /home/airflow/gcs/data/libraries_io/tags/\n rm /home/airflow/gcs/data/libraries_io/tags/tags.csv\nfi\n", ) + create_cluster = kubernetes_engine.GKECreateClusterOperator( + task_id="create_cluster", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + body={ + "name": "pdp-libraries-io-tags", + "initial_node_count": 1, + "network": "{{ var.value.vpc_network }}", + "node_config": { + "machine_type": "e2-standard-16", + "oauth_scopes": [ + "https://www.googleapis.com/auth/devstorage.read_write", + "https://www.googleapis.com/auth/cloud-platform", + ], + }, + }, + ) # Run CSV transform within kubernetes pod - transform_tags = kubernetes_pod.KubernetesPodOperator( + transform_tags = kubernetes_engine.GKEStartPodOperator( task_id="transform_tags", startup_timeout_seconds=600, name="tags", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-libraries-io-tags", image_pull_policy="Always", image="{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}", env_vars={ @@ -61,10 +80,10 @@ "RENAME_MAPPINGS": '{"ID":"id","Host Type":"host_type","Repository Name with Owner":"repository_name_with_owner","Repository ID":"repository_id", "Tag Name":"tag_name","Tag git sha":"tag_git_sha","Tag Published Timestamp":"tag_published_timestamp", "Tag Created Timestamp":"tag_created_timestamp","Tag Updated Timestamp":"tag_updated_timestamp"}', "CSV_HEADERS": '["id","host_type","repository_name_with_owner","repository_id","tag_name","tag_git_sha","tag_published_timestamp","tag_created_timestamp","tag_updated_timestamp"]', }, - resources={ - "request_memory": "4G", - "request_cpu": "1", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) @@ -137,12 +156,14 @@ ) # Run CSV transform within kubernetes pod - transform_tags_2 = kubernetes_pod.KubernetesPodOperator( + transform_tags_2 = kubernetes_engine.GKEStartPodOperator( task_id="transform_tags_2", startup_timeout_seconds=600, name="tags", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-libraries-io-tags", image_pull_policy="Always", image="{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}", env_vars={ @@ -157,10 +178,10 @@ "RENAME_MAPPINGS": '{"ID":"id","Host Type":"host_type","Repository Name with Owner":"repository_name_with_owner","Repository ID":"repository_id", "Tag Name":"tag_name","Tag git sha":"tag_git_sha","Tag Published Timestamp":"tag_published_timestamp", "Tag Created Timestamp":"tag_created_timestamp","Tag Updated Timestamp":"tag_updated_timestamp"}', "CSV_HEADERS": '["id","host_type","repository_name_with_owner","repository_id","tag_name","tag_git_sha","tag_published_timestamp","tag_created_timestamp","tag_updated_timestamp"]', }, - resources={ - "request_memory": "4G", - "request_cpu": "1", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) @@ -233,12 +254,14 @@ ) # Run CSV transform within kubernetes pod - transform_tags_3 = kubernetes_pod.KubernetesPodOperator( + transform_tags_3 = kubernetes_engine.GKEStartPodOperator( task_id="transform_tags_3", startup_timeout_seconds=600, name="tags", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-libraries-io-tags", image_pull_policy="Always", image="{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}", env_vars={ @@ -253,10 +276,10 @@ "RENAME_MAPPINGS": '{"ID":"id","Host Type":"host_type","Repository Name with Owner":"repository_name_with_owner","Repository ID":"repository_id", "Tag Name":"tag_name","Tag git sha":"tag_git_sha","Tag Published Timestamp":"tag_published_timestamp", "Tag Created Timestamp":"tag_created_timestamp","Tag Updated Timestamp":"tag_updated_timestamp"}', "CSV_HEADERS": '["id","host_type","repository_name_with_owner","repository_id","tag_name","tag_git_sha","tag_published_timestamp","tag_created_timestamp","tag_updated_timestamp"]', }, - resources={ - "request_memory": "4G", - "request_cpu": "1", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) @@ -327,11 +350,17 @@ }, ], ) + delete_cluster = kubernetes_engine.GKEDeleteClusterOperator( + task_id="delete_cluster", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + name="pdp-libraries-io-tags", + ) ( bash_gcs_to_gcs + >> create_cluster >> [transform_tags, transform_tags_2, transform_tags_3] - >> load_tags_to_bq - >> load_tags_to_bq_2 - >> load_tags_to_bq_3 + >> delete_cluster + >> [load_tags_to_bq, load_tags_to_bq_2, load_tags_to_bq_3] ) diff --git a/datasets/libraries_io/pipelines/versions/pipeline.yaml b/datasets/libraries_io/pipelines/versions/pipeline.yaml index ed00ca4f7..e6220b16b 100644 --- a/datasets/libraries_io/pipelines/versions/pipeline.yaml +++ b/datasets/libraries_io/pipelines/versions/pipeline.yaml @@ -49,14 +49,31 @@ dag: cp /home/airflow/gcs/data/libraries_io/libraries-1.4.0-2018-12-22/versions-1.4.0-2018-12-22.csv /home/airflow/gcs/data/libraries_io/versions/versions.csv fi - - operator: "KubernetesPodOperator" + - operator: "GKECreateClusterOperator" + args: + task_id: "create_cluster" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + body: + name: pdp-libraries-io-versions + initial_node_count: 1 + network: "{{ var.value.vpc_network }}" + node_config: + machine_type: e2-standard-16 + oauth_scopes: + - https://www.googleapis.com/auth/devstorage.read_write + - https://www.googleapis.com/auth/cloud-platform + + - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "transform_versions" startup_timeout_seconds: 600 name: "versions" - namespace: "composer" - service_account_name: "datasets" + namespace: "default" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + cluster_name: pdp-libraries-io-versions image_pull_policy: "Always" image: "{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}" env_vars: @@ -73,10 +90,13 @@ dag: "Published Timestamp":"published_timestamp","Created Timestamp":"created_timestamp","Updated Timestamp":"updated_timestamp"} CSV_HEADERS: >- ["id","platform","project_name","project_id","number","published_timestamp","created_timestamp","updated_timestamp"] - resources: - request_memory: "4G" - request_cpu: "1" - request_ephemeral_storage: "10G" + container_resources: + memory: + request: "16Gi" + cpu: + request: "1" + ephemeral-storage: + request: "10Gi" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" @@ -123,5 +143,12 @@ dag: description: "The timestamp of when the version was last saved by Libraries.io." mode: "nullable" + - operator: "GKEDeleteClusterOperator" + args: + task_id: "delete_cluster" + project_id: "{{ var.value.gcp_project }}" + location: "us-central1-c" + name: pdp-libraries-io-versions + graph_paths: - - "bash_gcs_to_gcs >> transform_versions >> load_versions_to_bq" + - "bash_gcs_to_gcs >> create_cluster >> transform_versions >> delete_cluster >> load_versions_to_bq" diff --git a/datasets/libraries_io/pipelines/versions/versions_dag.py b/datasets/libraries_io/pipelines/versions/versions_dag.py index 010dba8ee..7e7c8fe8c 100644 --- a/datasets/libraries_io/pipelines/versions/versions_dag.py +++ b/datasets/libraries_io/pipelines/versions/versions_dag.py @@ -15,7 +15,7 @@ from airflow import DAG from airflow.operators import bash -from airflow.providers.cncf.kubernetes.operators import kubernetes_pod +from airflow.providers.google.cloud.operators import kubernetes_engine from airflow.providers.google.cloud.transfers import gcs_to_bigquery default_args = { @@ -39,14 +39,33 @@ task_id="bash_gcs_to_gcs", bash_command="if test -f /home/airflow/gcs/data/libraries_io/lib-1.6.0.tar.gz;\nthen\n mkdir /home/airflow/gcs/data/libraries_io/versions/\n cp /home/airflow/gcs/data/libraries_io/libraries-1.4.0-2018-12-22/versions-1.4.0-2018-12-22.csv /home/airflow/gcs/data/libraries_io/versions/versions.csv\nelse\n mkdir /home/airflow/gcs/data/libraries_io/\n curl -o /home/airflow/gcs/data/libraries_io/lib-1.6.0.tar.gz -L https://zenodo.org/record/2536573/files/Libraries.io-open-data-1.4.0.tar.gz\n tar -xf /home/airflow/gcs/data/libraries_io/lib-1.6.0.tar.gz -C /home/airflow/gcs/data/libraries_io/\n mkdir /home/airflow/gcs/data/libraries_io/versions/\n cp /home/airflow/gcs/data/libraries_io/libraries-1.4.0-2018-12-22/versions-1.4.0-2018-12-22.csv /home/airflow/gcs/data/libraries_io/versions/versions.csv\nfi\n", ) + create_cluster = kubernetes_engine.GKECreateClusterOperator( + task_id="create_cluster", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + body={ + "name": "pdp-libraries-io-versions", + "initial_node_count": 1, + "network": "{{ var.value.vpc_network }}", + "node_config": { + "machine_type": "e2-standard-16", + "oauth_scopes": [ + "https://www.googleapis.com/auth/devstorage.read_write", + "https://www.googleapis.com/auth/cloud-platform", + ], + }, + }, + ) # Run CSV transform within kubernetes pod - transform_versions = kubernetes_pod.KubernetesPodOperator( + transform_versions = kubernetes_engine.GKEStartPodOperator( task_id="transform_versions", startup_timeout_seconds=600, name="versions", - namespace="composer", - service_account_name="datasets", + namespace="default", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + cluster_name="pdp-libraries-io-versions", image_pull_policy="Always", image="{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}", env_vars={ @@ -61,10 +80,10 @@ "RENAME_MAPPINGS": '{"ID":"id","Platform":"platform","Project Name":"project_name","Project ID":"project_id","Number":"number", "Published Timestamp":"published_timestamp","Created Timestamp":"created_timestamp","Updated Timestamp":"updated_timestamp"}', "CSV_HEADERS": '["id","platform","project_name","project_id","number","published_timestamp","created_timestamp","updated_timestamp"]', }, - resources={ - "request_memory": "4G", - "request_cpu": "1", - "request_ephemeral_storage": "10G", + container_resources={ + "memory": {"request": "16Gi"}, + "cpu": {"request": "1"}, + "ephemeral-storage": {"request": "10Gi"}, }, ) @@ -129,5 +148,17 @@ }, ], ) + delete_cluster = kubernetes_engine.GKEDeleteClusterOperator( + task_id="delete_cluster", + project_id="{{ var.value.gcp_project }}", + location="us-central1-c", + name="pdp-libraries-io-versions", + ) - bash_gcs_to_gcs >> transform_versions >> load_versions_to_bq + ( + bash_gcs_to_gcs + >> create_cluster + >> transform_versions + >> delete_cluster + >> load_versions_to_bq + )