Skip to content

Commit

Permalink
Fix DbtRunOperationLocalOperator missing flags (#529)
Browse files Browse the repository at this point in the history
@jensenity reported an issue in Slack related to Cosmos 1.1:
https://apache-airflow.slack.com/archives/C059CC42E9W/p1694488079896779

"for some reason, the the dbtrunoperation operator doesn’t use the
cmd_flags….. it didnt have the --args flags when i look into my logs."
```
DbtRunOperationOperator(
                            task_id="run_operation",
                            macro_name="stage_external_sources",
                            args=dict(select='source_a'),
                            profile_config=profile_config,
                            project_dir=DBT_PROJECT_PATH,
                            install_deps=True,
                            env={
                                …
                            },
                            dbt_executable_path=DBT_EXECUTION_PATH,
                            queue="kubernetes",
                        )
```

This PR adds tests to all local operators to confirm if they are being
executed with flags or not to avoid regressions.
It increases the project test coverage from 91.80% to 92.33%.

As a bonus, this PR also enables the `--full-refresh` flag when running
dbt models, as requested by Monideep De in the Slack channel:
https://apache-airflow.slack.com/archives/C059CC42E9W/p1694523446939829

This change makes sense since this flag is supported both in Seeds and
Models
(https://docs.getdbt.com/reference/resource-configs/full_refresh), but
we only exposed it in Seeds.
  • Loading branch information
tatiana authored Sep 14, 2023
1 parent 7178805 commit bcf7714
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 2 deletions.
17 changes: 16 additions & 1 deletion cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,10 +405,21 @@ class DbtRunLocalOperator(DbtLocalBaseOperator):
ui_color = "#7352BA"
ui_fgcolor = "#F4F2FC"

def __init__(self, **kwargs: Any) -> None:
def __init__(self, full_refresh: bool = False, **kwargs: Any) -> None:
self.full_refresh = full_refresh
super().__init__(**kwargs)
self.base_cmd = ["run"]

def add_cmd_flags(self) -> list[str]:
flags = []
if self.full_refresh is True:
flags.append("--full-refresh")
return flags

def execute(self, context: Context) -> None:
cmd_flags = self.add_cmd_flags()
self.build_and_run_cmd(context=context, cmd_flags=cmd_flags)


class DbtTestLocalOperator(DbtLocalBaseOperator):
"""
Expand Down Expand Up @@ -485,6 +496,10 @@ def add_cmd_flags(self) -> list[str]:
flags.append(yaml.dump(self.args))
return flags

def execute(self, context: Context) -> None:
cmd_flags = self.add_cmd_flags()
self.build_and_run_cmd(context=context, cmd_flags=cmd_flags)


class DbtDocsLocalOperator(DbtLocalBaseOperator):
"""
Expand Down
59 changes: 58 additions & 1 deletion tests/operators/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,18 @@
from pendulum import datetime

from cosmos.config import ProfileConfig
from cosmos.operators.local import DbtLocalBaseOperator, DbtRunLocalOperator, DbtTestLocalOperator
from cosmos.operators.local import (
DbtLocalBaseOperator,
DbtLSLocalOperator,
DbtSnapshotLocalOperator,
DbtRunLocalOperator,
DbtTestLocalOperator,
DbtDocsLocalOperator,
DbtDocsS3LocalOperator,
DbtDocsAzureStorageLocalOperator,
DbtSeedLocalOperator,
DbtRunOperationLocalOperator,
)
from cosmos.profiles import PostgresUserPasswordProfileMapping
from tests.utils import test_dag as run_test_dag

Expand Down Expand Up @@ -228,3 +239,49 @@ def test_store_compiled_sql() -> None:
tmp_project_dir="my/dir",
context=Context(execution_date=datetime(2023, 2, 15, 12, 30)),
)


@pytest.mark.parametrize(
"operator_class,kwargs,expected_call_kwargs",
[
(DbtSeedLocalOperator, {"full_refresh": True}, {"context": {}, "cmd_flags": ["--full-refresh"]}),
(DbtRunLocalOperator, {"full_refresh": True}, {"context": {}, "cmd_flags": ["--full-refresh"]}),
(
DbtRunOperationLocalOperator,
{"args": {"days": 7, "dry_run": True}, "macro_name": "bla"},
{"context": {}, "cmd_flags": ["--args", "days: 7\ndry_run: true\n"]},
),
],
)
@patch("cosmos.operators.local.DbtLocalBaseOperator.build_and_run_cmd")
def test_operator_execute_with_flags(mock_build_and_run_cmd, operator_class, kwargs, expected_call_kwargs):
task = operator_class(profile_config=profile_config, task_id="my-task", project_dir="my/dir", **kwargs)
task.execute(context={})
mock_build_and_run_cmd.assert_called_once_with(**expected_call_kwargs)


@pytest.mark.parametrize(
"operator_class",
(
DbtLSLocalOperator,
DbtSnapshotLocalOperator,
DbtTestLocalOperator,
DbtDocsLocalOperator,
DbtDocsS3LocalOperator,
DbtDocsAzureStorageLocalOperator,
),
)
@patch("cosmos.operators.local.DbtLocalBaseOperator.build_and_run_cmd")
def test_operator_execute_without_flags(mock_build_and_run_cmd, operator_class):
operator_class_kwargs = {
DbtDocsS3LocalOperator: {"aws_conn_id": "fake-conn", "bucket_name": "fake-bucket"},
DbtDocsAzureStorageLocalOperator: {"azure_conn_id": "fake-conn", "container_name": "fake-container"},
}
task = operator_class(
profile_config=profile_config,
task_id="my-task",
project_dir="my/dir",
**operator_class_kwargs.get(operator_class, {}),
)
task.execute(context={})
mock_build_and_run_cmd.assert_called_once_with(context={})

0 comments on commit bcf7714

Please sign in to comment.