From fb36be56eb9c4a78fb853d6b0971e9b254f31e1a Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 28 Sep 2023 19:18:30 +0100 Subject: [PATCH] Fix behaviour when openlineage raises KeyError (#565) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Make Cosmos more resilient to Openlineage errors. The Openlineage library raises `KeyError` for some community dbt projects (e.g. https://apache-airflow.slack.com/archives/C059CC42E9W/p1695067128727239): ``` [2023-09-18, 19:54:12 UTC] {taskinstance.py:1935} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 360, in execute self.build_and_run_cmd(context=context) File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 356, in build_and_run_cmd result = self.run_command(cmd=dbt_cmd, env=env, context=context) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 230, in run_command self.calculate_openlineage_events_completes(env, Path(tmp_project_dir)) File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 278, in calculate_openlineage_events_completes events = openlineage_processor.parse() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/openlineage/common/provider/dbt/processor.py", line 211, in parse events += self.parse_test(context, nodes) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/openlineage/common/provider/dbt/processor.py", line 308, in parse_test assertions = self.parse_assertions(context, nodes) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/openlineage/common/provider/dbt/processor.py", line 367, in parse_assertions assertion=test_node["test_metadata"]["name"], ~~~~~~~~~^^^^^^^^^^^^^^^^^ KeyError: 'test_metadata' ``` Closes: #545 Co-authored-by: Javier Hernández Novoa --- cosmos/__init__.py | 4 ++-- cosmos/dbt/graph.py | 1 + cosmos/operators/local.py | 9 +++++---- tests/operators/test_local.py | 19 +++++++++++++++++++ 4 files changed, 27 insertions(+), 6 deletions(-) diff --git a/cosmos/__init__.py b/cosmos/__init__.py index b73414c8c..e613dcd02 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -59,8 +59,8 @@ DbtSnapshotKubernetesOperator, DbtTestKubernetesOperator, ) -except ImportError as error: - logger.exception(error) +except ImportError: + logger.debug("To import Kubernetes modules, install astronomer-cosmos[kubernetes].", stack_info=True) DbtLSKubernetesOperator = MissingPackage( "cosmos.operators.kubernetes.DbtLSKubernetesOperator", "kubernetes", diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 1ad6c1737..72b3eaa2b 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -155,6 +155,7 @@ def load_via_dbt_ls(self) -> None: env.update(env_vars) with tempfile.TemporaryDirectory() as tmpdir: + logger.info("Content of the dbt project dir <%s>: `%s`", self.project.dir, os.listdir(self.project.dir)) logger.info("Creating symlinks from %s to `%s`", self.project.dir, tmpdir) # We create symbolic links to the original directory files and directories. # This allows us to run the dbt command from within the temporary directory, outputting any necessary diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index b6d0d12c8..d74cfbe44 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -23,6 +23,7 @@ from airflow.datasets import Dataset except ModuleNotFoundError: is_openlineage_available = False + DbtLocalArtifactProcessor = None else: is_openlineage_available = True @@ -52,11 +53,11 @@ except (ImportError, ModuleNotFoundError): try: from openlineage.airflow.extractors.base import OperatorLineage - except (ImportError, ModuleNotFoundError) as error: + except (ImportError, ModuleNotFoundError): logger.warning( - "To enable emitting Openlineage events, upgrade to Airflow 2.7 or install astronomer-cosmos[openlineage]." + "To enable emitting Openlineage events, upgrade to Airflow 2.7 or install astronomer-cosmos[openlineage].", + stack_info=True, ) - logger.exception(error) is_openlineage_available = False @define @@ -276,7 +277,7 @@ def calculate_openlineage_events_completes( try: events = openlineage_processor.parse() self.openlineage_events_completes = events.completes - except (FileNotFoundError, NotImplementedError, ValueError): + except (FileNotFoundError, NotImplementedError, ValueError, KeyError): logger.debug("Unable to parse OpenLineage events", stack_info=True) def get_datasets(self, source: Literal["inputs", "outputs"]) -> list[Dataset]: diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 728eea079..0898a2894 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -1,3 +1,4 @@ +import logging import os import shutil import tempfile @@ -370,3 +371,21 @@ def test_operator_execute_without_flags(mock_build_and_run_cmd, operator_class): ) task.execute(context={}) mock_build_and_run_cmd.assert_called_once_with(context={}) + + +@patch("cosmos.operators.local.DbtLocalArtifactProcessor") +def test_calculate_openlineage_events_completes_openlineage_errors(mock_processor, caplog): + instance = mock_processor.return_value + instance.parse = MagicMock(side_effect=KeyError) + caplog.set_level(logging.DEBUG) + dbt_base_operator = DbtLocalBaseOperator( + profile_config=profile_config, + task_id="my-task", + project_dir=DBT_PROJ_DIR, + should_store_compiled_sql=False, + ) + + dbt_base_operator.calculate_openlineage_events_completes(env={}, project_dir=DBT_PROJ_DIR) + assert instance.parse.called + err_msg = "Unable to parse OpenLineage events" + assert err_msg in caplog.text