From 4083fa49cb6e9955b2f7eca70727c8571b5652ed Mon Sep 17 00:00:00 2001 From: Daniel Walt Date: Tue, 14 Jun 2022 18:18:51 +0000 Subject: [PATCH 1/5] [BEAM-12994] Add schema update options to BigQuerySink and WriteToBigQuery --- sdks/python/apache_beam/io/gcp/bigquery.py | 82 ++++++++++++++++++- .../apache_beam/io/gcp/bigquery_file_loads.py | 9 ++ .../apache_beam/io/gcp/bigquery_tools.py | 7 ++ 3 files changed, 95 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index d98090057346..0307f6068b05 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -473,6 +473,8 @@ class BigQueryDisposition(object): WRITE_TRUNCATE = 'WRITE_TRUNCATE' WRITE_APPEND = 'WRITE_APPEND' WRITE_EMPTY = 'WRITE_EMPTY' + ALLOW_FIELD_ADDITION = 'ALLOW_FIELD_ADDITION' + ALLOW_FIELD_RELAXATION = 'ALLOW_FIELD_RELAXATION' @staticmethod def validate_create(disposition): @@ -494,6 +496,30 @@ def validate_write(disposition): 'Invalid write disposition %s. Expecting %s' % (disposition, values)) return disposition + @staticmethod + def validate_update(disposition): + values = ( + BigQueryDisposition.ALLOW_FIELD_ADDITION, + BigQueryDisposition.ALLOW_FIELD_RELAXATION) + if disposition not in values: + raise ValueError( + 'Invalid schema update option %s. Expecting %s' % (disposition, values)) + return disposition + + @staticmethod + def parse_update(disposition): + result = [] + if disposition: + if isinstance(disposition, str): + result.append( + BigQueryDisposition.validate_update(disposition)) + elif isinstance(disposition, (tuple, list)): + for opt in disposition: + result.append( + BigQueryDisposition.validate_update(opt)) + return result + + class BigQueryQueryPriority(object): """Class holding standard strings used for query priority.""" @@ -1345,6 +1371,7 @@ def __init__( schema=None, create_disposition=BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=BigQueryDisposition.WRITE_EMPTY, + schema_update_options=None, validate=False, coder=None, kms_key=None): @@ -1387,6 +1414,18 @@ def __init__( * :attr:`BigQueryDisposition.WRITE_EMPTY`: fail the write if table not empty. + schema_update_options (Iterable[BigQueryDisposition]): An iterable of + strings describing which schema mutations are allowed as side effects. + Schema update options are supported when `write_disposition` is set + to `WRITE_APPEND`, or when `write_disposition` is set to + `WRITE_TRUNCATE` for a destination that is a partition of an + existing table. Possible values are: + + * :attr:`BigQueryDisposition.ALLOW_FIELD_ADDITION`: allow adding + a nullable field to the schema. + * :attr:`BigQueryDisposition.ALLOW_FIELD_RELAXATION`: allow relaxing + a required field in the original schema to nullable. + validate (bool): If :data:`True`, various checks will be done when sink gets initialized (e.g., is table present given the disposition arguments?). This should be :data:`True` for most scenarios in order to @@ -1446,6 +1485,9 @@ def __init__( create_disposition) self.write_disposition = BigQueryDisposition.validate_write( write_disposition) + self.schema_update_options = BigQueryDisposition.parse_update( + schema_update_options) + self.validate = validate self.coder = coder or bigquery_tools.RowAsDictJsonCoder() self.kms_key = kms_key @@ -1513,6 +1555,7 @@ def __init__( schema=None, create_disposition=None, write_disposition=None, + schema_update_options=None, kms_key=None, test_client=None, max_buffered_rows=None, @@ -1539,10 +1582,18 @@ def __init__( - BigQueryDisposition.CREATE_NEVER: fail the write if does not exist. write_disposition: A string describing what happens if the table has already some data. Possible values are: - - BigQueryDisposition.WRITE_TRUNCATE: delete existing rows. - - BigQueryDisposition.WRITE_APPEND: add to existing rows. - - BigQueryDisposition.WRITE_EMPTY: fail the write if table not empty. + - BigQueryDisposition.WRITE_TRUNCATE: delete existing rows. + - BigQueryDisposition.WRITE_APPEND: add to existing rows. + - BigQueryDisposition.WRITE_EMPTY: fail the write if table not empty. For streaming pipelines WriteTruncate can not be used. + schema_update_options: An iterable of strings describing which schema + mutations are allowed as side effects. Possible values are: + - BigQueryDisposition.ALLOW_FIELD_ADDIION: allow adding a nullable + field to the schema. + - BigQueryDisposition.ALLOW_FIELD_ELAXATION: allow relaxing a required + field in the original schema to nullable. + Schema update options are supported when `write_disposition` is set + to `WRITE_APPEND`. kms_key: Optional Cloud KMS key name for use when creating new tables. test_client: Override the default bigquery client used for testing. @@ -1583,6 +1634,7 @@ def __init__( raise ValueError( 'Write disposition %s is not supported for' ' streaming inserts to BigQuery' % write_disposition) + self.schema_update_options = schema_update_options self._rows_buffer = [] self._reset_rows_buffer() @@ -1618,6 +1670,7 @@ def display_data(self): 'retry_strategy': self._retry_strategy, 'create_disposition': str(self.create_disposition), 'write_disposition': str(self.write_disposition), + 'schema_update_options': str(self.schema_update_options), 'additional_bq_parameters': str(self.additional_bq_parameters), 'ignore_insert_ids': str(self.ignore_insert_ids), 'ignore_unknown_columns': str(self.ignore_unknown_columns) @@ -1845,6 +1898,7 @@ def __init__( triggering_frequency, create_disposition, write_disposition, + schema_update_options, kms_key, retry_strategy, additional_bq_parameters, @@ -1861,6 +1915,7 @@ def __init__( self.triggering_frequency = triggering_frequency self.create_disposition = create_disposition self.write_disposition = write_disposition + self.schema_update_options = schema_update_options self.kms_key = kms_key self.retry_strategy = retry_strategy self.test_client = test_client @@ -1888,6 +1943,7 @@ def expand(self, input): batch_size=self.batch_size, create_disposition=self.create_disposition, write_disposition=self.write_disposition, + schema_update_options=self.schema_update_options, kms_key=self.kms_key, retry_strategy=self.retry_strategy, test_client=self.test_client, @@ -1973,6 +2029,7 @@ def __init__( schema=None, create_disposition=BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=BigQueryDisposition.WRITE_APPEND, + schema_update_options=None, kms_key=None, batch_size=None, max_file_size=None, @@ -2042,6 +2099,19 @@ def __init__( empty. For streaming pipelines WriteTruncate can not be used. + + schema_update_options (Iterable[BigQueryDisposition]): An iterable of + strings describing which schema mutations are allowed as side effects. + Schema update options are supported when `write_disposition` is set + to `WRITE_APPEND`, or when `write_disposition` is set to + `WRITE_TRUNCATE` for a destination that is a partition of an + existing table. Possible values are: + + * :attr:`BigQueryDisposition.ALLOW_FIELD_ADDITION`: allow adding + a nullable field to the schema. + * :attr:`BigQueryDisposition.ALLOW_FIELD_RELAXATION`: allow relaxing + a required field in the original schema to nullable. + kms_key (str): Optional Cloud KMS key name for use when creating new tables. batch_size (int): Number of rows to be written to BQ per streaming API @@ -2146,6 +2216,9 @@ def __init__( create_disposition) self.write_disposition = BigQueryDisposition.validate_write( write_disposition) + self.schema_update_options = BigQueryDisposition.parse_update( + schema_update_options) + if schema == SCHEMA_AUTODETECT: self.schema = schema else: @@ -2227,6 +2300,7 @@ def expand(self, pcoll): triggering_frequency=self.triggering_frequency, create_disposition=self.create_disposition, write_disposition=self.write_disposition, + schema_update_options=self.schema_update_options, kms_key=self.kms_key, retry_strategy=self.insert_retry_strategy, additional_bq_parameters=self.additional_bq_parameters, @@ -2281,6 +2355,7 @@ def find_in_nested_dict(schema): schema=self.schema, create_disposition=self.create_disposition, write_disposition=self.write_disposition, + schema_update_options=self.schema_update_options, triggering_frequency=triggering_frequency, with_auto_sharding=self.with_auto_sharding, temp_file_format=self._temp_file_format, @@ -2330,6 +2405,7 @@ def serialize(side_inputs): 'schema': self.schema, 'create_disposition': self.create_disposition, 'write_disposition': self.write_disposition, + 'schema_update_options': self.schema_update_options, 'kms_key': self.kms_key, 'batch_size': self.batch_size, 'max_file_size': self.max_file_size, diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index ddc8fe61db04..e0580c94692e 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -564,6 +564,7 @@ def __init__( schema=None, create_disposition=None, write_disposition=None, + schema_update_options=None, test_client=None, temporary_tables=False, additional_bq_parameters=None, @@ -583,14 +584,17 @@ def __init__( # and write dispositions, which mean that a new table will be created. self.create_disposition = None self.write_disposition = None + self.schema_update_options = None else: self.create_disposition = create_disposition self.write_disposition = write_disposition + self.schema_update_options = schema_update_options def display_data(self): result = { 'create_disposition': str(self.create_disposition), 'write_disposition': str(self.write_disposition), + 'schema_update_options': str(self.schema_update_options), 'additional_bq_params': str(self.additional_bq_parameters), 'schema': str(self.schema), 'launchesBigQueryJobs': DisplayDataItem( @@ -672,6 +676,7 @@ def process(self, element, load_job_name_prefix, *schema_side_inputs): schema=schema, write_disposition=self.write_disposition, create_disposition=create_disposition, + schema_update_options=self.schema_update_options, additional_load_parameters=additional_parameters, source_format=self.source_format, job_labels=self.bq_io_metadata.add_additional_bq_job_labels(), @@ -789,6 +794,7 @@ def __init__( custom_gcs_temp_location=None, create_disposition=None, write_disposition=None, + schema_update_options=None, triggering_frequency=None, with_auto_sharding=False, temp_file_format=None, @@ -806,6 +812,7 @@ def __init__( self.destination = destination self.create_disposition = create_disposition self.write_disposition = write_disposition + self.schema_update_options = schema_update_options self.triggering_frequency = triggering_frequency self.with_auto_sharding = with_auto_sharding self.max_file_size = max_file_size or _DEFAULT_MAX_FILE_SIZE @@ -1015,6 +1022,7 @@ def _load_data( schema=self.schema, write_disposition=self.write_disposition, create_disposition=self.create_disposition, + schema_update_options=self.schema_update_options, test_client=self.test_client, temporary_tables=True, additional_bq_parameters=self.additional_bq_parameters, @@ -1095,6 +1103,7 @@ def _load_data( schema=self.schema, write_disposition=self.write_disposition, create_disposition=self.create_disposition, + schema_update_options=self.schema_update_options, test_client=self.test_client, temporary_tables=False, additional_bq_parameters=self.additional_bq_parameters, diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index bb3b60273404..77a125e26c0a 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -497,6 +497,7 @@ def _insert_load_job( schema=None, write_disposition=None, create_disposition=None, + schema_update_options=None, additional_load_parameters=None, source_format=None, job_labels=None): @@ -514,6 +515,9 @@ def _insert_load_job( if source_uris is None: source_uris = [] + if schema_update_options is None: + schema_update_options = [] + additional_load_parameters = additional_load_parameters or {} job_schema = None if schema == 'SCHEMA_AUTODETECT' else schema reference = bigquery.JobReference(jobId=job_id, projectId=project_id) @@ -527,6 +531,7 @@ def _insert_load_job( schema=job_schema, writeDisposition=write_disposition, createDisposition=create_disposition, + schemaUpdateOptions=schema_update_options, sourceFormat=source_format, useAvroLogicalTypes=True, autodetect=schema == 'SCHEMA_AUTODETECT', @@ -992,6 +997,7 @@ def perform_load_job( schema=None, write_disposition=None, create_disposition=None, + schema_update_options=None, additional_load_parameters=None, source_format=None, job_labels=None, @@ -1014,6 +1020,7 @@ def perform_load_job( schema=schema, create_disposition=create_disposition, write_disposition=write_disposition, + schema_update_options=schema_update_options, additional_load_parameters=additional_load_parameters, source_format=source_format, job_labels=job_labels) From 0e9603c37338f25d648852cee689f44b618db9c8 Mon Sep 17 00:00:00 2001 From: Daniel Walt Date: Tue, 14 Jun 2022 20:17:57 +0000 Subject: [PATCH 2/5] [BEAM-12994] Add unit tests for BigQueryDisposition, including the new schema update options --- .../apache_beam/io/gcp/bigquery_test.py | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index 65a89d8f7eae..0ffafdb1acdf 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -44,6 +44,7 @@ from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp from apache_beam.io.gcp import bigquery as beam_bq from apache_beam.io.gcp import bigquery_tools +from apache_beam.io.gcp.bigquery import BigQueryDisposition from apache_beam.io.gcp.bigquery import ReadFromBigQuery from apache_beam.io.gcp.bigquery import TableRowJsonCoder from apache_beam.io.gcp.bigquery import WriteToBigQuery @@ -660,6 +661,40 @@ def test_nested_schema_as_json(self): }, json.loads(sink.schema_as_json())) + def test_disposition_exceptions(self): + """Test bad parameter value errors for disposition args.""" + with self.assertRaises(ValueError): + beam.io.BigQuerySink('dataset.table', + create_disposition="BAD") + + with self.assertRaises(ValueError): + beam.io.BigQuerySink('dataset.table', + write_disposition="BAD") + + with self.assertRaises(ValueError): + beam.io.BigQuerySink('dataset.table', + schema_update_options="BAD") + + def test_disposition_update_opts(self): + """Test schema update options from args.""" + obj1 = beam.io.BigQuerySink('dataset.table') + self.assertEquals(obj1.schema_update_options, [], + 'Schema Update Options passed as None') + + opt2 = BigQueryDisposition.ALLOW_FIELD_ADDITION + obj2 = beam.io.BigQuerySink('dataset.table', + schema_update_options=opt2) + self.assertEquals(obj2.schema_update_options, [opt2], + 'Schema Update Options passed as single string') + + opt3 = BigQueryDisposition.ALLOW_FIELD_RELAXATION + obj3 = beam.io.BigQuerySink('dataset.table', + schema_update_options=(opt2, opt3)) + + cmp1, cmp2 = set(obj3.schema_update_options), {opt2, opt3} + self.assertEqual(cmp1, cmp2, + 'Schema Update Options passed as iterable') + @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class TestWriteToBigQuery(unittest.TestCase): @@ -1035,6 +1070,26 @@ def test_copy_load_job_exception(self, exception_type, error_message): self.assertEqual(4, mock_insert_copy_job.call_count) self.assertIn(error_message, exc.exception.args[0]) + def test_disposition_update_opts(self): + """Test schema update options from args.""" + obj1 = beam.io.WriteToBigQuery('project:dataset.table') + self.assertEquals(obj1.schema_update_options, [], + 'Schema Update Options passed as None') + + opt2 = BigQueryDisposition.ALLOW_FIELD_ADDITION + obj2 = beam.io.WriteToBigQuery('project:dataset.table', + schema_update_options=opt2) + self.assertEquals(obj2.schema_update_options, [opt2], + 'Schema Update Options passed as single string') + + opt3 = BigQueryDisposition.ALLOW_FIELD_RELAXATION + obj3 = beam.io.WriteToBigQuery('project:dataset.table', + schema_update_options=(opt2, opt3)) + + cmp1, cmp2 = set(obj3.schema_update_options), {opt2, opt3} + self.assertEqual(cmp1, cmp2, + 'Schema Update Options passed as iterable') + @unittest.skipIf( HttpError is None or exceptions is None, From 46ddf27a17fe29bc830b705eb52f57124dcf2196 Mon Sep 17 00:00:00 2001 From: Daniel Walt Date: Tue, 14 Jun 2022 20:20:37 +0000 Subject: [PATCH 3/5] [BEAM-12994] Update unit tests that include calls to _StreamToBigQuery to include added positional schema update options argument --- sdks/python/apache_beam/io/gcp/bigquery_test.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index 0ffafdb1acdf..9d0191810c38 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -1541,6 +1541,7 @@ def store_callback(table, **kwargs): triggering_frequency=None, create_disposition='CREATE_NEVER', write_disposition=None, + schema_update_options=[], kms_key=None, retry_strategy=None, additional_bq_parameters=[], @@ -1637,6 +1638,7 @@ def store_callback(table, **kwargs): triggering_frequency=None, create_disposition='CREATE_NEVER', write_disposition=None, + schema_update_options=[], kms_key=None, retry_strategy=retry_strategy, additional_bq_parameters=[], @@ -1712,6 +1714,7 @@ def store_callback(table, **kwargs): triggering_frequency=None, create_disposition='CREATE_NEVER', write_disposition=None, + schema_update_options=None, kms_key=None, retry_strategy=retry_strategy, additional_bq_parameters=[], @@ -1779,6 +1782,7 @@ def store_callback(table, **kwargs): triggering_frequency=None, create_disposition='CREATE_NEVER', write_disposition=None, + schema_update_options=None, kms_key=None, retry_strategy=None, additional_bq_parameters=[], From 2742f4bc9b5d89d59c9549e01225ab7789a71431 Mon Sep 17 00:00:00 2001 From: Daniel Walt Date: Wed, 15 Jun 2022 17:07:01 +0000 Subject: [PATCH 4/5] [BEAM-12994] Lint tests (swap to assertEqual and convert to single quote strings) --- .../apache_beam/io/gcp/bigquery_test.py | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index 9d0191810c38..6c739364e694 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -665,27 +665,27 @@ def test_disposition_exceptions(self): """Test bad parameter value errors for disposition args.""" with self.assertRaises(ValueError): beam.io.BigQuerySink('dataset.table', - create_disposition="BAD") + create_disposition='BAD') with self.assertRaises(ValueError): beam.io.BigQuerySink('dataset.table', - write_disposition="BAD") + write_disposition='BAD') with self.assertRaises(ValueError): beam.io.BigQuerySink('dataset.table', - schema_update_options="BAD") + schema_update_options='BAD') def test_disposition_update_opts(self): """Test schema update options from args.""" obj1 = beam.io.BigQuerySink('dataset.table') - self.assertEquals(obj1.schema_update_options, [], - 'Schema Update Options passed as None') + self.assertEqual(obj1.schema_update_options, [], + 'Schema Update Options passed as None') opt2 = BigQueryDisposition.ALLOW_FIELD_ADDITION obj2 = beam.io.BigQuerySink('dataset.table', schema_update_options=opt2) - self.assertEquals(obj2.schema_update_options, [opt2], - 'Schema Update Options passed as single string') + self.assertEqual(obj2.schema_update_options, [opt2], + 'Schema Update Options passed as single string') opt3 = BigQueryDisposition.ALLOW_FIELD_RELAXATION obj3 = beam.io.BigQuerySink('dataset.table', @@ -1073,14 +1073,14 @@ def test_copy_load_job_exception(self, exception_type, error_message): def test_disposition_update_opts(self): """Test schema update options from args.""" obj1 = beam.io.WriteToBigQuery('project:dataset.table') - self.assertEquals(obj1.schema_update_options, [], - 'Schema Update Options passed as None') + self.assertEqual(obj1.schema_update_options, [], + 'Schema Update Options passed as None') opt2 = BigQueryDisposition.ALLOW_FIELD_ADDITION obj2 = beam.io.WriteToBigQuery('project:dataset.table', schema_update_options=opt2) - self.assertEquals(obj2.schema_update_options, [opt2], - 'Schema Update Options passed as single string') + self.assertEqual(obj2.schema_update_options, [opt2], + 'Schema Update Options passed as single string') opt3 = BigQueryDisposition.ALLOW_FIELD_RELAXATION obj3 = beam.io.WriteToBigQuery('project:dataset.table', From 4bc9030acfa8cff845c7f7fe316c967cf84f5c25 Mon Sep 17 00:00:00 2001 From: Daniel Walt Date: Wed, 15 Jun 2022 17:09:54 +0000 Subject: [PATCH 5/5] [BEAM-12994] Improve readability of parse_update by collapsing lines --- sdks/python/apache_beam/io/gcp/bigquery.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 0307f6068b05..a4a8f39aa7bb 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -511,12 +511,10 @@ def parse_update(disposition): result = [] if disposition: if isinstance(disposition, str): - result.append( - BigQueryDisposition.validate_update(disposition)) + result.append(BigQueryDisposition.validate_update(disposition)) elif isinstance(disposition, (tuple, list)): for opt in disposition: - result.append( - BigQueryDisposition.validate_update(opt)) + result.append(BigQueryDisposition.validate_update(opt)) return result