Skip to content

Commit

Permalink
Merge pull request #28850 [BEAM-28684] [YAML] Add Parquet reading and…
Browse files Browse the repository at this point in the history
… writing.
  • Loading branch information
robertwb authored Oct 6, 2023
2 parents cb80266 + d7db27e commit a6947eb
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 28 deletions.
99 changes: 75 additions & 24 deletions sdks/python/apache_beam/io/parquetio.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,34 @@
# pytype: skip-file

from functools import partial
from typing import Iterator

from packaging import version

from apache_beam.io import filebasedsink
from apache_beam.io import filebasedsource
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystems import FileSystems
from apache_beam.io.iobase import RangeTracker
from apache_beam.io.iobase import Read
from apache_beam.io.iobase import Write
from apache_beam.portability.api import schema_pb2
from apache_beam.transforms import DoFn
from apache_beam.transforms import ParDo
from apache_beam.transforms import PTransform
from apache_beam.transforms import window
from apache_beam.typehints import schemas

try:
import pyarrow as pa
import pyarrow.parquet as pq
# pylint: disable=ungrouped-imports
from apache_beam.typehints import arrow_type_compatibility
except ImportError:
pa = None
pq = None
ARROW_MAJOR_VERSION = None
arrow_type_compatibility = None
else:
base_pa_version = version.parse(pa.__version__).base_version
ARROW_MAJOR_VERSION, _, _ = map(int, base_pa_version.split('.'))
Expand Down Expand Up @@ -146,6 +153,24 @@ def _flush_buffer(self):
self._record_batches_byte_size = self._record_batches_byte_size + size


class _ArrowTableToBeamRows(DoFn):
def __init__(self, beam_type):
self._beam_type = beam_type

@DoFn.yields_batches
def process(self, element) -> Iterator[pa.Table]:
yield element

def infer_output_type(self, input_type):
return self._beam_type


class _BeamRowsToArrowTable(DoFn):
@DoFn.yields_elements
def process_batch(self, element: pa.Table) -> Iterator[pa.Table]:
yield element


class ReadFromParquetBatched(PTransform):
"""A :class:`~apache_beam.transforms.ptransform.PTransform` for reading
Parquet files as a `PCollection` of `pyarrow.Table`. This `PTransform` is
Expand Down Expand Up @@ -191,7 +216,7 @@ def __init__(
"""

super().__init__()
self._source = _create_parquet_source(
self._source = _ParquetSource(
file_pattern,
min_bundle_size,
validate=validate,
Expand All @@ -210,7 +235,12 @@ class ReadFromParquet(PTransform):
Parquet files as a `PCollection` of dictionaries. This `PTransform` is
currently experimental. No backward-compatibility guarantees."""
def __init__(
self, file_pattern=None, min_bundle_size=0, validate=True, columns=None):
self,
file_pattern=None,
min_bundle_size=0,
validate=True,
columns=None,
as_rows=False):
"""Initializes :class:`ReadFromParquet`.
Uses source ``_ParquetSource`` to read a set of Parquet files defined by
Expand Down Expand Up @@ -255,17 +285,38 @@ def __init__(
columns (List[str]): list of columns that will be read from files.
A column name may be a prefix of a nested field, e.g. 'a' will select
'a.b', 'a.c', and 'a.d.e'
as_rows (bool): whether to output a schema'd PCollection of Beam rows
rather than Python dictionaries.
"""
super().__init__()
self._source = _create_parquet_source(
self._source = _ParquetSource(
file_pattern,
min_bundle_size,
validate=validate,
columns=columns,
)
if as_rows:
if columns is None:
filter_schema = lambda schema: schema
else:
top_level_columns = set(c.split('.')[0] for c in columns)
filter_schema = lambda schema: schema_pb2.Schema(
fields=[f for f in schema.fields if f.name in top_level_columns])
path = FileSystems.match([file_pattern], [1])[0].metadata_list[0].path
with FileSystems.open(path) as fin:
self._schema = filter_schema(
arrow_type_compatibility.beam_schema_from_arrow_schema(
pq.read_schema(fin)))
else:
self._schema = None

def expand(self, pvalue):
return pvalue | Read(self._source) | ParDo(_ArrowTableToRowDictionaries())
arrow_batches = pvalue | Read(self._source)
if self._schema is None:
return arrow_batches | ParDo(_ArrowTableToRowDictionaries())
else:
return arrow_batches | ParDo(
_ArrowTableToBeamRows(schemas.named_tuple_from_schema(self._schema)))

def display_data(self):
return {'source_dd': self._source}
Expand Down Expand Up @@ -305,9 +356,7 @@ def __init__(
"""
super().__init__()
source_from_file = partial(
_create_parquet_source,
min_bundle_size=min_bundle_size,
columns=columns)
_ParquetSource, min_bundle_size=min_bundle_size, columns=columns)
self._read_all_files = filebasedsource.ReadAllFiles(
True,
CompressionTypes.UNCOMPRESSED,
Expand All @@ -333,17 +382,6 @@ def expand(self, pvalue):
_ArrowTableToRowDictionaries(), with_filename=self._with_filename)


def _create_parquet_source(
file_pattern=None, min_bundle_size=0, validate=False, columns=None):
return \
_ParquetSource(
file_pattern=file_pattern,
min_bundle_size=min_bundle_size,
validate=validate,
columns=columns,
)


class _ParquetUtils(object):
@staticmethod
def find_first_row_group_index(pf, start_offset):
Expand All @@ -370,7 +408,8 @@ def get_number_of_row_groups(pf):
class _ParquetSource(filebasedsource.FileBasedSource):
"""A source for reading Parquet files.
"""
def __init__(self, file_pattern, min_bundle_size, validate, columns):
def __init__(
self, file_pattern, min_bundle_size=0, validate=False, columns=None):
super().__init__(
file_pattern=file_pattern,
min_bundle_size=min_bundle_size,
Expand Down Expand Up @@ -421,6 +460,9 @@ def split_points_unclaimed(stop_position):
yield table


_create_parquet_source = _ParquetSource


class WriteToParquet(PTransform):
"""A ``PTransform`` for writing parquet files.
Expand All @@ -430,7 +472,7 @@ class WriteToParquet(PTransform):
def __init__(
self,
file_path_prefix,
schema,
schema=None,
row_group_buffer_size=64 * 1024 * 1024,
record_batch_size=1000,
codec='none',
Expand Down Expand Up @@ -534,10 +576,19 @@ def __init__(
)

def expand(self, pcoll):
return pcoll | ParDo(
_RowDictionariesToArrowTable(
self._schema, self._row_group_buffer_size,
self._record_batch_size)) | Write(self._sink)
if self._schema is None:
try:
beam_schema = schemas.schema_from_element_type(pcoll.element_type)
except TypeError as exn:
raise ValueError(
"A schema is required to write non-schema'd data.") from exn
self._sink._schema = (
arrow_type_compatibility.arrow_schema_from_beam_schema(beam_schema))
convert_fn = _BeamRowsToArrowTable()
else:
convert_fn = _RowDictionariesToArrowTable(
self._schema, self._row_group_buffer_size, self._record_batch_size)
return pcoll | ParDo(convert_fn) | Write(self._sink)

def display_data(self):
return {
Expand Down
16 changes: 16 additions & 0 deletions sdks/python/apache_beam/io/parquetio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from parameterized import param
from parameterized import parameterized

import apache_beam as beam
from apache_beam import Create
from apache_beam import Map
from apache_beam.io import filebasedsource
Expand Down Expand Up @@ -400,6 +401,21 @@ def test_sink_transform_compliant_nested_type(self):
assert_that(
readback, equal_to([json.dumps(r) for r in self.RECORDS_NESTED]))

def test_schema_read_write(self):
with TemporaryDirectory() as tmp_dirname:
path = os.path.join(tmp_dirname, 'tmp_filename')
rows = [beam.Row(a=1, b='x'), beam.Row(a=2, b='y')]
stable_repr = lambda row: json.dumps(row._asdict())
with TestPipeline() as p:
_ = p | Create(rows) | WriteToParquet(path) | beam.Map(print)
with TestPipeline() as p:
# json used for stable sortability
readback = (
p
| ReadFromParquet(path + '*', as_rows=True)
| Map(stable_repr))
assert_that(readback, equal_to([stable_repr(r) for r in rows]))

def test_batched_read(self):
with TemporaryDirectory() as tmp_dirname:
path = os.path.join(tmp_dirname + "tmp_filename")
Expand Down
6 changes: 3 additions & 3 deletions sdks/python/apache_beam/typehints/arrow_type_compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,9 @@ def from_typehints(element_type,
element_type = RowTypeConstraint.from_user_type(element_type)
if element_type is None:
raise TypeError(
"Element type must be compatible with Beam Schemas ("
"https://beam.apache.org/documentation/programming-guide/#schemas) "
"for batch type pa.Table.")
f"Element type {element_type} must be compatible with Beam Schemas "
"(https://beam.apache.org/documentation/programming-guide/#schemas)"
" for batch type pa.Table.")

return PyarrowBatchConverter(element_type)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class ArrowBatchConverterErrorsTest(unittest.TestCase):
(
pa.Table,
Any,
r'Element type must be compatible with Beam Schemas',
r'Element type .* must be compatible with Beam Schemas',
),
])
def test_construction_errors(
Expand Down
10 changes: 10 additions & 0 deletions sdks/python/apache_beam/yaml/standard_io.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
'WriteToCsv': 'WriteToCsv'
'ReadFromJson': 'ReadFromJson'
'WriteToJson': 'WriteToJson'
'ReadFromParquet': 'ReadFromParquet'
'WriteToParquet': 'WriteToParquet'
'ReadFromAvro': 'ReadFromAvro'
'WriteToAvro': 'WriteToAvro'
config:
Expand All @@ -79,11 +81,17 @@
path: 'path'
'WriteToJson':
path: 'path'
'ReadFromParquet':
path: 'file_pattern'
'WriteToParquet':
path: 'file_path_prefix'
'ReadFromAvro':
path: 'file_pattern'
'WriteToAvro':
path: 'file_path_prefix'
defaults:
'ReadFromParquet':
as_rows: True
'ReadFromAvro':
as_rows: True
underlying_provider:
Expand All @@ -93,5 +101,7 @@
'WriteToCsv': 'apache_beam.io.WriteToCsv'
'ReadFromJson': 'apache_beam.io.ReadFromJson'
'WriteToJson': 'apache_beam.io.WriteToJson'
'ReadFromParquet': 'apache_beam.io.ReadFromParquet'
'WriteToParquet': 'apache_beam.io.WriteToParquet'
'ReadFromAvro': 'apache_beam.io.ReadFromAvro'
'WriteToAvro': 'apache_beam.io.WriteToAvro'

0 comments on commit a6947eb

Please sign in to comment.