Skip to content

Commit

Permalink
Remove custom query job async timeout logic (#1109)
Browse files Browse the repository at this point in the history
* use dynamic schema in test_grant_access_to.py

* use dynamic schema in test_grant_access_to.py

* revert setup

* use dbt-common main to test against

* emove custom query job async timeout logic

* remove unneeded unit test

---------

Co-authored-by: Mike Alfare <[email protected]>
  • Loading branch information
colin-rogers-dbt and mikealfare authored Feb 20, 2024
1 parent 6b48ec8 commit 5501cd3
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 46 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20240219-103324.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Remove custom query job async timeout logic as it has been fixed in bigquery-python
time: 2024-02-19T10:33:24.3385-08:00
custom:
Author: colin-rogers-dbt
Issue: "1081"
22 changes: 1 addition & 21 deletions dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import asyncio
import functools
import json
import re
from contextlib import contextmanager
Expand Down Expand Up @@ -740,25 +738,7 @@ def _query_and_results(
self._bq_job_link(query_job.location, query_job.project, query_job.job_id)
)

# only use async logic if user specifies a timeout
if job_execution_timeout:
loop = asyncio.new_event_loop()
future_iterator = asyncio.wait_for(
loop.run_in_executor(None, functools.partial(query_job.result, max_results=limit)),
timeout=job_execution_timeout,
)

try:
iterator = loop.run_until_complete(future_iterator)
except asyncio.TimeoutError:
query_job.cancel()
raise DbtRuntimeError(
f"Query exceeded configured timeout of {job_execution_timeout}s"
)
finally:
loop.close()
else:
iterator = query_job.result(max_results=limit)
iterator = query_job.result(max_results=limit, timeout=job_execution_timeout)
return query_job, iterator

def _retry_and_handle(self, msg, conn, fn):
Expand Down
3 changes: 2 additions & 1 deletion tests/functional/test_job_timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,5 @@ def profiles_config_update(self, dbt_profile_target):

def test_job_timeout(self, project):
result = run_dbt(["run"], expect_pass=False) # project setup will fail
assert f"Query exceeded configured timeout of {_SHORT_TIMEOUT}s" in result[0].message
expected_error = f"Operation did not complete within the designated timeout of {_SHORT_TIMEOUT} seconds."
assert expected_error in result[0].message
24 changes: 0 additions & 24 deletions tests/unit/test_bigquery_connection_manager.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
import time
import json
import pytest
import unittest
from contextlib import contextmanager
from requests.exceptions import ConnectionError
from unittest.mock import patch, MagicMock, Mock, ANY

import dbt.adapters
import dbt_common.dataclass_schema

from dbt.adapters.bigquery import BigQueryCredentials
from dbt.adapters.bigquery import BigQueryRelation
from dbt.adapters.bigquery.connections import BigQueryConnectionManager
import dbt_common.exceptions
from dbt.logger import GLOBAL_LOGGER as logger # noqa


Expand Down Expand Up @@ -123,26 +119,6 @@ def test_query_and_results(self, mock_bq):
query="sql", job_config=mock_bq.QueryJobConfig(), timeout=15
)

@patch("dbt.adapters.bigquery.impl.google.cloud.bigquery")
def test_query_and_results_timeout(self, mock_bq):
self.mock_client.query = Mock(
return_value=Mock(result=lambda *args, **kwargs: time.sleep(4))
)
with pytest.raises(dbt_common.exceptions.DbtRuntimeError) as exc:
self.connections._query_and_results(
self.mock_client,
"sql",
{"job_param_1": "blah"},
job_creation_timeout=15,
job_execution_timeout=1,
)

mock_bq.QueryJobConfig.assert_called_once()
self.mock_client.query.assert_called_once_with(
query="sql", job_config=mock_bq.QueryJobConfig(), timeout=15
)
assert "Query exceeded configured timeout of 1s" in str(exc.value)

def test_copy_bq_table_appends(self):
self._copy_table(write_disposition=dbt.adapters.bigquery.impl.WRITE_APPEND)
args, kwargs = self.mock_client.copy_table.call_args
Expand Down

0 comments on commit 5501cd3

Please sign in to comment.