Skip to content

Commit

Permalink
Support ProjectConfig.dbt_project_path = None & different paths for…
Browse files Browse the repository at this point in the history
… Rendering and Execution (astronomer#634)

This MR finishes the work that was started in astronomer#605 to add full support
for ProjectConfig.dbt_project_path = None, and implements astronomer#568.

Within this PR, several things have been updated:
1 - Added project_path fields to RenderConfig and ExecutionConfig
2 - Simplified the consumption of RenderConfig in the dbtGraph class
3 - added option to configure different dbt executables for Rendering vs
Execution.

Closes: astronomer#568
  • Loading branch information
MrBones757 authored and LennartKloppenburg committed Nov 17, 2023
1 parent 9972e03 commit 4532db2
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 143 deletions.
2 changes: 1 addition & 1 deletion cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,8 @@ class ExecutionConfig:

dbt_project_path: InitVar[str | Path | None] = None
virtualenv_dir: str | Path | None = None

project_path: Path | None = field(init=False)

def __post_init__(self, dbt_project_path: str | Path | None) -> None:
self.project_path = Path(dbt_project_path) if dbt_project_path else None

25 changes: 3 additions & 22 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,6 @@ def __init__(
# If we are using the old interface, we should migrate it to the new interface
# This is safe to do now since we have validated which config interface we're using
if project_config.dbt_project_path:
# We copy the configuration so the change does not affect other DAGs or TaskGroups
# that may reuse the same original configuration
render_config = copy.deepcopy(render_config)
execution_config = copy.deepcopy(execution_config)
render_config.project_path = project_config.dbt_project_path
execution_config.project_path = project_config.dbt_project_path

Expand All @@ -139,23 +135,8 @@ def __init__(
raise CosmosValueError(
"RenderConfig.dbt_project_path is required for rendering an airflow DAG from a DBT Graph if no manifest is provided."
)
emit_datasets = render_config.emit_datasets
dbt_root_path = project_config.dbt_project_path.parent
dbt_project_name = project_config.dbt_project_path.name
dbt_models_dir = project_config.models_relative_path
dbt_seeds_dir = project_config.seeds_relative_path
dbt_snapshots_dir = project_config.snapshots_relative_path
test_behavior = render_config.test_behavior
select = render_config.select
exclude = render_config.exclude
dbt_deps = render_config.dbt_deps
execution_mode = execution_config.execution_mode
load_mode = render_config.load_method
manifest_path = project_config.parsed_manifest_path
dbt_executable_path = execution_config.dbt_executable_path
node_converters = render_config.node_converters

if execution_mode != ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None:

if execution_config.execution_mode != ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None:
logger.warning(
"`ExecutionConfig.virtualenv_dir` is only supported when \
ExecutionConfig.execution_mode is set to ExecutionMode.VIRTUALENV."
Expand Down Expand Up @@ -196,7 +177,7 @@ def __init__(
if execution_config.dbt_executable_path:
task_args["dbt_executable_path"] = execution_config.dbt_executable_path

if execution_mode == ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None:
if execution_config.execution_mode == ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None:
task_args["virtualenv_dir"] = execution_config.virtualenv_dir

validate_arguments(render_config.select, render_config.exclude, profile_args, task_args)
Expand Down
35 changes: 26 additions & 9 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
ExecutionMode,
LoadMode,
)
from cosmos.dbt.executable import get_system_dbt
from cosmos.dbt.parser.project import LegacyDbtProject
from cosmos.dbt.project import create_symlinks
from cosmos.dbt.selector import select_nodes
Expand Down Expand Up @@ -52,6 +53,14 @@ class DbtNode:
has_test: bool = False


def create_symlinks(project_path: Path, tmp_dir: Path) -> None:
"""Helper function to create symlinks to the dbt project files."""
ignore_paths = (DBT_LOG_DIR_NAME, DBT_TARGET_DIR_NAME, "dbt_packages", "profiles.yml")
for child_name in os.listdir(project_path):
if child_name not in ignore_paths:
os.symlink(project_path / child_name, tmp_dir / child_name)


def run_command(command: list[str], tmp_dir: Path, env_vars: dict[str, str]) -> str:
"""Run a command in a subprocess, returning the stdout."""
logger.info("Running command: `%s`", " ".join(command))
Expand Down Expand Up @@ -108,6 +117,15 @@ class DbtGraph:
Supports different ways of loading the `dbt` project into this representation.
Different loading methods can result in different `nodes` and `filtered_nodes`.
Example of how to use:
dbt_graph = DbtGraph(
project=ProjectConfig(dbt_project_path=DBT_PROJECT_PATH),
render_config=RenderConfig(exclude=["*orders*"], select=[]),
dbt_cmd="/usr/local/bin/dbt"
)
dbt_graph.load(method=LoadMode.DBT_LS, execution_mode=ExecutionMode.LOCAL)
"""

nodes: dict[str, DbtNode] = dict()
Expand All @@ -119,13 +137,15 @@ def __init__(
render_config: RenderConfig = RenderConfig(),
execution_config: ExecutionConfig = ExecutionConfig(),
profile_config: ProfileConfig | None = None,
dbt_cmd: str = get_system_dbt(),
operator_args: dict[str, Any] | None = None,
):
self.project = project
self.render_config = render_config
self.profile_config = profile_config
self.execution_config = execution_config
self.operator_args = operator_args or {}
self.dbt_cmd = dbt_cmd

def load(
self,
Expand Down Expand Up @@ -163,9 +183,7 @@ def load(
else:
load_method[method]()

def run_dbt_ls(
self, dbt_cmd: str, project_path: Path, tmp_dir: Path, env_vars: dict[str, str]
) -> dict[str, DbtNode]:
def run_dbt_ls(self, project_path: Path, tmp_dir: Path, env_vars: dict[str, str]) -> dict[str, DbtNode]:
"""Runs dbt ls command and returns the parsed nodes."""
ls_command = [dbt_cmd, "ls", "--output", "json"]

Expand Down Expand Up @@ -202,10 +220,6 @@ def load_via_dbt_ls(self) -> None:
* self.nodes
* self.filtered_nodes
"""
self.render_config.validate_dbt_command(fallback_cmd=self.execution_config.dbt_executable_path)
dbt_cmd = self.render_config.dbt_executable_path
dbt_cmd = dbt_cmd.as_posix() if isinstance(dbt_cmd, Path) else dbt_cmd

logger.info(f"Trying to parse the dbt project in `{self.render_config.project_path}` using dbt ls...")
if not self.render_config.project_path or not self.execution_config.project_path:
raise CosmosLoadDbtException(
Expand All @@ -215,6 +229,9 @@ def load_via_dbt_ls(self) -> None:
if not self.profile_config:
raise CosmosLoadDbtException("Unable to load project via dbt ls without a profile config.")

if not self.profile_config:
raise CosmosLoadDbtException("Unable to load project via dbt ls without a profile config.")

with tempfile.TemporaryDirectory() as tmpdir:
logger.info(
f"Content of the dbt project dir {self.render_config.project_path}: `{os.listdir(self.render_config.project_path)}`"
Expand Down Expand Up @@ -243,12 +260,12 @@ def load_via_dbt_ls(self) -> None:
env[DBT_TARGET_PATH_ENVVAR] = str(self.target_dir)

if self.render_config.dbt_deps:
deps_command = [dbt_cmd, "deps"]
deps_command = [self.dbt_cmd, "deps"]
deps_command.extend(self.local_flags)
stdout = run_command(deps_command, tmpdir_path, env)
logger.debug("dbt deps output: %s", stdout)

nodes = self.run_dbt_ls(dbt_cmd, self.execution_config.project_path, tmpdir_path, env)
nodes = self.run_dbt_ls(self.execution_config.project_path, tmpdir_path, env)

self.nodes = nodes
self.filtered_nodes = nodes
Expand Down
14 changes: 6 additions & 8 deletions tests/dbt/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import pytest

from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig, CosmosConfigException
from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.constants import DbtResourceType, ExecutionMode
from cosmos.dbt.graph import (
CosmosLoadDbtException,
Expand Down Expand Up @@ -311,8 +311,9 @@ def test_load_via_dbt_ls_without_exclude(project_name):
def test_load_via_custom_without_project_path():
project_config = ProjectConfig(manifest_path=SAMPLE_MANIFEST, project_name="test")
execution_config = ExecutionConfig()
render_config = RenderConfig(dbt_executable_path="/inexistent/dbt")
render_config = RenderConfig()
dbt_graph = DbtGraph(
dbt_cmd="/inexistent/dbt",
project=project_config,
execution_config=execution_config,
render_config=render_config,
Expand All @@ -328,10 +329,9 @@ def test_load_via_custom_without_project_path():
def test_load_via_dbt_ls_without_profile(mock_validate_dbt_command):
project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME)
execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME)
render_config = RenderConfig(
dbt_executable_path="existing-dbt-cmd", dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME
)
render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME)
dbt_graph = DbtGraph(
dbt_cmd="/inexistent/dbt",
project=project_config,
execution_config=execution_config,
render_config=render_config,
Expand All @@ -346,9 +346,7 @@ def test_load_via_dbt_ls_without_profile(mock_validate_dbt_command):
def test_load_via_dbt_ls_with_invalid_dbt_path():
project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME)
execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME)
render_config = RenderConfig(
dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, dbt_executable_path="/inexistent/dbt"
)
render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME)
with patch("pathlib.Path.exists", return_value=True):
dbt_graph = DbtGraph(
project=project_config,
Expand Down
35 changes: 0 additions & 35 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,38 +122,3 @@ def test_profile_config_validate():
profile_config = ProfileConfig(profile_name="test", target_name="test")
assert profile_config.validate_profile() is None
assert err_info.value.args[0] == "Either profiles_yml_filepath or profile_mapping must be set to render a profile"


@patch("cosmos.config.shutil.which", return_value=None)
def test_render_config_without_dbt_cmd(mock_which):
render_config = RenderConfig()
with pytest.raises(CosmosConfigException) as err_info:
render_config.validate_dbt_command("inexistent-dbt")

error_msg = err_info.value.args[0]
assert error_msg.startswith("Unable to find the dbt executable, attempted: <")
assert error_msg.endswith("dbt> and <inexistent-dbt>.")


@patch("cosmos.config.shutil.which", return_value=None)
def test_render_config_with_invalid_dbt_commands(mock_which):
render_config = RenderConfig(dbt_executable_path="invalid-dbt")
with pytest.raises(CosmosConfigException) as err_info:
render_config.validate_dbt_command()

error_msg = err_info.value.args[0]
assert error_msg == "Unable to find the dbt executable, attempted: <invalid-dbt>."


@patch("cosmos.config.shutil.which", side_effect=(None, "fallback-dbt-path"))
def test_render_config_uses_fallback_if_default_not_found(mock_which):
render_config = RenderConfig()
render_config.validate_dbt_command(Path("/tmp/fallback-dbt-path"))
assert render_config.dbt_executable_path == "/tmp/fallback-dbt-path"


@patch("cosmos.config.shutil.which", side_effect=("user-dbt", "fallback-dbt-path"))
def test_render_config_uses_default_if_exists(mock_which):
render_config = RenderConfig(dbt_executable_path="user-dbt")
render_config.validate_dbt_command("fallback-dbt-path")
assert render_config.dbt_executable_path == "user-dbt"
68 changes: 0 additions & 68 deletions tests/test_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,74 +181,6 @@ def test_converter_fails_execution_config_no_project_dir(mock_load_dbt_graph, ex
)


def test_converter_fails_render_config_invalid_dbt_path_with_dbt_ls():
"""
Validate that a dbt project fails to be rendered to Airflow with DBT_LS if
the dbt command is invalid.
"""
project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT.as_posix(), project_name="sample")
execution_config = ExecutionConfig(
execution_mode=ExecutionMode.LOCAL,
dbt_executable_path="invalid-execution-dbt",
)
render_config = RenderConfig(
emit_datasets=True,
dbt_executable_path="invalid-render-dbt",
)
profile_config = ProfileConfig(
profile_name="my_profile_name",
target_name="my_target_name",
profiles_yml_filepath=SAMPLE_PROFILE_YML,
)
with pytest.raises(CosmosConfigException) as err_info:
with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag:
DbtToAirflowConverter(
dag=dag,
nodes=nodes,
project_config=project_config,
profile_config=profile_config,
execution_config=execution_config,
render_config=render_config,
)
assert (
err_info.value.args[0]
== "Unable to find the dbt executable, attempted: <invalid-render-dbt> and <invalid-execution-dbt>."
)


def test_converter_fails_render_config_invalid_dbt_path_with_manifest():
"""
Validate that a dbt project succeeds to be rendered to Airflow with DBT_MANIFEST even when
the dbt command is invalid.
"""
project_config = ProjectConfig(manifest_path=SAMPLE_DBT_MANIFEST.as_posix(), project_name="sample")

execution_config = ExecutionConfig(
execution_mode=ExecutionMode.LOCAL,
dbt_executable_path="invalid-execution-dbt",
dbt_project_path=SAMPLE_DBT_PROJECT.as_posix(),
)
render_config = RenderConfig(
emit_datasets=True,
dbt_executable_path="invalid-render-dbt",
)
profile_config = ProfileConfig(
profile_name="my_profile_name",
target_name="my_target_name",
profiles_yml_filepath=SAMPLE_PROFILE_YML,
)
with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag:
converter = DbtToAirflowConverter(
dag=dag,
nodes=nodes,
project_config=project_config,
profile_config=profile_config,
execution_config=execution_config,
render_config=render_config,
)
assert converter


@pytest.mark.parametrize(
"execution_mode,operator_args",
[
Expand Down

0 comments on commit 4532db2

Please sign in to comment.