Skip to content

Commit

Permalink
Add case for exisiting bytes object
Browse files Browse the repository at this point in the history
Signed-off-by: Jeffrey Kinard <[email protected]>
  • Loading branch information
Polber committed Aug 27, 2024
1 parent a0c9245 commit 3d90cff
Showing 1 changed file with 13 additions and 1 deletion.
14 changes: 13 additions & 1 deletion sdks/python/apache_beam/yaml/yaml_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,19 @@ def _create_formatter(
field_names = [field.name for field in beam_schema.fields]
if len(field_names) != 1:
raise ValueError(f'Expecting exactly one field, found {field_names}')
return lambda row: getattr(row, field_names[0]).encode('utf-8')

def convert_to_bytes(row):
output = getattr(row, field_names[0])
if isinstance(output, bytes):
return output
try:
return str(output).encode('utf-8')
except Exception as e:
raise ValueError(
f"Cannot encode payload for WriteToPubSub. "
f"Must be valid string or bytes object. {e}")

return convert_to_bytes
elif format == 'JSON':
return json_utils.json_formater(beam_schema)
elif format == 'AVRO':
Expand Down

0 comments on commit 3d90cff

Please sign in to comment.