Skip to content

Commit

Permalink
Support both virtualenv use-cases in two task groups in the example DAG
Browse files Browse the repository at this point in the history
  • Loading branch information
LennartKloppenburg committed Oct 23, 2023
1 parent 89913c2 commit f33bfc2
Showing 1 changed file with 54 additions and 22 deletions.
76 changes: 54 additions & 22 deletions dev/dags/example_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
from datetime import datetime
from pathlib import Path

from cosmos import DbtDag, ExecutionMode, ExecutionConfig, ProjectConfig, ProfileConfig
from airflow.decorators import dag
from airflow.configuration import get_airflow_home
from airflow.operators.empty import EmptyOperator

from cosmos import DbtTaskGroup, ExecutionMode, ExecutionConfig, ProjectConfig, ProfileConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
Expand All @@ -21,29 +25,57 @@
),
)

# [START virtualenv_example]
example_virtualenv = DbtDag(
# dbt/cosmos-specific parameters
project_config=ProjectConfig(
DBT_ROOT_PATH / "jaffle_shop",
),
profile_config=profile_config,
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.VIRTUALENV,
# We can enable this flag if we want Airflow to create one virtualenv
# and reuse that within the whole DAG.
# virtualenv_dir=f"{get_airflow_home()}/persistent-venv",
),
operator_args={
"py_system_site_packages": False,
"py_requirements": ["dbt-postgres==1.6.0b1"],
"install_deps": True,
},
# normal dag parameters
@dag(
schedule_interval="@daily",
start_date=datetime(2023, 1, 1),
catchup=False,
dag_id="example_virtualenv",
default_args={"retries": 2},
)
def example_virtualenv() -> None:
start_task = EmptyOperator(task_id='start-venv-examples')
end_task = EmptyOperator(task_id='end-venv-examples')

tmp_venv_task_group = DbtTaskGroup(
group_id='tmp-venv-group',
# dbt/cosmos-specific parameters
project_config=ProjectConfig(
DBT_ROOT_PATH / "jaffle_shop",
),
profile_config=profile_config,
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.VIRTUALENV,
# We can enable this flag if we want Airflow to create one virtualenv
# and reuse that within the whole DAG.
# virtualenv_dir=f"{get_airflow_home()}/persistent-venv",
),
operator_args={
"py_system_site_packages": False,
"py_requirements": ["dbt-postgres==1.6.0b1"],
"install_deps": True,
},
)

cached_venv_task_group = DbtTaskGroup(
group_id='cached-venv-group',
# dbt/cosmos-specific parameters
project_config=ProjectConfig(
DBT_ROOT_PATH / "jaffle_shop",
),
profile_config=profile_config,
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.VIRTUALENV,
# We can enable this flag if we want Airflow to create one virtualenv
# and reuse that within the whole DAG.
virtualenv_dir=Path(f"{get_airflow_home()}/persistent-venv"),
),
operator_args={
"py_system_site_packages": False,
"py_requirements": ["dbt-postgres==1.6.0b1"],
"install_deps": True,
},
)

start_task >> [tmp_venv_task_group, cached_venv_task_group] >> end_task

example_virtualenv()
# [END virtualenv_example]

0 comments on commit f33bfc2

Please sign in to comment.