From bc1fcfb05920e877c798eb93b01bf4e89de1851b Mon Sep 17 00:00:00 2001 From: liferoad Date: Tue, 19 Mar 2024 12:03:08 -0400 Subject: [PATCH] Support ValueProvider for _CustomBigQueryStorageSource (#30662) --- sdks/python/apache_beam/io/gcp/bigquery.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index eb5a4582f516..08698b273b1e 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -1098,7 +1098,16 @@ def estimate_size(self): bq = bigquery_tools.BigQueryWrapper.from_pipeline_options( self.pipeline_options) if self.table_reference is not None: - return self._get_table_size(bq, self.table_reference) + table_ref = self.table_reference + if (isinstance(self.table_reference, vp.ValueProvider) and + self.table_reference.is_accessible()): + table_ref = bigquery_tools.parse_table_reference( + self.table_reference.get(), project=self._get_project()) + elif isinstance(self.table_reference, vp.ValueProvider): + # Size estimation is best effort. We return None as we have + # no access to the table that we're querying. + return None + return self._get_table_size(bq, table_ref) elif self.query is not None and self.query.is_accessible(): query_job_name = bigquery_tools.generate_bq_job_name( self._job_name,