diff --git a/sdks/python/apache_beam/ml/rag/ingestion/bigquery_it_test.py b/sdks/python/apache_beam/ml/rag/ingestion/bigquery_it_test.py index d1ad95acd36..ec69f511642 100644 --- a/sdks/python/apache_beam/ml/rag/ingestion/bigquery_it_test.py +++ b/sdks/python/apache_beam/ml/rag/ingestion/bigquery_it_test.py @@ -175,17 +175,31 @@ def test_streaming_default_schema(self): id="2", embedding=Embedding(dense_embedding=[0.3, 0.4]), content=Content(text="bar"), - metadata={"c": "d"}) + metadata={"c": "d"}), + Chunk( + id="3", + embedding=Embedding(dense_embedding=[0.5, 0.6]), + content=Content(text="foo"), + metadata={"e": "f"}), + Chunk( + id="4", + embedding=Embedding(dense_embedding=[0.7, 0.8]), + content=Content(text="bar"), + metadata={"g": "h"}) ] pipeline_verifiers = [ BigqueryFullResultMatcher( project=self.project, query="SELECT id, content, embedding, metadata FROM %s" % table_id, - data=[("0", "foo", [0.1, 0.2], [{ + data=[("1", "foo", [0.1, 0.2], [{ "key": "a", "value": "b" }]), ("2", "bar", [0.3, 0.4], [{ "key": "c", "value": "d" + }]), ("3", "bar", [0.5, 0.6], [{ + "key": "e", "value": "f" + }]), ("4", "bar", [0.7, 0.8], [{ + "key": "c", "value": "d" }])]) ] args = self.test_pipeline.get_full_options_as_args( @@ -196,7 +210,7 @@ def test_streaming_default_schema(self): with beam.Pipeline(argv=args) as p: _ = ( p - | PeriodicImpulse(0, 4, 1) + | PeriodicImpulse(0, 2, 1) | beam.Map(lambda t: chunks[t]) | config.create_write_transform())