Skip to content

Commit

Permalink
Resolve Any environment types before optimization. (apache#30568) (ap…
Browse files Browse the repository at this point in the history
…ache#30576)

This allows us to correctly inspect environment capabilities during optimization, etc.

Co-authored-by: Robert Bradshaw <[email protected]>
  • Loading branch information
Abacn and robertwb authored Mar 8, 2024
1 parent b67fc15 commit 93140cb
Showing 1 changed file with 15 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
from apache_beam.options import pipeline_options
from apache_beam.options.value_provider import RuntimeValueProvider
from apache_beam.portability import common_urns
from apache_beam.portability import python_urns
from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.portability.api import beam_provision_api_pb2
from apache_beam.portability.api import beam_runner_api_pb2
Expand Down Expand Up @@ -220,6 +221,7 @@ def run_via_runner_api(self, pipeline_proto, options):
]
if direct_options.direct_embed_docker_python:
pipeline_proto = self.embed_default_docker_image(pipeline_proto)
pipeline_proto = self.resolve_any_environments(pipeline_proto)
stage_context, stages = self.create_stages(pipeline_proto)
return self.run_stages(stage_context, stages)

Expand All @@ -235,6 +237,9 @@ def embed_default_docker_image(self, pipeline_proto):
environments.DockerEnvironment.default_docker_image()).to_runner_api(
None) # type: ignore[arg-type]

# We'd rather deal with the complexity of any environments here rather
# than resolve them first so we can get optimal substitution in case
# docker is not high in the preferred environment type list.
def is_this_python_docker_env(env):
return any(
e == docker_env for e in environments.expand_anyof_environments(env))
Expand Down Expand Up @@ -264,6 +269,16 @@ def is_this_python_docker_env(env):
transform.environment_id = embedded_env_id
return pipeline_proto

def resolve_any_environments(self, pipeline_proto):
for env_id, env in pipeline_proto.components.environments.items():
pipeline_proto.components.environments[env_id].CopyFrom(
environments.resolve_anyof_environment(
env,
python_urns.EMBEDDED_PYTHON,
common_urns.environments.EXTERNAL.urn,
common_urns.environments.DOCKER.urn))
return pipeline_proto

@contextlib.contextmanager
def maybe_profile(self):
# type: () -> Iterator[None]
Expand Down

0 comments on commit 93140cb

Please sign in to comment.