Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[yaml] add FileIO docs #33185

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions sdks/python/apache_beam/yaml/generate_yaml_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,16 @@ def _fake_value(name, beam_type):
raise ValueError(f"Unrecognized type_info: {type_info!r}")


EXCLUDE_ARGS = ['args', 'kwargs']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't always filter these out, e.g. https://beam.apache.org/releases/yamldoc/current/#pytransform

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah.. right.. let me rethink



def _fake_row(schema):
if schema is None:
return '...'
return {f.name: _fake_value(f.name, f.type) for f in schema.fields}
return {
f.name: _fake_value(f.name, f.type)
for f in schema.fields if f.name not in EXCLUDE_ARGS
}


def pretty_example(provider, t, base_t=None):
Expand Down Expand Up @@ -161,13 +167,14 @@ def normalize_error_handling(f):

def lines():
for f in schema.fields:
f = normalize_error_handling(f)
yield ''.join([
f'**{f.name}** `{pretty_type(f.type)}`',
maybe_optional(f.type),
indent(': ' + f.description if f.description else '', 2),
maybe_row_parameters(f.type),
])
if f.name not in EXCLUDE_ARGS:
f = normalize_error_handling(f)
yield ''.join([
f'**{f.name}** `{pretty_type(f.type)}`',
maybe_optional(f.type),
indent(': ' + f.description if f.description else '', 2),
maybe_row_parameters(f.type),
])

return '\n\n'.join('*' + indent(line, 2) for line in lines()).strip()

Expand All @@ -194,13 +201,18 @@ def io_grouping_key(transform_name):
SKIP = {}


def normalize_beam_version(desc):
return desc.replace('BEAM_VERSION', beam_version)


def transform_docs(transform_base, transforms, providers, extra_docs=''):
return '\n'.join([
f'## {transform_base}',
'',
longest(
lambda t: longest(lambda p: p.description(t), providers[t]),
transforms).replace('::\n', '\n\n :::yaml\n'),
normalize_beam_version(
longest(
lambda t: longest(lambda p: p.description(t), providers[t]),
transforms).replace('::\n', '\n\n :::yaml\n')),
'',
extra_docs,
'',
Expand Down
72 changes: 19 additions & 53 deletions sdks/python/apache_beam/yaml/standard_io.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
# should be kept in sync.
# TODO(yaml): See if this can be enforced programmatically.

# BigQueryIO Java
- type: renaming
transforms:
'ReadFromBigQuery': 'ReadFromBigQuery'
Expand All @@ -48,6 +49,7 @@
config:
gradle_target: 'sdks:java:extensions:sql:expansion-service:shadowJar'

# KafkaIO Java
- type: renaming
transforms:
'ReadFromKafka': 'ReadFromKafka'
Expand Down Expand Up @@ -83,6 +85,7 @@
config:
gradle_target: 'sdks:java:io:expansion-service:shadowJar'

# PubSubLite Java
- type: renaming
transforms:
'ReadFromPubSubLite': 'ReadFromPubSubLite'
Expand Down Expand Up @@ -120,76 +123,38 @@
config:
gradle_target: 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar'

# BigQueryIO Python
- type: python
transforms:
'ReadFromBigQuery': 'apache_beam.yaml.yaml_io.read_from_bigquery'
# Disable until https://github.com/apache/beam/issues/28162 is resolved.
# 'WriteToBigQuery': 'apache_beam.yaml.yaml_io.write_to_bigquery'

# FileIO Python
- type: python
transforms:
'ReadFromText': 'apache_beam.yaml.yaml_io.read_from_text'
'WriteToText': 'apache_beam.yaml.yaml_io.write_to_text'
'ReadFromPubSub': 'apache_beam.yaml.yaml_io.read_from_pubsub'
'WriteToPubSub': 'apache_beam.yaml.yaml_io.write_to_pubsub'
'ReadFromCsv': 'apache_beam.yaml.yaml_io.read_from_csv'
'WriteToCsv': 'apache_beam.yaml.yaml_io.write_to_csv'
'ReadFromJson': 'apache_beam.yaml.yaml_io.read_from_json'
'WriteToJson': 'apache_beam.yaml.yaml_io.write_to_json'
'ReadFromParquet': 'apache_beam.yaml.yaml_io.read_from_parquet'
'WriteToParquet': 'apache_beam.yaml.yaml_io.write_to_parquet'
'ReadFromAvro': 'apache_beam.yaml.yaml_io.read_from_avro'
'WriteToAvro': 'apache_beam.yaml.yaml_io.write_to_avro'

# Declared as a renaming transform to avoid exposing all
# (implementation-specific) pandas arguments and aligning with possible Java
# implementation.
# Invoking these directly as a PyTransform is still an option for anyone wanting
# to use these power-features in a language-dependent manner.
- type: renaming
transforms:
'ReadFromCsv': 'ReadFromCsv'
'WriteToCsv': 'WriteToCsv'
'ReadFromJson': 'ReadFromJson'
'WriteToJson': 'WriteToJson'
'ReadFromParquet': 'ReadFromParquet'
'WriteToParquet': 'WriteToParquet'
'ReadFromAvro': 'ReadFromAvro'
'WriteToAvro': 'WriteToAvro'
config:
mappings:
'ReadFromCsv':
path: 'path'
delimiter: 'sep'
comment: 'comment'
'WriteToCsv':
path: 'path'
delimiter: 'sep'
'ReadFromJson':
path: 'path'
'WriteToJson':
path: 'path'
'ReadFromParquet':
path: 'file_pattern'
'WriteToParquet':
path: 'file_path_prefix'
'ReadFromAvro':
path: 'file_pattern'
'WriteToAvro':
path: 'file_path_prefix'
defaults:
'ReadFromParquet':
as_rows: True
'ReadFromAvro':
as_rows: True
underlying_provider:
type: python
transforms:
'ReadFromCsv': 'apache_beam.io.ReadFromCsv'
'WriteToCsv': 'apache_beam.io.WriteToCsv'
'ReadFromJson': 'apache_beam.io.ReadFromJson'
'WriteToJson': 'apache_beam.io.WriteToJson'
'ReadFromParquet': 'apache_beam.io.ReadFromParquet'
'WriteToParquet': 'apache_beam.io.WriteToParquet'
'ReadFromAvro': 'apache_beam.io.ReadFromAvro'
'WriteToAvro': 'apache_beam.io.WriteToAvro'

# FileIO Java
- type: beamJar
transforms:
'WriteToCsv': 'beam:schematransform:org.apache.beam:csv_write:v1'
'WriteToJson': 'beam:schematransform:org.apache.beam:json_write:v1'
config:
gradle_target: 'sdks:java:extensions:schemaio-expansion-service:shadowJar'

# JdbcIO Java
- type: renaming
transforms:
'ReadFromJdbc': 'ReadFromJdbc'
Expand Down Expand Up @@ -259,6 +224,7 @@
config:
gradle_target: 'sdks:java:extensions:schemaio-expansion-service:shadowJar'

# SpannerIO Java
- type: renaming
transforms:
'ReadFromSpanner': 'ReadFromSpanner'
Expand Down
Loading
Loading