Skip to content

Commit

Permalink
fix: CQDG-759 WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
adipaul1981 committed Jul 12, 2024
1 parent 4f739d7 commit f66bda2
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 21 deletions.
22 changes: 6 additions & 16 deletions dags/etl.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
from datetime import datetime

from airflow import DAG
from airflow.models.param import Param
from airflow.utils.task_group import TaskGroup
from datetime import datetime
from lib.config import env, study_codes, release_id, es_port, es_url, etl_publish_config, project

from es_templates_update import es_templates_update
from etl_import import etl_import
from arranger import arranger_task
from etl_fhavro_export import fhavro_export
from etl_import import etl_import
from etl_index import index_operator
from etl_prepare_index import prepare_index
from etl_publish import publish_task

with DAG(
dag_id='etl',
Expand All @@ -24,16 +25,5 @@

with TaskGroup(group_id='index') as index:
index_operator('study') >> index_operator('participant') >> index_operator('file') >> index_operator('biospecimen')

with TaskGroup(group_id='publish') as publish:
def publish_operator(name:str):
return etl_publish_config \
.args(es_url, es_port, env, project, release_id, study_codes, f'{name}_centric') \
.operator(
task_id=f'{name}_centric',
name=f'etl-publish-{name}-centric'
)

publish_operator('study') >> publish_operator('participant') >> publish_operator('file') >> publish_operator('biospecimen')

fhavro_export() >> etl_import() >> prepare_index() >> es_templates_update() >> index >> publish >> arranger_task()
fhavro_export() >> etl_import() >> prepare_index() >> es_templates_update() >> index >> publish_task('study')
43 changes: 43 additions & 0 deletions dags/etl_publish.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from airflow import DAG
from airflow.models import Param, Variable
from datetime import datetime
from lib.config import kube_config, es_url, es_port, es_credentials_secret_name, \
es_credentials_secret_key_password, es_credentials_secret_key_username, release_id, study_ids
from lib.operators.publish import PublishConfig

etl_publish_config = PublishConfig(
es_url = es_url,
kube_config = kube_config,
image = Variable.get('publish_image'),
es_port = es_port,
es_cert_secret_name = 'opensearch-ca-certificate',
es_credentials_secret_name = es_credentials_secret_name,
es_credentials_secret_key_username = es_credentials_secret_key_username,
es_credentials_secret_key_password = es_credentials_secret_key_password,
)

def publish_task():
return etl_publish_config.args(
es_url,
es_port,
release_id,
study_ids,
'{{ params.job_types }}') \
.operator(
task_id='etl_publish',
name='etl-publish',
)
with DAG(
dag_id='etl-publish',
start_date=datetime(2022, 1, 1),
schedule_interval=None,
params={
'es_host': Param('https://search-workers.qa.juno.cqdg.ferlab.bio', type='string'),
'es_port': Param('9200', type='string'),
'release_id': Param('0', type='string'),
'study_ids': Param('study1', type='string'),
'job_types': Param('study_centric,participant_centric,file_centric,biospecimen_centric', type='string'),
},
) as dag:
publish_task()

5 changes: 0 additions & 5 deletions dags/lib/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,6 @@ class Env:
.add_spark_conf(spark_small_conf, spark_index_conf) \
.with_spark_jar(index_jar)

etl_publish_config = etl_base_config \
.add_spark_conf(spark_small_conf, spark_index_conf) \
.with_spark_jar(publish_jar) \
.with_spark_class('bio.ferlab.fhir.etl.PublishTask')

etl_variant_config = etl_base_config \
.add_spark_conf(spark_large_conf) \
.with_spark_jar(variant_jar)
99 changes: 99 additions & 0 deletions dags/lib/operators/publish.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
from kubernetes.client import models as k8s
from typing import Optional, Type
from dataclasses import dataclass
from lib.operators.base_kubernetes import BaseKubernetesOperator, BaseConfig, required


class PublishOperator(BaseKubernetesOperator):
def __init__(
self,
es_url: str,
es_port: Optional[str] = '9200',
es_cert_secret_name: Optional[str] = None,
es_cert_file: Optional[str] = 'ca.crt',
es_credentials_secret_name: Optional[str] = None,
es_credentials_secret_key_username: Optional[str] = 'username',
es_credentials_secret_key_password: Optional[str] = 'password',
**kwargs,
) -> None:
super().__init__(
**kwargs
)
self.es_url = es_url
self.es_port = es_port
self.es_cert_secret_name = es_cert_secret_name
self.es_cert_file = es_cert_file
self.es_credentials_secret_name = es_credentials_secret_name
self.es_credentials_secret_key_username = es_credentials_secret_key_username
self.es_credentials_secret_key_password = es_credentials_secret_key_password

def execute(self, **kwargs):

self.env_vars.append(
k8s.V1EnvVar(
name='ES_HOST',
value=f"{self.es_url}:{self.es_port}",
),
)

if self.es_credentials_secret_name:
self.env_vars.append(
k8s.V1EnvVar(
name='ES_USERNAME',
value_from=k8s.V1EnvVarSource(
secret_key_ref=k8s.V1SecretKeySelector(
name=self.es_credentials_secret_name,
key=self.es_credentials_secret_key_username)
)
)
)
self.env_vars.append(
k8s.V1EnvVar(
name='ES_PASSWORD',
value_from=k8s.V1EnvVarSource(
secret_key_ref=k8s.V1SecretKeySelector(
name=self.es_credentials_secret_name,
key=self.es_credentials_secret_key_password)
)
)
)

if self.es_cert_secret_name:
self.volumes.append(
k8s.V1Volume(
name=self.es_cert_secret_name,
secret=k8s.V1SecretVolumeSource(
secret_name='opensearch-ca-certificate',
default_mode=0o555
),
),
)
self.volume_mounts.append(
k8s.V1VolumeMount(
name=self.es_cert_secret_name,
mount_path='/opt/opensearch-ca',
read_only=True,
),
)

self.cmds = ['java',
'-cp',
'publish-task.jar',
'bio.ferlab.fhir.etl.PublishTask'
]

super().execute(**kwargs)


@dataclass
class PublishConfig(BaseConfig):
es_url: Optional[str] = required()
es_port: Optional[str] = '9200'
es_cert_secret_name: Optional[str] = None
es_cert_file: Optional[str] = 'ca.crt'
es_credentials_secret_name: Optional[str] = None
es_credentials_secret_key_username: Optional[str] = 'username'
es_credentials_secret_key_password: Optional[str] = 'password'

def operator(self, class_to_instantiate: Type[PublishOperator] = PublishOperator, **kwargs) -> PublishOperator:
return super().build_operator(class_to_instantiate=class_to_instantiate, **kwargs)
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ x-airflow-common:
AIRFLOW_VAR_OBO_PARSER_IMAGE: ${OBO_PARSER_IMAGE}
AIRFLOW_VAR_ETL_IMAGE: ${ETL_IMAGE}
AIRFLOW_VAR_FHAVRO_EXPORT_IMAGE: ${FHAVRO_EXPORT_IMAGE}
AIRFLOW_VAR_PUBLISH_IMAGE: ${PUBLISH_IMAGE}
AIRFLOW_VAR_ARRANGER_IMAGE: ${ARRANGER_IMAGE}
AIRFLOW_VAR_FHIR_IMPORT_IMAGE: ${FHIR_IMPORT_IMAGE}
AIRFLOW_VAR_FERLOAD_DRS_IMPORT_IMAGE: ${FERLOAD_DRS_IMPORT_IMAGE}
Expand Down

0 comments on commit f66bda2

Please sign in to comment.