Skip to content

Commit

Permalink
feat: CQDG-719 refactor varaints normalize
Browse files Browse the repository at this point in the history
  • Loading branch information
adipaul1981 committed Aug 23, 2024
1 parent bcad8fc commit 777f2d3
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 31 deletions.
31 changes: 24 additions & 7 deletions dags/etl-variant.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
from airflow.utils.task_group import TaskGroup

from es_templates_update import es_templates_update
from etl_enrich_specimens import etl_enrich_specimens
from etl_enrich_specimens import etl_enrich_specimens
from etl_enrich_variants import variant_task_enrich_variants, variant_task_enrich_consequences
from etl_index import index_operator
from etl_index_variants import index_variants
from etl_normalize_variants import normalize_variant_operator
from etl_normalize_variants import extract_params, normalized_etl
from etl_prepare_index_variants import etl_variant_prepared
from etl_publish import publish_task

Expand All @@ -20,17 +19,35 @@
params={
'study_code': Param('CAG', type='string'), #FIXME study Codes vs study code !!!
'owner': Param('jmichaud', type='string'),
'dataset': Param('dataset_default', type='string'),
'batch': Param('annotated_vcf', type='string'),
'dateset_batches': Param(
[
{'dataset': 'dataset_dataset1', 'batches': ['annotated_vcf1','annotated_vcf2']},
{'dataset': 'dataset_dataset2', 'batches': ['annotated_vcf']}
],
schema = {
"type": "array",
"minItems": 1,
"items": {
"type": "object",
"default": {'dataset': 'dataset_default', 'batches': ['annotated_vcf']},
"properties": {
"dataset": {"type": "string"},
"batches": {"type": "array", "items": {"type": "string"}},
},
"required": ["dataset", "batches"]
},
}
),
'release_id': Param('7', type='string'),
'study_codes': Param('CAG', type='string'),
'project': Param('cqdg', type='string'),
'es_port': Param('9200', type='string')
'es_port': Param('9200', type='string'),
},
) as dag:
params = extract_params()

with TaskGroup(group_id='normalize') as normalize:
normalize_variant_operator('snv') >> normalize_variant_operator('consequences')
normalized_etl(run_time_params = params, name='snv') >> normalized_etl(run_time_params = params, name='consequences')

with TaskGroup(group_id='enrich') as enrich:
variant_task_enrich_variants() >> variant_task_enrich_consequences()
Expand Down
100 changes: 76 additions & 24 deletions dags/etl_normalize_variants.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,69 @@
from __future__ import annotations

from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.models import Param
from airflow.operators.python import get_current_context

from lib.config import default_config_file, study_code, etl_variant_config
from lib.operators.spark import SparkOperator

class NormalizeVariants(SparkOperator):
template_fields = [*SparkOperator.template_fields, 'arguments', 'dataset_batch']

def __init__(self,
dataset_batch,
**kwargs):
super().__init__(**kwargs)
self.dataset_batch = dataset_batch

from lib.config import batch, default_config_file, study_code, spark_large_conf, \
etl_variant_config, spark_small_conf
def execute(self, **kwargs):
# Append dataset and batch to arguments at runtime.
self.arguments.append('--dataset')
self.arguments.append(self.dataset_batch[0])
self.arguments.append('--batch')
self.arguments.append(self.dataset_batch[1])
super().execute(**kwargs)

normalized_etl = etl_variant_config \
.with_spark_class('bio.ferlab.etl.normalized.RunNormalizedGenomic') \
.args(
@task
def extract_params() -> list[(str, list[str])]:
"""Extract input arguments at runtime.
Returns: List of datasets with their batches
"""
context = get_current_context()
items = context["params"]["dateset_batches"]
r_list = []

for item in items:
bs = item['batches']
d = item['dataset']
for b in bs:
r_list.append((d, b))
return r_list

def normalized_etl(run_time_params, name):
return (etl_variant_config
.with_spark_class('bio.ferlab.etl.normalized.RunNormalizedGenomic')
.args(
'--config', default_config_file,
'--steps', 'default',
'--app-name', 'variant_task_consequences',
'--owner', '{{ params.owner }}',
'--dataset', '{{ params.dataset }}',
'--batch', batch,
'--study-code', study_code
) \
.add_package('io.projectglow:glow-spark3_2.12:2.0.0') \
.add_spark_conf({'spark.jars.excludes': 'org.apache.hadoop:hadoop-client,'
'io.netty:netty-all,'
'io.netty:netty-handler,'
'io.netty:netty-transport-native-epoll',
'spark.hadoop.io.compression.codecs': 'io.projectglow.sql.util.BGZFCodec',
}) \


def normalize_variant_operator(name):
etl = normalized_etl if name == 'snv' else normalized_etl
return etl.prepend_args(name).operator(
'--study-code', study_code)
.add_package('io.projectglow:glow-spark3_2.12:2.0.0')
.add_spark_conf({'spark.jars.excludes': 'org.apache.hadoop:hadoop-client,'
'io.netty:netty-all,'
'io.netty:netty-handler,'
'io.netty:netty-transport-native-epoll',
'spark.hadoop.io.compression.codecs': 'io.projectglow.sql.util.BGZFCodec',
})
.partial(
class_to_instantiate=NormalizeVariants,
task_id=f'normalize-{name}',
name=f'normalize-{name}')
.expand(dataset_batch=run_time_params))


with DAG(
Expand All @@ -40,9 +73,28 @@ def normalize_variant_operator(name):
params={
'study_code': Param('CAG', type='string'),
'owner': Param('jmichaud', type='string'),
'dataset': Param('dataset_default', type='string'),
'batch': Param('annotated_vcf', type='string'),
'project': Param('cqdg', type='string'),
'dateset_batches': Param(
[
{'dataset': 'dataset_dataset1', 'batches': ['annotated_vcf1','annotated_vcf2']},
{'dataset': 'dataset_dataset2', 'batches': ['annotated_vcf']}
],
schema = {
"type": "array",
"minItems": 1,
"items": {
"type": "object",
"default": {'dataset': 'dataset_default', 'batches': ['annotated_vcf']},
"properties": {
"dataset": {"type": "string"},
"batches": {"type": "array", "items": {"type": "string"}},
},
"required": ["dataset", "batches"]
},
}
),
},
) as dag:
normalize_variant_operator('snv') >> normalize_variant_operator('consequences')
params = extract_params()

normalized_etl(run_time_params = params, name='snv') >> normalized_etl(run_time_params = params, name='consequences')

0 comments on commit 777f2d3

Please sign in to comment.