diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 61699a67fd98..819fd6cc8b92 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -2715,6 +2715,58 @@ def expand(self, input): failed_rows=failed_rows, failed_rows_with_errors=failed_rows_with_errors) + @staticmethod + def extract_type_hint( + is_row_data: bool, + dynamic_destination: bool, + element_type, + mutation_info_fn: Callable): + if dynamic_destination: + fields = [(StorageWriteToBigQuery.DESTINATION, str), + ( + StorageWriteToBigQuery.RECORD, + element_type if is_row_data else + RowTypeConstraint.from_fields(element_type))] + if callable(mutation_info_fn): + fields.append(StorageWriteToBigQuery.CDC_INFO_TYPE_HINT) + type_hint = RowTypeConstraint.from_fields(fields) + else: + if callable(mutation_info_fn): + type_hint = RowTypeConstraint.from_fields([ + ( + StorageWriteToBigQuery.RECORD, + element_type if is_row_data else + RowTypeConstraint.from_fields(element_type)), + StorageWriteToBigQuery.CDC_INFO_TYPE_HINT + ]) + else: + type_hint = RowTypeConstraint.from_user_type( + element_type) if is_row_data else RowTypeConstraint.from_fields( + element_type) + return type_hint + + @staticmethod + def maybe_add_mutation_info_dynamic_destination( + destination, data, mutation_info_fn, schema=None): + if callable(mutation_info_fn): + cdc_info = mutation_info_fn(data) + return beam.Row( + **{ + StorageWriteToBigQuery.DESTINATION: destination, + StorageWriteToBigQuery.RECORD: data if not schema else + bigquery_tools.beam_row_from_dict(data_and_dest[1], schema), + StorageWriteToBigQuery.CDC_INFO: cdc_info + if not schema else bigquery_tools.beam_row_from_dict( + cdc_info, StorageWriteToBigQuery.CDC_INFO_SCHEMA) + }) + else: + return beam.Row( + **{ + StorageWriteToBigQuery.DESTINATION: destination, + StorageWriteToBigQuery.RECORD: data if not schema else + bigquery_tools.beam_row_from_dict(data, schema) + }) + class PrepareBeamRows(PTransform): def __init__( self, @@ -2730,8 +2782,9 @@ def expand(self, input_rows): return ( input_rows | "Wrap and maybe include CDC info" >> beam.Map( - lambda data_and_dest: self. - maybe_add_mutation_info_dynamic_destination(data_and_dest))) + lambda data_and_dest: StorageWriteToBigQuery. + maybe_add_mutation_info_dynamic_destination( + data_and_dest[0], data_and_dest[1], self.mutation_info_fn))) else: if callable(self.mutation_info_fn): return ( @@ -2747,38 +2800,12 @@ def expand(self, input_rows): return input_rows def with_output_types(self): - if self.dynamic_destinations: - fields = [(StorageWriteToBigQuery.DESTINATION, str), - (StorageWriteToBigQuery.RECORD, self.element_type)] - if callable(self.mutation_info_fn): - fields.append(StorageWriteToBigQuery.CDC_INFO_TYPE_HINT) - type_hint = RowTypeConstraint.from_fields(fields) - else: - if callable(self.mutation_info_fn): - type_hint = RowTypeConstraint.from_fields([ - (StorageWriteToBigQuery.RECORD, self.element_type), - StorageWriteToBigQuery.CDC_INFO_TYPE_HINT - ]) - else: - type_hint = self.element_type - - return super().with_output_types(type_hint) - - def maybe_add_mutation_info_dynamic_destination(self, row_and_dest): - if callable(self.mutation_info_fn): - return beam.Row( - **{ - StorageWriteToBigQuery.DESTINATION: row_and_dest[0], - StorageWriteToBigQuery.RECORD: row_and_dest[1], - StorageWriteToBigQuery.CDC_INFO: self.mutation_info_fn( - row_and_dest[1]) - }) - else: - return beam.Row( - **{ - StorageWriteToBigQuery.DESTINATION: row_and_dest[0], - StorageWriteToBigQuery.RECORD: row_and_dest[1] - }) + return super().with_output_types( + StorageWriteToBigQuery.extract_type_hint( + True, + self.dynamic_destinations, + self.element_type, + self.mutation_info_fn)) class ConvertToBeamRows(PTransform): def __init__( @@ -2795,8 +2822,12 @@ def expand(self, input_dicts): return ( input_dicts | "Convert dict to Beam Row" >> beam.Map( - lambda data_with_dest: self. - maybe_add_mutation_info_dynamic_destination(data_with_dest))) + lambda data_with_dest: StorageWriteToBigQuery. + maybe_add_mutation_info_dynamic_destination( + data_with_dest[0], + data_with_dest[1], + self.mutation_info_fn, + self.schema))) else: return ( input_dicts @@ -2806,46 +2837,12 @@ def expand(self, input_dicts): def with_output_types(self): row_type_hints = bigquery_tools.get_beam_typehints_from_tableschema( self.schema) - if self.dynamic_destinations: - fields = [(StorageWriteToBigQuery.DESTINATION, str), - ( - StorageWriteToBigQuery.RECORD, - RowTypeConstraint.from_fields(row_type_hints))] - if callable(self.mutation_info_fn): - fields.append(StorageWriteToBigQuery.CDC_INFO_TYPE_HINT) - type_hint = RowTypeConstraint.from_fields(fields) - else: - if callable(self.mutation_info_fn): - type_hint = RowTypeConstraint.from_fields([ - ( - StorageWriteToBigQuery.RECORD, - RowTypeConstraint.from_fields(row_type_hints)), - StorageWriteToBigQuery.CDC_INFO_TYPE_HINT - ]) - else: - type_hint = RowTypeConstraint.from_fields(row_type_hints) - - return super().with_output_types(type_hint) - - def maybe_add_mutation_info_dynamic_destination(self, data_and_dest): - if callable(self.mutation_info_fn): - return beam.Row( - **{ - StorageWriteToBigQuery.DESTINATION: data_and_dest[0], - StorageWriteToBigQuery.RECORD: bigquery_tools. - beam_row_from_dict(data_and_dest[1], self.schema), - StorageWriteToBigQuery.CDC_INFO: bigquery_tools. - beam_row_from_dict( - self.mutation_info_fn(data_and_dest[1]), - StorageWriteToBigQuery.CDC_INFO_SCHEMA) - }) - else: - return beam.Row( - **{ - StorageWriteToBigQuery.DESTINATION: data_and_dest[0], - StorageWriteToBigQuery.RECORD: bigquery_tools. - beam_row_from_dict(data_and_dest[1], self.schema) - }) + return super().with_output_types( + StorageWriteToBigQuery.extract_type_hint( + False, + self.dynamic_destinations, + row_type_hints, + self.mutation_info_fn)) def maybe_add_mutation_info(self, dict_data): if callable(self.mutation_info_fn):