From 5c010cc430f1e81d50063c36195fa655e1393ff5 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 18 Apr 2024 23:37:54 +0100 Subject: [PATCH] Increase test coverage and fix implementation --- cosmos/cache.py | 10 +- cosmos/config.py | 3 +- cosmos/converter.py | 12 -- cosmos/dbt/graph.py | 4 +- cosmos/operators/local.py | 2 +- dev/dags/basic_cosmos_task_group.py | 10 +- tests/dbt/test_graph.py | 18 ++- tests/plugin/test_plugin.py | 221 ---------------------------- tests/test_cache.py | 62 ++++++++ 9 files changed, 97 insertions(+), 245 deletions(-) delete mode 100644 tests/plugin/test_plugin.py create mode 100644 tests/test_cache.py diff --git a/cosmos/cache.py b/cosmos/cache.py index 01db22c0f..7d24b5ab0 100644 --- a/cosmos/cache.py +++ b/cosmos/cache.py @@ -11,6 +11,11 @@ from cosmos.dbt.project import get_partial_parse_path +# It was considered to create a cache identifier based on the dbt project path, as opposed +# to where it is used in Airflow. However, we could have concurrency issues if the same +# dbt cached directory was being used by different dbt task groups or DAGs within the same +# node. For this reason, as a starting point, the cache is identified by where it is used. +# This can be reviewed in the future. def create_cache_identifier(dag: DAG, task_group: TaskGroup | None) -> str: """ Given a DAG name and a (optional) task_group_name, create the identifier for caching. @@ -22,11 +27,8 @@ def create_cache_identifier(dag: DAG, task_group: TaskGroup | None) -> str: if task_group: if task_group.dag_id is not None: cache_identifiers_list = [task_group.dag_id] - if task_group.upstream_group_ids is not None: - group_ids: list[str] = [tg for tg in task_group.upstream_group_ids or [] if tg is not None] - cache_identifiers_list.extend(group_ids) if task_group.group_id is not None: - cache_identifiers_list.extend(task_group.group_id) + cache_identifiers_list.extend([task_group.group_id.replace(".", "_")]) cache_identifier = "_".join(cache_identifiers_list) else: cache_identifier = dag.dag_id diff --git a/cosmos/config.py b/cosmos/config.py index 18311057c..b5346377b 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -288,7 +288,8 @@ def ensure_profile( with tempfile.TemporaryDirectory() as temp_dir: temp_file = Path(temp_dir) / DEFAULT_PROFILES_FILE_NAME logger.info( - "Creating temporary profiles.yml at %s with the following contents:\n%s", + "Creating temporary profiles.yml with use_mock_values=%s at %s with the following contents:\n%s", + use_mock_values, temp_file, profile_contents, ) diff --git a/cosmos/converter.py b/cosmos/converter.py index 273519be4..9b33efa20 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -215,8 +215,6 @@ def __init__( validate_initial_user_config(execution_config, profile_config, project_config, render_config, operator_args) - # If we are using the old interface, we should migrate it to the new interface - # This is safe to do now since we have validated which config interface we're using if project_config.dbt_project_path: execution_config, render_config = migrate_to_new_interface(execution_config, project_config, render_config) @@ -227,16 +225,6 @@ def __init__( cache_dir = cache.obtain_cache_dir_path(cache_identifier=cache.create_cache_identifier(dag, task_group)) - # Previously, we were creating a cosmos.dbt.project.DbtProject - # DbtProject has now been replaced with ProjectConfig directly - # since the interface of the two classes were effectively the same - # Under this previous implementation, we were passing: - # - name, root dir, models dir, snapshots dir and manifest path - # Internally in the dbtProject class, we were defaulting the profile_path - # To be root dir/profiles.yml - # To keep this logic working, if converter is given no ProfileConfig, - # we can create a default retaining this value to preserve this functionality. - # We may want to consider defaulting this value in our actual ProjectConfig class? self.dbt_graph = DbtGraph( project=project_config, render_config=render_config, diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 597150ffe..80276af9f 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -74,7 +74,7 @@ def name(self) -> str: def run_command(command: list[str], tmp_dir: Path, env_vars: dict[str, str]) -> str: """Run a command in a subprocess, returning the stdout.""" logger.info("Running command: `%s`", " ".join(command)) - logger.info("Environment variable keys: %s", env_vars.keys()) + logger.debug("Environment variable keys: %s", env_vars.keys()) process = Popen( command, stdout=PIPE, @@ -215,7 +215,7 @@ def run_dbt_ls( stdout = run_command(ls_command, tmp_dir, env_vars) - logger.debug("dbt ls output: %s", stdout) + logger.info("dbt ls output: %s", stdout) log_filepath = self.log_dir / DBT_LOG_FILENAME logger.debug("dbt logs available in: %s", log_filepath) if log_filepath.exists(): diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index e28045ba2..5079fbaf4 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -325,7 +325,7 @@ def run_command( full_cmd = cmd + flags - logger.info("Using environment variables keys: %s", env.keys()) + logger.debug("Using environment variables keys: %s", env.keys()) result = self.invoke_dbt( command=full_cmd, diff --git a/dev/dags/basic_cosmos_task_group.py b/dev/dags/basic_cosmos_task_group.py index 14d209d13..a06cb51be 100644 --- a/dev/dags/basic_cosmos_task_group.py +++ b/dev/dags/basic_cosmos_task_group.py @@ -25,7 +25,10 @@ ), ) -shared_execution_config = ExecutionConfig(invocation_mode=InvocationMode.DBT_RUNNER) +shared_execution_config = ExecutionConfig( + invocation_mode=InvocationMode.SUBPROCESS, + # invocation_mode=InvocationMode.DBT_RUNNER +) @dag( @@ -56,7 +59,10 @@ def basic_cosmos_task_group() -> None: project_config=ProjectConfig( (DBT_ROOT_PATH / "jaffle_shop").as_posix(), ), - render_config=RenderConfig(select=["path:seeds/raw_orders.csv"]), + render_config=RenderConfig( + select=["path:seeds/raw_orders.csv"], + enable_mock_profile=False, # This is necessary to benefit from partial parsing when using ProfileMapping + ), execution_config=shared_execution_config, operator_args={"install_deps": True}, profile_config=profile_config, diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index f72bbb146..faddf4484 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -482,7 +482,9 @@ def test_load_via_dbt_ls_without_dbt_deps(postgres_profile_config): @pytest.mark.integration -def test_load_via_dbt_ls_without_dbt_deps_and_preinstalled_dbt_packages(tmp_dbt_project_dir, postgres_profile_config): +def test_load_via_dbt_ls_without_dbt_deps_and_preinstalled_dbt_packages( + tmp_dbt_project_dir, postgres_profile_config, caplog, tmp_path +): local_flags = [ "--project-dir", tmp_dbt_project_dir / DBT_PROJECT_NAME, @@ -506,17 +508,29 @@ def test_load_via_dbt_ls_without_dbt_deps_and_preinstalled_dbt_packages(tmp_dbt_ stdout, stderr = process.communicate() project_config = ProjectConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) - render_config = RenderConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, dbt_deps=False) + render_config = RenderConfig( + dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, dbt_deps=False, enable_mock_profile=False + ) execution_config = ExecutionConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) dbt_graph = DbtGraph( project=project_config, render_config=render_config, execution_config=execution_config, profile_config=postgres_profile_config, + cache_dir=tmp_path, ) + from cosmos.constants import DBT_TARGET_DIR_NAME + + (tmp_path / DBT_TARGET_DIR_NAME).mkdir(parents=True, exist_ok=True) + dbt_graph.load_via_dbt_ls() # does not raise exception + assert "Unable to do partial parsing" in caplog.text + # TODO: split the caching test into a separate test, and make the following assertion work + # dbt_graph.load_via_dbt_ls() # does not raise exception + # assert not "Unable to do partial parsing" in caplog.text + @pytest.mark.integration @patch("cosmos.dbt.graph.Popen") diff --git a/tests/plugin/test_plugin.py b/tests/plugin/test_plugin.py deleted file mode 100644 index 8d0e3742f..000000000 --- a/tests/plugin/test_plugin.py +++ /dev/null @@ -1,221 +0,0 @@ -# dbt-core relies on Jinja2>3, whereas Flask<2 relies on an incompatible version of Jinja2. -# -# This discrepancy causes the automated integration tests to fail, as dbt-core is installed in the same -# environment as apache-airflow. -# -# We can get around this by patching the jinja2 namespace to include the deprecated objects: -try: - import flask # noqa: F401 -except ImportError: - import jinja2 - import markupsafe - - jinja2.Markup = markupsafe.Markup - jinja2.escape = markupsafe.escape - -import sys -from unittest.mock import MagicMock, PropertyMock, mock_open, patch - -import pytest -from airflow.configuration import conf -from airflow.exceptions import AirflowConfigException -from airflow.utils.db import initdb, resetdb -from airflow.www.app import cached_app -from airflow.www.extensions.init_appbuilder import AirflowAppBuilder -from flask.testing import FlaskClient - -import cosmos.plugin -from cosmos.plugin import ( - dbt_docs_view, - iframe_script, - open_azure_file, - open_file, - open_gcs_file, - open_http_file, - open_s3_file, -) - -original_conf_get = conf.get - - -def _get_text_from_response(response) -> str: - # Airflow < 2.4 uses an old version of Werkzeug that does not have Response.text. - if not hasattr(response, "text"): - return response.get_data(as_text=True) - else: - return response.text - - -@pytest.fixture(scope="module") -def app() -> FlaskClient: - initdb() - - app = cached_app(testing=True) - appbuilder: AirflowAppBuilder = app.extensions["appbuilder"] - - appbuilder.sm.check_authorization = lambda *args, **kwargs: True - - if dbt_docs_view not in appbuilder.baseviews: - appbuilder._check_and_init(dbt_docs_view) - appbuilder.register_blueprint(dbt_docs_view) - - yield app.test_client() - - resetdb(skip_init=True) - - -def test_dbt_docs(monkeypatch, app): - def conf_get(section, key, *args, **kwargs): - if section == "cosmos" and key == "dbt_docs_dir": - return "path/to/docs/dir" - else: - return original_conf_get(section, key, *args, **kwargs) - - monkeypatch.setattr(conf, "get", conf_get) - - response = app.get("/cosmos/dbt_docs") - - assert response.status_code == 200 - assert "