diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 5e73e3450369..c01341f2d4d0 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -1884,7 +1884,8 @@ def _restore_table_ref(sharded_table_ref_elems_kv): # Flag to be passed to WriteToBigQuery to force schema autodetection SCHEMA_AUTODETECT = 'SCHEMA_AUTODETECT' -CdcWritesWithRows = Callable[[beam.Row], beam.Row] +# CDC configuration type definition +CdcWritesWithRows = Callable[[beam.pvalue.Row], beam.pvalue.Row] CdcWritesWithDicts = Callable[[Dict], Dict] UseCdcWrites = Union[bool, CdcWritesWithRows, CdcWritesWithRows] @@ -1935,7 +1936,7 @@ def __init__( load_job_project_id=None, max_insert_payload_size=MAX_INSERT_PAYLOAD_SIZE, num_streaming_keys=DEFAULT_SHARDS_PER_DESTINATION, - use_cdc_writes: bool = False, + use_cdc_writes: UseCdcWrites = False, primary_key: List[str] = None, expansion_service=None): """Initialize a WriteToBigQuery transform. @@ -2555,7 +2556,7 @@ def __init__( use_at_least_once=False, with_auto_sharding=False, num_storage_api_streams=0, - use_cdc_writes: bool = False, + use_cdc_writes: UseCdcWrites = False, primary_key: List[str] = None, expansion_service=None): self._table = table