From 6e41471712d9352524e70a59a1380e8e52c5ede2 Mon Sep 17 00:00:00 2001 From: Joppe Vos <44348300+joppevos@users.noreply.github.com> Date: Fri, 10 Nov 2023 16:40:15 +0100 Subject: [PATCH] Fix installing deps when using `profile_mapping` & `ExecutionMode.LOCAL` (#659) Extends the local operator when running `dbt deps` with the provides profile flags. This makes the logic consistent between DAG parsing and task running as referenced below https://github.com/astronomer/astronomer-cosmos/blob/8e2d5908ce89aa98813af6dfd112239e124bd69a/cosmos/dbt/graph.py#L247-L266 Closes: #658 --- cosmos/operators/local.py | 26 ++++++++++++++++---------- tests/operators/test_local.py | 30 ++++++++++++++++++++++++++++++ tests/operators/test_virtualenv.py | 2 +- 3 files changed, 47 insertions(+), 11 deletions(-) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 145741096..121ed66d0 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -206,17 +206,11 @@ def run_command( tmp_project_dir, ) - # if we need to install deps, do so - if self.install_deps: - self.run_subprocess( - command=[self.dbt_executable_path, "deps"], - env=env, - output_encoding=self.output_encoding, - cwd=tmp_project_dir, - ) - with self.profile_config.ensure_profile() as (profile_path, env_vars): + with self.profile_config.ensure_profile() as profile_values: + (profile_path, env_vars) = profile_values env.update(env_vars) - full_cmd = cmd + [ + + flags = [ "--profiles-dir", str(profile_path.parent), "--profile", @@ -225,6 +219,18 @@ def run_command( self.profile_config.target_name, ] + if self.install_deps: + deps_command = [self.dbt_executable_path, "deps"] + deps_command.extend(flags) + self.run_subprocess( + command=deps_command, + env=env, + output_encoding=self.output_encoding, + cwd=tmp_project_dir, + ) + + full_cmd = cmd + flags + logger.info("Trying to run the command:\n %s\nFrom %s", full_cmd, tmp_project_dir) logger.info("Using environment variables keys: %s", env.keys()) result = self.run_subprocess( diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 14213b335..38863620f 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -458,3 +458,33 @@ def test_dbt_docs_gcs_local_operator(): call(filename="fake-dir/target/file2", bucket_name="fake-bucket", object_name="fake-folder/file2"), ] mock_hook.upload.assert_has_calls(expected_upload_calls) + + +@patch("cosmos.operators.local.DbtLocalBaseOperator.store_compiled_sql") +@patch("cosmos.operators.local.DbtLocalBaseOperator.exception_handling") +@patch("cosmos.config.ProfileConfig.ensure_profile") +@patch("cosmos.operators.local.DbtLocalBaseOperator.run_subprocess") +def test_operator_execute_deps_parameters( + mock_build_and_run_cmd, mock_ensure_profile, mock_exception_handling, mock_store_compiled_sql +): + expected_call_kwargs = [ + "/usr/local/bin/dbt", + "deps", + "--profiles-dir", + "/path/to", + "--profile", + "default", + "--target", + "dev", + ] + task = DbtRunLocalOperator( + profile_config=real_profile_config, + task_id="my-task", + project_dir=DBT_PROJ_DIR, + install_deps=True, + emit_datasets=False, + dbt_executable_path="/usr/local/bin/dbt", + ) + mock_ensure_profile.return_value.__enter__.return_value = (Path("/path/to/profile"), {"ENV_VAR": "value"}) + task.execute(context={"task_instance": MagicMock()}) + assert mock_build_and_run_cmd.call_args_list[0].kwargs["command"] == expected_call_kwargs diff --git a/tests/operators/test_virtualenv.py b/tests/operators/test_virtualenv.py index 142a251a7..13dba8f94 100644 --- a/tests/operators/test_virtualenv.py +++ b/tests/operators/test_virtualenv.py @@ -60,7 +60,7 @@ def test_run_command( dbt_cmd = run_command_args[2] assert python_cmd[0][0][0].endswith("/bin/python") assert python_cmd[0][-1][-1] == "from importlib.metadata import version; print(version('dbt-core'))" - assert dbt_deps[0][0][-1] == "deps" + assert dbt_deps[0][0][1] == "deps" assert dbt_deps[0][0][0].endswith("/bin/dbt") assert dbt_deps[0][0][0] == dbt_cmd[0][0][0] assert dbt_cmd[0][0][1] == "do-something"