From dd7df64ef43cdbbbb060d8d149e27b35b141f821 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 14 Sep 2023 11:19:59 +0100 Subject: [PATCH] Address CR feedback --- cosmos/operators/local.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 60aeb71e8..aae04ee2a 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -54,8 +54,7 @@ from openlineage.airflow.extractors.base import OperatorLineage except (ImportError, ModuleNotFoundError) as error: logger.warning( - "To enable emitting Openlineage events. In order to use openlineage, upgrade to Airflow 2.7 or " - "install astronomer-cosmos[openlineage]." + "To enable emitting Openlineage events, upgrade to Airflow 2.7 or install astronomer-cosmos[openlineage]." ) logger.exception(error) is_openlineage_available = False @@ -68,6 +67,12 @@ class OperatorLineage: # type: ignore job_facets: dict[str, str] = dict() +try: + LINEAGE_NAMESPACE = conf.get("openlineage", "namespace") +except airflow.exceptions.AirflowConfigException: + LINEAGE_NAMESPACE = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE) + + class DbtLocalBaseOperator(DbtBaseOperator): """ Executes a dbt core cli command locally. @@ -259,15 +264,9 @@ def calculate_openlineage_events_completes( for key, value in env.items(): os.environ[key] = str(value) - lineage_namespace = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE) - try: - lineage_namespace = conf.get("openlineage", "namespace") - except airflow.exceptions.AirflowConfigException: - pass - openlineage_processor = DbtLocalArtifactProcessor( producer=OPENLINEAGE_PRODUCER, - job_namespace=lineage_namespace, + job_namespace=LINEAGE_NAMESPACE, project_dir=project_dir, profile_name=self.profile_config.profile_name, target=self.profile_config.target_name, @@ -278,8 +277,8 @@ def calculate_openlineage_events_completes( try: events = openlineage_processor.parse() self.openlineage_events_completes = events.completes - except (FileNotFoundError, NotImplementedError) as error: - logger.exception(error) + except (FileNotFoundError, NotImplementedError): + logger.debug("Unable to parse OpenLineage events", stack_info=True) def get_datasets(self, source: Literal["inputs", "outputs"]) -> list[Dataset]: """ @@ -333,7 +332,7 @@ def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> Ope elif hasattr(task_instance, "openlineage_events_completes"): openlineage_events_completes = task_instance.openlineage_events_completes else: - logger.warning("Unable to emit OpenLineage events due to lack of data.") + logger.info("Unable to emit OpenLineage events due to lack of data.") if openlineage_events_completes is not None: for completed in openlineage_events_completes: @@ -342,7 +341,7 @@ def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> Ope run_facets = {**run_facets, **completed.run.facets} job_facets = {**job_facets, **completed.job.facets} else: - logger.warning("Unable to emit OpenLineage events due to lack of dependencies or data.") + logger.info("Unable to emit OpenLineage events due to lack of dependencies or data.") return OperatorLineage( inputs=inputs,