From 8fbbf8025e04f628417faab102f2cc26f85fe67b Mon Sep 17 00:00:00 2001 From: Jeffrey Kinard Date: Thu, 21 Nov 2024 13:32:30 -0500 Subject: [PATCH 1/5] [yaml] add FileIO docs Signed-off-by: Jeffrey Kinard --- .../apache_beam/yaml/generate_yaml_docs.py | 34 ++-- sdks/python/apache_beam/yaml/standard_io.yaml | 72 ++----- sdks/python/apache_beam/yaml/yaml_io.py | 179 +++++++++++++++++- 3 files changed, 212 insertions(+), 73 deletions(-) diff --git a/sdks/python/apache_beam/yaml/generate_yaml_docs.py b/sdks/python/apache_beam/yaml/generate_yaml_docs.py index 27e17029f387..65453c9f2a36 100644 --- a/sdks/python/apache_beam/yaml/generate_yaml_docs.py +++ b/sdks/python/apache_beam/yaml/generate_yaml_docs.py @@ -86,10 +86,16 @@ def _fake_value(name, beam_type): raise ValueError(f"Unrecognized type_info: {type_info!r}") +EXCLUDE_ARGS = ['args', 'kwargs'] + + def _fake_row(schema): if schema is None: return '...' - return {f.name: _fake_value(f.name, f.type) for f in schema.fields} + return { + f.name: _fake_value(f.name, f.type) + for f in schema.fields if f.name not in EXCLUDE_ARGS + } def pretty_example(provider, t, base_t=None): @@ -161,13 +167,14 @@ def normalize_error_handling(f): def lines(): for f in schema.fields: - f = normalize_error_handling(f) - yield ''.join([ - f'**{f.name}** `{pretty_type(f.type)}`', - maybe_optional(f.type), - indent(': ' + f.description if f.description else '', 2), - maybe_row_parameters(f.type), - ]) + if f.name not in EXCLUDE_ARGS: + f = normalize_error_handling(f) + yield ''.join([ + f'**{f.name}** `{pretty_type(f.type)}`', + maybe_optional(f.type), + indent(': ' + f.description if f.description else '', 2), + maybe_row_parameters(f.type), + ]) return '\n\n'.join('*' + indent(line, 2) for line in lines()).strip() @@ -193,14 +200,19 @@ def io_grouping_key(transform_name): # Exclude providers SKIP = {} +def normalize_beam_version(desc): + return desc.replace( + 'BEAM_VERSION', 'current' if '.dev' in beam_version else beam_version) + def transform_docs(transform_base, transforms, providers, extra_docs=''): return '\n'.join([ f'## {transform_base}', '', - longest( - lambda t: longest(lambda p: p.description(t), providers[t]), - transforms).replace('::\n', '\n\n :::yaml\n'), + normalize_beam_version( + longest( + lambda t: longest(lambda p: p.description(t), providers[t]), + transforms).replace('::\n', '\n\n :::yaml\n')), '', extra_docs, '', diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index 305e6877ad90..276ae84b593b 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -22,6 +22,7 @@ # should be kept in sync. # TODO(yaml): See if this can be enforced programmatically. +# BigQueryIO Java - type: renaming transforms: 'ReadFromBigQuery': 'ReadFromBigQuery' @@ -48,6 +49,7 @@ config: gradle_target: 'sdks:java:extensions:sql:expansion-service:shadowJar' +# KafkaIO Java - type: renaming transforms: 'ReadFromKafka': 'ReadFromKafka' @@ -83,6 +85,7 @@ config: gradle_target: 'sdks:java:io:expansion-service:shadowJar' +# PubSubLite Java - type: renaming transforms: 'ReadFromPubSubLite': 'ReadFromPubSubLite' @@ -120,69 +123,30 @@ config: gradle_target: 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar' +# BigQueryIO Python - type: python transforms: 'ReadFromBigQuery': 'apache_beam.yaml.yaml_io.read_from_bigquery' # Disable until https://github.com/apache/beam/issues/28162 is resolved. # 'WriteToBigQuery': 'apache_beam.yaml.yaml_io.write_to_bigquery' + +# FileIO Python +- type: python + transforms: 'ReadFromText': 'apache_beam.yaml.yaml_io.read_from_text' 'WriteToText': 'apache_beam.yaml.yaml_io.write_to_text' 'ReadFromPubSub': 'apache_beam.yaml.yaml_io.read_from_pubsub' 'WriteToPubSub': 'apache_beam.yaml.yaml_io.write_to_pubsub' + 'ReadFromCsv': 'apache_beam.yaml.yaml_io.read_from_csv' + 'WriteToCsv': 'apache_beam.yaml.yaml_io.write_to_csv' + 'ReadFromJson': 'apache_beam.yaml.yaml_io.read_from_json' + 'WriteToJson': 'apache_beam.yaml.yaml_io.write_to_json' + 'ReadFromParquet': 'apache_beam.yaml.yaml_io.read_from_parquet' + 'WriteToParquet': 'apache_beam.yaml.yaml_io.write_to_parquet' + 'ReadFromAvro': 'apache_beam.yaml.yaml_io.read_from_avro' + 'WriteToAvro': 'apache_beam.yaml.yaml_io.write_to_avro' -# Declared as a renaming transform to avoid exposing all -# (implementation-specific) pandas arguments and aligning with possible Java -# implementation. -# Invoking these directly as a PyTransform is still an option for anyone wanting -# to use these power-features in a language-dependent manner. -- type: renaming - transforms: - 'ReadFromCsv': 'ReadFromCsv' - 'WriteToCsv': 'WriteToCsv' - 'ReadFromJson': 'ReadFromJson' - 'WriteToJson': 'WriteToJson' - 'ReadFromParquet': 'ReadFromParquet' - 'WriteToParquet': 'WriteToParquet' - 'ReadFromAvro': 'ReadFromAvro' - 'WriteToAvro': 'WriteToAvro' - config: - mappings: - 'ReadFromCsv': - path: 'path' - delimiter: 'sep' - comment: 'comment' - 'WriteToCsv': - path: 'path' - delimiter: 'sep' - 'ReadFromJson': - path: 'path' - 'WriteToJson': - path: 'path' - 'ReadFromParquet': - path: 'file_pattern' - 'WriteToParquet': - path: 'file_path_prefix' - 'ReadFromAvro': - path: 'file_pattern' - 'WriteToAvro': - path: 'file_path_prefix' - defaults: - 'ReadFromParquet': - as_rows: True - 'ReadFromAvro': - as_rows: True - underlying_provider: - type: python - transforms: - 'ReadFromCsv': 'apache_beam.io.ReadFromCsv' - 'WriteToCsv': 'apache_beam.io.WriteToCsv' - 'ReadFromJson': 'apache_beam.io.ReadFromJson' - 'WriteToJson': 'apache_beam.io.WriteToJson' - 'ReadFromParquet': 'apache_beam.io.ReadFromParquet' - 'WriteToParquet': 'apache_beam.io.WriteToParquet' - 'ReadFromAvro': 'apache_beam.io.ReadFromAvro' - 'WriteToAvro': 'apache_beam.io.WriteToAvro' - +# FileIO Java - type: beamJar transforms: 'WriteToCsv': 'beam:schematransform:org.apache.beam:csv_write:v1' @@ -190,6 +154,7 @@ config: gradle_target: 'sdks:java:extensions:schemaio-expansion-service:shadowJar' +# JdbcIO Java - type: renaming transforms: 'ReadFromJdbc': 'ReadFromJdbc' @@ -259,6 +224,7 @@ config: gradle_target: 'sdks:java:extensions:schemaio-expansion-service:shadowJar' +# SpannerIO Java - type: renaming transforms: 'ReadFromSpanner': 'ReadFromSpanner' diff --git a/sdks/python/apache_beam/yaml/yaml_io.py b/sdks/python/apache_beam/yaml/yaml_io.py index a6525aef9877..c9edfae2066b 100644 --- a/sdks/python/apache_beam/yaml/yaml_io.py +++ b/sdks/python/apache_beam/yaml/yaml_io.py @@ -38,8 +38,16 @@ import apache_beam as beam import apache_beam.io as beam_io +from apache_beam.io import ReadFromAvro from apache_beam.io import ReadFromBigQuery +from apache_beam.io import ReadFromCsv +from apache_beam.io import ReadFromJson +from apache_beam.io import ReadFromParquet +from apache_beam.io import WriteToAvro from apache_beam.io import WriteToBigQuery +from apache_beam.io import WriteToCsv +from apache_beam.io import WriteToJson +from apache_beam.io import WriteToParquet from apache_beam.io import avroio from apache_beam.io.gcp.bigquery import BigQueryDisposition from apache_beam.portability.api import schema_pb2 @@ -49,30 +57,55 @@ from apache_beam.yaml import yaml_provider -def read_from_text(path: str): +def read_from_text( + path: str, + delimiter: str = None, +): # TODO(yaml): Consider passing the filename and offset, possibly even # by default. - """Reads lines from a text files. + """Reads lines from text files into Beam rows. - The resulting PCollection consists of rows with a single string filed named - "line." + The resulting collection consists of Beam rows, each with a single string + field named "line." + + For example, each Beam row will look like: :: + + line: "a single line of text from source" + + Parses a text file as newline-delimited elements, by default assuming + ``UTF-8`` encoding. Supports newline delimiters ``\n`` and ``\r\n`` + or specified delimiter. Args: - path (str): The file path to read from. The path can contain glob + delimiter (str): Delimiter to split records. + path (str): The file path to read from as a local file path or a + GCS ``gs://`` path. The path can contain glob characters such as ``*`` and ``?``. """ - return beam_io.ReadFromText(path) | beam.Map(lambda s: beam.Row(line=s)) + return beam_io.ReadFromText( + path=path, delimiter=delimiter) | beam.Map(lambda s: beam.Row(line=s)) @beam.ptransform_fn def write_to_text(pcoll, path: str): - """Writes a PCollection to a (set of) text files(s). + """Writes Beam rows to a (set of) text files(s). + + The input must be a beam Row whose schema has exactly one field. + + For example: :: + + key: "text to write" - The input must be a PCollection whose schema has exactly one field. + where `key` can be any name. The idea is that the previous transform should be + outputting a collection of Beam rows that have only one field which is the + text to be written. + + See MapToFields for guidance on how to drop and map rows to this format. Args: - path (str): The file path to write to. The files written will + path (str): The file path to write to as a local file path or a + GCS ``gs://`` path. The files written will begin with this prefix, followed by a shard identifier. """ try: @@ -92,6 +125,134 @@ def write_to_text(pcoll, path: str): lambda x: str(getattr(x, sole_field_name))) | beam.io.WriteToText(path) +def read_from_csv( + path: str, comment: str = None, delimiter: str = None, **kwargs): + """Reads comma-separated values (csv) files into Beam rows. + + For more information about possible arguments, see + + + Args: + delimiter (str): Character or regex pattern to treat as the delimiter, + default ',' (comma). + comment (str): Character indicating that the remainder of line should + not be parsed. If found at the beginning of a line, the line will be + ignored altogether. This parameter must be a single character. + path (str): The file path to read from as a local file path or a + GCS ``gs://`` path. The path can contain glob + characters such as ``*`` and ``?``. + """ + return ReadFromCsv(path=path, comment=comment, sep=delimiter, **kwargs) + + +def write_to_csv(path: str, delimiter: str = None, **kwargs): + """Writes Beam rows to a (set of) comma-separated values (csv) files. + + For more information about possible arguments, see + + + Args: + delimiter (str): Character or regex pattern to treat as the delimiter. + path (str): The file path to write to as a local file path or a + GCS ``gs://`` path. The files written will + begin with this prefix, followed by a shard identifier. + """ + return WriteToCsv(path=path, sep=delimiter, **kwargs) + + +def read_from_json(path: str, **kwargs): + """Reads json values from files into Beam rows. + + For more information about possible arguments, see + + + Args: + path (str): The file path to read from as a local file path or a + GCS ``gs://`` path. The path can contain glob + characters such as ``*`` and ``?``. + """ + return ReadFromJson(path=path, **kwargs) + + +def write_to_json(path: str, **kwargs): + """Writes Beam rows as json values to files. + + For more information about possible arguments, see + + + Args: + path (str): The file path to write to as a local file path or a + GCS ``gs://`` path. The files written will + begin with this prefix, followed by a shard identifier. + """ + return WriteToJson(path=path, **kwargs) + + +def read_from_parquet(path: str, **kwargs): + """Read a set of Parquet files defined by a given file pattern. + + For more information about possible arguments, see + + + Args: + path (str): The file path to read from as a local file path or a + GCS ``gs://`` path. + """ + return ReadFromParquet(file_pattern=path, as_rows=True, **kwargs) + + +def write_to_parquet(path: str, **kwargs): + """Writes parquet files from a collection of Beam rows. + + For more information about possible arguments, see + + + Args: + path (str): The file path to write to as a local file path or a + GCS ``gs://`` path. The files written will begin + with this prefix, followed by a shard identifier, and + end in a common extension. + """ + return WriteToParquet(file_path_prefix=path, **kwargs) + + +def read_from_avro(path: str, **kwargs): + """Reads records from avro files into Beam rows. + + Records that are of simple types will be + mapped to beam Rows with a single `record` field containing the records + value. Records that are of Avro type ``RECORD`` will be mapped to Beam rows + that comply with the schema contained in the Avro file that contains those + records. + + For more information about possible arguments, see + + + Args: + path (str): The file path to read from as a local file path or a + GCS ``gs://`` path. + """ + return ReadFromAvro(file_pattern=path, as_rows=True, **kwargs) + + +def write_to_avro(path: str, **kwargs): + """Writes avro records into files from a collection of Beam rows. + + The avro schema will be automatically generated and used to write the + output records. + + For more information about possible arguments, see + + + Args: + path (str): The file path to write to as a local file path or a + GCS ``gs://`` path. The files written will begin + with this prefix, followed by a shard identifier, and + end in a common extension. + """ + return WriteToAvro(file_path_prefix=path, **kwargs) + + def read_from_bigquery( *, table: Optional[str] = None, From 7649b0141176fffc51b518f162f613746e726bf2 Mon Sep 17 00:00:00 2001 From: Jeffrey Kinard Date: Wed, 27 Nov 2024 15:00:07 -0500 Subject: [PATCH 2/5] fix errors Signed-off-by: Jeffrey Kinard --- sdks/python/apache_beam/yaml/yaml_io.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_io.py b/sdks/python/apache_beam/yaml/yaml_io.py index c9edfae2066b..d58bd1ef71f8 100644 --- a/sdks/python/apache_beam/yaml/yaml_io.py +++ b/sdks/python/apache_beam/yaml/yaml_io.py @@ -59,7 +59,7 @@ def read_from_text( path: str, - delimiter: str = None, + delimiter: Optional[str] = None, ): # TODO(yaml): Consider passing the filename and offset, possibly even # by default. @@ -84,7 +84,8 @@ def read_from_text( characters such as ``*`` and ``?``. """ return beam_io.ReadFromText( - path=path, delimiter=delimiter) | beam.Map(lambda s: beam.Row(line=s)) + file_pattern=path, + delimiter=delimiter) | beam.Map(lambda s: beam.Row(line=s)) @beam.ptransform_fn @@ -126,7 +127,10 @@ def write_to_text(pcoll, path: str): def read_from_csv( - path: str, comment: str = None, delimiter: str = None, **kwargs): + path: str, + comment: Optional[str] = None, + delimiter: Optional[str] = None, + **kwargs): """Reads comma-separated values (csv) files into Beam rows. For more information about possible arguments, see @@ -145,7 +149,7 @@ def read_from_csv( return ReadFromCsv(path=path, comment=comment, sep=delimiter, **kwargs) -def write_to_csv(path: str, delimiter: str = None, **kwargs): +def write_to_csv(path: str, delimiter: Optional[str] = None, **kwargs): """Writes Beam rows to a (set of) comma-separated values (csv) files. For more information about possible arguments, see From 103e1e86325dfaed5310904fc0e2447f444a02d9 Mon Sep 17 00:00:00 2001 From: Jeffrey Kinard Date: Wed, 27 Nov 2024 18:46:52 -0500 Subject: [PATCH 3/5] fix test failures Signed-off-by: Jeffrey Kinard --- sdks/python/apache_beam/yaml/integration_tests.py | 2 +- sdks/python/apache_beam/yaml/yaml_io.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/yaml/integration_tests.py b/sdks/python/apache_beam/yaml/integration_tests.py index 72b3918195da..fd9f1cc7756f 100644 --- a/sdks/python/apache_beam/yaml/integration_tests.py +++ b/sdks/python/apache_beam/yaml/integration_tests.py @@ -191,7 +191,7 @@ def parse_test_files(filepattern): logging.getLogger().setLevel(logging.INFO) -parse_test_files(os.path.join(os.path.dirname(__file__), 'tests', '*.yaml')) +parse_test_files(os.path.join(os.path.dirname(__file__), 'tests', 'csv.yaml')) if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) diff --git a/sdks/python/apache_beam/yaml/yaml_io.py b/sdks/python/apache_beam/yaml/yaml_io.py index d58bd1ef71f8..8a23d01588bf 100644 --- a/sdks/python/apache_beam/yaml/yaml_io.py +++ b/sdks/python/apache_beam/yaml/yaml_io.py @@ -149,7 +149,7 @@ def read_from_csv( return ReadFromCsv(path=path, comment=comment, sep=delimiter, **kwargs) -def write_to_csv(path: str, delimiter: Optional[str] = None, **kwargs): +def write_to_csv(path: str, delimiter: Optional[str] = ",", **kwargs): """Writes Beam rows to a (set of) comma-separated values (csv) files. For more information about possible arguments, see From c90624ed688c639a545442164b197e16f8b50148 Mon Sep 17 00:00:00 2001 From: Jeffrey Kinard Date: Sun, 1 Dec 2024 21:13:45 -0500 Subject: [PATCH 4/5] fix defaults Signed-off-by: Jeffrey Kinard --- .../apache_beam/yaml/integration_tests.py | 2 +- sdks/python/apache_beam/yaml/yaml_io.py | 26 +++++++++---------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/sdks/python/apache_beam/yaml/integration_tests.py b/sdks/python/apache_beam/yaml/integration_tests.py index fd9f1cc7756f..72b3918195da 100644 --- a/sdks/python/apache_beam/yaml/integration_tests.py +++ b/sdks/python/apache_beam/yaml/integration_tests.py @@ -191,7 +191,7 @@ def parse_test_files(filepattern): logging.getLogger().setLevel(logging.INFO) -parse_test_files(os.path.join(os.path.dirname(__file__), 'tests', 'csv.yaml')) +parse_test_files(os.path.join(os.path.dirname(__file__), 'tests', '*.yaml')) if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) diff --git a/sdks/python/apache_beam/yaml/yaml_io.py b/sdks/python/apache_beam/yaml/yaml_io.py index 8a23d01588bf..d95c25214942 100644 --- a/sdks/python/apache_beam/yaml/yaml_io.py +++ b/sdks/python/apache_beam/yaml/yaml_io.py @@ -66,21 +66,21 @@ def read_from_text( """Reads lines from text files into Beam rows. - The resulting collection consists of Beam rows, each with a single string - field named "line." + The resulting collection consists of Beam rows, each with a single string + field named "line." - For example, each Beam row will look like: :: + For example, each Beam row will look like: :: - line: "a single line of text from source" + line: "a single line of text from source" - Parses a text file as newline-delimited elements, by default assuming - ``UTF-8`` encoding. Supports newline delimiters ``\n`` and ``\r\n`` - or specified delimiter. + Parses a text file as newline-delimited elements, by default assuming + ``UTF-8`` encoding. Supports newline delimiters ``\n`` and ``\r\n`` + or specified delimiter. Args: delimiter (str): Delimiter to split records. path (str): The file path to read from as a local file path or a - GCS ``gs://`` path. The path can contain glob + GCS ``gs://`` path. The path can contain glob characters such as ``*`` and ``?``. """ return beam_io.ReadFromText( @@ -146,7 +146,7 @@ def read_from_csv( GCS ``gs://`` path. The path can contain glob characters such as ``*`` and ``?``. """ - return ReadFromCsv(path=path, comment=comment, sep=delimiter, **kwargs) + return ReadFromCsv(path=path, comment=comment, delimiter=delimiter, **kwargs) def write_to_csv(path: str, delimiter: Optional[str] = ",", **kwargs): @@ -192,7 +192,7 @@ def write_to_json(path: str, **kwargs): return WriteToJson(path=path, **kwargs) -def read_from_parquet(path: str, **kwargs): +def read_from_parquet(path: str, as_rows: bool = True, **kwargs): """Read a set of Parquet files defined by a given file pattern. For more information about possible arguments, see @@ -202,7 +202,7 @@ def read_from_parquet(path: str, **kwargs): path (str): The file path to read from as a local file path or a GCS ``gs://`` path. """ - return ReadFromParquet(file_pattern=path, as_rows=True, **kwargs) + return ReadFromParquet(file_pattern=path, as_rows=as_rows, **kwargs) def write_to_parquet(path: str, **kwargs): @@ -220,7 +220,7 @@ def write_to_parquet(path: str, **kwargs): return WriteToParquet(file_path_prefix=path, **kwargs) -def read_from_avro(path: str, **kwargs): +def read_from_avro(path: str, as_rows: bool = True, **kwargs): """Reads records from avro files into Beam rows. Records that are of simple types will be @@ -236,7 +236,7 @@ def read_from_avro(path: str, **kwargs): path (str): The file path to read from as a local file path or a GCS ``gs://`` path. """ - return ReadFromAvro(file_pattern=path, as_rows=True, **kwargs) + return ReadFromAvro(file_pattern=path, as_rows=as_rows, **kwargs) def write_to_avro(path: str, **kwargs): From 9303b5a9c49feb3dce92ec895861d01a42344d49 Mon Sep 17 00:00:00 2001 From: Jeffrey Kinard Date: Fri, 20 Dec 2024 15:43:27 -0500 Subject: [PATCH 5/5] address some initial comments Signed-off-by: Jeffrey Kinard --- sdks/python/apache_beam/yaml/generate_yaml_docs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/yaml/generate_yaml_docs.py b/sdks/python/apache_beam/yaml/generate_yaml_docs.py index 65453c9f2a36..952014b2779e 100644 --- a/sdks/python/apache_beam/yaml/generate_yaml_docs.py +++ b/sdks/python/apache_beam/yaml/generate_yaml_docs.py @@ -200,9 +200,9 @@ def io_grouping_key(transform_name): # Exclude providers SKIP = {} + def normalize_beam_version(desc): - return desc.replace( - 'BEAM_VERSION', 'current' if '.dev' in beam_version else beam_version) + return desc.replace('BEAM_VERSION', beam_version) def transform_docs(transform_base, transforms, providers, extra_docs=''):