Skip to content

Commit

Permalink
Merge pull request #29080 Validate incoming JSON data from pubsub.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb authored Oct 20, 2023
2 parents abce1ad + b97c7de commit 879935e
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 12 deletions.
40 changes: 38 additions & 2 deletions sdks/python/apache_beam/yaml/json_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
from typing import Any
from typing import Callable
from typing import Dict
from typing import Optional

import jsonschema

import apache_beam as beam
from apache_beam.portability.api import schema_pb2
Expand Down Expand Up @@ -131,15 +134,48 @@ def json_to_row(beam_type: schema_pb2.FieldType) -> Callable[[Any], Any]:
raise ValueError(f"Unrecognized type_info: {type_info!r}")


def json_parser(beam_schema: schema_pb2.Schema) -> Callable[[bytes], beam.Row]:
def json_parser(
beam_schema: schema_pb2.Schema,
json_schema: Optional[Dict[str,
Any]] = None) -> Callable[[bytes], beam.Row]:
"""Returns a callable converting Json strings to Beam rows of the given type.
The input to the returned callable is expected to conform to the Json schema
corresponding to this Beam type.
"""
if json_schema is None:
validate_fn = None
else:
cls = jsonschema.validators.validator_for(json_schema)
cls.check_schema(json_schema)
validate_fn = _PicklableFromConstructor(
lambda: jsonschema.validators.validator_for(json_schema)
(json_schema).validate)

to_row = json_to_row(
schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=beam_schema)))
return lambda s: to_row(json.loads(s))

def parse(s: bytes):
o = json.loads(s)
if validate_fn is not None:
validate_fn(o)
return to_row(o)

return parse


class _PicklableFromConstructor:
def __init__(self, constructor):
self._constructor = constructor
self._value = None

def __call__(self, o):
if self._value is None:
self._value = self._constructor()
return self._value(o)

def __getstate__(self):
return {'_constructor': self._constructor, '_value': None}


def row_to_json(beam_type: schema_pb2.FieldType) -> Callable[[Any], Any]:
Expand Down
10 changes: 8 additions & 2 deletions sdks/python/apache_beam/yaml/yaml_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def _create_parser(
lambda payload: beam.Row(payload=payload))
elif format == 'json':
beam_schema = json_utils.json_schema_to_beam_schema(schema)
return beam_schema, json_utils.json_parser(beam_schema)
return beam_schema, json_utils.json_parser(beam_schema, schema)
elif format == 'avro':
beam_schema = avroio.avro_schema_to_beam_schema(schema)
covert_to_row = avroio.avro_dict_to_beam_row(schema, beam_schema)
Expand Down Expand Up @@ -215,6 +215,8 @@ def read_from_pubsub(
- raw: Produces records with a single `payload` field whose contents
are the raw bytes of the pubsub message.
- avro: Parses records with a given avro schema.
- json: Parses records with a given json schema.
schema: Schema specification for the given format.
attributes: List of attribute keys whose values will be flattened into the
Expand Down Expand Up @@ -309,8 +311,12 @@ def write_to_pubsub(
formats are
- raw: Expects a message with a single field (excluding
attribute-related fields )whose contents are used as the raw bytes
attribute-related fields) whose contents are used as the raw bytes
of the pubsub message.
- avro: Encodes records with a given avro schema, which may be inferred
from the input PCollection schema.
- json: Formats records with a given json schema, which may be inferred
from the input PCollection schema.
schema: Schema specification for the given format.
attributes: List of attribute keys whose values will be pulled out as
Expand Down
32 changes: 32 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_io_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,38 @@ def test_read_json_without_error_handling(self):
some_int: {type: integer}
''')

def test_read_json_with_bad_schema(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
with mock.patch('apache_beam.io.ReadFromPubSub',
FakeReadFromPubSub(
topic='my_topic',
messages=[PubsubMessage('{"some_int": 123}',
attributes={}),
PubsubMessage('{"some_int": "NOT"}',
attributes={})])):
result = p | YamlTransform(
'''
type: ReadFromPubSub
config:
topic: my_topic
format: json
schema:
type: object
properties:
some_int: {type: integer}
error_handling:
output: errors
''')
assert_that(
result['good'],
equal_to([beam.Row(some_int=123)]),
label='CheckGood')
assert_that(
result['errors'] | beam.Map(lambda error: error.element),
equal_to(['{"some_int": "NOT"}']),
label='CheckErrors')

def test_simple_write(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
Expand Down
8 changes: 6 additions & 2 deletions sdks/python/container/py310/base_image_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,24 @@ google-cloud-resource-manager==1.10.4
google-cloud-spanner==3.40.1
google-cloud-storage==2.12.0
google-cloud-videointelligence==2.11.4
google-cloud-vision==3.4.4
google-cloud-vision==3.4.5
google-crc32c==1.5.0
google-resumable-media==2.6.0
googleapis-common-protos==1.61.0
greenlet==3.0.0
grpc-google-iam-v1==0.12.6
grpcio==1.59.0
grpcio-status==1.59.0
guppy3==3.1.3
guppy3==3.1.4
hdfs==2.7.3
httplib2==0.22.0
hypothesis==6.88.1
idna==3.4
iniconfig==2.0.0
joblib==1.3.2
Js2Py==0.74
jsonschema==4.19.1
jsonschema-specifications==2023.7.1
mmh3==4.0.1
mock==5.1.0
nltk==3.8.1
Expand Down Expand Up @@ -116,9 +118,11 @@ python-dateutil==2.8.2
python-snappy==0.6.1
pytz==2023.3.post1
PyYAML==6.0.1
referencing==0.30.2
regex==2023.10.3
requests==2.31.0
requests-mock==1.11.0
rpds-py==0.10.6
rsa==4.9
scikit-learn==1.3.1
scipy==1.11.3
Expand Down
8 changes: 6 additions & 2 deletions sdks/python/container/py311/base_image_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,22 +63,24 @@ google-cloud-resource-manager==1.10.4
google-cloud-spanner==3.40.1
google-cloud-storage==2.12.0
google-cloud-videointelligence==2.11.4
google-cloud-vision==3.4.4
google-cloud-vision==3.4.5
google-crc32c==1.5.0
google-resumable-media==2.6.0
googleapis-common-protos==1.61.0
greenlet==3.0.0
grpc-google-iam-v1==0.12.6
grpcio==1.59.0
grpcio-status==1.59.0
guppy3==3.1.3
guppy3==3.1.4
hdfs==2.7.3
httplib2==0.22.0
hypothesis==6.88.1
idna==3.4
iniconfig==2.0.0
joblib==1.3.2
Js2Py==0.74
jsonschema==4.19.1
jsonschema-specifications==2023.7.1
mmh3==4.0.1
mock==5.1.0
nltk==3.8.1
Expand Down Expand Up @@ -112,9 +114,11 @@ pytest-xdist==3.3.1
python-dateutil==2.8.2
pytz==2023.3.post1
PyYAML==6.0.1
referencing==0.30.2
regex==2023.10.3
requests==2.31.0
requests-mock==1.11.0
rpds-py==0.10.6
rsa==4.9
scikit-learn==1.3.1
scipy==1.11.3
Expand Down
10 changes: 8 additions & 2 deletions sdks/python/container/py38/base_image_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -67,23 +67,26 @@ google-cloud-resource-manager==1.10.4
google-cloud-spanner==3.40.1
google-cloud-storage==2.12.0
google-cloud-videointelligence==2.11.4
google-cloud-vision==3.4.4
google-cloud-vision==3.4.5
google-crc32c==1.5.0
google-resumable-media==2.6.0
googleapis-common-protos==1.61.0
greenlet==3.0.0
grpc-google-iam-v1==0.12.6
grpcio==1.59.0
grpcio-status==1.59.0
guppy3==3.1.3
guppy3==3.1.4
hdfs==2.7.3
httplib2==0.22.0
hypothesis==6.88.1
idna==3.4
importlib-metadata==6.8.0
importlib-resources==6.1.0
iniconfig==2.0.0
joblib==1.3.2
Js2Py==0.74
jsonschema==4.19.1
jsonschema-specifications==2023.7.1
mmh3==4.0.1
mock==5.1.0
nltk==3.8.1
Expand All @@ -96,6 +99,7 @@ overrides==6.5.0
packaging==23.2
pandas==1.5.3
parameterized==0.9.0
pkgutil_resolve_name==1.3.10
pluggy==1.3.0
proto-plus==1.22.3
protobuf==4.24.4
Expand All @@ -118,9 +122,11 @@ python-dateutil==2.8.2
python-snappy==0.6.1
pytz==2023.3.post1
PyYAML==6.0.1
referencing==0.30.2
regex==2023.10.3
requests==2.31.0
requests-mock==1.11.0
rpds-py==0.10.6
rsa==4.9
scikit-learn==1.3.1
scipy==1.10.1
Expand Down
8 changes: 6 additions & 2 deletions sdks/python/container/py39/base_image_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ google-cloud-resource-manager==1.10.4
google-cloud-spanner==3.40.1
google-cloud-storage==2.12.0
google-cloud-videointelligence==2.11.4
google-cloud-vision==3.4.4
google-cloud-vision==3.4.5
google-crc32c==1.5.0
google-resumable-media==2.6.0
googleapis-common-protos==1.61.0
greenlet==3.0.0
grpc-google-iam-v1==0.12.6
grpcio==1.59.0
grpcio-status==1.59.0
guppy3==3.1.3
guppy3==3.1.4
hdfs==2.7.3
httplib2==0.22.0
hypothesis==6.88.1
Expand All @@ -83,6 +83,8 @@ importlib-metadata==6.8.0
iniconfig==2.0.0
joblib==1.3.2
Js2Py==0.74
jsonschema==4.19.1
jsonschema-specifications==2023.7.1
mmh3==4.0.1
mock==5.1.0
nltk==3.8.1
Expand Down Expand Up @@ -117,9 +119,11 @@ python-dateutil==2.8.2
python-snappy==0.6.1
pytz==2023.3.post1
PyYAML==6.0.1
referencing==0.30.2
regex==2023.10.3
requests==2.31.0
requests-mock==1.11.0
rpds-py==0.10.6
rsa==4.9
scikit-learn==1.3.1
scipy==1.11.3
Expand Down
1 change: 1 addition & 0 deletions sdks/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ def get_portability_package_data():
'hdfs>=2.1.0,<3.0.0',
'httplib2>=0.8,<0.23.0',
'js2py>=0.74,<1',
'jsonschema>=4.0.0,<5.0.0',
# numpy can have breaking changes in minor versions.
# Use a strict upper bound.
'numpy>=1.14.3,<1.25.0', # Update pyproject.toml as well.
Expand Down

0 comments on commit 879935e

Please sign in to comment.