From 6dac09a63709e83e5d46efe804e7d57963f7bda3 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 20 Dec 2024 08:36:20 +0000 Subject: [PATCH] Remove DagBag from listener --- cosmos/listeners/dag_run_listener.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index 2b0b60698..ddac1a540 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -47,12 +47,9 @@ def uses_cosmos(dag: DAG) -> bool: return bool(is_cosmos_dag(dag) or total_cosmos_task_groups(dag) or total_cosmos_tasks(dag)) -from airflow.utils.session import provide_session - - -@provide_session +# @provide_session @hookimpl -def on_dag_run_success(dag_run: DagRun, msg: str, session=None) -> None: +def on_dag_run_success(dag_run: DagRun, msg: str) -> None: logger.info(f"dir: {dir(dag_run.dag)}") logger.info("The on_dag_run_success was called") @@ -66,11 +63,11 @@ def on_dag_run_success(dag_run: DagRun, msg: str, session=None) -> None: logger.info(f"3: {serialized_dag.task_dict}") logger.info(f"4: {serialized_dag.task_group_dict}") - from airflow.models import DagBag + # from airflow.models import DagBag - dag_bag = DagBag(dag_folder=serialized_dag.fileloc, include_examples=False) - dag = dag_bag.get_dag(dag_run.dag_id) - logger.info(f"dag: {dag}") + # dag_bag = DagBag(dag_folder=serialized_dag.fileloc, include_examples=False) + # dag = dag_bag.get_dag(dag_run.dag_id) + # logger.info(f"dag: {dag}") if not uses_cosmos(serialized_dag): logger.info("The DAG does not use Cosmos")