From 5501cd34b12965e0380952bdbb65fd52f49b49f5 Mon Sep 17 00:00:00 2001 From: colin-rogers-dbt <111200756+colin-rogers-dbt@users.noreply.github.com> Date: Tue, 20 Feb 2024 14:54:42 -0800 Subject: [PATCH] Remove custom query job async timeout logic (#1109) * use dynamic schema in test_grant_access_to.py * use dynamic schema in test_grant_access_to.py * revert setup * use dbt-common main to test against * emove custom query job async timeout logic * remove unneeded unit test --------- Co-authored-by: Mike Alfare <13974384+mikealfare@users.noreply.github.com> --- .../unreleased/Fixes-20240219-103324.yaml | 6 +++++ dbt/adapters/bigquery/connections.py | 22 +---------------- tests/functional/test_job_timeout.py | 3 ++- .../unit/test_bigquery_connection_manager.py | 24 ------------------- 4 files changed, 9 insertions(+), 46 deletions(-) create mode 100644 .changes/unreleased/Fixes-20240219-103324.yaml diff --git a/.changes/unreleased/Fixes-20240219-103324.yaml b/.changes/unreleased/Fixes-20240219-103324.yaml new file mode 100644 index 000000000..16906db85 --- /dev/null +++ b/.changes/unreleased/Fixes-20240219-103324.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Remove custom query job async timeout logic as it has been fixed in bigquery-python +time: 2024-02-19T10:33:24.3385-08:00 +custom: + Author: colin-rogers-dbt + Issue: "1081" diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py index 0f2f70f74..1e4708f0b 100644 --- a/dbt/adapters/bigquery/connections.py +++ b/dbt/adapters/bigquery/connections.py @@ -1,5 +1,3 @@ -import asyncio -import functools import json import re from contextlib import contextmanager @@ -740,25 +738,7 @@ def _query_and_results( self._bq_job_link(query_job.location, query_job.project, query_job.job_id) ) - # only use async logic if user specifies a timeout - if job_execution_timeout: - loop = asyncio.new_event_loop() - future_iterator = asyncio.wait_for( - loop.run_in_executor(None, functools.partial(query_job.result, max_results=limit)), - timeout=job_execution_timeout, - ) - - try: - iterator = loop.run_until_complete(future_iterator) - except asyncio.TimeoutError: - query_job.cancel() - raise DbtRuntimeError( - f"Query exceeded configured timeout of {job_execution_timeout}s" - ) - finally: - loop.close() - else: - iterator = query_job.result(max_results=limit) + iterator = query_job.result(max_results=limit, timeout=job_execution_timeout) return query_job, iterator def _retry_and_handle(self, msg, conn, fn): diff --git a/tests/functional/test_job_timeout.py b/tests/functional/test_job_timeout.py index be559e816..57172e133 100644 --- a/tests/functional/test_job_timeout.py +++ b/tests/functional/test_job_timeout.py @@ -59,4 +59,5 @@ def profiles_config_update(self, dbt_profile_target): def test_job_timeout(self, project): result = run_dbt(["run"], expect_pass=False) # project setup will fail - assert f"Query exceeded configured timeout of {_SHORT_TIMEOUT}s" in result[0].message + expected_error = f"Operation did not complete within the designated timeout of {_SHORT_TIMEOUT} seconds." + assert expected_error in result[0].message diff --git a/tests/unit/test_bigquery_connection_manager.py b/tests/unit/test_bigquery_connection_manager.py index 6bb89ed36..564601b2f 100644 --- a/tests/unit/test_bigquery_connection_manager.py +++ b/tests/unit/test_bigquery_connection_manager.py @@ -1,18 +1,14 @@ -import time import json -import pytest import unittest from contextlib import contextmanager from requests.exceptions import ConnectionError from unittest.mock import patch, MagicMock, Mock, ANY import dbt.adapters -import dbt_common.dataclass_schema from dbt.adapters.bigquery import BigQueryCredentials from dbt.adapters.bigquery import BigQueryRelation from dbt.adapters.bigquery.connections import BigQueryConnectionManager -import dbt_common.exceptions from dbt.logger import GLOBAL_LOGGER as logger # noqa @@ -123,26 +119,6 @@ def test_query_and_results(self, mock_bq): query="sql", job_config=mock_bq.QueryJobConfig(), timeout=15 ) - @patch("dbt.adapters.bigquery.impl.google.cloud.bigquery") - def test_query_and_results_timeout(self, mock_bq): - self.mock_client.query = Mock( - return_value=Mock(result=lambda *args, **kwargs: time.sleep(4)) - ) - with pytest.raises(dbt_common.exceptions.DbtRuntimeError) as exc: - self.connections._query_and_results( - self.mock_client, - "sql", - {"job_param_1": "blah"}, - job_creation_timeout=15, - job_execution_timeout=1, - ) - - mock_bq.QueryJobConfig.assert_called_once() - self.mock_client.query.assert_called_once_with( - query="sql", job_config=mock_bq.QueryJobConfig(), timeout=15 - ) - assert "Query exceeded configured timeout of 1s" in str(exc.value) - def test_copy_bq_table_appends(self): self._copy_table(write_disposition=dbt.adapters.bigquery.impl.WRITE_APPEND) args, kwargs = self.mock_client.copy_table.call_args