Skip to content

Commit

Permalink
Remove DagBag from listener
Browse files Browse the repository at this point in the history
  • Loading branch information
tatiana committed Dec 20, 2024
1 parent 47fb46c commit 6dac09a
Showing 1 changed file with 6 additions and 9 deletions.
15 changes: 6 additions & 9 deletions cosmos/listeners/dag_run_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,9 @@ def uses_cosmos(dag: DAG) -> bool:
return bool(is_cosmos_dag(dag) or total_cosmos_task_groups(dag) or total_cosmos_tasks(dag))


from airflow.utils.session import provide_session


@provide_session
# @provide_session
@hookimpl
def on_dag_run_success(dag_run: DagRun, msg: str, session=None) -> None:
def on_dag_run_success(dag_run: DagRun, msg: str) -> None:

logger.info(f"dir: {dir(dag_run.dag)}")
logger.info("The on_dag_run_success was called")
Expand All @@ -66,11 +63,11 @@ def on_dag_run_success(dag_run: DagRun, msg: str, session=None) -> None:
logger.info(f"3: {serialized_dag.task_dict}")
logger.info(f"4: {serialized_dag.task_group_dict}")

from airflow.models import DagBag
# from airflow.models import DagBag

dag_bag = DagBag(dag_folder=serialized_dag.fileloc, include_examples=False)
dag = dag_bag.get_dag(dag_run.dag_id)
logger.info(f"dag: {dag}")
# dag_bag = DagBag(dag_folder=serialized_dag.fileloc, include_examples=False)
# dag = dag_bag.get_dag(dag_run.dag_id)
# logger.info(f"dag: {dag}")

if not uses_cosmos(serialized_dag):
logger.info("The DAG does not use Cosmos")
Expand Down

0 comments on commit 6dac09a

Please sign in to comment.