From 365794c63fe7bdce00eb78c2c5d71f753267647c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ferran=20Fern=C3=A1ndez=20Garrido?= Date: Mon, 22 Apr 2024 14:29:37 +0200 Subject: [PATCH 1/4] [YAML] - Normalize YAML PubSub format --- sdks/python/apache_beam/yaml/yaml_io.py | 39 ++++++++++++++++--------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_io.py b/sdks/python/apache_beam/yaml/yaml_io.py index 1cd7231e35dd..f9d149fb5247 100644 --- a/sdks/python/apache_beam/yaml/yaml_io.py +++ b/sdks/python/apache_beam/yaml/yaml_io.py @@ -25,6 +25,7 @@ import io import os +import logging from typing import Any from typing import Callable from typing import Iterable @@ -164,16 +165,21 @@ def raise_exception(failed_row_with_error): def _create_parser( format, schema: Any) -> Tuple[schema_pb2.Schema, Callable[[bytes], beam.Row]]: - if format == 'raw': + + if format.islower(): + format = format.upper() + logging.warning('Lowercase formats will be deprecated in version 2.60') + + if format == 'RAW': if schema: - raise ValueError('raw format does not take a schema') + raise ValueError('RAW format does not take a schema') return ( schema_pb2.Schema(fields=[schemas.schema_field('payload', bytes)]), lambda payload: beam.Row(payload=payload)) - elif format == 'json': + elif format == 'JSON': beam_schema = json_utils.json_schema_to_beam_schema(schema) return beam_schema, json_utils.json_parser(beam_schema, schema) - elif format == 'avro': + elif format == 'AVRO': beam_schema = avroio.avro_schema_to_beam_schema(schema) covert_to_row = avroio.avro_dict_to_beam_row(schema, beam_schema) # pylint: disable=line-too-long @@ -188,16 +194,21 @@ def _create_parser( def _create_formatter( format, schema: Any, beam_schema: schema_pb2.Schema) -> Callable[[beam.Row], bytes]: - if format == 'raw': + + if format.islower(): + format = format.upper() + logging.warning('Lowercase formats will be deprecated in version 2.60') + + if format == 'RAW': if schema: - raise ValueError('raw format does not take a schema') + raise ValueError('RAW format does not take a schema') field_names = [field.name for field in beam_schema.fields] if len(field_names) != 1: raise ValueError(f'Expecting exactly one field, found {field_names}') return lambda row: getattr(row, field_names[0]) - elif format == 'json': + elif format == 'JSON': return json_utils.json_formater(beam_schema) - elif format == 'avro': + elif format == 'AVRO': avro_schema = schema or avroio.beam_schema_to_avro_schema(beam_schema) from_row = avroio.beam_row_to_avro_dict(avro_schema, beam_schema) @@ -238,10 +249,10 @@ def read_from_pubsub( format: The expected format of the message payload. Currently suported formats are - - raw: Produces records with a single `payload` field whose contents + - RAW: Produces records with a single `payload` field whose contents are the raw bytes of the pubsub message. - - avro: Parses records with a given avro schema. - - json: Parses records with a given json schema. + - AVRO: Parses records with a given Avro schema. + - JSON: Parses records with a given JSON schema. schema: Schema specification for the given format. attributes: List of attribute keys whose values will be flattened into the @@ -335,12 +346,12 @@ def write_to_pubsub( format: How to format the message payload. Currently suported formats are - - raw: Expects a message with a single field (excluding + - RAW: Expects a message with a single field (excluding attribute-related fields) whose contents are used as the raw bytes of the pubsub message. - - avro: Encodes records with a given avro schema, which may be inferred + - AVRO: Encodes records with a given Avro schema, which may be inferred from the input PCollection schema. - - json: Formats records with a given json schema, which may be inferred + - JSON: Formats records with a given JSON schema, which may be inferred from the input PCollection schema. schema: Schema specification for the given format. From 2a256bf2002e76401006c9c430d61ebc48b3c111 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ferran=20Fern=C3=A1ndez=20Garrido?= Date: Mon, 22 Apr 2024 15:03:51 +0200 Subject: [PATCH 2/4] [YAML] - Fix format --- sdks/python/apache_beam/yaml/yaml_io.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_io.py b/sdks/python/apache_beam/yaml/yaml_io.py index f9d149fb5247..57e300d05484 100644 --- a/sdks/python/apache_beam/yaml/yaml_io.py +++ b/sdks/python/apache_beam/yaml/yaml_io.py @@ -196,8 +196,8 @@ def _create_formatter( beam_schema: schema_pb2.Schema) -> Callable[[beam.Row], bytes]: if format.islower(): - format = format.upper() - logging.warning('Lowercase formats will be deprecated in version 2.60') + format = format.upper() + logging.warning('Lowercase formats will be deprecated in version 2.60') if format == 'RAW': if schema: From e980ac38991ca540daf7e463e8793410737d393d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ferran=20Fern=C3=A1ndez=20Garrido?= Date: Mon, 22 Apr 2024 16:18:37 +0200 Subject: [PATCH 3/4] [YAML] - Fix import --- sdks/python/apache_beam/yaml/yaml_io.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/yaml/yaml_io.py b/sdks/python/apache_beam/yaml/yaml_io.py index 57e300d05484..227a36b33ca3 100644 --- a/sdks/python/apache_beam/yaml/yaml_io.py +++ b/sdks/python/apache_beam/yaml/yaml_io.py @@ -24,8 +24,8 @@ """ import io -import os import logging +import os from typing import Any from typing import Callable from typing import Iterable From c672a1c36f7b53a54cb991a3c0e4cead2190eb81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ferran=20Fern=C3=A1ndez=20Garrido?= Date: Tue, 23 Apr 2024 09:57:44 +0200 Subject: [PATCH 4/4] [YAML] - Update tests and web --- sdks/python/apache_beam/yaml/yaml_io_test.py | 30 +++++++++---------- .../content/en/documentation/sdks/yaml.md | 14 ++++----- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_io_test.py b/sdks/python/apache_beam/yaml/yaml_io_test.py index 54fbac0fbeb2..393e31de0e6d 100644 --- a/sdks/python/apache_beam/yaml/yaml_io_test.py +++ b/sdks/python/apache_beam/yaml/yaml_io_test.py @@ -95,7 +95,7 @@ def test_simple_read(self): type: ReadFromPubSub config: topic: my_topic - format: raw + format: RAW ''') assert_that( result, @@ -115,7 +115,7 @@ def test_read_with_attribute(self): type: ReadFromPubSub config: topic: my_topic - format: raw + format: RAW attributes: [attr] ''') assert_that( @@ -139,7 +139,7 @@ def test_read_with_attribute_map(self): type: ReadFromPubSub config: topic: my_topic - format: raw + format: RAW attributes_map: attrMap ''') assert_that( @@ -163,7 +163,7 @@ def test_read_with_id_attribute(self): type: ReadFromPubSub config: topic: my_topic - format: raw + format: RAW id_attribute: some_attr ''') assert_that( @@ -203,7 +203,7 @@ def test_read_avro(self): type: ReadFromPubSub config: topic: my_topic - format: avro + format: AVRO schema: %s ''' % json.dumps(self._avro_schema)) assert_that( @@ -227,7 +227,7 @@ def test_read_json(self): type: ReadFromPubSub config: topic: my_topic - format: json + format: JSON schema: type: object properties: @@ -267,7 +267,7 @@ def test_read_json_with_error_handling(self): type: ReadFromPubSub config: topic: my_topic - format: json + format: JSON schema: type: object properties: @@ -300,7 +300,7 @@ def test_read_json_without_error_handling(self): type: ReadFromPubSub config: topic: my_topic - format: json + format: JSON schema: type: object properties: @@ -322,7 +322,7 @@ def test_read_json_with_bad_schema(self): type: ReadFromPubSub config: topic: my_topic - format: json + format: JSON schema: type: object properties: @@ -353,7 +353,7 @@ def test_simple_write(self): type: WriteToPubSub config: topic: my_topic - format: raw + format: RAW ''')) def test_write_with_attribute(self): @@ -374,7 +374,7 @@ def test_write_with_attribute(self): type: WriteToPubSub config: topic: my_topic - format: raw + format: RAW attributes: [attr] ''')) @@ -396,7 +396,7 @@ def test_write_with_attribute_map(self): type: WriteToPubSub config: topic: my_topic - format: raw + format: RAW attributes_map: attrMap ''')) @@ -415,7 +415,7 @@ def test_write_with_id_attribute(self): type: WriteToPubSub config: topic: my_topic - format: raw + format: RAW id_attribute: some_attr ''')) @@ -438,7 +438,7 @@ def test_write_avro(self): type: WriteToPubSub config: topic: my_topic - format: avro + format: AVRO ''')) def test_write_json(self): @@ -463,7 +463,7 @@ def test_write_json(self): type: WriteToPubSub config: topic: my_topic - format: json + format: JSON attributes: [label] attributes_map: other ''')) diff --git a/website/www/site/content/en/documentation/sdks/yaml.md b/website/www/site/content/en/documentation/sdks/yaml.md index 5c34e9939823..530ccf1177cd 100644 --- a/website/www/site/content/en/documentation/sdks/yaml.md +++ b/website/www/site/content/en/documentation/sdks/yaml.md @@ -424,7 +424,7 @@ pipeline: - type: ReadFromPubSub config: topic: myPubSubTopic - format: json + format: JSON schema: type: object properties: @@ -441,7 +441,7 @@ pipeline: - type: WriteToPubSub config: topic: anotherPubSubTopic - format: json + format: JSON options: streaming: true ``` @@ -469,7 +469,7 @@ pipeline: - type: WriteToPubSub config: topic: anotherPubSubTopic - format: json + format: JSON options: streaming: true ``` @@ -496,7 +496,7 @@ pipeline: - type: WriteToPubSub config: topic: anotherPubSubTopic - format: json + format: JSON options: streaming: true ``` @@ -556,7 +556,7 @@ pipeline: - type: WriteToPubSub config: topic: anotherPubSubTopic - format: json + format: JSON options: streaming: true ``` @@ -581,7 +581,7 @@ pipeline: - type: WriteToPubSub config: topic: anotherPubSubTopic - format: json + format: JSON windowing: type: fixed size: 60 @@ -700,7 +700,7 @@ pipeline: - type: WriteToPubSub config: topic: anotherPubSubTopic - format: json + format: JSON options: streaming: true ```