Skip to content

Commit

Permalink
Merge pull request #34 from Ferlab-Ste-Justine/main
Browse files Browse the repository at this point in the history
Update dags prod
  • Loading branch information
adipaul1981 authored Oct 1, 2024
2 parents 6f1466e + c257764 commit ccdabd6
Show file tree
Hide file tree
Showing 22 changed files with 499 additions and 230 deletions.
1 change: 0 additions & 1 deletion .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ ID_SERVICE_URL=http://id-service:5000
OBO_PARSER_IMAGE=index.docker.io/ferlabcrsj/obo-parser
ETL_IMAGE=index.docker.io/ferlabcrsj/etl-cqdg-portal
FHAVRO_EXPORT_IMAGE=index.docker.io/ferlabcrsj/fhavro-export
ARRANGER_IMAGE=index.docker.io/ferlabcrsj/cqdg-api-arranger
FHIR_IMPORT_IMAGE=index.docker.io/ferlabcrsj/cqdg-fhir-import
FERLOAD_DRS_IMPORT_IMAGE=index.docker.io/ferlabcrsj/cqdg-ferload-drs-import
ENVIRONMENT=qa
36 changes: 0 additions & 36 deletions dags/arranger.py

This file was deleted.

4 changes: 3 additions & 1 deletion dags/es_templates_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from kubernetes.client import models as k8s

from lib.config import datalake_bucket, kube_config, aws_endpoint, aws_secret_name, aws_secret_access_key, aws_secret_secret_key
from lib.slack import Slack

script = f"""
#!/bin/bash
Expand Down Expand Up @@ -74,5 +75,6 @@ def es_templates_update():
key=aws_secret_secret_key,
),
),
), ]
), ],
on_failure_callback=Slack.notify_task_failure
)
75 changes: 75 additions & 0 deletions dags/etl-variant.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from datetime import datetime

from airflow import DAG
from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator
from airflow.utils.task_group import TaskGroup

from es_templates_update import es_templates_update
from etl_enrich_specimens import etl_enrich_specimens
from etl_enrich_variants import variant_task_enrich_variants, variant_task_enrich_consequences
from etl_index_variants import index_variants
from etl_normalize_variants import extract_params, normalized_etl
from etl_prepare_index_variants import etl_variant_prepared
from etl_publish_variants import publish_task
from lib.slack import Slack

with DAG(
dag_id='etl-variant',
start_date=datetime(2022, 1, 1),
schedule_interval=None,
# concurrency set to 1, only one task can run at a time to avoid conflicts in Delta table
concurrency=1,
params={
'study_code': Param('CAG', type='string'),
'owner': Param('jmichaud', type='string'),
'dateset_batches': Param(
[
{'dataset': 'dataset_dataset1', 'batches': ['annotated_vcf1','annotated_vcf2']},
{'dataset': 'dataset_dataset2', 'batches': ['annotated_vcf']}
],
schema = {
"type": "array",
"minItems": 1,
"items": {
"type": "object",
"default": {'dataset': 'dataset_default', 'batches': ['annotated_vcf']},
"properties": {
"dataset": {"type": "string"},
"batches": {"type": "array", "items": {"type": "string"}},
},
"required": ["dataset", "batches"]
},
}
),
'release_id': Param('7', type='string'),
'project': Param('cqdg', type='string'),
'es_port': Param('9200', type='string'),
},
) as dag:
params = extract_params()

with TaskGroup(group_id='normalize') as normalize:
normalized_etl(run_time_params = params, name='snv') >> normalized_etl(run_time_params = params, name='consequences')

with TaskGroup(group_id='enrich') as enrich:
variant_task_enrich_variants() >> variant_task_enrich_consequences()

with TaskGroup(group_id='prepared') as prepared:
etl_variant_prepared('variant_centric') >> etl_variant_prepared('gene_centric') >> etl_variant_prepared('variant_suggestions') >> etl_variant_prepared('gene_suggestions')

with TaskGroup(group_id='index') as index:
index_variants()

start = EmptyOperator(
task_id="start",
on_success_callback=Slack.notify_dag_start
)

slack = EmptyOperator(
task_id="slack",
on_success_callback=Slack.notify_dag_completion
)


start >> etl_enrich_specimens() >> normalize >> enrich >> prepared >> es_templates_update() >> index >> publish_task('variant_centric,variant_suggestions,gene_centric,gene_suggestions') >> slack
34 changes: 18 additions & 16 deletions dags/etl.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
from datetime import datetime

from airflow import DAG
from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime
from lib.config import env, study_codes, release_id, es_port, es_url, etl_publish_config, project

from es_templates_update import es_templates_update
from etl_import import etl_import
from arranger import arranger_task
from etl_fhavro_export import fhavro_export
from etl_import import etl_import
from etl_index import index_operator
from etl_prepare_index import prepare_index
from etl_publish import publish_task
from lib.slack import Slack

with DAG(
dag_id='etl',
Expand All @@ -22,18 +25,17 @@
},
) as dag:

start = EmptyOperator(
task_id="start",
on_success_callback=Slack.notify_dag_start
)

slack = EmptyOperator(
task_id="slack",
on_success_callback=Slack.notify_dag_completion
)

with TaskGroup(group_id='index') as index:
index_operator('study') >> index_operator('participant') >> index_operator('file') >> index_operator('biospecimen')

with TaskGroup(group_id='publish') as publish:
def publish_operator(name:str):
return etl_publish_config \
.args(es_url, es_port, env, project, release_id, study_codes, f'{name}_centric') \
.operator(
task_id=f'{name}_centric',
name=f'etl-publish-{name}-centric'
)

publish_operator('study') >> publish_operator('participant') >> publish_operator('file') >> publish_operator('biospecimen')

fhavro_export() >> etl_import() >> prepare_index() >> es_templates_update() >> index >> publish >> arranger_task()
start >> fhavro_export() >> etl_import() >> prepare_index() >> es_templates_update() >> index >> publish_task('study_centric,participant_centric,file_centric,biospecimen_centric') >> slack
40 changes: 23 additions & 17 deletions dags/etl_enrich_specimens.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,28 @@
from datetime import datetime

from airflow import DAG
from airflow.models.param import Param
from datetime import datetime
from lib.config import default_config_file, study_codes

from etl_prepare_index import etl_base_config, spark_small_conf, prepare_index_jar
from lib.config import default_config_file, study_code
from lib.slack import Slack


def etl_enrich_specimens():
return etl_base_config \
.add_spark_conf(spark_small_conf) \
.with_spark_jar(prepare_index_jar) \
.with_spark_class('bio.ferlab.fhir.etl.Enrich') \
.args(
'--config', default_config_file,
'--steps', 'default',
'--app-name', 'enrich_specimen',
'--study-id', study_code,
).operator(
task_id='enrich-specimen',
name='etl-enrich-specimen',
on_failure_callback=Slack.notify_task_failure
)

with DAG(
dag_id='etl-enrich-specimen',
Expand All @@ -13,19 +33,5 @@
'project': Param('cqdg', type='string'),
},
) as dag:

etl_base_config \
.add_spark_conf(spark_small_conf) \
.with_spark_jar(prepare_index_jar) \
.with_spark_class('bio.ferlab.fhir.etl.Enrich') \
.args(
'--config', default_config_file,
'--steps', 'default',
'--app-name', 'enrich_specimen',
'--study-id', study_codes
).operator(
task_id='enrich-specimen',
name='etl-enrich-specimen'
)

etl_enrich_specimens()

32 changes: 18 additions & 14 deletions dags/etl_enrich_variants.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,36 @@
from datetime import datetime

from airflow import DAG
from airflow.models.param import Param
from datetime import datetime

from lib.config import etl_variant_config, default_config_file
from lib.operators.spark import SparkOperator
from lib.slack import Slack

etl_variant_enrich_config = etl_variant_config \
.with_spark_class('bio.ferlab.etl.enriched.RunEnrichGenomic') \
.args('--config', default_config_file,
'--steps', 'default'
)
with DAG(
dag_id='etl-enrich-variants',
start_date=datetime(2022, 1, 1),
schedule_interval=None,
params={
'project': Param('cqdg', type='string'),
},
) as dag:

variant_task_enrich_variants = etl_variant_enrich_config.prepend_args('variants').operator(
def variant_task_enrich_variants():
return etl_variant_enrich_config.prepend_args('variants').operator(
task_id='variant_task_variant_enrich_variants',
name='etl-variant_task_variant_enrich_variants'
)

variant_task_enrich_consequences = etl_variant_enrich_config.prepend_args('consequences').operator(
def variant_task_enrich_consequences():
return etl_variant_enrich_config.prepend_args('consequences').operator(
task_id='variant_task_variant_enrich_consequences',
name='etl-variant_task_variant_enrich_consequences'
)


variant_task_enrich_variants >> variant_task_enrich_consequences
with DAG(
dag_id='etl-enrich-variants',
start_date=datetime(2022, 1, 1),
schedule_interval=None,
params={
'project': Param('cqdg', type='string'),
},
on_failure_callback=Slack.notify_task_failure
) as dag:
variant_task_enrich_variants() >> variant_task_enrich_consequences()
4 changes: 3 additions & 1 deletion dags/etl_fhavro_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from lib.config import env, fhir_url, datalake_bucket, keycloak_client_secret_name, keycloak_url, \
aws_secret_name, aws_secret_access_key, aws_secret_secret_key, kube_config, aws_endpoint, study_codes
from lib.operators.fhavro import FhavroConfig
from lib.slack import Slack

fhavro_config = FhavroConfig(
fhir_url=fhir_url,
Expand All @@ -22,7 +23,8 @@
def fhavro_export():
return fhavro_config.args(study_codes, env).operator(
task_id='fhavro_export',
name='etl-fhavro_export'
name='etl-fhavro_export',
on_failure_callback=Slack.notify_task_failure
)

with DAG(
Expand Down
14 changes: 8 additions & 6 deletions dags/etl_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@
from airflow.models.param import Param
from datetime import datetime
from lib.config import etl_base_config, spark_small_conf, import_jar, default_config_file, study_codes
from lib.slack import Slack


def etl_import():
return etl_base_config \
.with_spark_class('bio.ferlab.fhir.etl.ImportTask') \
.with_spark_jar(import_jar) \
.add_spark_conf(spark_small_conf) \
.args(default_config_file, 'default', study_codes) \
.operator(
task_id='import_task',
name='etl-import-task'
)
.add_spark_conf(spark_small_conf) \
.args(default_config_file, 'default', study_codes) \
.operator(
task_id='import_task',
name='etl-import-task',
on_failure_callback=Slack.notify_task_failure
)

with DAG(
dag_id='etl-import',
Expand Down
Loading

0 comments on commit ccdabd6

Please sign in to comment.