Skip to content

Commit

Permalink
Add location/job_id/project_id to adapter response to enable easy job…
Browse files Browse the repository at this point in the history
… linking (#250)
  • Loading branch information
Kayrnt authored Aug 8, 2022
1 parent 64e9da7 commit 764f5fe
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 16 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Under the Hood-20220806-142912.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Under the Hood
body: Add location/job_id/project_id to adapter response to enable easy job linking
time: 2022-08-06T14:29:12.271054+02:00
custom:
Author: Kayrnt
Issue: "92"
PR: "250"
56 changes: 40 additions & 16 deletions dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ class BigQueryConnectionMethod(StrEnum):
@dataclass
class BigQueryAdapterResponse(AdapterResponse):
bytes_processed: Optional[int] = None
location: Optional[str] = None
project_id: Optional[str] = None
job_id: Optional[str] = None


@dataclass
Expand Down Expand Up @@ -188,6 +191,12 @@ class BigQueryConnectionManager(BaseConnectionManager):
@classmethod
def handle_error(cls, error, message):
error_msg = "\n".join([item["message"] for item in error.errors])
if hasattr(error, "query_job"):
logger.error(
cls._bq_job_link(
error.query_job.location, error.query_job.project, error.query_job.job_id
)
)
raise DatabaseException(error_msg)

def clear_transaction(self):
Expand Down Expand Up @@ -446,55 +455,70 @@ def execute(
code = None
num_rows = None
bytes_processed = None
location = None
job_id = None
project_id = None
num_rows_formatted = None
processed_bytes = None

if query_job.statement_type == "CREATE_VIEW":
code = "CREATE VIEW"

elif query_job.statement_type == "CREATE_TABLE_AS_SELECT":
code = "CREATE TABLE"
conn = self.get_thread_connection()
client = conn.handle
query_table = client.get_table(query_job.destination)
code = "CREATE TABLE"
num_rows = query_table.num_rows
num_rows_formated = self.format_rows_number(num_rows)
bytes_processed = query_job.total_bytes_processed
processed_bytes = self.format_bytes(bytes_processed)
message = f"{code} ({num_rows_formated} rows, {processed_bytes} processed)"

elif query_job.statement_type == "SCRIPT":
code = "SCRIPT"
bytes_processed = query_job.total_bytes_processed
message = f"{code} ({self.format_bytes(bytes_processed)} processed)"

elif query_job.statement_type in ["INSERT", "DELETE", "MERGE", "UPDATE"]:
code = query_job.statement_type
num_rows = query_job.num_dml_affected_rows
num_rows_formated = self.format_rows_number(num_rows)
bytes_processed = query_job.total_bytes_processed
processed_bytes = self.format_bytes(bytes_processed)
message = f"{code} ({num_rows_formated} rows, {processed_bytes} processed)"

elif query_job.statement_type == "SELECT":
code = "SELECT"
conn = self.get_thread_connection()
client = conn.handle
# use anonymous table for num_rows
query_table = client.get_table(query_job.destination)
code = "SELECT"
num_rows = query_table.num_rows
num_rows_formated = self.format_rows_number(num_rows)
bytes_processed = query_job.total_bytes_processed
processed_bytes = self.format_bytes(bytes_processed)
message = f"{code} ({num_rows_formated} rows, {processed_bytes} processed)"

# set common attributes
bytes_processed = query_job.total_bytes_processed
processed_bytes = self.format_bytes(bytes_processed)
location = query_job.location
job_id = query_job.job_id
project_id = query_job.project
if num_rows is not None:
num_rows_formatted = self.format_rows_number(num_rows)
message = f"{code} ({num_rows_formatted} rows, {processed_bytes} processed)"
elif bytes_processed is not None:
message = f"{code} ({processed_bytes} processed)"
else:
message = f"{code}"

if location is not None and job_id is not None and project_id is not None:
logger.debug(self._bq_job_link(job_id, project_id, location))

response = BigQueryAdapterResponse( # type: ignore[call-arg]
_message=message,
rows_affected=num_rows,
code=code,
bytes_processed=bytes_processed,
location=location,
project_id=project_id,
job_id=job_id,
)

return response, table

@staticmethod
def _bq_job_link(location, project_id, job_id) -> str:
return f"https://console.cloud.google.com/bigquery?project={project_id}&j=bq:{location}:{job_id}&page=queryresults"

def get_partitions_metadata(self, table):
def standard_to_legacy(table):
return table.project + ":" + table.dataset + "." + table.identifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ def test__bigquery_adapter_functions(self):
results = self.run_dbt()
self.assertEqual(len(results), 3)

for result in results:
# all queries in adapter models are jobs that are expected to have a location/project_id/job_id
assert result.adapter_response["location"] is not None
assert result.adapter_response["project_id"] is not None
assert result.adapter_response["job_id"] is not None

test_results = self.run_dbt(['test'])

self.assertTrue(len(test_results) > 0)
Expand Down

0 comments on commit 764f5fe

Please sign in to comment.