From 78bde63b4f4d6158584faa2362760e5260336a15 Mon Sep 17 00:00:00 2001 From: Jeff Kinard Date: Wed, 4 Dec 2024 12:08:24 -0500 Subject: [PATCH] [yaml] Fix YAML join with upstream ReadFromPubSub (#33273) --- sdks/python/apache_beam/yaml/yaml_join.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_join.py b/sdks/python/apache_beam/yaml/yaml_join.py index 5124ef56b49c..b22e452b27f9 100644 --- a/sdks/python/apache_beam/yaml/yaml_join.py +++ b/sdks/python/apache_beam/yaml/yaml_join.py @@ -62,9 +62,11 @@ def _validate_equalities(equalities, pcolls): error_prefix = f'Invalid value "{equalities}" for "equalities".' valid_cols = { - name: set(dict(pcoll.element_type._fields).keys()) - for name, - pcoll in pcolls.items() + name: set( + dict(fields).keys() if fields and all( + isinstance(field, tuple) for field in fields) else fields) + for (name, pcoll) in pcolls.items() + for fields in [getattr(pcoll.element_type, '_fields', [])] } if isinstance(equalities, str):