Skip to content

Commit

Permalink
fix: CQDG-00 update obo-parser dag
Browse files Browse the repository at this point in the history
  • Loading branch information
adipaul1981 committed May 30, 2024
1 parent e9c45b6 commit 4f739d7
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 14 deletions.
1 change: 1 addition & 0 deletions dags/lib/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class Env:
import_jar = 'local:///app/import-task.jar'
index_jar = 'local:///app/index-task.jar'
publish_jar = 'local:///app/publish-task.jar'
obo_parser_jar = 'local:///app/obo-parser.jar'

etl_base_config = SparkOperatorConfig(
spark_configs=[spark_default_conf],
Expand Down
35 changes: 21 additions & 14 deletions dags/obo_parser.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,30 @@
from datetime import datetime

from airflow import DAG
from airflow.models import Variable, Param
from datetime import datetime
from lib.config import datalake_bucket, etl_base_config, spark_small_conf
from lib.operators.spark import SparkOperator

from lib.config import datalake_bucket, etl_base_config, spark_small_conf, obo_parser_jar


def obo_parser():
return etl_base_config \
.with_spark_class('bio.ferlab.HPOMain') \
.with_image(Variable.get('obo_parser_image')) \
.with_spark_jar(obo_parser_jar) \
.add_spark_conf(spark_small_conf) \
.args(obo_url(), datalake_bucket, ontology(), is_icd(), required_top_node()) \
.operator(
task_id='obo_parser_task',
name='obo_parser-task'
)

with DAG(
dag_id='obo-parser',
start_date=datetime(2022, 1, 1),
schedule_interval=None,
params={
'obo_url': Param('https://raw.githubusercontent.com/obophenotype/human-phenotype-ontology/master/hp.obo', type='string'),
'obo_url': Param('https://raw.githubusercontent.com/obophenotype/human-phenotype-ontology/master/hp.obo',
type='string'),
'ontology': Param('hpo_terms', type='string'),
'is_icd': Param(False, type='boolean'),
'required_top_node': Param("", type='string'),
Expand All @@ -20,19 +35,11 @@ def obo_url() -> str:

def ontology() -> str:
return '{{ params.ontology }}'

def is_icd() -> str:
return '{{ params.is_icd }}'

def required_top_node() -> str:
return '{{ params.required_top_node }}'

import_task = etl_base_config \
.with_image(Variable.get('obo_parser_image')) \
.add_spark_conf(spark_small_conf) \
.with_spark_jar('local:///app/obo-parser.jar') \
.with_spark_class('bio.ferlab.HPOMain') \
.args(obo_url(), datalake_bucket, ontology(), is_icd(), required_top_node()) \
.operator(
task_id='obo_parser_task',
name='obo_parser-task'
)
obo_parser()

0 comments on commit 4f739d7

Please sign in to comment.