Skip to content

Commit

Permalink
Fix project in vector search test and pydocs.
Browse files Browse the repository at this point in the history
  • Loading branch information
claudevdm committed Dec 19, 2024
1 parent 3044d1d commit 1ba9ee9
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 47 deletions.
158 changes: 112 additions & 46 deletions sdks/python/apache_beam/ml/rag/enrichment/bigquery_vector_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,35 +33,84 @@
@dataclass
class BigQueryVectorSearchParameters:
"""Parameters for configuring BigQuery vector similarity search.
This class encapsulates the configuration needed to perform vector similarity
search using BigQuery's VECTOR_SEARCH function. It handles formatting the
query with proper embedding vectors and metadata restrictions.
Args:
table_name: Fully qualified BigQuery table name containing vectors
embedding_column: Column name containing the embedding vectors
columns: List of columns to retrieve from matched vectors
neighbor_count: Number of similar vectors to return (top-k)
metadata_restriction_template: Template string for filtering vectors by
metadata. Use Python string format syntax, e.g.
"metadata.type = '{doc_type}'"
distance_type: Optional distance metric to use. Supported values:
COSINE_DISTANCE (default), EUCLIDEAN_DISTANCE, or DOT_PRODUCT
options: Optional dictionary of additional VECTOR_SEARCH options
Example:
```python
params = BigQueryVectorSearchParameters(
table_name='project.dataset.embeddings',
embedding_column='embedding',
columns=['content', 'url', 'date'],
neighbor_count=5,
metadata_restriction_template="type = '{doc_type}'",
distance_type='COSINE_DISTANCE'
)
```
"""
This class encapsulates the configuration needed to perform vector
similarity search using BigQuery's VECTOR_SEARCH function. It handles
formatting the query with proper embedding vectors and metadata
restrictions.
Example with flattened metadata column:
Table schema::
embedding: ARRAY<FLOAT64> # Vector embedding
content: STRING # Document content
language: STRING # Direct metadata column
Code::
>>> params = BigQueryVectorSearchParameters(
... table_name='project.dataset.embeddings',
... embedding_column='embedding',
... columns=['content', 'language'],
... neighbor_count=5,
... # For column 'language', value comes from
... # chunk.metadata['language']
... metadata_restriction_template="language = '{language}'"
... )
>>> # When processing a chunk with metadata={'language': 'en'},
>>> # generates: WHERE language = 'en'
Example with nested repeated metadata:
Table schema::
embedding: ARRAY<FLOAT64> # Vector embedding
content: STRING # Document content
metadata: ARRAY<STRUCT< # Nested repeated metadata
key: STRING,
value: STRING
>>
Code::
>>> params = BigQueryVectorSearchParameters(
... table_name='project.dataset.embeddings',
... embedding_column='embedding',
... columns=['content', 'metadata'],
... neighbor_count=5,
... # check_metadata(field_name, key_to_search, value_from_chunk)
... metadata_restriction_template=(
... "check_metadata(metadata, 'language', '{language}')"
... )
... )
>>> # When processing a chunk with metadata={'language': 'en'},
>>> # generates: WHERE check_metadata(metadata, 'language', 'en')
>>> # Searches for {key: 'language', value: 'en'} in metadata array
Args:
table_name: Fully qualified BigQuery table name containing vectors.
embedding_column: Column name containing the embedding vectors.
columns: List of columns to retrieve from matched vectors.
neighbor_count: Number of similar vectors to return (top-k).
metadata_restriction_template: Template string for filtering vectors.
Two formats supported:
1. For flattened metadata columns:
``column_name = '{metadata_key}'`` where column_name is the
BigQuery column and metadata_key is used to get the value from
chunk.metadata[metadata_key].
2. For nested repeated metadata (ARRAY<STRUCT<key,value>>):
``check_metadata(field_name, 'key_to_match', '{metadata_key}')``
where field_name is the ARRAY<STRUCT> column in BigQuery,
key_to_match is the literal key to search for in the array, and
metadata_key is used to get value from
chunk.metadata[metadata_key].
distance_type: Optional distance metric to use. Supported values:
COSINE (default), EUCLIDEAN, or DOT_PRODUCT.
options: Optional dictionary of additional VECTOR_SEARCH options.
"""
table_name: str
embedding_column: str
columns: List[str]
Expand Down Expand Up @@ -144,31 +193,48 @@ class BigQueryVectorSearchEnrichmentHandler(
using the VECTOR_SEARCH function. It supports batching requests for efficiency
and preserves the original Chunk metadata while adding the search results.
Example:
>>> from apache_beam.ml.rag.types import Chunk, Content, Embedding
>>>
>>> # Configure vector search
>>> params = BigQueryVectorSearchParameters(
... table_name='project.dataset.embeddings',
... embedding_column='embedding',
... columns=['content', 'metadata'],
... neighbor_count=2,
... metadata_restriction_template="language = '{language}'"
... )
>>>
>>> # Create handler
>>> handler = BigQueryVectorSearchEnrichmentHandler(
... project='my-project',
... vector_search_parameters=params,
... min_batch_size=100,
... max_batch_size=1000
... )
>>>
>>> # Use in pipeline
>>> with beam.Pipeline() as p:
... enriched = (
... p
... | beam.Create([
... Chunk(
... id='query1',
... embedding=Embedding(dense_embedding=[0.1, 0.2, 0.3]),
... content=Content(text='test query'),
... metadata={'language': 'en'}
... )
... ])
... | Enrichment(handler)
... )
Args:
project: GCP project ID containing the BigQuery dataset
vector_search_parameters: Configuration for the vector search query
min_batch_size: Minimum number of chunks to batch before processing
max_batch_size: Maximum number of chunks to process in one batch
**kwargs: Additional arguments passed to bigquery.Client
Example:
```python
params = BigQueryVectorSearchParameters(...)
handler = BigQueryVectorSearchEnrichmentHandler(
project='my-project',
vector_search_parameters=params,
min_batch_size=100,
max_batch_size=1000
)
with beam.Pipeline() as p:
enriched = (
p
| beam.Create([chunk1, chunk2])
| beam.ParDo(handler)
)
```
The handler will:
1. Batch incoming chunks according to batch size parameters
2. Format and execute vector search query for each batch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

class BigQueryVectorSearchIT(unittest.TestCase):
bigquery_dataset_id = 'python_vector_search_test_'
project = "dataflow-test"
project = "apache-beam-testing"

@classmethod
def setUpClass(cls):
Expand Down

0 comments on commit 1ba9ee9

Please sign in to comment.