From 89f5999e7360bc4c1857ac1d904d8cfe1c5d0327 Mon Sep 17 00:00:00 2001 From: Daniel Reeves <31971762+dwreeves@users.noreply.github.com> Date: Thu, 15 Aug 2024 12:16:55 -0400 Subject: [PATCH] Simplify logging (#1108) This PR addresses #906 and fixes issues in the Cosmos logging once and for all.\* > \* Actually, there is another issue with logs being polluting with warnings about "source" nodes in 1.5.0, but that is a separate matter! I have a long explanation of how the `logging` module in Python works, and the sort of idioms it expects of end users of the module, here: https://github.com/astronomer/astronomer-cosmos/issues/906#issuecomment-2134150103 The choices I made, explained: - Although I don't know that I entirely agree with adding `(astronomer-cosmos)` to all the logs, clearly at least one user, and possibly many more, want it, and I don't believe we should remove it. The objective of this PR was therefore to preserve the feature while future-proofing against future issues. - Why I can't say I'm a fan of it: It seems that adding `(astronomer-cosmos)` to logs is a symptom of other problems with the Cosmos library, specifically how it impacts performance when users do not set it up effectively. And the prefix was added as a way to assist people in diagnosing these issues. I think ultimately we want to move away from this. Other components of the Airflow ecosystem do not feel compelled to do things like this. Also, the module path is something that can be handled in the `log_format` if users really want it. - How I future-proofed: As per the long post I link above, basically the issue is that there should not be tons of StreamHandlers being created. The proper and typical use of the logging module, with few exceptions, is to allow for logs to propagate upwards to a root logger. The reason the Cosmos logs presented issues for so long was because it deviated a lot from this. - I think default behavior being the "least astonishing" means making no modifications to the base logging behavior whatsoever. This is also less likely to morph into future issues if any further changes are made to the custom logging. - One thing I never mentioned: I found it odd that by default Cosmos did not "work out of the box" and that, despite using Astronomer's own Airflow platform (!), I had to set a config option that made Cosmos logging not be a nightmare (i.e. set `propagate_logs` = false). Previous logs referenced the Celery Executor as having issues, even though this is one of 2 of the most popular production ways to run Airflow. Something like this should just work out of the box for a majority of users! - For task execution, Cosmos should make use of the more Airflow-idiomatic `LoggingMixin` class whenever appropriate. This can also be used in scheduler / webserver related logging contexts but I think it is less out-of-place there to use globally scoped loggers. - These will not use the `get_logger()` implementation. That is intentional and probably desirable. These logs do not need to be "enriched" because they are isolated in the task execution logs. Oh also, I fixed an issue in the `project.entry_points` in the `pyproject.toml` while I was at it. ## Breaking Change? - Removes `propagate_logging` conf option, although removing this will not break users' builds. There is now a `rich_logging` conf option instead, which by default is disabled. --- cosmos/__init__.py | 4 ++ cosmos/hooks/subprocess.py | 18 +++---- cosmos/log.py | 35 ++++++------- cosmos/operators/azure_container_instance.py | 5 +- cosmos/operators/base.py | 7 +-- cosmos/operators/docker.py | 5 +- cosmos/operators/kubernetes.py | 5 +- cosmos/operators/local.py | 44 ++++++++-------- cosmos/operators/virtualenv.py | 2 +- cosmos/settings.py | 2 +- docs/configuration/logging.rst | 11 ++-- pyproject.toml | 2 +- tests/operators/test_local.py | 29 ++++++----- tests/test_log.py | 54 ++++++++++---------- 14 files changed, 110 insertions(+), 113 deletions(-) diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 1a00036f1..c98a14402 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -177,6 +177,10 @@ def get_provider_info(): "propagate_logs": { "description": "Enable log propagation from Cosmos custom logger\n", "version_added": "1.3.0a1", + "version_deprecated": "1.6.0a1", + "deprecation_reason": "`propagate_logs` is no longer necessary as of Cosmos 1.6.0" + " because the issue this option was meant to address is no longer an" + " issue with Cosmos's new logging approach.", "type": "boolean", "example": None, "default": "True", diff --git a/cosmos/hooks/subprocess.py b/cosmos/hooks/subprocess.py index 19c88540b..c19a8c6de 100644 --- a/cosmos/hooks/subprocess.py +++ b/cosmos/hooks/subprocess.py @@ -13,10 +13,6 @@ from airflow.hooks.base import BaseHook -from cosmos.log import get_logger - -logger = get_logger(__name__) - class FullOutputSubprocessResult(NamedTuple): exit_code: int @@ -57,7 +53,7 @@ def run_command( ``output``: the last line from stderr or stdout ``full_output``: all lines from stderr or stdout. """ - logger.info("Tmp dir root location: \n %s", gettempdir()) + self.log.info("Tmp dir root location: \n %s", gettempdir()) log_lines = [] with contextlib.ExitStack() as stack: if cwd is None: @@ -70,7 +66,7 @@ def pre_exec() -> None: signal.signal(getattr(signal, sig), signal.SIG_DFL) os.setsid() - logger.info("Running command: %s", command) + self.log.info("Running command: %s", command) self.sub_process = Popen( command, @@ -81,7 +77,7 @@ def pre_exec() -> None: preexec_fn=pre_exec, ) - logger.info("Command output:") + self.log.info("Command output:") line = "" if self.sub_process is None: @@ -91,23 +87,23 @@ def pre_exec() -> None: line = raw_line.decode(output_encoding, errors="backslashreplace").rstrip() # storing the warn & error lines to be used later log_lines.append(line) - logger.info("%s", line) + self.log.info("%s", line) self.sub_process.wait() - logger.info("Command exited with return code %s", self.sub_process.returncode) + self.log.info("Command exited with return code %s", self.sub_process.returncode) return_code: int = self.sub_process.returncode return FullOutputSubprocessResult(exit_code=return_code, output=line, full_output=log_lines) def send_sigterm(self) -> None: """Sends SIGTERM signal to ``self.sub_process`` if one exists.""" - logger.info("Sending SIGTERM signal to process group") + self.log.info("Sending SIGTERM signal to process group") if self.sub_process and hasattr(self.sub_process, "pid"): os.killpg(os.getpgid(self.sub_process.pid), signal.SIGTERM) def send_sigint(self) -> None: """Sends SIGINT signal to ``self.sub_process`` if one exists.""" - logger.info("Sending SIGINT signal to process group") + self.log.info("Sending SIGINT signal to process group") if self.sub_process and hasattr(self.sub_process, "pid"): os.killpg(os.getpgid(self.sub_process.pid), signal.SIGINT) diff --git a/cosmos/log.py b/cosmos/log.py index 3522b09dd..15c58cfb2 100644 --- a/cosmos/log.py +++ b/cosmos/log.py @@ -2,19 +2,15 @@ import logging -from airflow.utils.log.colored_log import CustomTTYColoredFormatter +from cosmos.settings import rich_logging -from cosmos.settings import propagate_logs -LOG_FORMAT: str = ( - "[%(blue)s%(asctime)s%(reset)s] " - "{%(blue)s%(filename)s:%(reset)s%(lineno)d} " - "%(log_color)s%(levelname)s%(reset)s - " - "%(purple)s(astronomer-cosmos)%(reset)s - " - "%(log_color)s%(message)s%(reset)s" -) +class CosmosRichLogger(logging.Logger): + """Custom Logger that prepends ``(astronomer-cosmos)`` to each log message in the scheduler.""" -LOGGER_NAME_TEMPLATE = "astronomer-cosmos-{}" + def handle(self, record: logging.LogRecord) -> None: + record.msg = "\x1b[35m(astronomer-cosmos)\x1b[0m " + record.msg + return super().handle(record) def get_logger(name: str) -> logging.Logger: @@ -24,13 +20,16 @@ def get_logger(name: str) -> logging.Logger: Airflow logs usually look like: [2023-08-09T14:20:55.532+0100] {subprocess.py:94} INFO - 13:20:55 Completed successfully - By using this logger, we introduce a (yellow) astronomer-cosmos string into the project's log messages: + This logger introduces a (magenta) astronomer-cosmos string into the project's log messages, + as long as the ``rich_logging`` setting is True: [2023-08-09T14:20:55.532+0100] {subprocess.py:94} INFO - (astronomer-cosmos) - 13:20:55 Completed successfully """ - logger = logging.getLogger(LOGGER_NAME_TEMPLATE.format(name)) - formatter: logging.Formatter = CustomTTYColoredFormatter(fmt=LOG_FORMAT) # type: ignore - handler = logging.StreamHandler() - handler.setFormatter(formatter) - logger.addHandler(handler) - logger.propagate = propagate_logs - return logger + if rich_logging: + cls = logging.getLoggerClass() + try: + logging.setLoggerClass(CosmosRichLogger) + return logging.getLogger(name) + finally: + logging.setLoggerClass(cls) + else: + return logging.getLogger(name) diff --git a/cosmos/operators/azure_container_instance.py b/cosmos/operators/azure_container_instance.py index 993d4315f..3b6ae3b7d 100644 --- a/cosmos/operators/azure_container_instance.py +++ b/cosmos/operators/azure_container_instance.py @@ -5,7 +5,6 @@ from airflow.utils.context import Context from cosmos.config import ProfileConfig -from cosmos.log import get_logger from cosmos.operators.base import ( AbstractDbtBaseOperator, DbtLSMixin, @@ -17,8 +16,6 @@ DbtTestMixin, ) -logger = get_logger(__name__) - # ACI is an optional dependency, so we need to check if it's installed try: from airflow.providers.microsoft.azure.operators.container_instances import AzureContainerInstancesOperator @@ -68,7 +65,7 @@ def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None self.build_command(context, cmd_flags) self.log.info(f"Running command: {self.command}") result = AzureContainerInstancesOperator.execute(self, context) - logger.info(result) + self.log.info(result) def build_command(self, context: Context, cmd_flags: list[str] | None = None) -> None: # For the first round, we're going to assume that the command is dbt diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index 37aae7a81..9a723383f 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -12,9 +12,6 @@ from airflow.utils.strings import to_boolean from cosmos.dbt.executable import get_system_dbt -from cosmos.log import get_logger - -logger = get_logger(__name__) class AbstractDbtBaseOperator(BaseOperator, metaclass=ABCMeta): @@ -178,14 +175,14 @@ def get_env(self, context: Context) -> dict[str, str | bytes | os.PathLike[Any]] filtered_env[key] = val else: if isinstance(key, accepted_types): - logger.warning( + self.log.warning( "Env var %s was ignored because its key is not a valid type. Must be one of %s", key, accepted_types, ) if isinstance(val, accepted_types): - logger.warning( + self.log.warning( "Env var %s was ignored because its value is not a valid type. Must be one of %s", key, accepted_types, diff --git a/cosmos/operators/docker.py b/cosmos/operators/docker.py index 4abf9e994..91d13fe72 100644 --- a/cosmos/operators/docker.py +++ b/cosmos/operators/docker.py @@ -4,7 +4,6 @@ from airflow.utils.context import Context -from cosmos.log import get_logger from cosmos.operators.base import ( AbstractDbtBaseOperator, DbtBuildMixin, @@ -17,8 +16,6 @@ DbtTestMixin, ) -logger = get_logger(__name__) - # docker is an optional dependency, so we need to check if it's installed try: from airflow.providers.docker.operators.docker import DockerOperator @@ -52,7 +49,7 @@ def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None self.build_command(context, cmd_flags) self.log.info(f"Running command: {self.command}") result = DockerOperator.execute(self, context) - logger.info(result) + self.log.info(result) def build_command(self, context: Context, cmd_flags: list[str] | None = None) -> None: # For the first round, we're going to assume that the command is dbt diff --git a/cosmos/operators/kubernetes.py b/cosmos/operators/kubernetes.py index ef69cd561..29a90a3ff 100644 --- a/cosmos/operators/kubernetes.py +++ b/cosmos/operators/kubernetes.py @@ -8,7 +8,6 @@ from cosmos.config import ProfileConfig from cosmos.dbt.parser.output import extract_log_issues -from cosmos.log import get_logger from cosmos.operators.base import ( AbstractDbtBaseOperator, DbtBuildMixin, @@ -24,8 +23,6 @@ DBT_NO_TESTS_MSG = "Nothing to do" DBT_WARN_MSG = "WARN" -logger = get_logger(__name__) - try: # apache-airflow-providers-cncf-kubernetes >= 7.4.0 from airflow.providers.cncf.kubernetes.backcompat.backwards_compat_converters import ( @@ -74,7 +71,7 @@ def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None self.build_kube_args(context, cmd_flags) self.log.info(f"Running command: {self.arguments}") result = KubernetesPodOperator.execute(self, context) - logger.info(result) + self.log.info(result) def build_kube_args(self, context: Context, cmd_flags: list[str] | None = None) -> None: # For the first round, we're going to assume that the command is dbt diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 8baa42716..1a88edb62 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -180,17 +180,17 @@ def _discover_invocation_mode(self) -> None: from dbt.cli.main import dbtRunner # noqa except ImportError: self.invocation_mode = InvocationMode.SUBPROCESS - logger.info("Could not import dbtRunner. Falling back to subprocess for invoking dbt.") + self.log.info("Could not import dbtRunner. Falling back to subprocess for invoking dbt.") else: self.invocation_mode = InvocationMode.DBT_RUNNER - logger.info("dbtRunner is available. Using dbtRunner for invoking dbt.") + self.log.info("dbtRunner is available. Using dbtRunner for invoking dbt.") self._set_invocation_methods() def handle_exception_subprocess(self, result: FullOutputSubprocessResult) -> None: if self.skip_exit_code is not None and result.exit_code == self.skip_exit_code: raise AirflowSkipException(f"dbt command returned exit code {self.skip_exit_code}. Skipping.") elif result.exit_code != 0: - logger.error("\n".join(result.full_output)) + self.log.error("\n".join(result.full_output)) raise AirflowException(f"dbt command failed. The command returned a non-zero exit code {result.exit_code}.") def handle_exception_dbt_runner(self, result: dbtRunnerResult) -> None: @@ -250,7 +250,7 @@ def store_compiled_sql(self, tmp_project_dir: str, context: Context, session: Se ).delete() session.add(rtif) else: - logger.info("Warning: ti is of type TaskInstancePydantic. Cannot update template_fields.") + self.log.info("Warning: ti is of type TaskInstancePydantic. Cannot update template_fields.") @provide_session def store_freshness_json(self, tmp_project_dir: str, context: Context, session: Session = NEW_SESSION) -> None: @@ -276,14 +276,14 @@ def store_freshness_json(self, tmp_project_dir: str, context: Context, session: self.freshness = "" def run_subprocess(self, command: list[str], env: dict[str, str], cwd: str) -> FullOutputSubprocessResult: - logger.info("Trying to run the command:\n %s\nFrom %s", command, cwd) + self.log.info("Trying to run the command:\n %s\nFrom %s", command, cwd) subprocess_result: FullOutputSubprocessResult = self.subprocess_hook.run_command( command=command, env=env, cwd=cwd, output_encoding=self.output_encoding, ) - logger.info(subprocess_result.output) + self.log.info(subprocess_result.output) return subprocess_result def run_dbt_runner(self, command: list[str], env: dict[str, str], cwd: str) -> dbtRunnerResult: @@ -300,7 +300,7 @@ def run_dbt_runner(self, command: list[str], env: dict[str, str], cwd: str) -> d # Exclude the dbt executable path from the command cli_args = command[1:] - logger.info("Trying to run dbtRunner with:\n %s\n in %s", cli_args, cwd) + self.log.info("Trying to run dbtRunner with:\n %s\n in %s", cli_args, cwd) with change_working_directory(cwd), environ(env): result = self._dbt_runner.invoke(cli_args) @@ -328,7 +328,7 @@ def run_command( with tempfile.TemporaryDirectory() as tmp_project_dir: - logger.info( + self.log.info( "Cloning project to writable temp directory %s from %s", tmp_project_dir, self.project_dir, @@ -339,7 +339,7 @@ def run_command( if self.partial_parse and self.cache_dir is not None: latest_partial_parse = cache._get_latest_partial_parse(Path(self.project_dir), self.cache_dir) - logger.info("Partial parse is enabled and the latest partial parse file is %s", latest_partial_parse) + self.log.info("Partial parse is enabled and the latest partial parse file is %s", latest_partial_parse) if latest_partial_parse is not None: cache._copy_partial_parse_to_project(latest_partial_parse, tmp_dir_path) @@ -370,7 +370,7 @@ def run_command( full_cmd = cmd + flags - logger.debug("Using environment variables keys: %s", env.keys()) + self.log.debug("Using environment variables keys: %s", env.keys()) result = self.invoke_dbt( command=full_cmd, @@ -386,8 +386,8 @@ def run_command( if self.emit_datasets: inlets = self.get_datasets("inputs") outlets = self.get_datasets("outputs") - logger.info("Inlets: %s", inlets) - logger.info("Outlets: %s", outlets) + self.log.info("Inlets: %s", inlets) + self.log.info("Outlets: %s", outlets) self.register_dataset(inlets, outlets) if self.partial_parse and self.cache_dir: @@ -435,7 +435,7 @@ def calculate_openlineage_events_completes( events = openlineage_processor.parse() self.openlineage_events_completes = events.completes except (FileNotFoundError, NotImplementedError, ValueError, KeyError, jinja2.exceptions.UndefinedError): - logger.debug("Unable to parse OpenLineage events", stack_info=True) + self.log.debug("Unable to parse OpenLineage events", stack_info=True) def get_datasets(self, source: Literal["inputs", "outputs"]) -> list[Dataset]: """ @@ -452,7 +452,7 @@ def get_datasets(self, source: Literal["inputs", "outputs"]) -> list[Dataset]: for output in getattr(completed, source): dataset_uri = output.namespace + "/" + output.name uris.append(dataset_uri) - logger.debug("URIs to be converted to Dataset: %s", uris) + self.log.debug("URIs to be converted to Dataset: %s", uris) datasets = [] try: @@ -504,7 +504,7 @@ def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> Ope elif hasattr(task_instance, "openlineage_events_completes"): openlineage_events_completes = task_instance.openlineage_events_completes else: - logger.info("Unable to emit OpenLineage events due to lack of data.") + self.log.info("Unable to emit OpenLineage events due to lack of data.") if openlineage_events_completes is not None: for completed in openlineage_events_completes: @@ -513,7 +513,7 @@ def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> Ope run_facets = {**run_facets, **completed.run.facets} job_facets = {**job_facets, **completed.job.facets} else: - logger.info("Unable to emit OpenLineage events due to lack of dependencies or data.") + self.log.info("Unable to emit OpenLineage events due to lack of dependencies or data.") return OperatorLineage( inputs=inputs, @@ -722,7 +722,7 @@ def __init__( def upload_to_cloud_storage(self, project_dir: str) -> None: """Uploads the generated documentation to S3.""" - logger.info( + self.log.info( 'Attempting to upload generated docs to S3 using S3Hook("%s")', self.connection_id, ) @@ -741,7 +741,7 @@ def upload_to_cloud_storage(self, project_dir: str) -> None: for filename in self.required_files: key = f"{self.folder_dir}/{filename}" if self.folder_dir else filename s3_path = f"s3://{self.bucket_name}/{key}" - logger.info("Uploading %s to %s", filename, s3_path) + self.log.info("Uploading %s to %s", filename, s3_path) hook.load_file( filename=f"{target_dir}/{filename}", @@ -788,7 +788,7 @@ def __init__( def upload_to_cloud_storage(self, project_dir: str) -> None: """Uploads the generated documentation to Azure Blob Storage.""" - logger.info( + self.log.info( 'Attempting to upload generated docs to Azure Blob Storage using WasbHook(conn_id="%s")', self.connection_id, ) @@ -802,7 +802,7 @@ def upload_to_cloud_storage(self, project_dir: str) -> None: ) for filename in self.required_files: - logger.info( + self.log.info( "Uploading %s to %s", filename, f"wasb://{self.bucket_name}/{filename}", @@ -832,7 +832,7 @@ class DbtDocsGCSLocalOperator(DbtDocsCloudLocalOperator): def upload_to_cloud_storage(self, project_dir: str) -> None: """Uploads the generated documentation to Google Cloud Storage""" - logger.info( + self.log.info( 'Attempting to upload generated docs to Storage using GCSHook(conn_id="%s")', self.connection_id, ) @@ -844,7 +844,7 @@ def upload_to_cloud_storage(self, project_dir: str) -> None: for filename in self.required_files: blob_name = f"{self.folder_dir}/{filename}" if self.folder_dir else filename - logger.info("Uploading %s to %s", filename, f"gs://{self.bucket_name}/{blob_name}") + self.log.info("Uploading %s to %s", filename, f"gs://{self.bucket_name}/{blob_name}") hook.upload( filename=f"{target_dir}/{filename}", bucket_name=self.bucket_name, diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index 4b1a97e6c..d12e2afad 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -106,7 +106,7 @@ def execute(self, context: Context) -> None: output = super().execute(context) if self._venv_tmp_dir: self._venv_tmp_dir.cleanup() - logger.info(output) + self.log.info(output) class DbtBuildVirtualenvOperator(DbtVirtualenvBaseOperator, DbtBuildLocalOperator): # type: ignore[misc] diff --git a/cosmos/settings.py b/cosmos/settings.py index 71387de6e..fc7eca72a 100644 --- a/cosmos/settings.py +++ b/cosmos/settings.py @@ -16,7 +16,7 @@ enable_cache_partial_parse = conf.getboolean("cosmos", "enable_cache_partial_parse", fallback=True) enable_cache_package_lockfile = conf.getboolean("cosmos", "enable_cache_package_lockfile", fallback=True) enable_cache_dbt_ls = conf.getboolean("cosmos", "enable_cache_dbt_ls", fallback=True) -propagate_logs = conf.getboolean("cosmos", "propagate_logs", fallback=True) +rich_logging = conf.getboolean("cosmos", "rich_logging", fallback=False) dbt_docs_dir = conf.get("cosmos", "dbt_docs_dir", fallback=None) dbt_docs_conn_id = conf.get("cosmos", "dbt_docs_conn_id", fallback=None) dbt_docs_index_file_name = conf.get("cosmos", "dbt_docs_index_file_name", fallback="index.html") diff --git a/docs/configuration/logging.rst b/docs/configuration/logging.rst index 9c27a950c..851af9f4e 100644 --- a/docs/configuration/logging.rst +++ b/docs/configuration/logging.rst @@ -3,17 +3,20 @@ Logging ==================== -Cosmos uses a custom logger implementation so that all log messages are clearly tagged with ``(astronomer-cosmos)``. By default this logger has propagation enabled. +Cosmos allows for a custom logger implementation that adds ``(astronomer-cosmos)`` to each log message. -In some environments (for example when running Celery workers) this can cause duplicated log messages to appear in the logs. In this case log propagation can be disabled via airflow configuration using the boolean option ``propagate_logs`` under a ``cosmos`` section. +By default this is not enabled; you can enable it with: .. code-block:: cfg [cosmos] - propagate_logs = False + rich_logging = True or .. code-block:: python - AIRFLOW__COSMOS__PROPAGATE_LOGS = "False" + AIRFLOW__COSMOS__ENRICH_LOGGING = "True" + +Previous versions of Cosmos had a feature called ``propagate_logs`` to handle issues with Cosmos's previous logging implementation on some systems. +This config option is deprecated. diff --git a/pyproject.toml b/pyproject.toml index a257fa860..8183d4b64 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -112,7 +112,7 @@ azure-container-instance = [ "apache-airflow-providers-microsoft-azure>=8.4.0", ] -[project.entry-points.cosmos] +[project.entry-points.apache_airflow_provider] provider_info = "cosmos:get_provider_info" [project.entry-points."airflow.plugins"] diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 54d9d01da..effd604fa 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -12,6 +12,7 @@ from airflow import __version__ as airflow_version from airflow.exceptions import AirflowException, AirflowSkipException from airflow.hooks.subprocess import SubprocessResult +from airflow.models.taskinstance import TaskInstance from airflow.utils.context import Context from packaging import version from pendulum import datetime @@ -618,13 +619,14 @@ def test_run_operator_emits_events_without_openlineage_events_completes(caplog): should_store_compiled_sql=False, ) delattr(dbt_base_operator, "openlineage_events_completes") - facets = dbt_base_operator.get_openlineage_facets_on_complete(dbt_base_operator) + with patch.object(dbt_base_operator.log, "info") as mock_log_info: + facets = dbt_base_operator.get_openlineage_facets_on_complete(TaskInstance(dbt_base_operator)) + assert facets.inputs == [] assert facets.outputs == [] assert facets.run_facets == {} assert facets.job_facets == {} - log = "Unable to emit OpenLineage events due to lack of dependencies or data." - assert log in caplog.text + mock_log_info.assert_called_with("Unable to emit OpenLineage events due to lack of dependencies or data.") def test_store_compiled_sql() -> None: @@ -761,10 +763,11 @@ def test_calculate_openlineage_events_completes_openlineage_errors(mock_processo should_store_compiled_sql=False, ) - dbt_base_operator.calculate_openlineage_events_completes(env={}, project_dir=DBT_PROJ_DIR) + with patch.object(dbt_base_operator.log, "debug") as mock_log_debug: + dbt_base_operator.calculate_openlineage_events_completes(env={}, project_dir=DBT_PROJ_DIR) + assert instance.parse.called - err_msg = "Unable to parse OpenLineage events" - assert err_msg in caplog.text + mock_log_debug.assert_called_with("Unable to parse OpenLineage events", stack_info=True) @pytest.mark.parametrize( @@ -930,7 +933,7 @@ def test_dbt_local_operator_on_kill_sigterm(mock_send_sigterm) -> None: mock_send_sigterm.assert_called_once() -def test_handle_exception_subprocess(caplog): +def test_handle_exception_subprocess(): """ Test the handle_exception_subprocess method of the DbtLocalBaseOperator class for non-zero dbt exit code. """ @@ -939,14 +942,16 @@ def test_handle_exception_subprocess(caplog): task_id="my-task", project_dir="my/dir", ) - result = FullOutputSubprocessResult(exit_code=1, output="test", full_output=["n" * n for n in range(1, 1000)]) + full_output = ["n" * n for n in range(1, 1000)] + result = FullOutputSubprocessResult(exit_code=1, output="test", full_output=full_output) - caplog.set_level(logging.ERROR) # Test when exit_code is non-zero - with pytest.raises(AirflowException) as err_context: - operator.handle_exception_subprocess(result) + with patch.object(operator.log, "error") as mock_log_error: + with pytest.raises(AirflowException) as err_context: + operator.handle_exception_subprocess(result) + assert len(str(err_context.value)) < 100 # Ensure the error message is not too long - assert len(caplog.text) > 1000 # Ensure the log message is not truncated + mock_log_error.assert_called_with("\n".join(full_output)) @pytest.fixture diff --git a/tests/test_log.py b/tests/test_log.py index c4f8b4bc1..9f75634ad 100644 --- a/tests/test_log.py +++ b/tests/test_log.py @@ -1,39 +1,41 @@ -import logging - import pytest +import cosmos.log from cosmos import get_provider_info -from cosmos.log import get_logger +from cosmos.log import CosmosRichLogger, get_logger -def test_get_logger(): - custom_string = "%(purple)s(astronomer-cosmos)%(reset)s" - standard_logger = logging.getLogger() - assert custom_string not in standard_logger.handlers[0].formatter._fmt +def test_get_logger(monkeypatch): + monkeypatch.setattr(cosmos.log, "rich_logging", False) + standard_logger = get_logger("test-get-logger-example1") + assert not isinstance(standard_logger, CosmosRichLogger) - custom_logger = get_logger("cosmos-log") - assert custom_logger.propagate is True - assert custom_logger.handlers[0].formatter.__class__.__name__ == "CustomTTYColoredFormatter" - assert custom_string in custom_logger.handlers[0].formatter._fmt + monkeypatch.setattr(cosmos.log, "rich_logging", True) + custom_logger = get_logger("test-get-logger-example2") + assert isinstance(custom_logger, CosmosRichLogger) with pytest.raises(TypeError): # Ensure that the get_logger signature is not changed in the future # and name is still a required parameter - custom_logger = get_logger() # noqa - - # Explicitly ensure that even if we pass None or empty string - # we will not get root logger in any case - custom_logger = get_logger("") - assert custom_logger.name != "" - - custom_logger = get_logger(None) # noqa - assert custom_logger.name != "" - - -def test_propagate_logs_conf(monkeypatch): - monkeypatch.setattr("cosmos.log.propagate_logs", False) - custom_logger = get_logger("cosmos-log") - assert custom_logger.propagate is False + bad_logger = get_logger() # noqa + + +def test_rich_logging(monkeypatch, capsys): + monkeypatch.setattr(cosmos.log, "rich_logging", False) + standard_logger = get_logger("test-rich-logging-example1") + standard_logger.info("Hello, world!") + out = capsys.readouterr().out + assert "Hello, world!" in out + assert "\x1b[35m(astronomer-cosmos)\x1b[0m " not in out + assert out.count("\n") == 1 + + monkeypatch.setattr(cosmos.log, "rich_logging", True) + custom_logger = get_logger("test-rich-logging-example2") + custom_logger.info("Hello, world!") + out = capsys.readouterr().out + assert "Hello, world!" in out + assert "\x1b[35m(astronomer-cosmos)\x1b[0m " in out + assert out.count("\n") == 1 def test_get_provider_info():