Skip to content

Commit

Permalink
feat: CQDG-719 refactor varaints normalize
Browse files Browse the repository at this point in the history
  • Loading branch information
adipaul1981 committed Aug 23, 2024
1 parent bcad8fc commit 4ad1008
Showing 1 changed file with 95 additions and 0 deletions.
95 changes: 95 additions & 0 deletions dags/etl_normalize_variants_TEST.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from __future__ import annotations

from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.models import Param
from airflow.operators.python import get_current_context

from lib.config import default_config_file, study_code, etl_variant_config
from lib.operators.spark import SparkOperator


with DAG(
dag_id='etl-normalize-variants-TEST',
start_date=datetime(2022, 1, 1),
schedule_interval=None,
params={
'study_code': Param('CAG', type='string'),
'owner': Param('jmichaud', type='string'),
'project': Param('cqdg', type='string'),
'dateset_batches': Param(
[
{'dataset': 'dataset_dataset1', 'batches': ['annotated_vcf1','annotated_vcf2']},
{'dataset': 'dataset_dataset2', 'batches': ['annotated_vcf']}
],
schema = {
"type": "array",
"minItems": 1,
"items": {
"type": "object",
"default": {'dataset': 'dataset_default', 'batches': ['annotated_vcf']},
"properties": {
"dataset": {"type": "string"},
"batches": {"type": "array", "items": {"type": "string"}},
},
"required": ["dataset", "batches"]
},
}
),
},
) as dag:


@task
def extract_params() -> list[(str, list[str])]:
"""Extract input arguments at runtime.
Returns: List of datasets with their batches
"""
context = get_current_context()
items = context["params"]["dateset_batches"]
r_list = []

for item in items:
bs = item['batches']
d = item['dataset']
for b in bs:
r_list.append((d, b))
return r_list

class NormalizeVariants(SparkOperator):
template_fields = [*SparkOperator.template_fields, 'arguments', 'dataset_batch']

def __init__(self,
dataset_batch,
**kwargs):
super().__init__(**kwargs)
self.dataset_batch = dataset_batch

def execute(self, **kwargs):
# Append dataset and batch to arguments at runtime.
self.arguments.append('--dataset')
self.arguments.append(self.dataset_batch[0])
self.arguments.append('--batch')
self.arguments.append(self.dataset_batch[1])
super().execute(**kwargs)

etl_variant_config \
.with_spark_class('bio.ferlab.etl.normalized.RunNormalizedGenomic') \
.args(
'--config', default_config_file,
'--steps', 'default',
'--app-name', 'variant_task_consequences',
'--owner', '{{ params.owner }}',
'--study-code', study_code
) \
.partial(
class_to_instantiate=NormalizeVariants,
task_id='normalize_variants_snv',
name='normalize-variants-snv'
) \
.expand(dataset_batch=extract_params())



0 comments on commit 4ad1008

Please sign in to comment.