Skip to content

Commit

Permalink
feat: CDDG-719 etl variant and variant publish
Browse files Browse the repository at this point in the history
  • Loading branch information
adipaul1981 committed Aug 21, 2024
1 parent 40a93c7 commit cebf189
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 78 deletions.
44 changes: 44 additions & 0 deletions dags/etl-variant.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from datetime import datetime

from airflow import DAG
from airflow.models.param import Param
from airflow.utils.task_group import TaskGroup

from es_templates_update import es_templates_update
from etl_enrich_specimens import etl_enrich_specimens
from etl_enrich_variants import variant_task_enrich_variants, variant_task_enrich_consequences
from etl_index import index_operator
from etl_index_variants import index_variants
from etl_normalize_variants import normalize_variant_operator
from etl_prepare_index_variants import etl_variant_prepared
from etl_publish import publish_task

with DAG(
dag_id='etl-variant',
start_date=datetime(2022, 1, 1),
schedule_interval=None,
params={
'study_code': Param('CAG', type='string'), #FIXME study Codes vs study code !!!
'owner': Param('jmichaud', type='string'),
'dataset': Param('dataset_default', type='string'),
'batch': Param('annotated_vcf', type='string'),
'release_id': Param('7', type='string'),
'study_codes': Param('CAG', type='string'),
'project': Param('cqdg', type='string'),
'es_port': Param('9200', type='string')
},
) as dag:

with TaskGroup(group_id='normalize') as normalize:
normalize_variant_operator('snv') >> normalize_variant_operator('consequences')

with TaskGroup(group_id='enrich') as enrich:
variant_task_enrich_variants() >> variant_task_enrich_consequences()

with TaskGroup(group_id='prepared') as prepared:
etl_variant_prepared('variant_centric') >> etl_variant_prepared('gene_centric') >> etl_variant_prepared('variant_suggestions') >> etl_variant_prepared('gene_suggestions')

with TaskGroup(group_id='index') as index:
index_variants()

etl_enrich_specimens() >> normalize >> enrich >> prepared >> es_templates_update() >> index >> publish_task('study_centric,participant_centric,file_centric,biospecimen_centric')
33 changes: 17 additions & 16 deletions dags/etl_enrich_specimens.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
from airflow import DAG
from airflow.models.param import Param
from datetime import datetime
from lib.config import default_config_file, study_codes
from lib.config import default_config_file, study_code, study_codes
from etl_prepare_index import etl_base_config, spark_small_conf, prepare_index_jar

def etl_enrich_specimens():
return etl_base_config \
.add_spark_conf(spark_small_conf) \
.with_spark_jar(prepare_index_jar) \
.with_spark_class('bio.ferlab.fhir.etl.Enrich') \
.args(
'--config', default_config_file,
'--steps', 'default',
'--app-name', 'enrich_specimen',
'--study-id', study_code or study_codes
).operator(
task_id='enrich-specimen',
name='etl-enrich-specimen'
)

with DAG(
dag_id='etl-enrich-specimen',
start_date=datetime(2022, 1, 1),
Expand All @@ -13,19 +28,5 @@
'project': Param('cqdg', type='string'),
},
) as dag:

etl_base_config \
.add_spark_conf(spark_small_conf) \
.with_spark_jar(prepare_index_jar) \
.with_spark_class('bio.ferlab.fhir.etl.Enrich') \
.args(
'--config', default_config_file,
'--steps', 'default',
'--app-name', 'enrich_specimen',
'--study-id', study_codes
).operator(
task_id='enrich-specimen',
name='etl-enrich-specimen'
)

etl_enrich_specimens()

25 changes: 13 additions & 12 deletions dags/etl_enrich_variants.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,25 @@
.args('--config', default_config_file,
'--steps', 'default'
)
with DAG(
dag_id='etl-enrich-variants',
start_date=datetime(2022, 1, 1),
schedule_interval=None,
params={
'project': Param('cqdg', type='string'),
},
) as dag:

variant_task_enrich_variants = etl_variant_enrich_config.prepend_args('variants').operator(
def variant_task_enrich_variants():
return etl_variant_enrich_config.prepend_args('variants').operator(
task_id='variant_task_variant_enrich_variants',
name='etl-variant_task_variant_enrich_variants'
)

variant_task_enrich_consequences = etl_variant_enrich_config.prepend_args('consequences').operator(
def variant_task_enrich_consequences():
return etl_variant_enrich_config.prepend_args('consequences').operator(
task_id='variant_task_variant_enrich_consequences',
name='etl-variant_task_variant_enrich_consequences'
)


variant_task_enrich_variants >> variant_task_enrich_consequences
with DAG(
dag_id='etl-enrich-variants',
start_date=datetime(2022, 1, 1),
schedule_interval=None,
params={
'project': Param('cqdg', type='string'),
},
) as dag:
variant_task_enrich_variants() >> variant_task_enrich_consequences()
65 changes: 36 additions & 29 deletions dags/etl_index_variants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,46 +2,53 @@
from airflow.models.param import Param
from airflow.utils.task_group import TaskGroup
from datetime import datetime
from lib.config import es_port, es_url, release_id, default_config_file, etl_index_config, datalake_bucket
from lib.config import es_port, es_url, release_id, default_config_file, etl_index_config, datalake_bucket, env, Env

etl_index_variant_config = etl_index_config \
.with_spark_class('bio.ferlab.fhir.etl.VariantIndexTask')

with DAG(
dag_id='etl-index-variants',
start_date=datetime(2022, 1, 1),
schedule_interval=None,
params={
'release_id': Param('7', type='string'),
'project': Param('cqdg', type='string')
},
) as dag:
def operator(indexName:str, chromosome:str='all'):
indexNameU = indexName.replace('_', '-')
return etl_index_variant_config \
.args(es_port, release_id, indexName, default_config_file, f's3a://{datalake_bucket}/es_index/{indexName}/', chromosome, es_url) \
.operator(
task_id=f'{indexName}_index',
name=f'etl-index-{indexNameU}-index'
)

chromosomes = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18',
'19', '20', '21', '22', 'X', 'Y', 'M']
def genes():
return operator('gene_centric') >> operator('gene_suggestions')

def operator(indexName:str, chromosome:str='all'):
indexNameU = indexName.replace('_', '-')
return etl_index_variant_config \
.args(es_port, release_id, indexName, default_config_file, f's3a://{datalake_bucket}/es_index/{indexName}/', chromosome, es_url) \
.operator(
task_id=f'{indexName}_index',
name=f'etl-index-{indexNameU}-index'
)

def genes():
return operator('gene_centric') >> operator('gene_suggestions')
def variants(chr):
with TaskGroup(group_id=f'variant_index-{chr}') as variant_index:
operator('variant_centric', chr) >> operator('variant_suggestions', chr)
return variant_index


def variants(chr):
with TaskGroup(group_id=f'variant_index-{chr}') as variant_index:
operator('variant_centric', chr) >> operator('variant_suggestions', chr)
return variant_index

def index_variants():
if env == Env.QA:
chromosomes = ['1']
else:
chromosomes = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18',
'19', '20', '21', '22', 'X', 'Y', 'M']

task_arr = [genes()]

for chr in chromosomes:
task = variants(chr)
for chromosome in chromosomes:
task = variants(chromosome)
task_arr[-1] >> task
task_arr.append(task)
return task_arr


with DAG(
dag_id='etl-index-variants',
start_date=datetime(2022, 1, 1),
schedule_interval=None,
params={
'release_id': Param('7', type='string'),
'project': Param('cqdg', type='string')
},
) as dag:
index_variants()
29 changes: 8 additions & 21 deletions dags/etl_prepare_index_variants.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@
.args('--config', default_config_file,
'--steps', 'default'
)

def etl_variant_prepared(name):
return etl_variant_prepared_config.prepend_args(name).operator(
task_id=f'variant_task_{name}',
name=f'etl-variant_task_{name}'
)

with DAG(
dag_id='etl-prepare-index-variant',
start_date=datetime(2022, 1, 1),
Expand All @@ -18,24 +25,4 @@
},
) as dag:

variant_task_variant_centric = etl_variant_prepared_config.prepend_args('variant_centric').operator(
task_id='variant_task_variant_centric',
name='etl-variant_task_variant_centric'
)

variant_task_gene_centric = etl_variant_prepared_config.prepend_args('gene_centric').operator(
task_id='variant_task_gene_centric',
name='etl-variant_task_gene_centric'
)

variant_task_variant_suggestions = etl_variant_prepared_config.prepend_args('variant_suggestions').operator(
task_id='variant_task_variant_suggestions',
name='etl-variant_variant_suggestions'
)

variant_task_gene_suggestions = etl_variant_prepared_config.prepend_args('gene_suggestions').operator(
task_id='variant_task_gene_suggestions',
name='etl-variant_gene_suggestions'
)

variant_task_variant_centric >> variant_task_gene_centric >> variant_task_variant_suggestions >> variant_task_gene_suggestions
etl_variant_prepared('variant_centric') >> etl_variant_prepared('gene_centric') >> etl_variant_prepared('variant_suggestions') >> etl_variant_prepared('gene_suggestions')
40 changes: 40 additions & 0 deletions dags/etl_publish_variants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from airflow import DAG
from airflow.models import Param, Variable
from datetime import datetime
from lib.config import kube_config, es_url, es_port, es_credentials_secret_name, \
es_credentials_secret_key_password, es_credentials_secret_key_username, release_id, study_codes
from lib.operators.publish import PublishConfig

etl_publish_config = PublishConfig(
es_url = es_url,
kube_config = kube_config,
image = Variable.get('publish_image'),
es_port = es_port,
es_cert_secret_name = 'opensearch-ca-certificate',
es_credentials_secret_name = es_credentials_secret_name,
es_credentials_secret_key_username = es_credentials_secret_key_username,
es_credentials_secret_key_password = es_credentials_secret_key_password,
)

def publish_task(job_types: str):
return etl_publish_config.args(
'-n', 'https://search-workers.qa.juno.cqdg.ferlab.bio',
'-p', es_port,
'-r', release_id,
'-j', job_types) \
.operator(
task_id='etl_publish_variant',
name='etl-publish_variant',
)
with DAG(
dag_id='etl-publish-variant',
start_date=datetime(2022, 1, 1),
schedule_interval=None,
params={
'es_port': Param('9200', type='string'),
'release_id': Param('0', type='string'),
'job_types': Param('variant_centric,variant_suggestions,gene_centric,gene_suggestions', type='string'),
},
) as dag:
publish_task('{{ params.job_types }}')

0 comments on commit cebf189

Please sign in to comment.