Skip to content

Commit

Permalink
Fix streaming test.
Browse files Browse the repository at this point in the history
  • Loading branch information
claudevdm committed Dec 19, 2024
1 parent 3774dca commit b2ad7ac
Showing 1 changed file with 17 additions and 3 deletions.
20 changes: 17 additions & 3 deletions sdks/python/apache_beam/ml/rag/ingestion/bigquery_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,17 +175,31 @@ def test_streaming_default_schema(self):
id="2",
embedding=Embedding(dense_embedding=[0.3, 0.4]),
content=Content(text="bar"),
metadata={"c": "d"})
metadata={"c": "d"}),
Chunk(
id="3",
embedding=Embedding(dense_embedding=[0.5, 0.6]),
content=Content(text="foo"),
metadata={"e": "f"}),
Chunk(
id="4",
embedding=Embedding(dense_embedding=[0.7, 0.8]),
content=Content(text="bar"),
metadata={"g": "h"})
]

pipeline_verifiers = [
BigqueryFullResultMatcher(
project=self.project,
query="SELECT id, content, embedding, metadata FROM %s" % table_id,
data=[("0", "foo", [0.1, 0.2], [{
data=[("1", "foo", [0.1, 0.2], [{
"key": "a", "value": "b"
}]), ("2", "bar", [0.3, 0.4], [{
"key": "c", "value": "d"
}]), ("3", "bar", [0.5, 0.6], [{
"key": "e", "value": "f"
}]), ("4", "bar", [0.7, 0.8], [{
"key": "c", "value": "d"
}])])
]
args = self.test_pipeline.get_full_options_as_args(
Expand All @@ -196,7 +210,7 @@ def test_streaming_default_schema(self):
with beam.Pipeline(argv=args) as p:
_ = (
p
| PeriodicImpulse(0, 4, 1)
| PeriodicImpulse(0, 2, 1)
| beam.Map(lambda t: chunks[t])
| config.create_write_transform())

Expand Down

0 comments on commit b2ad7ac

Please sign in to comment.