Skip to content

Commit

Permalink
adding xlang IT with BQ
Browse files Browse the repository at this point in the history
  • Loading branch information
prodriguezdefino committed Sep 22, 2024
1 parent e804618 commit 7b7255b
Showing 1 changed file with 72 additions and 0 deletions.
72 changes: 72 additions & 0 deletions sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,27 @@ def run_storage_write_test(
use_at_least_once=use_at_least_once))
hamcrest_assert(p, bq_matcher)

def create_table_for_cdc(self, table_name):
table_schema = bigquery.TableSchema()
table_field = bigquery.TableFieldSchema()
table_field.name = 'name'
table_field.type = 'STRING'
table_field.mode = 'REQUIRED'
table_schema.fields.append(table_field)
table_field = bigquery.TableFieldSchema()
table_field.name = 'value'
table_field.type = 'INT64'
table_schema.fields.append(table_field)
table = bigquery.Table(
tableReference=bigquery.TableReference(
projectId=self.project,
datasetId=self.dataset_id,
tableId=table_name),
schema=table_schema)
request = bigquery.BigqueryTablesInsertRequest(
projectId=self.project, datasetId=self.dataset_id, table=table)
self.bigquery_client.client.tables.Insert(request)

def test_all_types(self):
table_name = "all_types"
schema = self.ALL_TYPES_SCHEMA
Expand Down Expand Up @@ -245,6 +266,57 @@ def test_write_with_beam_rows(self):
| StorageWriteToBigQuery(table=table_id))
hamcrest_assert(p, bq_matcher)

def test_write_with_beam_rows_cdc(self):
table = 'write_with_beam_rows_cdc'
table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table)

create_table_for_cdc(table)

expected_data_on_bq = [
# (name, value)
{
"name": "cdc_test",
"value": 5,
}
]

rows_with_cdc = [
beam.Row(
cdc_info=beam.Row(
mutation_type="UPSERT",
change_sequence_number="AAA/2"
),
record=beam.Row(
name="cdc_test",
value=5
)),
beam.Row(
cdc_info=beam.Row(
mutation_type="UPSERT",
change_sequence_number="AAA/1"
),
record=beam.Row(
name="cdc_test",
value=3
))
]

bq_matcher = BigqueryFullResultMatcher(
project=self.project,
query="SELECT * FROM {}.{}".format(self.dataset_id, table),
data=self.parse_expected_data(expected_data_on_bq))

with beam.Pipeline(argv=self.args) as p:
_ = (
p
| beam.Create(rows_with_cdc)
| StorageWriteToBigQuery(
table=table_id,
create_disposition="CREATE_NEVER",
use_cdc_writes_with_primary_key=(primaryKeyColumns)
))
hamcrest_assert(p, bq_matcher)

def test_write_to_dynamic_destinations(self):
base_table_spec = '{}.dynamic_dest_'.format(self.dataset_id)
spec_with_project = '{}:{}'.format(self.project, base_table_spec)
Expand Down

0 comments on commit 7b7255b

Please sign in to comment.