Skip to content

Commit

Permalink
Use returncode instead of stderr to determine dbt graph loading e…
Browse files Browse the repository at this point in the history
…rrors (#547)

Before, `DbtGraph.load_via_dbt_ls` raises `CosmosLoadDbtException` whenever `stderr` is not empty.

This behavior does not seem consistent, as non-blocking warnings can still make `cosmos` refuse to continue.

A specific case that I encountered is an OpenBLAS warning that is
produced by `podman` and [`composer-local-dev`](https://github.com/GoogleCloudPlatform/composer-local-dev):

```
OpenBLAS WARNING - could not determine the L2 cache size on this system, assuming 256k
```

This warning does not affect how `dbt` works, but `cosmos` still throws out an exception. This PR solves this.
  • Loading branch information
cliff-lau-cloverhealth authored Sep 26, 2023
1 parent 6a3ca35 commit d5ba070
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 11 deletions.
20 changes: 11 additions & 9 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,10 @@ def load_via_dbt_ls(self) -> None:
env=env,
)
stdout, stderr = process.communicate()
returncode = process.returncode
logger.debug("dbt deps output: %s", stdout)

if stderr or "Error" in stdout:
if returncode or "Error" in stdout:
details = stderr or stdout
raise CosmosLoadDbtException(f"Unable to run dbt deps command due to the error:\n{details}")

Expand All @@ -223,6 +224,7 @@ def load_via_dbt_ls(self) -> None:
)

stdout, stderr = process.communicate()
returncode = process.returncode

logger.debug("dbt output: %s", stdout)
log_filepath = log_dir / DBT_LOG_FILENAME
Expand All @@ -232,14 +234,14 @@ def load_via_dbt_ls(self) -> None:
for line in logfile:
logger.debug(line.strip())

if stderr or "Error" in stdout:
if 'Run "dbt deps" to install package dependencies' in stdout:
raise CosmosLoadDbtException(
"Unable to run dbt ls command due to missing dbt_packages. Set render_config.dbt_deps=True."
)
else:
details = stderr or stdout
raise CosmosLoadDbtException(f"Unable to run dbt ls command due to the error:\n{details}")
if 'Run "dbt deps" to install package dependencies' in stdout:
raise CosmosLoadDbtException(
"Unable to run dbt ls command due to missing dbt_packages. Set render_config.dbt_deps=True."
)

if returncode or "Error" in stdout:
details = stderr or stdout
raise CosmosLoadDbtException(f"Unable to run dbt ls command due to the error:\n{details}")

nodes = {}
for line in stdout.split("\n"):
Expand Down
53 changes: 51 additions & 2 deletions tests/dbt/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import pytest

from cosmos.config import ProfileConfig
from cosmos.constants import ExecutionMode, DbtResourceType
from cosmos.dbt.graph import DbtGraph, LoadMode, CosmosLoadDbtException
from cosmos.constants import DbtResourceType, ExecutionMode
from cosmos.dbt.graph import CosmosLoadDbtException, DbtGraph, LoadMode
from cosmos.dbt.project import DbtProject
from cosmos.profiles import PostgresUserPasswordProfileMapping

Expand Down Expand Up @@ -132,6 +132,7 @@ def test_load(
@patch("cosmos.dbt.graph.Popen")
def test_load_via_dbt_ls_does_not_create_target_logs_in_original_folder(mock_popen, tmp_dbt_project_dir):
mock_popen().communicate.return_value = ("", "")
mock_popen().returncode = 0
assert not (tmp_dbt_project_dir / "target").exists()
assert not (tmp_dbt_project_dir / "logs").exists()

Expand Down Expand Up @@ -276,6 +277,53 @@ def test_load_via_dbt_ls_without_dbt_deps():
assert err_info.value.args[0] == expected


@pytest.mark.integration
@patch("cosmos.dbt.graph.Popen")
def test_load_via_dbt_ls_with_zero_returncode_and_non_empty_stderr(mock_popen, tmp_dbt_project_dir):
mock_popen().communicate.return_value = ("", "Some stderr warnings")
mock_popen().returncode = 0

dbt_project = DbtProject(name=DBT_PIPELINE_NAME, root_dir=tmp_dbt_project_dir)
dbt_graph = DbtGraph(
project=dbt_project,
profile_config=ProfileConfig(
profile_name="default",
target_name="default",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="airflow_db",
profile_args={"schema": "public"},
),
),
)

dbt_graph.load_via_dbt_ls() # does not raise exception


@pytest.mark.integration
@patch("cosmos.dbt.graph.Popen")
def test_load_via_dbt_ls_with_non_zero_returncode(mock_popen):
mock_popen().communicate.return_value = ("", "Some stderr message")
mock_popen().returncode = 1

dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR)
dbt_graph = DbtGraph(
project=dbt_project,
profile_config=ProfileConfig(
profile_name="default",
target_name="default",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="airflow_db",
profile_args={"schema": "public"},
),
),
)
with pytest.raises(CosmosLoadDbtException) as err_info:
dbt_graph.load_via_dbt_ls()

expected = "Unable to run dbt deps command due to the error:\nSome stderr message"
assert err_info.value.args[0] == expected


@pytest.mark.integration
@patch("cosmos.dbt.graph.Popen.communicate", return_value=("Some Runtime Error", ""))
def test_load_via_dbt_ls_with_runtime_error_in_stdout(mock_popen_communicate):
Expand All @@ -294,6 +342,7 @@ def test_load_via_dbt_ls_with_runtime_error_in_stdout(mock_popen_communicate):
)
with pytest.raises(CosmosLoadDbtException) as err_info:
dbt_graph.load_via_dbt_ls()

expected = "Unable to run dbt deps command due to the error:\nSome Runtime Error"
assert err_info.value.args[0] == expected
mock_popen_communicate.assert_called_once()
Expand Down

0 comments on commit d5ba070

Please sign in to comment.