diff --git a/dags/etl-variant.py b/dags/etl-variant.py index fef70f7..cecf38c 100644 --- a/dags/etl-variant.py +++ b/dags/etl-variant.py @@ -5,11 +5,10 @@ from airflow.utils.task_group import TaskGroup from es_templates_update import es_templates_update -from etl_enrich_specimens import etl_enrich_specimens +from etl_enrich_specimens import etl_enrich_specimens from etl_enrich_variants import variant_task_enrich_variants, variant_task_enrich_consequences -from etl_index import index_operator from etl_index_variants import index_variants -from etl_normalize_variants import normalize_variant_operator +from etl_normalize_variants import extract_params, normalized_etl from etl_prepare_index_variants import etl_variant_prepared from etl_publish import publish_task @@ -20,17 +19,35 @@ params={ 'study_code': Param('CAG', type='string'), #FIXME study Codes vs study code !!! 'owner': Param('jmichaud', type='string'), - 'dataset': Param('dataset_default', type='string'), - 'batch': Param('annotated_vcf', type='string'), + 'dateset_batches': Param( + [ + {'dataset': 'dataset_dataset1', 'batches': ['annotated_vcf1','annotated_vcf2']}, + {'dataset': 'dataset_dataset2', 'batches': ['annotated_vcf']} + ], + schema = { + "type": "array", + "minItems": 1, + "items": { + "type": "object", + "default": {'dataset': 'dataset_default', 'batches': ['annotated_vcf']}, + "properties": { + "dataset": {"type": "string"}, + "batches": {"type": "array", "items": {"type": "string"}}, + }, + "required": ["dataset", "batches"] + }, + } + ), 'release_id': Param('7', type='string'), 'study_codes': Param('CAG', type='string'), 'project': Param('cqdg', type='string'), - 'es_port': Param('9200', type='string') + 'es_port': Param('9200', type='string'), }, ) as dag: + params = extract_params() with TaskGroup(group_id='normalize') as normalize: - normalize_variant_operator('snv') >> normalize_variant_operator('consequences') + normalized_etl(run_time_params = params, name='snv') >> normalized_etl(run_time_params = params, name='consequences') with TaskGroup(group_id='enrich') as enrich: variant_task_enrich_variants() >> variant_task_enrich_consequences() diff --git a/dags/etl_normalize_variants.py b/dags/etl_normalize_variants.py index 3cde37b..b515635 100644 --- a/dags/etl_normalize_variants.py +++ b/dags/etl_normalize_variants.py @@ -1,36 +1,69 @@ +from __future__ import annotations + from datetime import datetime from airflow import DAG +from airflow.decorators import task from airflow.models import Param +from airflow.operators.python import get_current_context + +from lib.config import default_config_file, study_code, etl_variant_config +from lib.operators.spark import SparkOperator + +class NormalizeVariants(SparkOperator): + template_fields = [*SparkOperator.template_fields, 'arguments', 'dataset_batch'] + + def __init__(self, + dataset_batch, + **kwargs): + super().__init__(**kwargs) + self.dataset_batch = dataset_batch -from lib.config import batch, default_config_file, study_code, spark_large_conf, \ - etl_variant_config, spark_small_conf + def execute(self, **kwargs): + # Append dataset and batch to arguments at runtime. + self.arguments.append('--dataset') + self.arguments.append(self.dataset_batch[0]) + self.arguments.append('--batch') + self.arguments.append(self.dataset_batch[1]) + super().execute(**kwargs) -normalized_etl = etl_variant_config \ - .with_spark_class('bio.ferlab.etl.normalized.RunNormalizedGenomic') \ - .args( +@task +def extract_params() -> list[(str, list[str])]: + """Extract input arguments at runtime. + Returns: List of datasets with their batches + """ + context = get_current_context() + items = context["params"]["dateset_batches"] + r_list = [] + + for item in items: + bs = item['batches'] + d = item['dataset'] + for b in bs: + r_list.append((d, b)) + return r_list + +def normalized_etl(run_time_params, name): + return (etl_variant_config + .with_spark_class('bio.ferlab.etl.normalized.RunNormalizedGenomic') + .args( '--config', default_config_file, '--steps', 'default', '--app-name', 'variant_task_consequences', '--owner', '{{ params.owner }}', - '--dataset', '{{ params.dataset }}', - '--batch', batch, - '--study-code', study_code - ) \ - .add_package('io.projectglow:glow-spark3_2.12:2.0.0') \ - .add_spark_conf({'spark.jars.excludes': 'org.apache.hadoop:hadoop-client,' - 'io.netty:netty-all,' - 'io.netty:netty-handler,' - 'io.netty:netty-transport-native-epoll', - 'spark.hadoop.io.compression.codecs': 'io.projectglow.sql.util.BGZFCodec', - }) \ - - -def normalize_variant_operator(name): - etl = normalized_etl if name == 'snv' else normalized_etl - return etl.prepend_args(name).operator( + '--study-code', study_code) + .add_package('io.projectglow:glow-spark3_2.12:2.0.0') + .add_spark_conf({'spark.jars.excludes': 'org.apache.hadoop:hadoop-client,' + 'io.netty:netty-all,' + 'io.netty:netty-handler,' + 'io.netty:netty-transport-native-epoll', + 'spark.hadoop.io.compression.codecs': 'io.projectglow.sql.util.BGZFCodec', + }) + .partial( + class_to_instantiate=NormalizeVariants, task_id=f'normalize-{name}', name=f'normalize-{name}') + .expand(dataset_batch=run_time_params)) with DAG( @@ -40,9 +73,28 @@ def normalize_variant_operator(name): params={ 'study_code': Param('CAG', type='string'), 'owner': Param('jmichaud', type='string'), - 'dataset': Param('dataset_default', type='string'), - 'batch': Param('annotated_vcf', type='string'), 'project': Param('cqdg', type='string'), + 'dateset_batches': Param( + [ + {'dataset': 'dataset_dataset1', 'batches': ['annotated_vcf1','annotated_vcf2']}, + {'dataset': 'dataset_dataset2', 'batches': ['annotated_vcf']} + ], + schema = { + "type": "array", + "minItems": 1, + "items": { + "type": "object", + "default": {'dataset': 'dataset_default', 'batches': ['annotated_vcf']}, + "properties": { + "dataset": {"type": "string"}, + "batches": {"type": "array", "items": {"type": "string"}}, + }, + "required": ["dataset", "batches"] + }, + } + ), }, ) as dag: - normalize_variant_operator('snv') >> normalize_variant_operator('consequences') + params = extract_params() + + normalized_etl(run_time_params = params, name='snv') >> normalized_etl(run_time_params = params, name='consequences') \ No newline at end of file