From 80c7450f77f58c9e02087cc578ad8abe07648736 Mon Sep 17 00:00:00 2001 From: Jeffrey Kinard Date: Mon, 14 Oct 2024 18:57:01 -0400 Subject: [PATCH] Propogate field_descriptions to RowTypeConstraint Signed-off-by: Jeffrey Kinard --- sdks/python/apache_beam/typehints/row_type.py | 20 +++++++--- sdks/python/apache_beam/typehints/schemas.py | 1 + .../apache_beam/typehints/schemas_test.py | 38 +++++++++++++++++++ 3 files changed, 54 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/typehints/row_type.py b/sdks/python/apache_beam/typehints/row_type.py index fd7885ad59c4..880a897bbbe8 100644 --- a/sdks/python/apache_beam/typehints/row_type.py +++ b/sdks/python/apache_beam/typehints/row_type.py @@ -49,7 +49,8 @@ def __init__( fields: Sequence[Tuple[str, type]], user_type, schema_options: Optional[Sequence[Tuple[str, Any]]] = None, - field_options: Optional[Dict[str, Sequence[Tuple[str, Any]]]] = None): + field_options: Optional[Dict[str, Sequence[Tuple[str, Any]]]] = None, + field_descriptions: Optional[Dict[str, str]] = None): """For internal use only, no backwards comatibility guaratees. See https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types for guidance on creating PCollections with inferred schemas. @@ -96,6 +97,7 @@ def __init__( self._schema_options = schema_options or [] self._field_options = field_options or {} + self._field_descriptions = field_descriptions or {} @staticmethod def from_user_type( @@ -107,12 +109,15 @@ def from_user_type( fields = [(name, user_type.__annotations__[name]) for name in user_type._fields] + field_descriptions = getattr(user_type, '_field_descriptions', None) + if _user_type_is_generated(user_type): return RowTypeConstraint.from_fields( fields, schema_id=getattr(user_type, _BEAM_SCHEMA_ID), schema_options=schema_options, - field_options=field_options) + field_options=field_options, + field_descriptions=field_descriptions) # TODO(https://github.com/apache/beam/issues/22125): Add user API for # specifying schema/field options @@ -120,7 +125,8 @@ def from_user_type( fields=fields, user_type=user_type, schema_options=schema_options, - field_options=field_options) + field_options=field_options, + field_descriptions=field_descriptions) return None @@ -131,13 +137,15 @@ def from_fields( schema_options: Optional[Sequence[Tuple[str, Any]]] = None, field_options: Optional[Dict[str, Sequence[Tuple[str, Any]]]] = None, schema_registry: Optional[SchemaTypeRegistry] = None, + field_descriptions: Optional[Dict[str, str]] = None, ) -> RowTypeConstraint: return GeneratedClassRowTypeConstraint( fields, schema_id=schema_id, schema_options=schema_options, field_options=field_options, - schema_registry=schema_registry) + schema_registry=schema_registry, + field_descriptions=field_descriptions) def __call__(self, *args, **kwargs): # We make RowTypeConstraint callable (defers to constructing the user type) @@ -206,6 +214,7 @@ def __init__( schema_options: Optional[Sequence[Tuple[str, Any]]] = None, field_options: Optional[Dict[str, Sequence[Tuple[str, Any]]]] = None, schema_registry: Optional[SchemaTypeRegistry] = None, + field_descriptions: Optional[Dict[str, str]] = None, ): from apache_beam.typehints.schemas import named_fields_to_schema from apache_beam.typehints.schemas import named_tuple_from_schema @@ -224,7 +233,8 @@ def __init__( fields, user_type, schema_options=schema_options, - field_options=field_options) + field_options=field_options, + field_descriptions=field_descriptions) def __reduce__(self): return ( diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index ef82ca91044c..fea9b3534b0c 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -274,6 +274,7 @@ def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType: self.option_to_runner_api(option_tuple) for option_tuple in type_.field_options(field_name) ], + description=type_._field_descriptions.get(field_name, None), ) for (field_name, field_type) in type_._fields ], id=schema_id, diff --git a/sdks/python/apache_beam/typehints/schemas_test.py b/sdks/python/apache_beam/typehints/schemas_test.py index 5d38b16d9783..15144c6c2c17 100644 --- a/sdks/python/apache_beam/typehints/schemas_test.py +++ b/sdks/python/apache_beam/typehints/schemas_test.py @@ -489,6 +489,44 @@ def test_row_type_constraint_to_schema_with_field_options(self): ] self.assertEqual(list(field.options), expected) + def test_row_type_constraint_to_schema_with_field_descriptions(self): + row_type_with_options = row_type.RowTypeConstraint.from_fields( + [ + ('foo', np.int8), + ('bar', float), + ('baz', bytes), + ], + field_descriptions={ + 'foo': 'foo description', + 'bar': 'bar description', + 'baz': 'baz description', + }) + result_type = typing_to_runner_api(row_type_with_options) + + self.assertIsInstance(result_type, schema_pb2.FieldType) + self.assertEqual(result_type.WhichOneof("type_info"), "row_type") + + fields = result_type.row_type.schema.fields + + expected = [ + schema_pb2.Field( + name='foo', + description='foo description', + type=schema_pb2.FieldType(atomic_type=schema_pb2.BYTE), + ), + schema_pb2.Field( + name='bar', + description='bar description', + type=schema_pb2.FieldType(atomic_type=schema_pb2.DOUBLE), + ), + schema_pb2.Field( + name='baz', + description='baz description', + type=schema_pb2.FieldType(atomic_type=schema_pb2.BYTES), + ), + ] + self.assertEqual(list(fields), expected) + def assert_namedtuple_equivalent(self, actual, expected): # Two types are only considered equal if they are literally the same # object (i.e. `actual == expected` is the same as `actual is expected` in