Skip to content

Commit

Permalink
Merge pull request #29687 [YAML] Better IO documentation.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb authored Dec 20, 2023
2 parents fa11c0f + 47ccb8a commit 192f765
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 19 deletions.
16 changes: 13 additions & 3 deletions sdks/python/apache_beam/io/avroio.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,15 @@


class ReadFromAvro(PTransform):
"""A :class:`~apache_beam.transforms.ptransform.PTransform` for reading avro
files."""
"""A `PTransform` for reading records from avro files.
Each record of the resulting PCollection will contain
a single record read from a source. Records that are of simple types will be
mapped to beam Rows with a single `record` field containing the records
value. Records that are of Avro type ``RECORD`` will be mapped to Beam rows
that comply with the schema contained in the Avro file that contains those
records.
"""
def __init__(
self,
file_pattern=None,
Expand Down Expand Up @@ -355,7 +362,10 @@ def split_points_unclaimed(stop_position):


class WriteToAvro(beam.transforms.PTransform):
"""A ``PTransform`` for writing avro files."""
"""A ``PTransform`` for writing avro files.
If the input has a schema, a corresponding avro schema will be automatically
generated and used to write the output records."""
def __init__(
self,
file_path_prefix,
Expand Down
7 changes: 1 addition & 6 deletions sdks/python/apache_beam/io/parquetio.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,7 @@ def display_data(self):


class ReadFromParquet(PTransform):
"""A :class:`~apache_beam.transforms.ptransform.PTransform` for reading
Parquet files as a `PCollection` of dictionaries. This `PTransform` is
currently experimental. No backward-compatibility guarantees."""
"""A `PTransform` for reading Parquet files."""
def __init__(
self,
file_pattern=None,
Expand Down Expand Up @@ -465,9 +463,6 @@ def split_points_unclaimed(stop_position):

class WriteToParquet(PTransform):
"""A ``PTransform`` for writing parquet files.
This ``PTransform`` is currently experimental. No backward-compatibility
guarantees.
"""
def __init__(
self,
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/yaml/generate_yaml_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def lines():
yield ''.join([
f'**{f.name}** `{pretty_type(f.type)}`',
maybe_optional(f.type),
': ' + f.description if f.description else '',
indent(': ' + f.description if f.description else '', 2),
maybe_row_parameters(f.type),
])

Expand Down
8 changes: 0 additions & 8 deletions sdks/python/apache_beam/yaml/standard_io.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,5 @@
transforms:
'ReadFromJdbc': 'beam:schematransform:org.apache.beam:jdbc_read:v1'
'WriteToJdbc': 'beam:schematransform:org.apache.beam:jdbc_write:v1'
'ReadFromMySql': 'beam:schematransform:org.apache.beam:jdbc_read:v1'
'WriteToMySql': 'beam:schematransform:org.apache.beam:jdbc_write:v1'
'ReadFromPostgres': 'beam:schematransform:org.apache.beam:jdbc_read:v1'
'WriteToPostgres': 'beam:schematransform:org.apache.beam:jdbc_write:v1'
'ReadFromOracle': 'beam:schematransform:org.apache.beam:jdbc_read:v1'
'WriteToOracle': 'beam:schematransform:org.apache.beam:jdbc_write:v1'
'ReadFromSqlServer': 'beam:schematransform:org.apache.beam:jdbc_read:v1'
'WriteToSqlServer': 'beam:schematransform:org.apache.beam:jdbc_write:v1'
config:
gradle_target: 'sdks:java:extensions:schemaio-expansion-service:shadowJar'
24 changes: 24 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,29 @@
def read_from_text(path: str):
# TODO(yaml): Consider passing the filename and offset, possibly even
# by default.

"""Reads lines from a text files.
The resulting PCollection consists of rows with a single string filed named
"line."
Args:
path (str): The file path to read from. The path can contain glob
characters such as ``*`` and ``?``.
"""
return beam_io.ReadFromText(path) | beam.Map(lambda s: beam.Row(line=s))


@beam.ptransform_fn
def write_to_text(pcoll, path: str):
"""Writes a PCollection to a (set of) text files(s).
The input must be a PCollection whose schema has exactly one field.
Args:
path (str): The file path to write to. The files written will
begin with this prefix, followed by a shard identifier.
"""
try:
field_names = [
name for name,
Expand All @@ -76,6 +94,11 @@ def write_to_text(pcoll, path: str):

def read_from_bigquery(
query=None, table=None, row_restriction=None, fields=None):
"""Reads data from BigQuery.
Exactly one of table or query must be set.
If query is set, neither row_restriction nor fields should be set.
"""
if query is None:
assert table is not None
else:
Expand All @@ -95,6 +118,7 @@ def write_to_bigquery(
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_APPEND,
error_handling=None):
"""Writes data to a BigQuery table."""
class WriteToBigQueryHandlingErrors(beam.PTransform):
def default_label(self):
return 'WriteToBigQuery'
Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/yaml/yaml_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,8 @@ def empty_if_none(s):

docs = self.get_docs(typ)
return (
empty_if_none(docs.short_description) + '\n\n' +
empty_if_none(docs.short_description) +
('\n\n' if docs.blank_after_short_description else '\n') +
empty_if_none(docs.long_description)).strip() or None

def get_docs(self, typ):
Expand Down

0 comments on commit 192f765

Please sign in to comment.