diff --git a/sdks/python/apache_beam/ml/rag/enrichment/bigquery_vector_search.py b/sdks/python/apache_beam/ml/rag/enrichment/bigquery_vector_search.py index cf2d0a57912..8a2162104ec 100644 --- a/sdks/python/apache_beam/ml/rag/enrichment/bigquery_vector_search.py +++ b/sdks/python/apache_beam/ml/rag/enrichment/bigquery_vector_search.py @@ -33,35 +33,84 @@ @dataclass class BigQueryVectorSearchParameters: """Parameters for configuring BigQuery vector similarity search. - - This class encapsulates the configuration needed to perform vector similarity - search using BigQuery's VECTOR_SEARCH function. It handles formatting the - query with proper embedding vectors and metadata restrictions. - Args: - table_name: Fully qualified BigQuery table name containing vectors - embedding_column: Column name containing the embedding vectors - columns: List of columns to retrieve from matched vectors - neighbor_count: Number of similar vectors to return (top-k) - metadata_restriction_template: Template string for filtering vectors by - metadata. Use Python string format syntax, e.g. - "metadata.type = '{doc_type}'" - distance_type: Optional distance metric to use. Supported values: - COSINE_DISTANCE (default), EUCLIDEAN_DISTANCE, or DOT_PRODUCT - options: Optional dictionary of additional VECTOR_SEARCH options - - Example: - ```python - params = BigQueryVectorSearchParameters( - table_name='project.dataset.embeddings', - embedding_column='embedding', - columns=['content', 'url', 'date'], - neighbor_count=5, - metadata_restriction_template="type = '{doc_type}'", - distance_type='COSINE_DISTANCE' - ) - ``` - """ + This class encapsulates the configuration needed to perform vector + similarity search using BigQuery's VECTOR_SEARCH function. It handles + formatting the query with proper embedding vectors and metadata + restrictions. + + Example with flattened metadata column: + + Table schema:: + + embedding: ARRAY # Vector embedding + content: STRING # Document content + language: STRING # Direct metadata column + + Code:: + + >>> params = BigQueryVectorSearchParameters( + ... table_name='project.dataset.embeddings', + ... embedding_column='embedding', + ... columns=['content', 'language'], + ... neighbor_count=5, + ... # For column 'language', value comes from + ... # chunk.metadata['language'] + ... metadata_restriction_template="language = '{language}'" + ... ) + >>> # When processing a chunk with metadata={'language': 'en'}, + >>> # generates: WHERE language = 'en' + + Example with nested repeated metadata: + + Table schema:: + + embedding: ARRAY # Vector embedding + content: STRING # Document content + metadata: ARRAY> + + Code:: + + >>> params = BigQueryVectorSearchParameters( + ... table_name='project.dataset.embeddings', + ... embedding_column='embedding', + ... columns=['content', 'metadata'], + ... neighbor_count=5, + ... # check_metadata(field_name, key_to_search, value_from_chunk) + ... metadata_restriction_template=( + ... "check_metadata(metadata, 'language', '{language}')" + ... ) + ... ) + >>> # When processing a chunk with metadata={'language': 'en'}, + >>> # generates: WHERE check_metadata(metadata, 'language', 'en') + >>> # Searches for {key: 'language', value: 'en'} in metadata array + + Args: + table_name: Fully qualified BigQuery table name containing vectors. + embedding_column: Column name containing the embedding vectors. + columns: List of columns to retrieve from matched vectors. + neighbor_count: Number of similar vectors to return (top-k). + metadata_restriction_template: Template string for filtering vectors. + Two formats supported: + + 1. For flattened metadata columns: + ``column_name = '{metadata_key}'`` where column_name is the + BigQuery column and metadata_key is used to get the value from + chunk.metadata[metadata_key]. + 2. For nested repeated metadata (ARRAY>): + ``check_metadata(field_name, 'key_to_match', '{metadata_key}')`` + where field_name is the ARRAY column in BigQuery, + key_to_match is the literal key to search for in the array, and + metadata_key is used to get value from + chunk.metadata[metadata_key]. + + distance_type: Optional distance metric to use. Supported values: + COSINE (default), EUCLIDEAN, or DOT_PRODUCT. + options: Optional dictionary of additional VECTOR_SEARCH options. + """ table_name: str embedding_column: str columns: List[str] @@ -144,6 +193,41 @@ class BigQueryVectorSearchEnrichmentHandler( using the VECTOR_SEARCH function. It supports batching requests for efficiency and preserves the original Chunk metadata while adding the search results. + Example: + >>> from apache_beam.ml.rag.types import Chunk, Content, Embedding + >>> + >>> # Configure vector search + >>> params = BigQueryVectorSearchParameters( + ... table_name='project.dataset.embeddings', + ... embedding_column='embedding', + ... columns=['content', 'metadata'], + ... neighbor_count=2, + ... metadata_restriction_template="language = '{language}'" + ... ) + >>> + >>> # Create handler + >>> handler = BigQueryVectorSearchEnrichmentHandler( + ... project='my-project', + ... vector_search_parameters=params, + ... min_batch_size=100, + ... max_batch_size=1000 + ... ) + >>> + >>> # Use in pipeline + >>> with beam.Pipeline() as p: + ... enriched = ( + ... p + ... | beam.Create([ + ... Chunk( + ... id='query1', + ... embedding=Embedding(dense_embedding=[0.1, 0.2, 0.3]), + ... content=Content(text='test query'), + ... metadata={'language': 'en'} + ... ) + ... ]) + ... | Enrichment(handler) + ... ) + Args: project: GCP project ID containing the BigQuery dataset vector_search_parameters: Configuration for the vector search query @@ -151,24 +235,6 @@ class BigQueryVectorSearchEnrichmentHandler( max_batch_size: Maximum number of chunks to process in one batch **kwargs: Additional arguments passed to bigquery.Client - Example: - ```python - params = BigQueryVectorSearchParameters(...) - handler = BigQueryVectorSearchEnrichmentHandler( - project='my-project', - vector_search_parameters=params, - min_batch_size=100, - max_batch_size=1000 - ) - - with beam.Pipeline() as p: - enriched = ( - p - | beam.Create([chunk1, chunk2]) - | beam.ParDo(handler) - ) - ``` - The handler will: 1. Batch incoming chunks according to batch size parameters 2. Format and execute vector search query for each batch diff --git a/sdks/python/apache_beam/ml/rag/enrichment/bigquery_vector_search_it_test.py b/sdks/python/apache_beam/ml/rag/enrichment/bigquery_vector_search_it_test.py index cabcfd53351..d0cb8c31dd5 100644 --- a/sdks/python/apache_beam/ml/rag/enrichment/bigquery_vector_search_it_test.py +++ b/sdks/python/apache_beam/ml/rag/enrichment/bigquery_vector_search_it_test.py @@ -43,7 +43,7 @@ class BigQueryVectorSearchIT(unittest.TestCase): bigquery_dataset_id = 'python_vector_search_test_' - project = "dataflow-test" + project = "apache-beam-testing" @classmethod def setUpClass(cls):