Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Beam 12994 - Python SDK BigQuery - Promote schemaUpdateOptions to named arguments #21867

Closed
wants to merge 7 commits into from
80 changes: 77 additions & 3 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,8 @@ class BigQueryDisposition(object):
WRITE_TRUNCATE = 'WRITE_TRUNCATE'
WRITE_APPEND = 'WRITE_APPEND'
WRITE_EMPTY = 'WRITE_EMPTY'
ALLOW_FIELD_ADDITION = 'ALLOW_FIELD_ADDITION'
ALLOW_FIELD_RELAXATION = 'ALLOW_FIELD_RELAXATION'

@staticmethod
def validate_create(disposition):
Expand All @@ -494,6 +496,28 @@ def validate_write(disposition):
'Invalid write disposition %s. Expecting %s' % (disposition, values))
return disposition

@staticmethod
def validate_update(disposition):
values = (
BigQueryDisposition.ALLOW_FIELD_ADDITION,
BigQueryDisposition.ALLOW_FIELD_RELAXATION)
if disposition not in values:
raise ValueError(
'Invalid schema update option %s. Expecting %s' % (disposition, values))
return disposition

@staticmethod
def parse_update(disposition):
result = []
if disposition:
if isinstance(disposition, str):
result.append(BigQueryDisposition.validate_update(disposition))
elif isinstance(disposition, (tuple, list)):
for opt in disposition:
result.append(BigQueryDisposition.validate_update(opt))
return result



class BigQueryQueryPriority(object):
"""Class holding standard strings used for query priority."""
Expand Down Expand Up @@ -1345,6 +1369,7 @@ def __init__(
schema=None,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_EMPTY,
schema_update_options=None,
validate=False,
coder=None,
kms_key=None):
Expand Down Expand Up @@ -1387,6 +1412,18 @@ def __init__(
* :attr:`BigQueryDisposition.WRITE_EMPTY`: fail the write if table not
empty.

schema_update_options (Iterable[BigQueryDisposition]): An iterable of
strings describing which schema mutations are allowed as side effects.
Schema update options are supported when `write_disposition` is set
to `WRITE_APPEND`, or when `write_disposition` is set to
`WRITE_TRUNCATE` for a destination that is a partition of an
existing table. Possible values are:

* :attr:`BigQueryDisposition.ALLOW_FIELD_ADDITION`: allow adding
a nullable field to the schema.
* :attr:`BigQueryDisposition.ALLOW_FIELD_RELAXATION`: allow relaxing
a required field in the original schema to nullable.

validate (bool): If :data:`True`, various checks will be done when sink
gets initialized (e.g., is table present given the disposition
arguments?). This should be :data:`True` for most scenarios in order to
Expand Down Expand Up @@ -1446,6 +1483,9 @@ def __init__(
create_disposition)
self.write_disposition = BigQueryDisposition.validate_write(
write_disposition)
self.schema_update_options = BigQueryDisposition.parse_update(
schema_update_options)

self.validate = validate
self.coder = coder or bigquery_tools.RowAsDictJsonCoder()
self.kms_key = kms_key
Expand Down Expand Up @@ -1513,6 +1553,7 @@ def __init__(
schema=None,
create_disposition=None,
write_disposition=None,
schema_update_options=None,
kms_key=None,
test_client=None,
max_buffered_rows=None,
Expand All @@ -1539,10 +1580,18 @@ def __init__(
- BigQueryDisposition.CREATE_NEVER: fail the write if does not exist.
write_disposition: A string describing what happens if the table has
already some data. Possible values are:
- BigQueryDisposition.WRITE_TRUNCATE: delete existing rows.
- BigQueryDisposition.WRITE_APPEND: add to existing rows.
- BigQueryDisposition.WRITE_EMPTY: fail the write if table not empty.
- BigQueryDisposition.WRITE_TRUNCATE: delete existing rows.
- BigQueryDisposition.WRITE_APPEND: add to existing rows.
- BigQueryDisposition.WRITE_EMPTY: fail the write if table not empty.
For streaming pipelines WriteTruncate can not be used.
schema_update_options: An iterable of strings describing which schema
mutations are allowed as side effects. Possible values are:
- BigQueryDisposition.ALLOW_FIELD_ADDIION: allow adding a nullable
field to the schema.
- BigQueryDisposition.ALLOW_FIELD_ELAXATION: allow relaxing a required
field in the original schema to nullable.
Schema update options are supported when `write_disposition` is set
to `WRITE_APPEND`.
kms_key: Optional Cloud KMS key name for use when creating new tables.
test_client: Override the default bigquery client used for testing.

Expand Down Expand Up @@ -1583,6 +1632,7 @@ def __init__(
raise ValueError(
'Write disposition %s is not supported for'
' streaming inserts to BigQuery' % write_disposition)
self.schema_update_options = schema_update_options
self._rows_buffer = []
self._reset_rows_buffer()

Expand Down Expand Up @@ -1618,6 +1668,7 @@ def display_data(self):
'retry_strategy': self._retry_strategy,
'create_disposition': str(self.create_disposition),
'write_disposition': str(self.write_disposition),
'schema_update_options': str(self.schema_update_options),
'additional_bq_parameters': str(self.additional_bq_parameters),
'ignore_insert_ids': str(self.ignore_insert_ids),
'ignore_unknown_columns': str(self.ignore_unknown_columns)
Expand Down Expand Up @@ -1845,6 +1896,7 @@ def __init__(
triggering_frequency,
create_disposition,
write_disposition,
schema_update_options,
kms_key,
retry_strategy,
additional_bq_parameters,
Expand All @@ -1861,6 +1913,7 @@ def __init__(
self.triggering_frequency = triggering_frequency
self.create_disposition = create_disposition
self.write_disposition = write_disposition
self.schema_update_options = schema_update_options
self.kms_key = kms_key
self.retry_strategy = retry_strategy
self.test_client = test_client
Expand Down Expand Up @@ -1888,6 +1941,7 @@ def expand(self, input):
batch_size=self.batch_size,
create_disposition=self.create_disposition,
write_disposition=self.write_disposition,
schema_update_options=self.schema_update_options,
kms_key=self.kms_key,
retry_strategy=self.retry_strategy,
test_client=self.test_client,
Expand Down Expand Up @@ -1973,6 +2027,7 @@ def __init__(
schema=None,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_APPEND,
schema_update_options=None,
kms_key=None,
batch_size=None,
max_file_size=None,
Expand Down Expand Up @@ -2042,6 +2097,19 @@ def __init__(
empty.

For streaming pipelines WriteTruncate can not be used.

schema_update_options (Iterable[BigQueryDisposition]): An iterable of
strings describing which schema mutations are allowed as side effects.
Schema update options are supported when `write_disposition` is set
to `WRITE_APPEND`, or when `write_disposition` is set to
`WRITE_TRUNCATE` for a destination that is a partition of an
existing table. Possible values are:

* :attr:`BigQueryDisposition.ALLOW_FIELD_ADDITION`: allow adding
a nullable field to the schema.
* :attr:`BigQueryDisposition.ALLOW_FIELD_RELAXATION`: allow relaxing
a required field in the original schema to nullable.

kms_key (str): Optional Cloud KMS key name for use when creating new
tables.
batch_size (int): Number of rows to be written to BQ per streaming API
Expand Down Expand Up @@ -2146,6 +2214,9 @@ def __init__(
create_disposition)
self.write_disposition = BigQueryDisposition.validate_write(
write_disposition)
self.schema_update_options = BigQueryDisposition.parse_update(
schema_update_options)

if schema == SCHEMA_AUTODETECT:
self.schema = schema
else:
Expand Down Expand Up @@ -2227,6 +2298,7 @@ def expand(self, pcoll):
triggering_frequency=self.triggering_frequency,
create_disposition=self.create_disposition,
write_disposition=self.write_disposition,
schema_update_options=self.schema_update_options,
kms_key=self.kms_key,
retry_strategy=self.insert_retry_strategy,
additional_bq_parameters=self.additional_bq_parameters,
Expand Down Expand Up @@ -2281,6 +2353,7 @@ def find_in_nested_dict(schema):
schema=self.schema,
create_disposition=self.create_disposition,
write_disposition=self.write_disposition,
schema_update_options=self.schema_update_options,
triggering_frequency=triggering_frequency,
with_auto_sharding=self.with_auto_sharding,
temp_file_format=self._temp_file_format,
Expand Down Expand Up @@ -2330,6 +2403,7 @@ def serialize(side_inputs):
'schema': self.schema,
'create_disposition': self.create_disposition,
'write_disposition': self.write_disposition,
'schema_update_options': self.schema_update_options,
'kms_key': self.kms_key,
'batch_size': self.batch_size,
'max_file_size': self.max_file_size,
Expand Down
9 changes: 9 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,7 @@ def __init__(
schema=None,
create_disposition=None,
write_disposition=None,
schema_update_options=None,
test_client=None,
temporary_tables=False,
additional_bq_parameters=None,
Expand All @@ -583,14 +584,17 @@ def __init__(
# and write dispositions, which mean that a new table will be created.
self.create_disposition = None
self.write_disposition = None
self.schema_update_options = None
else:
self.create_disposition = create_disposition
self.write_disposition = write_disposition
self.schema_update_options = schema_update_options

def display_data(self):
result = {
'create_disposition': str(self.create_disposition),
'write_disposition': str(self.write_disposition),
'schema_update_options': str(self.schema_update_options),
'additional_bq_params': str(self.additional_bq_parameters),
'schema': str(self.schema),
'launchesBigQueryJobs': DisplayDataItem(
Expand Down Expand Up @@ -672,6 +676,7 @@ def process(self, element, load_job_name_prefix, *schema_side_inputs):
schema=schema,
write_disposition=self.write_disposition,
create_disposition=create_disposition,
schema_update_options=self.schema_update_options,
additional_load_parameters=additional_parameters,
source_format=self.source_format,
job_labels=self.bq_io_metadata.add_additional_bq_job_labels(),
Expand Down Expand Up @@ -789,6 +794,7 @@ def __init__(
custom_gcs_temp_location=None,
create_disposition=None,
write_disposition=None,
schema_update_options=None,
triggering_frequency=None,
with_auto_sharding=False,
temp_file_format=None,
Expand All @@ -806,6 +812,7 @@ def __init__(
self.destination = destination
self.create_disposition = create_disposition
self.write_disposition = write_disposition
self.schema_update_options = schema_update_options
self.triggering_frequency = triggering_frequency
self.with_auto_sharding = with_auto_sharding
self.max_file_size = max_file_size or _DEFAULT_MAX_FILE_SIZE
Expand Down Expand Up @@ -1015,6 +1022,7 @@ def _load_data(
schema=self.schema,
write_disposition=self.write_disposition,
create_disposition=self.create_disposition,
schema_update_options=self.schema_update_options,
test_client=self.test_client,
temporary_tables=True,
additional_bq_parameters=self.additional_bq_parameters,
Expand Down Expand Up @@ -1095,6 +1103,7 @@ def _load_data(
schema=self.schema,
write_disposition=self.write_disposition,
create_disposition=self.create_disposition,
schema_update_options=self.schema_update_options,
test_client=self.test_client,
temporary_tables=False,
additional_bq_parameters=self.additional_bq_parameters,
Expand Down
59 changes: 59 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp
from apache_beam.io.gcp import bigquery as beam_bq
from apache_beam.io.gcp import bigquery_tools
from apache_beam.io.gcp.bigquery import BigQueryDisposition
from apache_beam.io.gcp.bigquery import ReadFromBigQuery
from apache_beam.io.gcp.bigquery import TableRowJsonCoder
from apache_beam.io.gcp.bigquery import WriteToBigQuery
Expand Down Expand Up @@ -660,6 +661,40 @@ def test_nested_schema_as_json(self):
},
json.loads(sink.schema_as_json()))

def test_disposition_exceptions(self):
"""Test bad parameter value errors for disposition args."""
with self.assertRaises(ValueError):
beam.io.BigQuerySink('dataset.table',
create_disposition='BAD')

with self.assertRaises(ValueError):
beam.io.BigQuerySink('dataset.table',
write_disposition='BAD')

with self.assertRaises(ValueError):
beam.io.BigQuerySink('dataset.table',
schema_update_options='BAD')

def test_disposition_update_opts(self):
"""Test schema update options from args."""
obj1 = beam.io.BigQuerySink('dataset.table')
self.assertEqual(obj1.schema_update_options, [],
'Schema Update Options passed as None')

opt2 = BigQueryDisposition.ALLOW_FIELD_ADDITION
obj2 = beam.io.BigQuerySink('dataset.table',
schema_update_options=opt2)
self.assertEqual(obj2.schema_update_options, [opt2],
'Schema Update Options passed as single string')

opt3 = BigQueryDisposition.ALLOW_FIELD_RELAXATION
obj3 = beam.io.BigQuerySink('dataset.table',
schema_update_options=(opt2, opt3))

cmp1, cmp2 = set(obj3.schema_update_options), {opt2, opt3}
self.assertEqual(cmp1, cmp2,
'Schema Update Options passed as iterable')


@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestWriteToBigQuery(unittest.TestCase):
Expand Down Expand Up @@ -1035,6 +1070,26 @@ def test_copy_load_job_exception(self, exception_type, error_message):
self.assertEqual(4, mock_insert_copy_job.call_count)
self.assertIn(error_message, exc.exception.args[0])

def test_disposition_update_opts(self):
"""Test schema update options from args."""
obj1 = beam.io.WriteToBigQuery('project:dataset.table')
self.assertEqual(obj1.schema_update_options, [],
'Schema Update Options passed as None')

opt2 = BigQueryDisposition.ALLOW_FIELD_ADDITION
obj2 = beam.io.WriteToBigQuery('project:dataset.table',
schema_update_options=opt2)
self.assertEqual(obj2.schema_update_options, [opt2],
'Schema Update Options passed as single string')

opt3 = BigQueryDisposition.ALLOW_FIELD_RELAXATION
obj3 = beam.io.WriteToBigQuery('project:dataset.table',
schema_update_options=(opt2, opt3))

cmp1, cmp2 = set(obj3.schema_update_options), {opt2, opt3}
self.assertEqual(cmp1, cmp2,
'Schema Update Options passed as iterable')


@unittest.skipIf(
HttpError is None or exceptions is None,
Expand Down Expand Up @@ -1486,6 +1541,7 @@ def store_callback(table, **kwargs):
triggering_frequency=None,
create_disposition='CREATE_NEVER',
write_disposition=None,
schema_update_options=[],
kms_key=None,
retry_strategy=None,
additional_bq_parameters=[],
Expand Down Expand Up @@ -1582,6 +1638,7 @@ def store_callback(table, **kwargs):
triggering_frequency=None,
create_disposition='CREATE_NEVER',
write_disposition=None,
schema_update_options=[],
kms_key=None,
retry_strategy=retry_strategy,
additional_bq_parameters=[],
Expand Down Expand Up @@ -1657,6 +1714,7 @@ def store_callback(table, **kwargs):
triggering_frequency=None,
create_disposition='CREATE_NEVER',
write_disposition=None,
schema_update_options=None,
kms_key=None,
retry_strategy=retry_strategy,
additional_bq_parameters=[],
Expand Down Expand Up @@ -1724,6 +1782,7 @@ def store_callback(table, **kwargs):
triggering_frequency=None,
create_disposition='CREATE_NEVER',
write_disposition=None,
schema_update_options=None,
kms_key=None,
retry_strategy=None,
additional_bq_parameters=[],
Expand Down
Loading