diff --git a/sdks/python/apache_beam/yaml/generate_yaml_docs.py b/sdks/python/apache_beam/yaml/generate_yaml_docs.py index 2123c7a9f202..ae831d83e3a0 100644 --- a/sdks/python/apache_beam/yaml/generate_yaml_docs.py +++ b/sdks/python/apache_beam/yaml/generate_yaml_docs.py @@ -86,10 +86,16 @@ def _fake_value(name, beam_type): raise ValueError(f"Unrecognized type_info: {type_info!r}") +EXCLUDE_ARGS = ['args', 'kwargs'] + + def _fake_row(schema): if schema is None: return '...' - return {f.name: _fake_value(f.name, f.type) for f in schema.fields} + return { + f.name: _fake_value(f.name, f.type) + for f in schema.fields if f.name not in EXCLUDE_ARGS + } def pretty_example(provider, t, base_t=None): @@ -160,13 +166,14 @@ def normalize_error_handling(f): def lines(): for f in schema.fields: - f = normalize_error_handling(f) - yield ''.join([ - f'**{f.name}** `{pretty_type(f.type)}`', - maybe_optional(f.type), - indent(': ' + f.description if f.description else '', 2), - maybe_row_parameters(f.type), - ]) + if f.name not in EXCLUDE_ARGS: + f = normalize_error_handling(f) + yield ''.join([ + f'**{f.name}** `{pretty_type(f.type)}`', + maybe_optional(f.type), + indent(': ' + f.description if f.description else '', 2), + maybe_row_parameters(f.type), + ]) return '\n\n'.join('*' + indent(line, 2) for line in lines()).strip() @@ -189,6 +196,11 @@ def io_grouping_key(transform_name): return 0, transform_name +def normalize_beam_version(desc): + return desc.replace( + 'BEAM_VERSION', 'current' if '.dev' in beam_version else beam_version) + + SKIP = [ 'Combine', 'Filter', @@ -200,9 +212,10 @@ def transform_docs(transform_base, transforms, providers, extra_docs=''): return '\n'.join([ f'## {transform_base}', '', - longest( - lambda t: longest(lambda p: p.description(t), providers[t]), - transforms).replace('::\n', '\n\n :::yaml\n'), + normalize_beam_version( + longest( + lambda t: longest(lambda p: p.description(t), providers[t]), + transforms).replace('::\n', '\n\n :::yaml\n')), '', extra_docs, '', diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index 269c14e17baa..7f05e5ff1d88 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -22,6 +22,7 @@ # should be kept in sync. # TODO(yaml): See if this can be enforced programmatically. +# BigQueryIO Java - type: renaming transforms: 'ReadFromBigQuery': 'ReadFromBigQuery' @@ -48,6 +49,7 @@ config: gradle_target: 'sdks:java:extensions:sql:expansion-service:shadowJar' +# KafkaIO Java - type: renaming transforms: 'ReadFromKafka': 'ReadFromKafka' @@ -83,6 +85,7 @@ config: gradle_target: 'sdks:java:io:expansion-service:shadowJar' +# PubSubLite Java - type: renaming transforms: 'ReadFromPubSubLite': 'ReadFromPubSubLite' @@ -120,69 +123,30 @@ config: gradle_target: 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar' +# BigQueryIO Python - type: python transforms: 'ReadFromBigQuery': 'apache_beam.yaml.yaml_io.read_from_bigquery' # Disable until https://github.com/apache/beam/issues/28162 is resolved. # 'WriteToBigQuery': 'apache_beam.yaml.yaml_io.write_to_bigquery' + +# FileIO Python +- type: python + transforms: 'ReadFromText': 'apache_beam.yaml.yaml_io.read_from_text' 'WriteToText': 'apache_beam.yaml.yaml_io.write_to_text' 'ReadFromPubSub': 'apache_beam.yaml.yaml_io.read_from_pubsub' 'WriteToPubSub': 'apache_beam.yaml.yaml_io.write_to_pubsub' + 'ReadFromCsv': 'apache_beam.yaml.yaml_io.read_from_csv' + 'WriteToCsv': 'apache_beam.yaml.yaml_io.write_to_csv' + 'ReadFromJson': 'apache_beam.yaml.yaml_io.read_from_json' + 'WriteToJson': 'apache_beam.yaml.yaml_io.write_to_json' + 'ReadFromParquet': 'apache_beam.yaml.yaml_io.read_from_parquet' + 'WriteToParquet': 'apache_beam.yaml.yaml_io.write_to_parquet' + 'ReadFromAvro': 'apache_beam.yaml.yaml_io.read_from_avro' + 'WriteToAvro': 'apache_beam.yaml.yaml_io.write_to_avro' -# Declared as a renaming transform to avoid exposing all -# (implementation-specific) pandas arguments and aligning with possible Java -# implementation. -# Invoking these directly as a PyTransform is still an option for anyone wanting -# to use these power-features in a language-dependent manner. -- type: renaming - transforms: - 'ReadFromCsv': 'ReadFromCsv' - 'WriteToCsv': 'WriteToCsv' - 'ReadFromJson': 'ReadFromJson' - 'WriteToJson': 'WriteToJson' - 'ReadFromParquet': 'ReadFromParquet' - 'WriteToParquet': 'WriteToParquet' - 'ReadFromAvro': 'ReadFromAvro' - 'WriteToAvro': 'WriteToAvro' - config: - mappings: - 'ReadFromCsv': - path: 'path' - delimiter: 'sep' - comment: 'comment' - 'WriteToCsv': - path: 'path' - delimiter: 'sep' - 'ReadFromJson': - path: 'path' - 'WriteToJson': - path: 'path' - 'ReadFromParquet': - path: 'file_pattern' - 'WriteToParquet': - path: 'file_path_prefix' - 'ReadFromAvro': - path: 'file_pattern' - 'WriteToAvro': - path: 'file_path_prefix' - defaults: - 'ReadFromParquet': - as_rows: True - 'ReadFromAvro': - as_rows: True - underlying_provider: - type: python - transforms: - 'ReadFromCsv': 'apache_beam.io.ReadFromCsv' - 'WriteToCsv': 'apache_beam.io.WriteToCsv' - 'ReadFromJson': 'apache_beam.io.ReadFromJson' - 'WriteToJson': 'apache_beam.io.WriteToJson' - 'ReadFromParquet': 'apache_beam.io.ReadFromParquet' - 'WriteToParquet': 'apache_beam.io.WriteToParquet' - 'ReadFromAvro': 'apache_beam.io.ReadFromAvro' - 'WriteToAvro': 'apache_beam.io.WriteToAvro' - +# FileIO Java - type: beamJar transforms: 'WriteToCsv': 'beam:schematransform:org.apache.beam:csv_write:v1' @@ -190,6 +154,7 @@ config: gradle_target: 'sdks:java:extensions:schemaio-expansion-service:shadowJar' +# JdbcIO Java - type: renaming transforms: 'ReadFromJdbc': 'ReadFromJdbc' @@ -258,6 +223,7 @@ config: gradle_target: 'sdks:java:extensions:schemaio-expansion-service:shadowJar' +# SpannerIO Java - type: renaming transforms: 'ReadFromSpanner': 'ReadFromSpanner' diff --git a/sdks/python/apache_beam/yaml/yaml_io.py b/sdks/python/apache_beam/yaml/yaml_io.py index a6525aef9877..c9edfae2066b 100644 --- a/sdks/python/apache_beam/yaml/yaml_io.py +++ b/sdks/python/apache_beam/yaml/yaml_io.py @@ -38,8 +38,16 @@ import apache_beam as beam import apache_beam.io as beam_io +from apache_beam.io import ReadFromAvro from apache_beam.io import ReadFromBigQuery +from apache_beam.io import ReadFromCsv +from apache_beam.io import ReadFromJson +from apache_beam.io import ReadFromParquet +from apache_beam.io import WriteToAvro from apache_beam.io import WriteToBigQuery +from apache_beam.io import WriteToCsv +from apache_beam.io import WriteToJson +from apache_beam.io import WriteToParquet from apache_beam.io import avroio from apache_beam.io.gcp.bigquery import BigQueryDisposition from apache_beam.portability.api import schema_pb2 @@ -49,30 +57,55 @@ from apache_beam.yaml import yaml_provider -def read_from_text(path: str): +def read_from_text( + path: str, + delimiter: str = None, +): # TODO(yaml): Consider passing the filename and offset, possibly even # by default. - """Reads lines from a text files. + """Reads lines from text files into Beam rows. - The resulting PCollection consists of rows with a single string filed named - "line." + The resulting collection consists of Beam rows, each with a single string + field named "line." + + For example, each Beam row will look like: :: + + line: "a single line of text from source" + + Parses a text file as newline-delimited elements, by default assuming + ``UTF-8`` encoding. Supports newline delimiters ``\n`` and ``\r\n`` + or specified delimiter. Args: - path (str): The file path to read from. The path can contain glob + delimiter (str): Delimiter to split records. + path (str): The file path to read from as a local file path or a + GCS ``gs://`` path. The path can contain glob characters such as ``*`` and ``?``. """ - return beam_io.ReadFromText(path) | beam.Map(lambda s: beam.Row(line=s)) + return beam_io.ReadFromText( + path=path, delimiter=delimiter) | beam.Map(lambda s: beam.Row(line=s)) @beam.ptransform_fn def write_to_text(pcoll, path: str): - """Writes a PCollection to a (set of) text files(s). + """Writes Beam rows to a (set of) text files(s). + + The input must be a beam Row whose schema has exactly one field. + + For example: :: + + key: "text to write" - The input must be a PCollection whose schema has exactly one field. + where `key` can be any name. The idea is that the previous transform should be + outputting a collection of Beam rows that have only one field which is the + text to be written. + + See MapToFields for guidance on how to drop and map rows to this format. Args: - path (str): The file path to write to. The files written will + path (str): The file path to write to as a local file path or a + GCS ``gs://`` path. The files written will begin with this prefix, followed by a shard identifier. """ try: @@ -92,6 +125,134 @@ def write_to_text(pcoll, path: str): lambda x: str(getattr(x, sole_field_name))) | beam.io.WriteToText(path) +def read_from_csv( + path: str, comment: str = None, delimiter: str = None, **kwargs): + """Reads comma-separated values (csv) files into Beam rows. + + For more information about possible arguments, see + + + Args: + delimiter (str): Character or regex pattern to treat as the delimiter, + default ',' (comma). + comment (str): Character indicating that the remainder of line should + not be parsed. If found at the beginning of a line, the line will be + ignored altogether. This parameter must be a single character. + path (str): The file path to read from as a local file path or a + GCS ``gs://`` path. The path can contain glob + characters such as ``*`` and ``?``. + """ + return ReadFromCsv(path=path, comment=comment, sep=delimiter, **kwargs) + + +def write_to_csv(path: str, delimiter: str = None, **kwargs): + """Writes Beam rows to a (set of) comma-separated values (csv) files. + + For more information about possible arguments, see + + + Args: + delimiter (str): Character or regex pattern to treat as the delimiter. + path (str): The file path to write to as a local file path or a + GCS ``gs://`` path. The files written will + begin with this prefix, followed by a shard identifier. + """ + return WriteToCsv(path=path, sep=delimiter, **kwargs) + + +def read_from_json(path: str, **kwargs): + """Reads json values from files into Beam rows. + + For more information about possible arguments, see + + + Args: + path (str): The file path to read from as a local file path or a + GCS ``gs://`` path. The path can contain glob + characters such as ``*`` and ``?``. + """ + return ReadFromJson(path=path, **kwargs) + + +def write_to_json(path: str, **kwargs): + """Writes Beam rows as json values to files. + + For more information about possible arguments, see + + + Args: + path (str): The file path to write to as a local file path or a + GCS ``gs://`` path. The files written will + begin with this prefix, followed by a shard identifier. + """ + return WriteToJson(path=path, **kwargs) + + +def read_from_parquet(path: str, **kwargs): + """Read a set of Parquet files defined by a given file pattern. + + For more information about possible arguments, see + + + Args: + path (str): The file path to read from as a local file path or a + GCS ``gs://`` path. + """ + return ReadFromParquet(file_pattern=path, as_rows=True, **kwargs) + + +def write_to_parquet(path: str, **kwargs): + """Writes parquet files from a collection of Beam rows. + + For more information about possible arguments, see + + + Args: + path (str): The file path to write to as a local file path or a + GCS ``gs://`` path. The files written will begin + with this prefix, followed by a shard identifier, and + end in a common extension. + """ + return WriteToParquet(file_path_prefix=path, **kwargs) + + +def read_from_avro(path: str, **kwargs): + """Reads records from avro files into Beam rows. + + Records that are of simple types will be + mapped to beam Rows with a single `record` field containing the records + value. Records that are of Avro type ``RECORD`` will be mapped to Beam rows + that comply with the schema contained in the Avro file that contains those + records. + + For more information about possible arguments, see + + + Args: + path (str): The file path to read from as a local file path or a + GCS ``gs://`` path. + """ + return ReadFromAvro(file_pattern=path, as_rows=True, **kwargs) + + +def write_to_avro(path: str, **kwargs): + """Writes avro records into files from a collection of Beam rows. + + The avro schema will be automatically generated and used to write the + output records. + + For more information about possible arguments, see + + + Args: + path (str): The file path to write to as a local file path or a + GCS ``gs://`` path. The files written will begin + with this prefix, followed by a shard identifier, and + end in a common extension. + """ + return WriteToAvro(file_path_prefix=path, **kwargs) + + def read_from_bigquery( *, table: Optional[str] = None,