diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 630ed7910c8d..721bee4fceb5 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -1981,27 +1981,34 @@ def env_key(env): base_env_key(e) for e in environments.expand_anyof_environments(env))) - cannonical_enviornments = collections.defaultdict(list) + canonical_environments = collections.defaultdict(list) for env_id, env in pipeline_proto.components.environments.items(): - cannonical_enviornments[env_key(env)].append(env_id) + canonical_environments[env_key(env)].append(env_id) - if len(cannonical_enviornments) == len( - pipeline_proto.components.environments): + if len(canonical_environments) == len(pipeline_proto.components.environments): # All environments are already sufficiently distinct. return pipeline_proto environment_remappings = { e: es[0] - for es in cannonical_enviornments.values() for e in es + for es in canonical_environments.values() for e in es } if not inplace: pipeline_proto = copy.copy(pipeline_proto) for t in pipeline_proto.components.transforms.values(): + if t.environment_id not in pipeline_proto.components.environments: + # TODO(https://github.com/apache/beam/issues/30876): Remove this + # workaround. + continue if t.environment_id: t.environment_id = environment_remappings[t.environment_id] for w in pipeline_proto.components.windowing_strategies.values(): + if w.environment_id not in pipeline_proto.components.environments: + # TODO(https://github.com/apache/beam/issues/30876): Remove this + # workaround. + continue if w.environment_id: w.environment_id = environment_remappings[w.environment_id] for e in set(pipeline_proto.components.environments.keys()) - set(