Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YAML] - Normalize YAML PubSub format #31068

Merged
merged 4 commits into from
Apr 23, 2024
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 25 additions & 14 deletions sdks/python/apache_beam/yaml/yaml_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"""

import io
import logging
import os
from typing import Any
from typing import Callable
Expand Down Expand Up @@ -164,16 +165,21 @@ def raise_exception(failed_row_with_error):
def _create_parser(
format,
schema: Any) -> Tuple[schema_pb2.Schema, Callable[[bytes], beam.Row]]:
if format == 'raw':

if format.islower():
format = format.upper()
logging.warning('Lowercase formats will be deprecated in version 2.60')

if format == 'RAW':
if schema:
raise ValueError('raw format does not take a schema')
raise ValueError('RAW format does not take a schema')
return (
schema_pb2.Schema(fields=[schemas.schema_field('payload', bytes)]),
lambda payload: beam.Row(payload=payload))
elif format == 'json':
elif format == 'JSON':
beam_schema = json_utils.json_schema_to_beam_schema(schema)
return beam_schema, json_utils.json_parser(beam_schema, schema)
elif format == 'avro':
elif format == 'AVRO':
beam_schema = avroio.avro_schema_to_beam_schema(schema)
covert_to_row = avroio.avro_dict_to_beam_row(schema, beam_schema)
# pylint: disable=line-too-long
Expand All @@ -188,16 +194,21 @@ def _create_parser(
def _create_formatter(
format, schema: Any,
beam_schema: schema_pb2.Schema) -> Callable[[beam.Row], bytes]:
if format == 'raw':

if format.islower():
format = format.upper()
logging.warning('Lowercase formats will be deprecated in version 2.60')

if format == 'RAW':
if schema:
raise ValueError('raw format does not take a schema')
raise ValueError('RAW format does not take a schema')
field_names = [field.name for field in beam_schema.fields]
if len(field_names) != 1:
raise ValueError(f'Expecting exactly one field, found {field_names}')
return lambda row: getattr(row, field_names[0])
elif format == 'json':
elif format == 'JSON':
return json_utils.json_formater(beam_schema)
elif format == 'avro':
elif format == 'AVRO':
avro_schema = schema or avroio.beam_schema_to_avro_schema(beam_schema)
from_row = avroio.beam_row_to_avro_dict(avro_schema, beam_schema)

Expand Down Expand Up @@ -238,10 +249,10 @@ def read_from_pubsub(
format: The expected format of the message payload. Currently suported
formats are

- raw: Produces records with a single `payload` field whose contents
- RAW: Produces records with a single `payload` field whose contents
are the raw bytes of the pubsub message.
- avro: Parses records with a given avro schema.
- json: Parses records with a given json schema.
- AVRO: Parses records with a given Avro schema.
- JSON: Parses records with a given JSON schema.

schema: Schema specification for the given format.
attributes: List of attribute keys whose values will be flattened into the
Expand Down Expand Up @@ -335,12 +346,12 @@ def write_to_pubsub(
format: How to format the message payload. Currently suported
formats are

- raw: Expects a message with a single field (excluding
- RAW: Expects a message with a single field (excluding
attribute-related fields) whose contents are used as the raw bytes
of the pubsub message.
- avro: Encodes records with a given avro schema, which may be inferred
- AVRO: Encodes records with a given Avro schema, which may be inferred
from the input PCollection schema.
- json: Formats records with a given json schema, which may be inferred
- JSON: Formats records with a given JSON schema, which may be inferred
from the input PCollection schema.

schema: Schema specification for the given format.
Expand Down
Loading