Skip to content

Commit

Permalink
Revert "Override BQ load job location when necessary (#31986)"
Browse files Browse the repository at this point in the history
This reverts commit ea98212.
  • Loading branch information
Abacn committed Oct 29, 2024
1 parent f2e1f94 commit a0880a4
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 33 deletions.
18 changes: 1 addition & 17 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
Original file line number Diff line number Diff line change
Expand Up @@ -777,26 +777,10 @@ def process(
GlobalWindows.windowed_value((destination, job_reference)))

def finish_bundle(self):
dataset_locations = {}

for windowed_value in self.pending_jobs:
table_ref = bigquery_tools.parse_table_reference(windowed_value.value[0])
project_dataset = (table_ref.projectId, table_ref.datasetId)

job_ref = windowed_value.value[1]
# In some cases (e.g. when the load job op returns a 409 ALREADY_EXISTS),
# the returned job reference may not include a location. In such cases,
# we need to override with the dataset's location.
job_location = job_ref.location
if not job_location and project_dataset not in dataset_locations:
job_location = self.bq_wrapper.get_table_location(
table_ref.projectId, table_ref.datasetId, table_ref.tableId)
dataset_locations[project_dataset] = job_location

self.bq_wrapper.wait_for_bq_job(
job_ref,
sleep_duration_sec=_SLEEP_DURATION_BETWEEN_POLLS,
location=job_location)
job_ref, sleep_duration_sec=_SLEEP_DURATION_BETWEEN_POLLS)
return self.pending_jobs


Expand Down
10 changes: 0 additions & 10 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,6 @@ def test_records_traverse_transform_with_mocks(self):
job_reference = bigquery_api.JobReference()
job_reference.projectId = 'project1'
job_reference.jobId = 'job_name1'
job_reference.location = 'US'
result_job = bigquery_api.Job()
result_job.jobReference = job_reference

Expand Down Expand Up @@ -483,7 +482,6 @@ def test_load_job_id_used(self):
job_reference = bigquery_api.JobReference()
job_reference.projectId = 'loadJobProject'
job_reference.jobId = 'job_name1'
job_reference.location = 'US'

result_job = bigquery_api.Job()
result_job.jobReference = job_reference
Expand Down Expand Up @@ -521,7 +519,6 @@ def test_load_job_id_use_for_copy_job(self):
job_reference = bigquery_api.JobReference()
job_reference.projectId = 'loadJobProject'
job_reference.jobId = 'job_name1'
job_reference.location = 'US'
result_job = mock.Mock()
result_job.jobReference = job_reference

Expand Down Expand Up @@ -577,12 +574,10 @@ def test_wait_for_load_job_completion(self, sleep_mock):
job_1.jobReference = bigquery_api.JobReference()
job_1.jobReference.projectId = 'project1'
job_1.jobReference.jobId = 'jobId1'
job_1.jobReference.location = 'US'
job_2 = bigquery_api.Job()
job_2.jobReference = bigquery_api.JobReference()
job_2.jobReference.projectId = 'project1'
job_2.jobReference.jobId = 'jobId2'
job_2.jobReference.location = 'US'

job_1_waiting = mock.Mock()
job_1_waiting.status.state = 'RUNNING'
Expand Down Expand Up @@ -622,12 +617,10 @@ def test_one_load_job_failed_after_waiting(self, sleep_mock):
job_1.jobReference = bigquery_api.JobReference()
job_1.jobReference.projectId = 'project1'
job_1.jobReference.jobId = 'jobId1'
job_1.jobReference.location = 'US'
job_2 = bigquery_api.Job()
job_2.jobReference = bigquery_api.JobReference()
job_2.jobReference.projectId = 'project1'
job_2.jobReference.jobId = 'jobId2'
job_2.jobReference.location = 'US'

job_1_waiting = mock.Mock()
job_1_waiting.status.state = 'RUNNING'
Expand Down Expand Up @@ -664,7 +657,6 @@ def test_multiple_partition_files(self):
job_reference = bigquery_api.JobReference()
job_reference.projectId = 'project1'
job_reference.jobId = 'job_name1'
job_reference.location = 'US'
result_job = mock.Mock()
result_job.jobReference = job_reference

Expand Down Expand Up @@ -750,7 +742,6 @@ def test_multiple_partition_files_write_dispositions(
job_reference = bigquery_api.JobReference()
job_reference.projectId = 'project1'
job_reference.jobId = 'job_name1'
job_reference.location = 'US'
result_job = mock.Mock()
result_job.jobReference = job_reference

Expand Down Expand Up @@ -793,7 +784,6 @@ def test_triggering_frequency(self, is_streaming, with_auto_sharding):
job_reference = bigquery_api.JobReference()
job_reference.projectId = 'project1'
job_reference.jobId = 'job_name1'
job_reference.location = 'US'
result_job = bigquery_api.Job()
result_job.jobReference = job_reference

Expand Down
8 changes: 2 additions & 6 deletions sdks/python/apache_beam/io/gcp/bigquery_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,16 +631,14 @@ def _start_query_job(

return self._start_job(request)

def wait_for_bq_job(
self, job_reference, sleep_duration_sec=5, max_retries=0, location=None):
def wait_for_bq_job(self, job_reference, sleep_duration_sec=5, max_retries=0):
"""Poll job until it is DONE.
Args:
job_reference: bigquery.JobReference instance.
sleep_duration_sec: Specifies the delay in seconds between retries.
max_retries: The total number of times to retry. If equals to 0,
the function waits forever.
location: Fall back on this location if job_reference doesn't have one.
Raises:
`RuntimeError`: If the job is FAILED or the number of retries has been
Expand All @@ -650,9 +648,7 @@ def wait_for_bq_job(
while True:
retry += 1
job = self.get_job(
job_reference.projectId,
job_reference.jobId,
job_reference.location or location)
job_reference.projectId, job_reference.jobId, job_reference.location)
_LOGGER.info('Job %s status: %s', job.id, job.status.state)
if job.status.state == 'DONE' and job.status.errorResult:
raise RuntimeError(
Expand Down

0 comments on commit a0880a4

Please sign in to comment.