Skip to content

Commit

Permalink
Handle rc container in _update_container_image_for_dataflow (#32049)
Browse files Browse the repository at this point in the history
  • Loading branch information
Abacn authored Aug 2, 2024
1 parent 202fa56 commit 0b4b8ea
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,12 @@ def _update_container_image_for_dataflow(beam_container_image_url):
# By default Dataflow pipelines use containers hosted in Dataflow GCR
# instead of Docker Hub.
image_suffix = beam_container_image_url.rsplit('/', 1)[1]

# trim "RCX" as release candidate tag exists on Docker Hub but not GCR
check_rc = image_suffix.lower().split('rc')
if len(check_rc) == 2:
image_suffix = image_suffix[:-2 - len(check_rc[1])]

return names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/' + image_suffix

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,43 @@ def test_dataflow_container_image_override_prime(self):

self._verify_dataflow_container_image_override(pipeline_options)

def _verify_dataflow_container_image_override_rc(self, pipeline_options):
pipeline = Pipeline(options=pipeline_options)
pipeline | Create([1, 2, 3]) | ParDo(DoFn()) # pylint:disable=expression-not-assigned

dummy_env = DockerEnvironment(
container_image='apache/beam_dummy_name:2.00.0RC10')
proto_pipeline, _ = pipeline.to_runner_api(
return_context=True, default_environment=dummy_env)

# Accessing non-public method for testing.
apiclient.DataflowApplicationClient._apply_sdk_environment_overrides(
proto_pipeline, {}, pipeline_options)

from apache_beam.utils import proto_utils
found_override = False
trimed_rc = True
for env in proto_pipeline.components.environments.values():
docker_payload = proto_utils.parse_Bytes(
env.payload, beam_runner_api_pb2.DockerPayload)
if docker_payload.container_image.startswith(
names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY):
found_override = True
if docker_payload.container_image.split(':')[-1] != '2.00.0':
trimed_rc = False

self.assertTrue(found_override)
self.assertTrue(trimed_rc)

def test_dataflow_container_image_override_rc(self):
pipeline_options = PipelineOptions([
'--experiments=use_runner_v2',
'--temp_location',
'gs://any-location/temp'
])

self._verify_dataflow_container_image_override_rc(pipeline_options)

def _verify_non_apache_container_not_overridden(self, pipeline_options):
pipeline = Pipeline(options=pipeline_options)
pipeline | Create([1, 2, 3]) | ParDo(DoFn()) # pylint:disable=expression-not-assigned
Expand Down

0 comments on commit 0b4b8ea

Please sign in to comment.