diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index a7311ad6d063..3145fb511068 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -777,26 +777,10 @@ def process( GlobalWindows.windowed_value((destination, job_reference))) def finish_bundle(self): - dataset_locations = {} - for windowed_value in self.pending_jobs: - table_ref = bigquery_tools.parse_table_reference(windowed_value.value[0]) - project_dataset = (table_ref.projectId, table_ref.datasetId) - job_ref = windowed_value.value[1] - # In some cases (e.g. when the load job op returns a 409 ALREADY_EXISTS), - # the returned job reference may not include a location. In such cases, - # we need to override with the dataset's location. - job_location = job_ref.location - if not job_location and project_dataset not in dataset_locations: - job_location = self.bq_wrapper.get_table_location( - table_ref.projectId, table_ref.datasetId, table_ref.tableId) - dataset_locations[project_dataset] = job_location - self.bq_wrapper.wait_for_bq_job( - job_ref, - sleep_duration_sec=_SLEEP_DURATION_BETWEEN_POLLS, - location=job_location) + job_ref, sleep_duration_sec=_SLEEP_DURATION_BETWEEN_POLLS) return self.pending_jobs diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py index e4c0e34d9c1f..10453d9c8baf 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py @@ -427,7 +427,6 @@ def test_records_traverse_transform_with_mocks(self): job_reference = bigquery_api.JobReference() job_reference.projectId = 'project1' job_reference.jobId = 'job_name1' - job_reference.location = 'US' result_job = bigquery_api.Job() result_job.jobReference = job_reference @@ -483,7 +482,6 @@ def test_load_job_id_used(self): job_reference = bigquery_api.JobReference() job_reference.projectId = 'loadJobProject' job_reference.jobId = 'job_name1' - job_reference.location = 'US' result_job = bigquery_api.Job() result_job.jobReference = job_reference @@ -521,7 +519,6 @@ def test_load_job_id_use_for_copy_job(self): job_reference = bigquery_api.JobReference() job_reference.projectId = 'loadJobProject' job_reference.jobId = 'job_name1' - job_reference.location = 'US' result_job = mock.Mock() result_job.jobReference = job_reference @@ -577,12 +574,10 @@ def test_wait_for_load_job_completion(self, sleep_mock): job_1.jobReference = bigquery_api.JobReference() job_1.jobReference.projectId = 'project1' job_1.jobReference.jobId = 'jobId1' - job_1.jobReference.location = 'US' job_2 = bigquery_api.Job() job_2.jobReference = bigquery_api.JobReference() job_2.jobReference.projectId = 'project1' job_2.jobReference.jobId = 'jobId2' - job_2.jobReference.location = 'US' job_1_waiting = mock.Mock() job_1_waiting.status.state = 'RUNNING' @@ -622,12 +617,10 @@ def test_one_load_job_failed_after_waiting(self, sleep_mock): job_1.jobReference = bigquery_api.JobReference() job_1.jobReference.projectId = 'project1' job_1.jobReference.jobId = 'jobId1' - job_1.jobReference.location = 'US' job_2 = bigquery_api.Job() job_2.jobReference = bigquery_api.JobReference() job_2.jobReference.projectId = 'project1' job_2.jobReference.jobId = 'jobId2' - job_2.jobReference.location = 'US' job_1_waiting = mock.Mock() job_1_waiting.status.state = 'RUNNING' @@ -664,7 +657,6 @@ def test_multiple_partition_files(self): job_reference = bigquery_api.JobReference() job_reference.projectId = 'project1' job_reference.jobId = 'job_name1' - job_reference.location = 'US' result_job = mock.Mock() result_job.jobReference = job_reference @@ -750,7 +742,6 @@ def test_multiple_partition_files_write_dispositions( job_reference = bigquery_api.JobReference() job_reference.projectId = 'project1' job_reference.jobId = 'job_name1' - job_reference.location = 'US' result_job = mock.Mock() result_job.jobReference = job_reference @@ -793,7 +784,6 @@ def test_triggering_frequency(self, is_streaming, with_auto_sharding): job_reference = bigquery_api.JobReference() job_reference.projectId = 'project1' job_reference.jobId = 'job_name1' - job_reference.location = 'US' result_job = bigquery_api.Job() result_job.jobReference = job_reference diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index c7128e7899ec..b31f6449fe90 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -32,6 +32,7 @@ import io import json import logging +import re import sys import time import uuid @@ -558,6 +559,19 @@ def _insert_load_job( )) return self._start_job(request, stream=source_stream).jobReference + @staticmethod + def _parse_location_from_exc(content, job_id): + """Parse job location from Exception content.""" + if isinstance(content, bytes): + content = content.decode('ascii', 'replace') + # search for "Already Exists: Job :." + m = re.search(r"Already Exists: Job \S+\:(\S+)\." + job_id, content) + if not m: + _LOGGER.warning( + "Not able to parse BigQuery load job location for %s", job_id) + return None + return m.group(1) + def _start_job( self, request, # type: bigquery.BigqueryJobsInsertRequest @@ -585,11 +599,17 @@ def _start_job( return response except HttpError as exn: if exn.status_code == 409: + jobId = request.job.jobReference.jobId _LOGGER.info( "BigQuery job %s already exists, will not retry inserting it: %s", request.job.jobReference, exn) - return request.job + job_location = self._parse_location_from_exc(exn.content, jobId) + response = request.job + if not response.jobReference.location and job_location: + # Request not constructed with location + response.jobReference.location = job_location + return response else: _LOGGER.info( "Failed to insert job %s: %s", request.job.jobReference, exn) @@ -631,8 +651,7 @@ def _start_query_job( return self._start_job(request) - def wait_for_bq_job( - self, job_reference, sleep_duration_sec=5, max_retries=0, location=None): + def wait_for_bq_job(self, job_reference, sleep_duration_sec=5, max_retries=0): """Poll job until it is DONE. Args: @@ -640,7 +659,6 @@ def wait_for_bq_job( sleep_duration_sec: Specifies the delay in seconds between retries. max_retries: The total number of times to retry. If equals to 0, the function waits forever. - location: Fall back on this location if job_reference doesn't have one. Raises: `RuntimeError`: If the job is FAILED or the number of retries has been @@ -650,9 +668,7 @@ def wait_for_bq_job( while True: retry += 1 job = self.get_job( - job_reference.projectId, - job_reference.jobId, - job_reference.location or location) + job_reference.projectId, job_reference.jobId, job_reference.location) _LOGGER.info('Job %s status: %s', job.id, job.status.state) if job.status.state == 'DONE' and job.status.errorResult: raise RuntimeError(