Skip to content

Commit

Permalink
Merge pull request #32967 from Abacn/fixbqjob
Browse files Browse the repository at this point in the history
Parsing location from HTTPError response instead of initiate a query
  • Loading branch information
Abacn authored Nov 4, 2024
2 parents 76c5d56 + d169006 commit 6f902c6
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 34 deletions.
18 changes: 1 addition & 17 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
Original file line number Diff line number Diff line change
Expand Up @@ -777,26 +777,10 @@ def process(
GlobalWindows.windowed_value((destination, job_reference)))

def finish_bundle(self):
dataset_locations = {}

for windowed_value in self.pending_jobs:
table_ref = bigquery_tools.parse_table_reference(windowed_value.value[0])
project_dataset = (table_ref.projectId, table_ref.datasetId)

job_ref = windowed_value.value[1]
# In some cases (e.g. when the load job op returns a 409 ALREADY_EXISTS),
# the returned job reference may not include a location. In such cases,
# we need to override with the dataset's location.
job_location = job_ref.location
if not job_location and project_dataset not in dataset_locations:
job_location = self.bq_wrapper.get_table_location(
table_ref.projectId, table_ref.datasetId, table_ref.tableId)
dataset_locations[project_dataset] = job_location

self.bq_wrapper.wait_for_bq_job(
job_ref,
sleep_duration_sec=_SLEEP_DURATION_BETWEEN_POLLS,
location=job_location)
job_ref, sleep_duration_sec=_SLEEP_DURATION_BETWEEN_POLLS)
return self.pending_jobs


Expand Down
10 changes: 0 additions & 10 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,6 @@ def test_records_traverse_transform_with_mocks(self):
job_reference = bigquery_api.JobReference()
job_reference.projectId = 'project1'
job_reference.jobId = 'job_name1'
job_reference.location = 'US'
result_job = bigquery_api.Job()
result_job.jobReference = job_reference

Expand Down Expand Up @@ -483,7 +482,6 @@ def test_load_job_id_used(self):
job_reference = bigquery_api.JobReference()
job_reference.projectId = 'loadJobProject'
job_reference.jobId = 'job_name1'
job_reference.location = 'US'

result_job = bigquery_api.Job()
result_job.jobReference = job_reference
Expand Down Expand Up @@ -521,7 +519,6 @@ def test_load_job_id_use_for_copy_job(self):
job_reference = bigquery_api.JobReference()
job_reference.projectId = 'loadJobProject'
job_reference.jobId = 'job_name1'
job_reference.location = 'US'
result_job = mock.Mock()
result_job.jobReference = job_reference

Expand Down Expand Up @@ -577,12 +574,10 @@ def test_wait_for_load_job_completion(self, sleep_mock):
job_1.jobReference = bigquery_api.JobReference()
job_1.jobReference.projectId = 'project1'
job_1.jobReference.jobId = 'jobId1'
job_1.jobReference.location = 'US'
job_2 = bigquery_api.Job()
job_2.jobReference = bigquery_api.JobReference()
job_2.jobReference.projectId = 'project1'
job_2.jobReference.jobId = 'jobId2'
job_2.jobReference.location = 'US'

job_1_waiting = mock.Mock()
job_1_waiting.status.state = 'RUNNING'
Expand Down Expand Up @@ -622,12 +617,10 @@ def test_one_load_job_failed_after_waiting(self, sleep_mock):
job_1.jobReference = bigquery_api.JobReference()
job_1.jobReference.projectId = 'project1'
job_1.jobReference.jobId = 'jobId1'
job_1.jobReference.location = 'US'
job_2 = bigquery_api.Job()
job_2.jobReference = bigquery_api.JobReference()
job_2.jobReference.projectId = 'project1'
job_2.jobReference.jobId = 'jobId2'
job_2.jobReference.location = 'US'

job_1_waiting = mock.Mock()
job_1_waiting.status.state = 'RUNNING'
Expand Down Expand Up @@ -664,7 +657,6 @@ def test_multiple_partition_files(self):
job_reference = bigquery_api.JobReference()
job_reference.projectId = 'project1'
job_reference.jobId = 'job_name1'
job_reference.location = 'US'
result_job = mock.Mock()
result_job.jobReference = job_reference

Expand Down Expand Up @@ -750,7 +742,6 @@ def test_multiple_partition_files_write_dispositions(
job_reference = bigquery_api.JobReference()
job_reference.projectId = 'project1'
job_reference.jobId = 'job_name1'
job_reference.location = 'US'
result_job = mock.Mock()
result_job.jobReference = job_reference

Expand Down Expand Up @@ -793,7 +784,6 @@ def test_triggering_frequency(self, is_streaming, with_auto_sharding):
job_reference = bigquery_api.JobReference()
job_reference.projectId = 'project1'
job_reference.jobId = 'job_name1'
job_reference.location = 'US'
result_job = bigquery_api.Job()
result_job.jobReference = job_reference

Expand Down
30 changes: 23 additions & 7 deletions sdks/python/apache_beam/io/gcp/bigquery_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io
import json
import logging
import re
import sys
import time
import uuid
Expand Down Expand Up @@ -558,6 +559,19 @@ def _insert_load_job(
))
return self._start_job(request, stream=source_stream).jobReference

@staticmethod
def _parse_location_from_exc(content, job_id):
"""Parse job location from Exception content."""
if isinstance(content, bytes):
content = content.decode('ascii', 'replace')
# search for "Already Exists: Job <project-id>:<location>.<job id>"
m = re.search(r"Already Exists: Job \S+\:(\S+)\." + job_id, content)
if not m:
_LOGGER.warning(
"Not able to parse BigQuery load job location for %s", job_id)
return None
return m.group(1)

def _start_job(
self,
request, # type: bigquery.BigqueryJobsInsertRequest
Expand Down Expand Up @@ -585,11 +599,17 @@ def _start_job(
return response
except HttpError as exn:
if exn.status_code == 409:
jobId = request.job.jobReference.jobId
_LOGGER.info(
"BigQuery job %s already exists, will not retry inserting it: %s",
request.job.jobReference,
exn)
return request.job
job_location = self._parse_location_from_exc(exn.content, jobId)
response = request.job
if not response.jobReference.location and job_location:
# Request not constructed with location
response.jobReference.location = job_location
return response
else:
_LOGGER.info(
"Failed to insert job %s: %s", request.job.jobReference, exn)
Expand Down Expand Up @@ -631,16 +651,14 @@ def _start_query_job(

return self._start_job(request)

def wait_for_bq_job(
self, job_reference, sleep_duration_sec=5, max_retries=0, location=None):
def wait_for_bq_job(self, job_reference, sleep_duration_sec=5, max_retries=0):
"""Poll job until it is DONE.
Args:
job_reference: bigquery.JobReference instance.
sleep_duration_sec: Specifies the delay in seconds between retries.
max_retries: The total number of times to retry. If equals to 0,
the function waits forever.
location: Fall back on this location if job_reference doesn't have one.
Raises:
`RuntimeError`: If the job is FAILED or the number of retries has been
Expand All @@ -650,9 +668,7 @@ def wait_for_bq_job(
while True:
retry += 1
job = self.get_job(
job_reference.projectId,
job_reference.jobId,
job_reference.location or location)
job_reference.projectId, job_reference.jobId, job_reference.location)
_LOGGER.info('Job %s status: %s', job.id, job.status.state)
if job.status.state == 'DONE' and job.status.errorResult:
raise RuntimeError(
Expand Down

0 comments on commit 6f902c6

Please sign in to comment.