diff --git a/CHANGELOG.md b/CHANGELOG.md index b9d087bde..530ef5a65 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ N/A - Capping `google-api-core` to version `1.31.3` due to `protobuf` dependency conflict ([#53](https://github.com/dbt-labs/dbt-bigquery/pull/53)) - Bump `google-cloud-core` and `google-api-core` upper bounds to `<3`, thereby removing `<1.31.3` limit on the latter. Remove explicit dependency on `six` ([#57](https://github.com/dbt-labs/dbt-bigquery/pull/57)) - Remove official support for python 3.6, which is reaching end of life on December 23, 2021 ([dbt-core#4134](https://github.com/dbt-labs/dbt-core/issues/4134), [#59](https://github.com/dbt-labs/dbt-bigquery/pull/59)) +- Add support for structured logging [#55](https://github.com/dbt-labs/dbt-bigquery/pull/55) ### Contributors - [@imartynetz](https://github.com/imartynetz) ([#48](https://github.com/dbt-labs/dbt-bigquery/pull/48)) diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py index 4a69b35b2..984da6888 100644 --- a/dbt/adapters/bigquery/connections.py +++ b/dbt/adapters/bigquery/connections.py @@ -27,11 +27,14 @@ FailedToConnectException, RuntimeException, DatabaseException, DbtProfileError ) from dbt.adapters.base import BaseConnectionManager, Credentials -from dbt.logger import GLOBAL_LOGGER as logger +from dbt.events import AdapterLogger +from dbt.events.functions import fire_event +from dbt.events.types import SQLQuery from dbt.version import __version__ as dbt_version from dbt.dataclass_schema import StrEnum +logger = AdapterLogger("BigQuery") BQ_QUERY_JOB_SPLIT = '-----Query Job SQL Follows-----' @@ -344,7 +347,7 @@ def raw_execute(self, sql, fetch=False, *, use_legacy_sql=False): conn = self.get_thread_connection() client = conn.handle - logger.debug('On {}: {}', conn.name, sql) + fire_event(SQLQuery(conn_name=conn.name, sql=sql)) if self.profile.query_comment and self.profile.query_comment.job_label: query_comment = self.query_header.comment.query_comment @@ -534,7 +537,7 @@ def _retry_and_handle(self, msg, conn, fn): """retry a function call within the context of exception_handler.""" def reopen_conn_on_error(error): if isinstance(error, REOPENABLE_ERRORS): - logger.warning('Reopening connection after {!r}', error) + logger.warning('Reopening connection after {!r}'.format(error)) self.close(conn) self.open(conn) return @@ -577,8 +580,9 @@ def count_error(self, error): self.error_count += 1 if _is_retryable(error) and self.error_count <= self.retries: logger.debug( - 'Retry attempt {} of {} after error: {}', - self.error_count, self.retries, repr(error)) + 'Retry attempt {} of {} after error: {}'.format( + self.error_count, self.retries, repr(error) + )) return True else: return False diff --git a/dbt/adapters/bigquery/gcloud.py b/dbt/adapters/bigquery/gcloud.py index 77ed74fdc..28e7e1a74 100644 --- a/dbt/adapters/bigquery/gcloud.py +++ b/dbt/adapters/bigquery/gcloud.py @@ -1,4 +1,4 @@ -from dbt.logger import GLOBAL_LOGGER as logger +from dbt.events import AdapterLogger import dbt.exceptions from dbt.clients.system import run_cmd @@ -9,6 +9,8 @@ https://cloud.google.com/sdk/ """ +logger = AdapterLogger("BigQuery") + def gcloud_installed(): try: diff --git a/dbt/adapters/bigquery/impl.py b/dbt/adapters/bigquery/impl.py index f828c37f8..8e3384dd1 100644 --- a/dbt/adapters/bigquery/impl.py +++ b/dbt/adapters/bigquery/impl.py @@ -14,7 +14,7 @@ from dbt.adapters.bigquery import BigQueryColumn from dbt.adapters.bigquery import BigQueryConnectionManager from dbt.contracts.graph.manifest import Manifest -from dbt.logger import GLOBAL_LOGGER as logger, print_timestamped_line +from dbt.events import AdapterLogger from dbt.utils import filter_null_values import google.auth @@ -29,6 +29,8 @@ import agate import json +logger = AdapterLogger("BigQuery") + # Write dispositions for bigquery. WRITE_APPEND = google.cloud.bigquery.job.WriteDisposition.WRITE_APPEND WRITE_TRUNCATE = google.cloud.bigquery.job.WriteDisposition.WRITE_TRUNCATE @@ -481,9 +483,8 @@ def _bq_table_to_relation(self, bq_table): @classmethod def warning_on_hooks(hook_type): msg = "{} is not supported in bigquery and will be ignored" - print_timestamped_line( - msg.format(hook_type), ui.COLOR_FG_YELLOW - ) + warn_msg = dbt.ui.color(msg, ui.COLOR_FG_YELLOW) + logger.info(warn_msg) @available def add_query(self, sql, auto_begin=True, bindings=None, diff --git a/tests/integration/base.py b/tests/integration/base.py index 6e31d2d71..243ac92d8 100644 --- a/tests/integration/base.py +++ b/tests/integration/base.py @@ -22,10 +22,15 @@ from dbt.clients.jinja import template_cache from dbt.config import RuntimeConfig from dbt.context import providers -from dbt.logger import GLOBAL_LOGGER as logger, log_manager +from dbt.logger import log_manager +from dbt.events.functions import ( + capture_stdout_logs, fire_event, setup_event_logger, stop_capture_stdout_logs +) +from dbt.events import AdapterLogger from dbt.contracts.graph.manifest import Manifest +logger = AdapterLogger("Bigquery") INITIAL_ROOT = os.getcwd() @@ -231,6 +236,7 @@ def setUp(self): os.chdir(self.initial_dir) # before we go anywhere, collect the initial path info self._logs_dir = os.path.join(self.initial_dir, 'logs', self.prefix) + setup_event_logger(self._logs_dir) _really_makedirs(self._logs_dir) self.test_original_source_path = _pytest_get_test_root() self.test_root_dir = self._generate_test_root_dir() @@ -403,16 +409,12 @@ def run_dbt(self, args=None, expect_pass=True, profiles_dir=True): def run_dbt_and_capture(self, *args, **kwargs): try: - initial_stdout = log_manager.stdout - initial_stderr = log_manager.stderr - stringbuf = io.StringIO() - log_manager.set_output_stream(stringbuf) - + stringbuf = capture_stdout_logs() res = self.run_dbt(*args, **kwargs) stdout = stringbuf.getvalue() finally: - log_manager.set_output_stream(initial_stdout, initial_stderr) + stop_capture_stdout_logs() return res, stdout diff --git a/tests/integration/query_comments_test/test_query_comments.py b/tests/integration/query_comments_test/test_query_comments.py index 11a816a21..bc813a6ae 100644 --- a/tests/integration/query_comments_test/test_query_comments.py +++ b/tests/integration/query_comments_test/test_query_comments.py @@ -2,6 +2,7 @@ import io import json import os +import re import dbt.exceptions from dbt.version import __version__ as dbt_version @@ -52,28 +53,29 @@ def tearDown(self): super().tearDown() def run_get_json(self, expect_pass=True): - self.run_dbt( + res, raw_logs = self.run_dbt_and_capture( ['--debug', '--log-format=json', 'run'], expect_pass=expect_pass ) - logs = [] - for line in self.stringbuf.getvalue().split('\n'): + parsed_logs = [] + for line in raw_logs.split('\n'): try: log = json.loads(line) except ValueError: continue - if log['extra'].get('run_state') != 'running': - continue - logs.append(log) - self.assertGreater(len(logs), 0) - return logs + parsed_logs.append(log) + + # empty lists evaluate as False + self.assertTrue(parsed_logs) + return parsed_logs def query_comment(self, model_name, log): + log_msg = re.sub("(?:[01]\d|2[0123]):(?:[012345]\d):(?:[012345]\d \| )", "", log['msg']) prefix = 'On {}: '.format(model_name) - if log['message'].startswith(prefix): - msg = log['message'][len(prefix):] + if log_msg.startswith(prefix): + msg = log_msg[len(prefix):] if msg in {'COMMIT', 'BEGIN', 'ROLLBACK'}: return None return msg @@ -88,7 +90,7 @@ def run_assert_comments(self): if msg is not None and self.matches_comment(msg): seen = True - self.assertTrue(seen, 'Never saw a matching log message! Logs:\n{}'.format('\n'.join(l['message'] for l in logs))) + self.assertTrue(seen, 'Never saw a matching log message! Logs:\n{}'.format('\n'.join(l['msg'] for l in logs))) @use_profile('bigquery') def test_bigquery_comments(self):