Skip to content

Commit

Permalink
Merge pull request #32776 Propogate field_descriptions to RowTypeCons…
Browse files Browse the repository at this point in the history
…traint
  • Loading branch information
robertwb authored Oct 29, 2024
2 parents a0cbe74 + 80c7450 commit 8b1ca21
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 5 deletions.
20 changes: 15 additions & 5 deletions sdks/python/apache_beam/typehints/row_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ def __init__(
fields: Sequence[Tuple[str, type]],
user_type,
schema_options: Optional[Sequence[Tuple[str, Any]]] = None,
field_options: Optional[Dict[str, Sequence[Tuple[str, Any]]]] = None):
field_options: Optional[Dict[str, Sequence[Tuple[str, Any]]]] = None,
field_descriptions: Optional[Dict[str, str]] = None):
"""For internal use only, no backwards comatibility guaratees. See
https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types
for guidance on creating PCollections with inferred schemas.
Expand Down Expand Up @@ -96,6 +97,7 @@ def __init__(

self._schema_options = schema_options or []
self._field_options = field_options or {}
self._field_descriptions = field_descriptions or {}

@staticmethod
def from_user_type(
Expand All @@ -107,20 +109,24 @@ def from_user_type(
fields = [(name, user_type.__annotations__[name])
for name in user_type._fields]

field_descriptions = getattr(user_type, '_field_descriptions', None)

if _user_type_is_generated(user_type):
return RowTypeConstraint.from_fields(
fields,
schema_id=getattr(user_type, _BEAM_SCHEMA_ID),
schema_options=schema_options,
field_options=field_options)
field_options=field_options,
field_descriptions=field_descriptions)

# TODO(https://github.com/apache/beam/issues/22125): Add user API for
# specifying schema/field options
return RowTypeConstraint(
fields=fields,
user_type=user_type,
schema_options=schema_options,
field_options=field_options)
field_options=field_options,
field_descriptions=field_descriptions)

return None

Expand All @@ -131,13 +137,15 @@ def from_fields(
schema_options: Optional[Sequence[Tuple[str, Any]]] = None,
field_options: Optional[Dict[str, Sequence[Tuple[str, Any]]]] = None,
schema_registry: Optional[SchemaTypeRegistry] = None,
field_descriptions: Optional[Dict[str, str]] = None,
) -> RowTypeConstraint:
return GeneratedClassRowTypeConstraint(
fields,
schema_id=schema_id,
schema_options=schema_options,
field_options=field_options,
schema_registry=schema_registry)
schema_registry=schema_registry,
field_descriptions=field_descriptions)

def __call__(self, *args, **kwargs):
# We make RowTypeConstraint callable (defers to constructing the user type)
Expand Down Expand Up @@ -206,6 +214,7 @@ def __init__(
schema_options: Optional[Sequence[Tuple[str, Any]]] = None,
field_options: Optional[Dict[str, Sequence[Tuple[str, Any]]]] = None,
schema_registry: Optional[SchemaTypeRegistry] = None,
field_descriptions: Optional[Dict[str, str]] = None,
):
from apache_beam.typehints.schemas import named_fields_to_schema
from apache_beam.typehints.schemas import named_tuple_from_schema
Expand All @@ -224,7 +233,8 @@ def __init__(
fields,
user_type,
schema_options=schema_options,
field_options=field_options)
field_options=field_options,
field_descriptions=field_descriptions)

def __reduce__(self):
return (
Expand Down
1 change: 1 addition & 0 deletions sdks/python/apache_beam/typehints/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType:
self.option_to_runner_api(option_tuple)
for option_tuple in type_.field_options(field_name)
],
description=type_._field_descriptions.get(field_name, None),
) for (field_name, field_type) in type_._fields
],
id=schema_id,
Expand Down
38 changes: 38 additions & 0 deletions sdks/python/apache_beam/typehints/schemas_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,44 @@ def test_row_type_constraint_to_schema_with_field_options(self):
]
self.assertEqual(list(field.options), expected)

def test_row_type_constraint_to_schema_with_field_descriptions(self):
row_type_with_options = row_type.RowTypeConstraint.from_fields(
[
('foo', np.int8),
('bar', float),
('baz', bytes),
],
field_descriptions={
'foo': 'foo description',
'bar': 'bar description',
'baz': 'baz description',
})
result_type = typing_to_runner_api(row_type_with_options)

self.assertIsInstance(result_type, schema_pb2.FieldType)
self.assertEqual(result_type.WhichOneof("type_info"), "row_type")

fields = result_type.row_type.schema.fields

expected = [
schema_pb2.Field(
name='foo',
description='foo description',
type=schema_pb2.FieldType(atomic_type=schema_pb2.BYTE),
),
schema_pb2.Field(
name='bar',
description='bar description',
type=schema_pb2.FieldType(atomic_type=schema_pb2.DOUBLE),
),
schema_pb2.Field(
name='baz',
description='baz description',
type=schema_pb2.FieldType(atomic_type=schema_pb2.BYTES),
),
]
self.assertEqual(list(fields), expected)

def assert_namedtuple_equivalent(self, actual, expected):
# Two types are only considered equal if they are literally the same
# object (i.e. `actual == expected` is the same as `actual is expected` in
Expand Down

0 comments on commit 8b1ca21

Please sign in to comment.