From bcf7714bec09caf3a3ffec9c6c2e72bf43e1c7cf Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 14 Sep 2023 11:22:51 +0100 Subject: [PATCH] Fix DbtRunOperationLocalOperator missing flags (#529) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit @jensenity reported an issue in Slack related to Cosmos 1.1: https://apache-airflow.slack.com/archives/C059CC42E9W/p1694488079896779 "for some reason, the the dbtrunoperation operator doesn’t use the cmd_flags….. it didnt have the --args flags when i look into my logs." ``` DbtRunOperationOperator( task_id="run_operation", macro_name="stage_external_sources", args=dict(select='source_a'), profile_config=profile_config, project_dir=DBT_PROJECT_PATH, install_deps=True, env={ … }, dbt_executable_path=DBT_EXECUTION_PATH, queue="kubernetes", ) ``` This PR adds tests to all local operators to confirm if they are being executed with flags or not to avoid regressions. It increases the project test coverage from 91.80% to 92.33%. As a bonus, this PR also enables the `--full-refresh` flag when running dbt models, as requested by Monideep De in the Slack channel: https://apache-airflow.slack.com/archives/C059CC42E9W/p1694523446939829 This change makes sense since this flag is supported both in Seeds and Models (https://docs.getdbt.com/reference/resource-configs/full_refresh), but we only exposed it in Seeds. --- cosmos/operators/local.py | 17 +++++++++- tests/operators/test_local.py | 59 ++++++++++++++++++++++++++++++++++- 2 files changed, 74 insertions(+), 2 deletions(-) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 69bc1a78c..adf608643 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -405,10 +405,21 @@ class DbtRunLocalOperator(DbtLocalBaseOperator): ui_color = "#7352BA" ui_fgcolor = "#F4F2FC" - def __init__(self, **kwargs: Any) -> None: + def __init__(self, full_refresh: bool = False, **kwargs: Any) -> None: + self.full_refresh = full_refresh super().__init__(**kwargs) self.base_cmd = ["run"] + def add_cmd_flags(self) -> list[str]: + flags = [] + if self.full_refresh is True: + flags.append("--full-refresh") + return flags + + def execute(self, context: Context) -> None: + cmd_flags = self.add_cmd_flags() + self.build_and_run_cmd(context=context, cmd_flags=cmd_flags) + class DbtTestLocalOperator(DbtLocalBaseOperator): """ @@ -485,6 +496,10 @@ def add_cmd_flags(self) -> list[str]: flags.append(yaml.dump(self.args)) return flags + def execute(self, context: Context) -> None: + cmd_flags = self.add_cmd_flags() + self.build_and_run_cmd(context=context, cmd_flags=cmd_flags) + class DbtDocsLocalOperator(DbtLocalBaseOperator): """ diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 7d00d88c8..f926816f6 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -11,7 +11,18 @@ from pendulum import datetime from cosmos.config import ProfileConfig -from cosmos.operators.local import DbtLocalBaseOperator, DbtRunLocalOperator, DbtTestLocalOperator +from cosmos.operators.local import ( + DbtLocalBaseOperator, + DbtLSLocalOperator, + DbtSnapshotLocalOperator, + DbtRunLocalOperator, + DbtTestLocalOperator, + DbtDocsLocalOperator, + DbtDocsS3LocalOperator, + DbtDocsAzureStorageLocalOperator, + DbtSeedLocalOperator, + DbtRunOperationLocalOperator, +) from cosmos.profiles import PostgresUserPasswordProfileMapping from tests.utils import test_dag as run_test_dag @@ -228,3 +239,49 @@ def test_store_compiled_sql() -> None: tmp_project_dir="my/dir", context=Context(execution_date=datetime(2023, 2, 15, 12, 30)), ) + + +@pytest.mark.parametrize( + "operator_class,kwargs,expected_call_kwargs", + [ + (DbtSeedLocalOperator, {"full_refresh": True}, {"context": {}, "cmd_flags": ["--full-refresh"]}), + (DbtRunLocalOperator, {"full_refresh": True}, {"context": {}, "cmd_flags": ["--full-refresh"]}), + ( + DbtRunOperationLocalOperator, + {"args": {"days": 7, "dry_run": True}, "macro_name": "bla"}, + {"context": {}, "cmd_flags": ["--args", "days: 7\ndry_run: true\n"]}, + ), + ], +) +@patch("cosmos.operators.local.DbtLocalBaseOperator.build_and_run_cmd") +def test_operator_execute_with_flags(mock_build_and_run_cmd, operator_class, kwargs, expected_call_kwargs): + task = operator_class(profile_config=profile_config, task_id="my-task", project_dir="my/dir", **kwargs) + task.execute(context={}) + mock_build_and_run_cmd.assert_called_once_with(**expected_call_kwargs) + + +@pytest.mark.parametrize( + "operator_class", + ( + DbtLSLocalOperator, + DbtSnapshotLocalOperator, + DbtTestLocalOperator, + DbtDocsLocalOperator, + DbtDocsS3LocalOperator, + DbtDocsAzureStorageLocalOperator, + ), +) +@patch("cosmos.operators.local.DbtLocalBaseOperator.build_and_run_cmd") +def test_operator_execute_without_flags(mock_build_and_run_cmd, operator_class): + operator_class_kwargs = { + DbtDocsS3LocalOperator: {"aws_conn_id": "fake-conn", "bucket_name": "fake-bucket"}, + DbtDocsAzureStorageLocalOperator: {"azure_conn_id": "fake-conn", "container_name": "fake-container"}, + } + task = operator_class( + profile_config=profile_config, + task_id="my-task", + project_dir="my/dir", + **operator_class_kwargs.get(operator_class, {}), + ) + task.execute(context={}) + mock_build_and_run_cmd.assert_called_once_with(context={})