Skip to content

Commit

Permalink
feat: CQDG-719 refactor varaints normalize
Browse files Browse the repository at this point in the history
  • Loading branch information
adipaul1981 committed Aug 23, 2024
1 parent bcad8fc commit 25b3e01
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 25 deletions.
102 changes: 77 additions & 25 deletions dags/etl_normalize_variants.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,100 @@
from __future__ import annotations

from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.models import Param
from airflow.operators.python import get_current_context

from lib.config import default_config_file, study_code, etl_variant_config
from lib.operators.spark import SparkOperator

class NormalizeVariants(SparkOperator):
template_fields = [*SparkOperator.template_fields, 'arguments', 'dataset_batch']

def __init__(self,
dataset_batch,
**kwargs):
super().__init__(**kwargs)
self.dataset_batch = dataset_batch

from lib.config import batch, default_config_file, study_code, spark_large_conf, \
etl_variant_config, spark_small_conf
def execute(self, **kwargs):
# Append dataset and batch to arguments at runtime.
self.arguments.append('--dataset')
self.arguments.append(self.dataset_batch[0])
self.arguments.append('--batch')
self.arguments.append(self.dataset_batch[1])
super().execute(**kwargs)

normalized_etl = etl_variant_config \
.with_spark_class('bio.ferlab.etl.normalized.RunNormalizedGenomic') \
.args(
@task
def extract_params() -> list[(str, list[str])]:
"""Extract input arguments at runtime.
Returns: List of datasets with their batches
"""
context = get_current_context()
items = context["params"]["dateset_batches"]
r_list = []

for item in items:
bs = item['batches']
d = item['dataset']
for b in bs:
r_list.append((d, b))
return r_list

def normalized_etl(run_time_params, name):
return (etl_variant_config
.with_spark_class('bio.ferlab.etl.normalized.RunNormalizedGenomic')
.args(
'--config', default_config_file,
'--steps', 'default',
'--app-name', 'variant_task_consequences',
'--owner', '{{ params.owner }}',
'--dataset', '{{ params.dataset }}',
'--batch', batch,
'--study-code', study_code
) \
.add_package('io.projectglow:glow-spark3_2.12:2.0.0') \
.add_spark_conf({'spark.jars.excludes': 'org.apache.hadoop:hadoop-client,'
'io.netty:netty-all,'
'io.netty:netty-handler,'
'io.netty:netty-transport-native-epoll',
'spark.hadoop.io.compression.codecs': 'io.projectglow.sql.util.BGZFCodec',
}) \


def normalize_variant_operator(name):
etl = normalized_etl if name == 'snv' else normalized_etl
return etl.prepend_args(name).operator(
'--study-code', study_code)
.add_package('io.projectglow:glow-spark3_2.12:2.0.0')
.add_spark_conf({'spark.jars.excludes': 'org.apache.hadoop:hadoop-client,'
'io.netty:netty-all,'
'io.netty:netty-handler,'
'io.netty:netty-transport-native-epoll',
'spark.hadoop.io.compression.codecs': 'io.projectglow.sql.util.BGZFCodec',
})
.partial(
class_to_instantiate=NormalizeVariants,
task_id=f'normalize-{name}',
name=f'normalize-{name}')
.expand(dataset_batch=run_time_params))


with DAG(
dag_id='etl-normalize-variants',
dag_id='etl-normalize-variants-TEST',
start_date=datetime(2022, 1, 1),
schedule_interval=None,
params={
'study_code': Param('CAG', type='string'),
'owner': Param('jmichaud', type='string'),
'dataset': Param('dataset_default', type='string'),
'batch': Param('annotated_vcf', type='string'),
'project': Param('cqdg', type='string'),
'dateset_batches': Param(
[
{'dataset': 'dataset_dataset1', 'batches': ['annotated_vcf1','annotated_vcf2']},
{'dataset': 'dataset_dataset2', 'batches': ['annotated_vcf']}
],
schema = {
"type": "array",
"minItems": 1,
"items": {
"type": "object",
"default": {'dataset': 'dataset_default', 'batches': ['annotated_vcf']},
"properties": {
"dataset": {"type": "string"},
"batches": {"type": "array", "items": {"type": "string"}},
},
"required": ["dataset", "batches"]
},
}
),
},
) as dag:
normalize_variant_operator('snv') >> normalize_variant_operator('consequences')
params = extract_params()

normalized_etl(run_time_params = params, name='snv') >> normalized_etl(run_time_params = params, name='consequences')
95 changes: 95 additions & 0 deletions dags/etl_normalize_variants_TEST.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from __future__ import annotations

from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.models import Param
from airflow.operators.python import get_current_context

from lib.config import default_config_file, study_code, etl_variant_config
from lib.operators.spark import SparkOperator


with DAG(
dag_id='etl-normalize-variants-TEST',
start_date=datetime(2022, 1, 1),
schedule_interval=None,
params={
'study_code': Param('CAG', type='string'),
'owner': Param('jmichaud', type='string'),
'project': Param('cqdg', type='string'),
'dateset_batches': Param(
[
{'dataset': 'dataset_dataset1', 'batches': ['annotated_vcf1','annotated_vcf2']},
{'dataset': 'dataset_dataset2', 'batches': ['annotated_vcf']}
],
schema = {
"type": "array",
"minItems": 1,
"items": {
"type": "object",
"default": {'dataset': 'dataset_default', 'batches': ['annotated_vcf']},
"properties": {
"dataset": {"type": "string"},
"batches": {"type": "array", "items": {"type": "string"}},
},
"required": ["dataset", "batches"]
},
}
),
},
) as dag:


@task
def extract_params() -> list[(str, list[str])]:
"""Extract input arguments at runtime.
Returns: List of datasets with their batches
"""
context = get_current_context()
items = context["params"]["dateset_batches"]
r_list = []

for item in items:
bs = item['batches']
d = item['dataset']
for b in bs:
r_list.append((d, b))
return r_list

class NormalizeVariants(SparkOperator):
template_fields = [*SparkOperator.template_fields, 'arguments', 'dataset_batch']

def __init__(self,
dataset_batch,
**kwargs):
super().__init__(**kwargs)
self.dataset_batch = dataset_batch

def execute(self, **kwargs):
# Append dataset and batch to arguments at runtime.
self.arguments.append('--dataset')
self.arguments.append(self.dataset_batch[0])
self.arguments.append('--batch')
self.arguments.append(self.dataset_batch[1])
super().execute(**kwargs)

etl_variant_config \
.with_spark_class('bio.ferlab.etl.normalized.RunNormalizedGenomic') \
.args(
'--config', default_config_file,
'--steps', 'default',
'--app-name', 'variant_task_consequences',
'--owner', '{{ params.owner }}',
'--study-code', study_code
) \
.partial(
class_to_instantiate=NormalizeVariants,
task_id='normalize_variants_snv',
name='normalize-variants-snv'
) \
.expand(dataset_batch=extract_params())



0 comments on commit 25b3e01

Please sign in to comment.