Skip to content

Commit

Permalink
Fix merging with missing environments. (apache#30864)
Browse files Browse the repository at this point in the history
  • Loading branch information
Polber authored Apr 5, 2024
1 parent f1a47ef commit 1dfd39b
Showing 1 changed file with 12 additions and 5 deletions.
17 changes: 12 additions & 5 deletions sdks/python/apache_beam/runners/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1981,27 +1981,34 @@ def env_key(env):
base_env_key(e)
for e in environments.expand_anyof_environments(env)))

cannonical_enviornments = collections.defaultdict(list)
canonical_environments = collections.defaultdict(list)
for env_id, env in pipeline_proto.components.environments.items():
cannonical_enviornments[env_key(env)].append(env_id)
canonical_environments[env_key(env)].append(env_id)

if len(cannonical_enviornments) == len(
pipeline_proto.components.environments):
if len(canonical_environments) == len(pipeline_proto.components.environments):
# All environments are already sufficiently distinct.
return pipeline_proto

environment_remappings = {
e: es[0]
for es in cannonical_enviornments.values() for e in es
for es in canonical_environments.values() for e in es
}

if not inplace:
pipeline_proto = copy.copy(pipeline_proto)

for t in pipeline_proto.components.transforms.values():
if t.environment_id not in pipeline_proto.components.environments:
# TODO(https://github.com/apache/beam/issues/30876): Remove this
# workaround.
continue
if t.environment_id:
t.environment_id = environment_remappings[t.environment_id]
for w in pipeline_proto.components.windowing_strategies.values():
if w.environment_id not in pipeline_proto.components.environments:
# TODO(https://github.com/apache/beam/issues/30876): Remove this
# workaround.
continue
if w.environment_id:
w.environment_id = environment_remappings[w.environment_id]
for e in set(pipeline_proto.components.environments.keys()) - set(
Expand Down

0 comments on commit 1dfd39b

Please sign in to comment.