Skip to content

Commit

Permalink
Address CR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
tatiana committed Sep 14, 2023
1 parent d0f4d08 commit dd7df64
Showing 1 changed file with 12 additions and 13 deletions.
25 changes: 12 additions & 13 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@
from openlineage.airflow.extractors.base import OperatorLineage
except (ImportError, ModuleNotFoundError) as error:
logger.warning(
"To enable emitting Openlineage events. In order to use openlineage, upgrade to Airflow 2.7 or "
"install astronomer-cosmos[openlineage]."
"To enable emitting Openlineage events, upgrade to Airflow 2.7 or install astronomer-cosmos[openlineage]."
)
logger.exception(error)
is_openlineage_available = False
Expand All @@ -68,6 +67,12 @@ class OperatorLineage: # type: ignore
job_facets: dict[str, str] = dict()


try:
LINEAGE_NAMESPACE = conf.get("openlineage", "namespace")
except airflow.exceptions.AirflowConfigException:
LINEAGE_NAMESPACE = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE)


class DbtLocalBaseOperator(DbtBaseOperator):
"""
Executes a dbt core cli command locally.
Expand Down Expand Up @@ -259,15 +264,9 @@ def calculate_openlineage_events_completes(
for key, value in env.items():
os.environ[key] = str(value)

lineage_namespace = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE)
try:
lineage_namespace = conf.get("openlineage", "namespace")
except airflow.exceptions.AirflowConfigException:
pass

openlineage_processor = DbtLocalArtifactProcessor(
producer=OPENLINEAGE_PRODUCER,
job_namespace=lineage_namespace,
job_namespace=LINEAGE_NAMESPACE,
project_dir=project_dir,
profile_name=self.profile_config.profile_name,
target=self.profile_config.target_name,
Expand All @@ -278,8 +277,8 @@ def calculate_openlineage_events_completes(
try:
events = openlineage_processor.parse()
self.openlineage_events_completes = events.completes
except (FileNotFoundError, NotImplementedError) as error:
logger.exception(error)
except (FileNotFoundError, NotImplementedError):
logger.debug("Unable to parse OpenLineage events", stack_info=True)

def get_datasets(self, source: Literal["inputs", "outputs"]) -> list[Dataset]:
"""
Expand Down Expand Up @@ -333,7 +332,7 @@ def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> Ope
elif hasattr(task_instance, "openlineage_events_completes"):
openlineage_events_completes = task_instance.openlineage_events_completes
else:
logger.warning("Unable to emit OpenLineage events due to lack of data.")
logger.info("Unable to emit OpenLineage events due to lack of data.")

if openlineage_events_completes is not None:
for completed in openlineage_events_completes:
Expand All @@ -342,7 +341,7 @@ def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> Ope
run_facets = {**run_facets, **completed.run.facets}
job_facets = {**job_facets, **completed.job.facets}
else:
logger.warning("Unable to emit OpenLineage events due to lack of dependencies or data.")
logger.info("Unable to emit OpenLineage events due to lack of dependencies or data.")

return OperatorLineage(
inputs=inputs,
Expand Down

0 comments on commit dd7df64

Please sign in to comment.