From 93140cb113b673fd9ee1a1282a6ec64fe817c5c2 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 8 Mar 2024 15:07:20 -0500 Subject: [PATCH] Resolve Any environment types before optimization. (#30568) (#30576) This allows us to correctly inspect environment capabilities during optimization, etc. Co-authored-by: Robert Bradshaw --- .../portability/fn_api_runner/fn_runner.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py index a736288dc62d..b3dd124216be 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py @@ -56,6 +56,7 @@ from apache_beam.options import pipeline_options from apache_beam.options.value_provider import RuntimeValueProvider from apache_beam.portability import common_urns +from apache_beam.portability import python_urns from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_provision_api_pb2 from apache_beam.portability.api import beam_runner_api_pb2 @@ -220,6 +221,7 @@ def run_via_runner_api(self, pipeline_proto, options): ] if direct_options.direct_embed_docker_python: pipeline_proto = self.embed_default_docker_image(pipeline_proto) + pipeline_proto = self.resolve_any_environments(pipeline_proto) stage_context, stages = self.create_stages(pipeline_proto) return self.run_stages(stage_context, stages) @@ -235,6 +237,9 @@ def embed_default_docker_image(self, pipeline_proto): environments.DockerEnvironment.default_docker_image()).to_runner_api( None) # type: ignore[arg-type] + # We'd rather deal with the complexity of any environments here rather + # than resolve them first so we can get optimal substitution in case + # docker is not high in the preferred environment type list. def is_this_python_docker_env(env): return any( e == docker_env for e in environments.expand_anyof_environments(env)) @@ -264,6 +269,16 @@ def is_this_python_docker_env(env): transform.environment_id = embedded_env_id return pipeline_proto + def resolve_any_environments(self, pipeline_proto): + for env_id, env in pipeline_proto.components.environments.items(): + pipeline_proto.components.environments[env_id].CopyFrom( + environments.resolve_anyof_environment( + env, + python_urns.EMBEDDED_PYTHON, + common_urns.environments.EXTERNAL.urn, + common_urns.environments.DOCKER.urn)) + return pipeline_proto + @contextlib.contextmanager def maybe_profile(self): # type: () -> Iterator[None]