Skip to content

Commit

Permalink
Parse load job location in HTTPError content
Browse files Browse the repository at this point in the history
  • Loading branch information
Abacn committed Oct 29, 2024
1 parent a0880a4 commit ec428e4
Showing 1 changed file with 21 additions and 1 deletion.
22 changes: 21 additions & 1 deletion sdks/python/apache_beam/io/gcp/bigquery_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io
import json
import logging
import re
import sys
import time
import uuid
Expand Down Expand Up @@ -558,6 +559,19 @@ def _insert_load_job(
))
return self._start_job(request, stream=source_stream).jobReference

@staticmethod
def _parse_location_from_exc(content, job_id):
"""Parse job location from Exception content."""
if isinstance(content, bytes):
content = content.decode('ascii', 'replace')
# search for "Already Exists: Job <project-id>:<location>.<job id>"
m = re.search(r"Already Exists: Job \S+\:(\S+)\." + job_id, content)
if not m:
_LOGGER.warning(
"Not able to parse BigQuery load job location for {}", job_id)
return None
return m.group(1)

def _start_job(
self,
request, # type: bigquery.BigqueryJobsInsertRequest
Expand Down Expand Up @@ -585,11 +599,17 @@ def _start_job(
return response
except HttpError as exn:
if exn.status_code == 409:
jobId = request.job.jobReference.jobId
_LOGGER.info(
"BigQuery job %s already exists, will not retry inserting it: %s",
request.job.jobReference,
exn)
return request.job
job_location = self._parse_location_from_exc(exn.content, jobId)
response = request.job
if not response.jobReference.location and job_location:
# Request not constructed with location
response.jobReference.location = job_location
return response
else:
_LOGGER.info(
"Failed to insert job %s: %s", request.job.jobReference, exn)
Expand Down

0 comments on commit ec428e4

Please sign in to comment.