From 71155b59dab39fad8101f78f94b17f562aede172 Mon Sep 17 00:00:00 2001 From: Pablo Rodriguez Defino Date: Thu, 17 Oct 2024 10:21:09 -0700 Subject: [PATCH 01/13] type defs --- sdks/python/apache_beam/io/gcp/bigquery.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 11e0d098b2f3..5e73e3450369 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -366,6 +366,7 @@ def chain_after(result): import uuid import warnings from dataclasses import dataclass +from typing import Callable from typing import Dict from typing import List from typing import Optional @@ -1883,6 +1884,10 @@ def _restore_table_ref(sharded_table_ref_elems_kv): # Flag to be passed to WriteToBigQuery to force schema autodetection SCHEMA_AUTODETECT = 'SCHEMA_AUTODETECT' +CdcWritesWithRows = Callable[[beam.Row], beam.Row] +CdcWritesWithDicts = Callable[[Dict], Dict] +UseCdcWrites = Union[bool, CdcWritesWithRows, CdcWritesWithRows] + class WriteToBigQuery(PTransform): """Write data to BigQuery. From c638241400c93b2ee7ca21083891304dcf342d27 Mon Sep 17 00:00:00 2001 From: Pablo Rodriguez Defino Date: Thu, 17 Oct 2024 17:56:38 -0700 Subject: [PATCH 02/13] corrected Row type reference --- sdks/python/apache_beam/io/gcp/bigquery.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 5e73e3450369..c01341f2d4d0 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -1884,7 +1884,8 @@ def _restore_table_ref(sharded_table_ref_elems_kv): # Flag to be passed to WriteToBigQuery to force schema autodetection SCHEMA_AUTODETECT = 'SCHEMA_AUTODETECT' -CdcWritesWithRows = Callable[[beam.Row], beam.Row] +# CDC configuration type definition +CdcWritesWithRows = Callable[[beam.pvalue.Row], beam.pvalue.Row] CdcWritesWithDicts = Callable[[Dict], Dict] UseCdcWrites = Union[bool, CdcWritesWithRows, CdcWritesWithRows] @@ -1935,7 +1936,7 @@ def __init__( load_job_project_id=None, max_insert_payload_size=MAX_INSERT_PAYLOAD_SIZE, num_streaming_keys=DEFAULT_SHARDS_PER_DESTINATION, - use_cdc_writes: bool = False, + use_cdc_writes: UseCdcWrites = False, primary_key: List[str] = None, expansion_service=None): """Initialize a WriteToBigQuery transform. @@ -2555,7 +2556,7 @@ def __init__( use_at_least_once=False, with_auto_sharding=False, num_storage_api_streams=0, - use_cdc_writes: bool = False, + use_cdc_writes: UseCdcWrites = False, primary_key: List[str] = None, expansion_service=None): self._table = table From e12d2b959695a60fcfab4b6811b96de632d5b0cc Mon Sep 17 00:00:00 2001 From: Pablo Rodriguez Defino Date: Fri, 18 Oct 2024 16:18:54 -0700 Subject: [PATCH 03/13] implemented transforms to handle cdc mutation info functions for static and dyn destinations --- sdks/python/apache_beam/io/gcp/bigquery.py | 183 +++++++++++++++++---- 1 file changed, 154 insertions(+), 29 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index c01341f2d4d0..dc744453d49c 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -2544,6 +2544,26 @@ class StorageWriteToBigQuery(PTransform): CDC_SQN = "change_sequence_number" # magic string to tell Java that these rows are going to dynamic destinations DYNAMIC_DESTINATIONS = "DYNAMIC_DESTINATIONS" + # CDC record input for type hints + CDC_INFO_TYPE_HINT = ( + CDC_INFO, + RowTypeConstraint.from_fields([(CDC_MUTATION_TYPE, str), (CDC_SQN, str)])) + # BQ table schema for CDC related information + CDC_INFO_SCHEMA = { + "name": "row_mutation_info", + "type": "STRUCT", + "fields": [ + # setting both fields are required + { + "name": "mutation_type", "type": "STRING", "mode": "REQUIRED" + }, + { + "name": "change_sequence_number", + "type": "STRING", + "mode": "REQUIRED" + } + ] + } def __init__( self, @@ -2598,7 +2618,9 @@ def expand(self, input): # if writing to one destination, just convert to Beam rows and send over if not callable(table): if is_rows: - input_beam_rows = input + input_beam_rows = ( + input | "Prepare Beam Row" >> self.PrepareBeamRows( + input.element_type, False).with_output_types()) else: input_beam_rows = ( input @@ -2620,16 +2642,8 @@ def expand(self, input): if is_rows: input_beam_rows = ( input_rows - | "Wrap in Beam Row" >> beam.Map( - lambda row: beam.Row( - **{ - StorageWriteToBigQuery.DESTINATION: row[0], - StorageWriteToBigQuery.RECORD: row[1] - })).with_output_types( - RowTypeConstraint.from_fields([ - (StorageWriteToBigQuery.DESTINATION, str), - (StorageWriteToBigQuery.RECORD, input.element_type) - ]))) + | "Prepare Beam Row" >> self.PrepareBeamRows( + input.element_type, True).with_output_types()) # otherwise, convert to Beam Rows else: input_beam_rows = ( @@ -2639,6 +2653,9 @@ def expand(self, input): # communicate to Java that this write should use dynamic destinations table = StorageWriteToBigQuery.DYNAMIC_DESTINATIONS + cdc_writes = self._use_cdc_writes if not callable( + self._use_cdc_writes) else True + output = ( input_beam_rows | SchemaAwareExternalTransform( @@ -2652,7 +2669,7 @@ def expand(self, input): auto_sharding=self._with_auto_sharding, num_streams=self._num_storage_api_streams, use_at_least_once_semantics=self._use_at_least_once, - use_cdc_writes=self._use_cdc_writes, + use_cdc_writes=cdc_writes, primary_key=self._primary_key, error_handling={ 'output': StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS @@ -2676,44 +2693,152 @@ def expand(self, input): failed_rows=failed_rows, failed_rows_with_errors=failed_rows_with_errors) + class PrepareBeamRows(PTransform): + def __init__( + self, + element_type, + dynamic_destinations, + mutation_info_fn: CdcWritesWithRows = None): + self.element_type = element_type + self.dynamic_destinations = dynamic_destinations + self.mutation_info_fn = mutation_info_fn + + def expand(self, input_rows): + if self.dynamic_destinations: + return ( + input_rows + | "Wrap and maybe include CDC info" >> beam.Map( + lambda data_and_dest: self. + maybe_add_mutation_info_dynamic_destination(data_and_dest))) + else: + if callable(self.mutation_info_fn): + return ( + input_rows + | "Wrap and maybe include CDC info" >> beam.Map( + lambda row: beam.Row( + **{ + StorageWriteToBigQuery.RECORD: row, + StorageWriteToBigQuery.CDC_INFO: self. + mutation_info_fn(row) + }))) + else: + return input_rows + + def with_output_types(self): + if self.dynamic_destinations: + fields = [(StorageWriteToBigQuery.DESTINATION, str), + (StorageWriteToBigQuery.RECORD, self.element_type)] + if callable(self.mutation_info_fn): + fields.append(CDC_INFO_TYPE_HINT) + type_hint = RowTypeConstraint.from_fields(fields) + else: + if callable(self.mutation_info_fn): + type_hint = RowTypeConstraint.from_fields([ + (StorageWriteToBigQuery.RECORD, self.element_type), + StorageWriteToBigQuery.CDC_INFO_TYPE_HINT + ]) + else: + type_hint = self.element_type + + return super().with_output_types(type_hint) + + def maybe_add_mutation_info_dynamic_destination(self, row_and_dest): + if callable(self.mutation_info_fn): + return beam.Row( + **{ + StorageWriteToBigQuery.DESTINATION: row_and_dest[0], + StorageWriteToBigQuery.RECORD: row_and_dest[1], + StorageWriteToBigQuery.CDC_INFO: self.mutation_info_fn( + row_and_dest[1]) + }) + else: + return beam.Row( + **{ + StorageWriteToBigQuery.DESTINATION: row_and_dest[0], + StorageWriteToBigQuery.RECORD: row_and_dest[1] + }) + class ConvertToBeamRows(PTransform): - def __init__(self, schema, dynamic_destinations): + def __init__( + self, + schema, + dynamic_destinations, + mutation_info_fn: CdcWritesWithDicts = None): self.schema = schema self.dynamic_destinations = dynamic_destinations + self.mutation_info_fn = mutation_info_fn def expand(self, input_dicts): if self.dynamic_destinations: return ( input_dicts | "Convert dict to Beam Row" >> beam.Map( - lambda row: beam.Row( - **{ - StorageWriteToBigQuery.DESTINATION: row[0], - StorageWriteToBigQuery.RECORD: bigquery_tools. - beam_row_from_dict(row[1], self.schema) - }))) + lambda data_with_dest: self. + maybe_add_mutation_info_dynamic_destination(data_with_dest))) else: return ( input_dicts - | "Convert dict to Beam Row" >> beam.Map( - lambda row: bigquery_tools.beam_row_from_dict(row, self.schema)) - ) + | "Convert dict to Beam Row" >> + beam.Map(lambda dict_data: self.maybe_add_mutation_info(dict_data))) def with_output_types(self): row_type_hints = bigquery_tools.get_beam_typehints_from_tableschema( self.schema) if self.dynamic_destinations: - type_hint = RowTypeConstraint.from_fields([ - (StorageWriteToBigQuery.DESTINATION, str), - ( - StorageWriteToBigQuery.RECORD, - RowTypeConstraint.from_fields(row_type_hints)) - ]) + fields = [(StorageWriteToBigQuery.DESTINATION, str), + ( + StorageWriteToBigQuery.RECORD, + RowTypeConstraint.from_fields(row_type_hints))] + if callable(self.mutation_info_fn): + fields.append(CDC_INFO_TYPE_HINT) + type_hint = RowTypeConstraint.from_fields(fields) else: - type_hint = RowTypeConstraint.from_fields(row_type_hints) + if callable(self.mutation_info_fn): + type_hint = RowTypeConstraint.from_fields([ + ( + StorageWriteToBigQuery.RECORD, + RowTypeConstraint.from_fields(row_type_hints)), + StorageWriteToBigQuery.CDC_INFO_TYPE_HINT + ]) + else: + type_hint = RowTypeConstraint.from_fields(row_type_hints) return super().with_output_types(type_hint) + def maybe_add_mutation_info_dynamic_destination(self, data_and_dest): + if callable(self.mutation_info_fn): + return beam.Row( + **{ + StorageWriteToBigQuery.DESTINATION: data_and_dest[0], + StorageWriteToBigQuery.RECORD: bigquery_tools. + beam_row_from_dict(data_and_dest[1], self.schema), + StorageWriteToBigQuery.CDC_INFO: bigquery_tools. + beam_row_from_dict( + self.mutation_info_fn(data_and_dest[1]), + StorageWriteToBigQuery.CDC_INFO_SCHEMA) + }) + else: + return beam.Row( + **{ + StorageWriteToBigQuery.DESTINATION: data_and_dest[0], + StorageWriteToBigQuery.RECORD: bigquery_tools. + beam_row_from_dict(data_and_dest[1], self.schema) + }) + + def maybe_add_mutation_info(self, dict_data): + if callable(self.mutation_info_fn): + return beam.Row( + **{ + StorageWriteToBigQuery.RECORD: bigquery_tools. + beam_row_from_dict(dict_data, self.schema), + StorageWriteToBigQuery.CDC_INFO: bigquery_tools. + beam_row_from_dict( + self.mutation_info_fn(dict_data), + StorageWriteToBigQuery.CDC_INFO_SCHEMA) + }) + else: + return bigquery_tools.beam_row_from_dict(dict_data, self.schema) + class ReadFromBigQuery(PTransform): # pylint: disable=line-too-long,W1401 From 9d6ef8bc84868d2916fb1e494f154f22eecd85a5 Mon Sep 17 00:00:00 2001 From: Pablo Rodriguez Defino Date: Fri, 18 Oct 2024 17:17:16 -0700 Subject: [PATCH 04/13] included callable specific type extraction on transforms --- sdks/python/apache_beam/io/gcp/bigquery.py | 30 +++++++++++++++++++--- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index dc744453d49c..45461eccc7f7 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -2593,6 +2593,24 @@ def __init__( self._expansion_service = expansion_service or BeamJarExpansionService( 'sdks:java:io:google-cloud-platform:expansion-service:build') + def extract_cdc_info_fn_rows(self): + cdc_writes = self._use_cdc_writes + if isinstance(cdc_writes, bool): + return None + elif isinstance(cdc_writes, Callable) and callable(cdc_writes): + if cdc_writes.__annotations__.get('return') == beam.pvalue.Row: + return cdc_writes + return None + + def extract_cdc_info_fn_dicts(self): + cdc_writes = self._use_cdc_writes + if isinstance(cdc_writes, bool): + return None + elif isinstance(cdc_writes, Callable) and callable(cdc_writes): + if cdc_writes.__annotations__.get('return') == Dicts: + return cdc_writes + return None + def expand(self, input): if self._schema is None: try: @@ -2620,12 +2638,14 @@ def expand(self, input): if is_rows: input_beam_rows = ( input | "Prepare Beam Row" >> self.PrepareBeamRows( - input.element_type, False).with_output_types()) + input.element_type, False, + self.extract_cdc_info_fn_rows()).with_output_types()) else: input_beam_rows = ( input | "Convert dict to Beam Row" >> self.ConvertToBeamRows( - schema, False).with_output_types()) + schema, False, + self.extract_cdc_info_fn_dicts()).with_output_types()) # For dynamic destinations, we first figure out where each row is going. # Then we send (destination, record) rows over to Java SchemaTransform. @@ -2643,13 +2663,15 @@ def expand(self, input): input_beam_rows = ( input_rows | "Prepare Beam Row" >> self.PrepareBeamRows( - input.element_type, True).with_output_types()) + input.element_type, True, + self.extract_cdc_info_fn_rows()).with_output_types()) # otherwise, convert to Beam Rows else: input_beam_rows = ( input_rows | "Convert dict to Beam Row" >> self.ConvertToBeamRows( - schema, True).with_output_types()) + schema, True, + self.extract_cdc_info_fn_dicts()).with_output_types()) # communicate to Java that this write should use dynamic destinations table = StorageWriteToBigQuery.DYNAMIC_DESTINATIONS From fdac2f77ec6f42c46b1dd0cc945d21c5b7819251 Mon Sep 17 00:00:00 2001 From: Pablo Rodriguez Defino Date: Fri, 18 Oct 2024 17:21:56 -0700 Subject: [PATCH 05/13] fix lint error --- sdks/python/apache_beam/io/gcp/bigquery.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 45461eccc7f7..428e2ae4885a 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -2751,7 +2751,7 @@ def with_output_types(self): fields = [(StorageWriteToBigQuery.DESTINATION, str), (StorageWriteToBigQuery.RECORD, self.element_type)] if callable(self.mutation_info_fn): - fields.append(CDC_INFO_TYPE_HINT) + fields.append(StorageWriteToBigQuery.CDC_INFO_TYPE_HINT) type_hint = RowTypeConstraint.from_fields(fields) else: if callable(self.mutation_info_fn): @@ -2812,7 +2812,7 @@ def with_output_types(self): StorageWriteToBigQuery.RECORD, RowTypeConstraint.from_fields(row_type_hints))] if callable(self.mutation_info_fn): - fields.append(CDC_INFO_TYPE_HINT) + fields.append(StorageWriteToBigQuery.CDC_INFO_TYPE_HINT) type_hint = RowTypeConstraint.from_fields(fields) else: if callable(self.mutation_info_fn): From 84f97b7c33122deecde4c2025a669c93e863a4de Mon Sep 17 00:00:00 2001 From: Pablo Rodriguez Defino Date: Fri, 18 Oct 2024 19:18:40 -0700 Subject: [PATCH 06/13] fixes another lint error --- sdks/python/apache_beam/io/gcp/bigquery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 428e2ae4885a..61699a67fd98 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -2607,7 +2607,7 @@ def extract_cdc_info_fn_dicts(self): if isinstance(cdc_writes, bool): return None elif isinstance(cdc_writes, Callable) and callable(cdc_writes): - if cdc_writes.__annotations__.get('return') == Dicts: + if cdc_writes.__annotations__.get('return') == Dict: return cdc_writes return None From d907c15083996c7c469e34bd364eb3e8c5f079db Mon Sep 17 00:00:00 2001 From: Pablo Rodriguez Defino Date: Fri, 18 Oct 2024 21:19:54 -0700 Subject: [PATCH 07/13] refactor --- sdks/python/apache_beam/io/gcp/bigquery.py | 149 ++++++++++----------- 1 file changed, 73 insertions(+), 76 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 61699a67fd98..819fd6cc8b92 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -2715,6 +2715,58 @@ def expand(self, input): failed_rows=failed_rows, failed_rows_with_errors=failed_rows_with_errors) + @staticmethod + def extract_type_hint( + is_row_data: bool, + dynamic_destination: bool, + element_type, + mutation_info_fn: Callable): + if dynamic_destination: + fields = [(StorageWriteToBigQuery.DESTINATION, str), + ( + StorageWriteToBigQuery.RECORD, + element_type if is_row_data else + RowTypeConstraint.from_fields(element_type))] + if callable(mutation_info_fn): + fields.append(StorageWriteToBigQuery.CDC_INFO_TYPE_HINT) + type_hint = RowTypeConstraint.from_fields(fields) + else: + if callable(mutation_info_fn): + type_hint = RowTypeConstraint.from_fields([ + ( + StorageWriteToBigQuery.RECORD, + element_type if is_row_data else + RowTypeConstraint.from_fields(element_type)), + StorageWriteToBigQuery.CDC_INFO_TYPE_HINT + ]) + else: + type_hint = RowTypeConstraint.from_user_type( + element_type) if is_row_data else RowTypeConstraint.from_fields( + element_type) + return type_hint + + @staticmethod + def maybe_add_mutation_info_dynamic_destination( + destination, data, mutation_info_fn, schema=None): + if callable(mutation_info_fn): + cdc_info = mutation_info_fn(data) + return beam.Row( + **{ + StorageWriteToBigQuery.DESTINATION: destination, + StorageWriteToBigQuery.RECORD: data if not schema else + bigquery_tools.beam_row_from_dict(data_and_dest[1], schema), + StorageWriteToBigQuery.CDC_INFO: cdc_info + if not schema else bigquery_tools.beam_row_from_dict( + cdc_info, StorageWriteToBigQuery.CDC_INFO_SCHEMA) + }) + else: + return beam.Row( + **{ + StorageWriteToBigQuery.DESTINATION: destination, + StorageWriteToBigQuery.RECORD: data if not schema else + bigquery_tools.beam_row_from_dict(data, schema) + }) + class PrepareBeamRows(PTransform): def __init__( self, @@ -2730,8 +2782,9 @@ def expand(self, input_rows): return ( input_rows | "Wrap and maybe include CDC info" >> beam.Map( - lambda data_and_dest: self. - maybe_add_mutation_info_dynamic_destination(data_and_dest))) + lambda data_and_dest: StorageWriteToBigQuery. + maybe_add_mutation_info_dynamic_destination( + data_and_dest[0], data_and_dest[1], self.mutation_info_fn))) else: if callable(self.mutation_info_fn): return ( @@ -2747,38 +2800,12 @@ def expand(self, input_rows): return input_rows def with_output_types(self): - if self.dynamic_destinations: - fields = [(StorageWriteToBigQuery.DESTINATION, str), - (StorageWriteToBigQuery.RECORD, self.element_type)] - if callable(self.mutation_info_fn): - fields.append(StorageWriteToBigQuery.CDC_INFO_TYPE_HINT) - type_hint = RowTypeConstraint.from_fields(fields) - else: - if callable(self.mutation_info_fn): - type_hint = RowTypeConstraint.from_fields([ - (StorageWriteToBigQuery.RECORD, self.element_type), - StorageWriteToBigQuery.CDC_INFO_TYPE_HINT - ]) - else: - type_hint = self.element_type - - return super().with_output_types(type_hint) - - def maybe_add_mutation_info_dynamic_destination(self, row_and_dest): - if callable(self.mutation_info_fn): - return beam.Row( - **{ - StorageWriteToBigQuery.DESTINATION: row_and_dest[0], - StorageWriteToBigQuery.RECORD: row_and_dest[1], - StorageWriteToBigQuery.CDC_INFO: self.mutation_info_fn( - row_and_dest[1]) - }) - else: - return beam.Row( - **{ - StorageWriteToBigQuery.DESTINATION: row_and_dest[0], - StorageWriteToBigQuery.RECORD: row_and_dest[1] - }) + return super().with_output_types( + StorageWriteToBigQuery.extract_type_hint( + True, + self.dynamic_destinations, + self.element_type, + self.mutation_info_fn)) class ConvertToBeamRows(PTransform): def __init__( @@ -2795,8 +2822,12 @@ def expand(self, input_dicts): return ( input_dicts | "Convert dict to Beam Row" >> beam.Map( - lambda data_with_dest: self. - maybe_add_mutation_info_dynamic_destination(data_with_dest))) + lambda data_with_dest: StorageWriteToBigQuery. + maybe_add_mutation_info_dynamic_destination( + data_with_dest[0], + data_with_dest[1], + self.mutation_info_fn, + self.schema))) else: return ( input_dicts @@ -2806,46 +2837,12 @@ def expand(self, input_dicts): def with_output_types(self): row_type_hints = bigquery_tools.get_beam_typehints_from_tableschema( self.schema) - if self.dynamic_destinations: - fields = [(StorageWriteToBigQuery.DESTINATION, str), - ( - StorageWriteToBigQuery.RECORD, - RowTypeConstraint.from_fields(row_type_hints))] - if callable(self.mutation_info_fn): - fields.append(StorageWriteToBigQuery.CDC_INFO_TYPE_HINT) - type_hint = RowTypeConstraint.from_fields(fields) - else: - if callable(self.mutation_info_fn): - type_hint = RowTypeConstraint.from_fields([ - ( - StorageWriteToBigQuery.RECORD, - RowTypeConstraint.from_fields(row_type_hints)), - StorageWriteToBigQuery.CDC_INFO_TYPE_HINT - ]) - else: - type_hint = RowTypeConstraint.from_fields(row_type_hints) - - return super().with_output_types(type_hint) - - def maybe_add_mutation_info_dynamic_destination(self, data_and_dest): - if callable(self.mutation_info_fn): - return beam.Row( - **{ - StorageWriteToBigQuery.DESTINATION: data_and_dest[0], - StorageWriteToBigQuery.RECORD: bigquery_tools. - beam_row_from_dict(data_and_dest[1], self.schema), - StorageWriteToBigQuery.CDC_INFO: bigquery_tools. - beam_row_from_dict( - self.mutation_info_fn(data_and_dest[1]), - StorageWriteToBigQuery.CDC_INFO_SCHEMA) - }) - else: - return beam.Row( - **{ - StorageWriteToBigQuery.DESTINATION: data_and_dest[0], - StorageWriteToBigQuery.RECORD: bigquery_tools. - beam_row_from_dict(data_and_dest[1], self.schema) - }) + return super().with_output_types( + StorageWriteToBigQuery.extract_type_hint( + False, + self.dynamic_destinations, + row_type_hints, + self.mutation_info_fn)) def maybe_add_mutation_info(self, dict_data): if callable(self.mutation_info_fn): From 412ba96e9d50798c7fd36976835d281b2ef8ce62 Mon Sep 17 00:00:00 2001 From: Pablo Rodriguez Defino Date: Fri, 18 Oct 2024 22:25:35 -0700 Subject: [PATCH 08/13] added tests for cdc mutation info function for beam rows and dicts --- .../io/external/xlang_bigqueryio_it_test.py | 132 +++++++++++------- sdks/python/apache_beam/io/gcp/bigquery.py | 10 +- 2 files changed, 93 insertions(+), 49 deletions(-) diff --git a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py index 7f3a16e02aa3..692e0544aee2 100644 --- a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py @@ -26,6 +26,7 @@ import unittest from decimal import Decimal +from typing import Dict import pytest from hamcrest.core import assert_that as hamcrest_assert from hamcrest.core.core.allof import all_of @@ -245,18 +246,56 @@ def test_write_with_beam_rows(self): | StorageWriteToBigQuery(table=table_id)) hamcrest_assert(p, bq_matcher) - def test_write_with_beam_rows_cdc(self): - table = 'write_with_beam_rows_cdc' + EXPECTED_CDC_DATA = [ + # (name, value) + { + "name": "cdc_test", + "value": 5, + } + ] + + def run_and_validate_cdc_writes( + self, table_name, table_id, data, schema, use_cdc_writes): + bq_matcher = BigqueryFullResultMatcher( + project=self.project, + query="SELECT * FROM {}.{}".format(self.dataset_id, table_name), + data=self.parse_expected_data(self.EXPECTED_CDC_DATA)) + + with beam.Pipeline(argv=self.args) as p: + _ = ( + p + | beam.Create(data) + | beam.io.WriteToBigQuery( + table=table_id, + method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, + use_at_least_once=True, + use_cdc_writes=use_cdc_writes, + schema=schema, + primary_key=["name"])) + hamcrest_assert(p, bq_matcher) + + def test_write_with_beam_rows_cdc_info_fn(self): + table = 'write_with_beam_rows_cdc_info_fn' table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table) - expected_data_on_bq = [ - # (name, value) - { - "name": "cdc_test", - "value": 5, - } + rows = [ + beam.Row(name="cdc_test", value=5), beam.Row(name="cdc_test", value=3) ] + def cdc_info(row: beam.Row) -> beam.Row: + if row.value == 3: + csn = 1 + else: + csn = 2 + return beam.Row( + mutation_type="UPSERT", change_sequence_number="AAA/" + str(csn)) + + self.run_and_validate_cdc_writes(table, table_id, rows, None, cdc_info) + + def test_write_with_beam_rows_cdc(self): + table = 'write_with_beam_rows_cdc' + table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table) + rows_with_cdc = [ beam.Row( row_mutation_info=beam.Row( @@ -268,35 +307,12 @@ def test_write_with_beam_rows_cdc(self): record=beam.Row(name="cdc_test", value=3)) ] - bq_matcher = BigqueryFullResultMatcher( - project=self.project, - query="SELECT * FROM {}.{}".format(self.dataset_id, table), - data=self.parse_expected_data(expected_data_on_bq)) - - with beam.Pipeline(argv=self.args) as p: - _ = ( - p - | beam.Create(rows_with_cdc) - | beam.io.WriteToBigQuery( - table=table_id, - method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, - use_at_least_once=True, - use_cdc_writes=True, - primary_key=["name"])) - hamcrest_assert(p, bq_matcher) + self.run_and_validate_cdc_writes(table, table_id, rows_with_cdc, None, True) def test_write_with_dicts_cdc(self): table = 'write_with_dicts_cdc' table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table) - expected_data_on_bq = [ - # (name, value) - { - "name": "cdc_test", - "value": 5, - } - ] - data_with_cdc = [ # record: (name, value) { @@ -349,23 +365,43 @@ def test_write_with_dicts_cdc(self): ] } - bq_matcher = BigqueryFullResultMatcher( - project=self.project, - query="SELECT * FROM {}.{}".format(self.dataset_id, table), - data=self.parse_expected_data(expected_data_on_bq)) + self.run_and_validate_cdc_writes( + table, table_id, data_with_cdc, schema, True) - with beam.Pipeline(argv=self.args) as p: - _ = ( - p - | beam.Create(data_with_cdc) - | beam.io.WriteToBigQuery( - table=table_id, - method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, - use_at_least_once=True, - use_cdc_writes=True, - schema=schema, - primary_key=["name"])) - hamcrest_assert(p, bq_matcher) + def test_write_with_dicts_cdc_info_fn(self): + table = 'write_with_dicts_cdc_info_fn' + table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table) + + data_with_cdc = [ + # record: (name, value) + { + 'name': 'cdc_test', 'value': 5 + }, + { + 'name': 'cdc_test', 'value': 3 + } + ] + + schema = { + "fields": [{ + "name": "name", "type": "STRING" + }, { + "name": "value", "type": "INTEGER" + }] + } + + def cdc_info(data: Dict) -> Dict: + if data["value"] == 3: + csn = 1 + else: + csn = 2 + return { + 'mutation_type': 'UPSERT', + 'change_sequence_number': 'AAA/' + str(csn) + } + + self.run_and_validate_cdc_writes( + table, table_id, data_with_cdc, schema, cdc_info) def test_write_to_dynamic_destinations(self): base_table_spec = '{}.dynamic_dest_'.format(self.dataset_id) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 819fd6cc8b92..cea5afb7cdbb 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -2600,6 +2600,10 @@ def extract_cdc_info_fn_rows(self): elif isinstance(cdc_writes, Callable) and callable(cdc_writes): if cdc_writes.__annotations__.get('return') == beam.pvalue.Row: return cdc_writes + else: + raise TypeError( + "For Beam Row values, while using CDC writes," + + " we expect a Callable[[beam.Row], beam.Row].") return None def extract_cdc_info_fn_dicts(self): @@ -2609,6 +2613,10 @@ def extract_cdc_info_fn_dicts(self): elif isinstance(cdc_writes, Callable) and callable(cdc_writes): if cdc_writes.__annotations__.get('return') == Dict: return cdc_writes + else: + raise TypeError( + "For dictionary values, while using CDC writes," + + " we expect a Callable[[Dict], Dict].") return None def expand(self, input): @@ -2754,7 +2762,7 @@ def maybe_add_mutation_info_dynamic_destination( **{ StorageWriteToBigQuery.DESTINATION: destination, StorageWriteToBigQuery.RECORD: data if not schema else - bigquery_tools.beam_row_from_dict(data_and_dest[1], schema), + bigquery_tools.beam_row_from_dict(data, schema), StorageWriteToBigQuery.CDC_INFO: cdc_info if not schema else bigquery_tools.beam_row_from_dict( cdc_info, StorageWriteToBigQuery.CDC_INFO_SCHEMA) From 50ac61f090bde8980cb6642f9208f7adbfad4395 Mon Sep 17 00:00:00 2001 From: pablo rodriguez defino Date: Fri, 18 Oct 2024 23:36:09 -0700 Subject: [PATCH 09/13] Update xlang_bigqueryio_it_test.py --- sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py index 692e0544aee2..2ce661d9802d 100644 --- a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py @@ -25,8 +25,8 @@ import time import unittest from decimal import Decimal - from typing import Dict + import pytest from hamcrest.core import assert_that as hamcrest_assert from hamcrest.core.core.allof import all_of From 3e4971681d4b3128520f09c0e7474336f48d0579 Mon Sep 17 00:00:00 2001 From: Pablo Rodriguez Defino Date: Mon, 21 Oct 2024 16:22:12 -0700 Subject: [PATCH 10/13] improve cdc muataion info callable instrospection and adding dynamic dest tests --- .../io/external/xlang_bigqueryio_it_test.py | 109 ++++++++++++++---- sdks/python/apache_beam/io/gcp/bigquery.py | 39 +++++-- 2 files changed, 113 insertions(+), 35 deletions(-) diff --git a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py index 2ce661d9802d..b0f400a73ea7 100644 --- a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py @@ -247,13 +247,18 @@ def test_write_with_beam_rows(self): hamcrest_assert(p, bq_matcher) EXPECTED_CDC_DATA = [ - # (name, value) + # (name, value, route) { - "name": "cdc_test", - "value": 5, + "name": "cdc_test", "value": 5, "route": 3 } ] + CDC_ROW_DATA = [ + beam.Row(name="cdc_test", value=5, route=3), + beam.Row(name="cdc_test", value=3, route=3), + beam.Row(name="cdc_test", value=2, route=1) + ] + def run_and_validate_cdc_writes( self, table_name, table_id, data, schema, use_cdc_writes): bq_matcher = BigqueryFullResultMatcher( @@ -278,19 +283,14 @@ def test_write_with_beam_rows_cdc_info_fn(self): table = 'write_with_beam_rows_cdc_info_fn' table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table) - rows = [ - beam.Row(name="cdc_test", value=5), beam.Row(name="cdc_test", value=3) - ] + rows = self.CDC_ROW_DATA - def cdc_info(row: beam.Row) -> beam.Row: - if row.value == 3: - csn = 1 - else: - csn = 2 + def cdc_info_rows(row: beam.Row) -> beam.Row: return beam.Row( - mutation_type="UPSERT", change_sequence_number="AAA/" + str(csn)) + mutation_type="UPSERT", + change_sequence_number="AAA/" + str(row.value + row.route)) - self.run_and_validate_cdc_writes(table, table_id, rows, None, cdc_info) + self.run_and_validate_cdc_writes(table, table_id, rows, None, cdc_info_rows) def test_write_with_beam_rows_cdc(self): table = 'write_with_beam_rows_cdc' @@ -300,11 +300,11 @@ def test_write_with_beam_rows_cdc(self): beam.Row( row_mutation_info=beam.Row( mutation_type="UPSERT", change_sequence_number="AAA/2"), - record=beam.Row(name="cdc_test", value=5)), + record=beam.Row(name="cdc_test", value=5, route=3)), beam.Row( row_mutation_info=beam.Row( mutation_type="UPSERT", change_sequence_number="AAA/1"), - record=beam.Row(name="cdc_test", value=3)) + record=beam.Row(name="cdc_test", value=3, route=1)) ] self.run_and_validate_cdc_writes(table, table_id, rows_with_cdc, None, True) @@ -320,7 +320,7 @@ def test_write_with_dicts_cdc(self): 'mutation_type': 'UPSERT', 'change_sequence_number': 'AAA/2' }, 'record': { - 'name': 'cdc_test', 'value': 5 + 'name': 'cdc_test', 'value': 5, 'route': 3 } }, { @@ -328,7 +328,7 @@ def test_write_with_dicts_cdc(self): 'mutation_type': 'UPSERT', 'change_sequence_number': 'AAA/1' }, 'record': { - 'name': 'cdc_test', 'value': 3 + 'name': 'cdc_test', 'value': 3, 'route': 1 } } ] @@ -360,6 +360,8 @@ def test_write_with_dicts_cdc(self): "name": "name", "type": "STRING" }, { "name": "value", "type": "INTEGER" + }, { + "name": "route", "type": "INTEGER" }] } ] @@ -372,13 +374,13 @@ def test_write_with_dicts_cdc_info_fn(self): table = 'write_with_dicts_cdc_info_fn' table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table) - data_with_cdc = [ + data = [ # record: (name, value) { - 'name': 'cdc_test', 'value': 5 + 'name': 'cdc_test', 'value': 5, 'route': 3 }, { - 'name': 'cdc_test', 'value': 3 + 'name': 'cdc_test', 'value': 3, 'route': 1 } ] @@ -387,10 +389,12 @@ def test_write_with_dicts_cdc_info_fn(self): "name": "name", "type": "STRING" }, { "name": "value", "type": "INTEGER" + }, { + "name": "route", "type": "INTEGER" }] } - def cdc_info(data: Dict) -> Dict: + def cdc_info_fn(data: Dict) -> Dict: if data["value"] == 3: csn = 1 else: @@ -400,8 +404,7 @@ def cdc_info(data: Dict) -> Dict: 'change_sequence_number': 'AAA/' + str(csn) } - self.run_and_validate_cdc_writes( - table, table_id, data_with_cdc, schema, cdc_info) + self.run_and_validate_cdc_writes(table, table_id, data, schema, cdc_info_fn) def test_write_to_dynamic_destinations(self): base_table_spec = '{}.dynamic_dest_'.format(self.dataset_id) @@ -427,6 +430,66 @@ def test_write_to_dynamic_destinations(self): use_at_least_once=False)) hamcrest_assert(p, all_of(*bq_matchers)) + def test_write_to_dynamic_destinations_rows_cdc_fn(self): + table_name_prefix = 'dynamic_dest_cdc_row_' + base_table_spec = '{}.{}'.format(self.dataset_id, table_name_prefix) + spec_with_project = '{}:{}'.format(self.project, base_table_spec) + table = table_name_prefix + "3" + + def cdc_info_rows(row: beam.Row) -> beam.Row: + return beam.Row( + mutation_type="UPSERT", + change_sequence_number="AAA/" + str(row.value + row.route)) + + self.run_and_validate_cdc_writes( + table, + lambda record: spec_with_project + str(record.route), + self.CDC_ROW_DATA, + None, + cdc_info_rows) + + def test_write_to_dynamic_destinations_dicts_cdc_fn(self): + table_name_prefix = 'dynamic_dest_cdc_dict_' + base_table_spec = '{}.{}'.format(self.dataset_id, table_name_prefix) + spec_with_project = '{}:{}'.format(self.project, base_table_spec) + table = table_name_prefix + "3" + + data = [ + # record: (name, value, route) + { + 'name': 'cdc_test', 'value': 5, 'route': 3 + }, + { + 'name': 'cdc_test', 'value': 3, 'route': 3 + }, + { + 'name': 'cdc_test', 'value': 2, 'route': 1 + } + ] + + schema = { + "fields": [{ + "name": "name", "type": "STRING" + }, { + "name": "value", "type": "INTEGER" + }, { + "name": "route", "type": "INTEGER" + }] + } + + def cdc_info_fn(data: Dict) -> Dict: + return { + 'mutation_type': 'UPSERT', + 'change_sequence_number': 'AAA/' + str(data["value"]) + } + + self.run_and_validate_cdc_writes( + table, + lambda data: spec_with_project + str(data["route"]), + data, + schema, + cdc_info_fn) + def test_write_to_dynamic_destinations_with_beam_rows(self): base_table_spec = '{}.dynamic_dest_'.format(self.dataset_id) spec_with_project = '{}:{}'.format(self.project, base_table_spec) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index cea5afb7cdbb..ec9faa6f9029 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -357,6 +357,7 @@ def chain_after(result): import collections import io +import inspect import itertools import json import logging @@ -2593,17 +2594,36 @@ def __init__( self._expansion_service = expansion_service or BeamJarExpansionService( 'sdks:java:io:google-cloud-platform:expansion-service:build') + @staticmethod + def check_and_return_callable(func: Callable, callable_expected_type): + signature = inspect.signature(func) + params = signature.parameters.values() + if len(params) < 1: + raise TypeError( + "Callable for CDC mutation information " + + "should have an input parameter. Received: " + str(signature)) + param = next(iter(params)) + if not param.annotation is callable_expected_type: + raise TypeError( + "For Beam Row values, while using CDC writes," + + " we expect a Callable[[" + str(callable_expected_type) + "], " + + str(callable_expected_type) + "]. " + + "Received a Callable with argument: " + str(param.annotation)) + if not signature.return_annotation is callable_expected_type: + raise TypeError( + "For Beam Row values, while using CDC writes," + + " we expect a Callable[[" + str(callable_expected_type) + "], " + + str(callable_expected_type) + "]. " + + "Received a Callable with return: " + + str(signature.return_annotation)) + return func + def extract_cdc_info_fn_rows(self): cdc_writes = self._use_cdc_writes if isinstance(cdc_writes, bool): return None elif isinstance(cdc_writes, Callable) and callable(cdc_writes): - if cdc_writes.__annotations__.get('return') == beam.pvalue.Row: - return cdc_writes - else: - raise TypeError( - "For Beam Row values, while using CDC writes," + - " we expect a Callable[[beam.Row], beam.Row].") + return self.check_and_return_callable(cdc_writes, beam.pvalue.Row) return None def extract_cdc_info_fn_dicts(self): @@ -2611,12 +2631,7 @@ def extract_cdc_info_fn_dicts(self): if isinstance(cdc_writes, bool): return None elif isinstance(cdc_writes, Callable) and callable(cdc_writes): - if cdc_writes.__annotations__.get('return') == Dict: - return cdc_writes - else: - raise TypeError( - "For dictionary values, while using CDC writes," + - " we expect a Callable[[Dict], Dict].") + return self.check_and_return_callable(cdc_writes, Dict) return None def expand(self, input): From 611c855263ff04c38a22299b41d1d290c6161e62 Mon Sep 17 00:00:00 2001 From: Pablo Rodriguez Defino Date: Mon, 21 Oct 2024 16:22:51 -0700 Subject: [PATCH 11/13] forcing xlang test checks --- .../beam_PostCommit_Python_Xlang_Gcp_Dataflow.json | 2 +- .../trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json index 27c1f3ae26cd..84cf24574f22 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 1 + "modification": 2 } diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json index b26833333238..c537844dc84a 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2 + "modification": 3 } From 16f1ec92e16ebb19795490c6aadb8a98cf299130 Mon Sep 17 00:00:00 2001 From: Pablo Rodriguez Defino Date: Mon, 21 Oct 2024 16:37:20 -0700 Subject: [PATCH 12/13] fix type signature --- sdks/python/apache_beam/io/gcp/bigquery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index ec9faa6f9029..f8250c561d6b 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -1888,7 +1888,7 @@ def _restore_table_ref(sharded_table_ref_elems_kv): # CDC configuration type definition CdcWritesWithRows = Callable[[beam.pvalue.Row], beam.pvalue.Row] CdcWritesWithDicts = Callable[[Dict], Dict] -UseCdcWrites = Union[bool, CdcWritesWithRows, CdcWritesWithRows] +UseCdcWrites = Union[bool, CdcWritesWithRows, CdcWritesWithDicts] class WriteToBigQuery(PTransform): From 58558f6ef3ce1167c24736893795f07b1b3d7788 Mon Sep 17 00:00:00 2001 From: Pablo Rodriguez Defino Date: Mon, 21 Oct 2024 17:27:09 -0700 Subject: [PATCH 13/13] fix lint --- sdks/python/apache_beam/io/gcp/bigquery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index f8250c561d6b..67761fe7a661 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -356,8 +356,8 @@ def chain_after(result): # pytype: skip-file import collections -import io import inspect +import io import itertools import json import logging