From ee017c38a7a4c48af57fdb6ddf4d235d726bfb11 Mon Sep 17 00:00:00 2001 From: Lorenzo Spataro Date: Thu, 27 Jun 2024 09:07:17 +0200 Subject: [PATCH] introducing BF vector store --- .../bq_storage_vectorstores/_base.py | 25 +++ .../bq_storage_vectorstores/bigquery.py | 33 +++- .../bq_storage_vectorstores/bruteforce.py | 159 ++++++++++++++++++ .../bq_storage_vectorstores/featurestore.py | 9 +- 4 files changed, 213 insertions(+), 13 deletions(-) create mode 100644 libs/community/langchain_google_community/bq_storage_vectorstores/bruteforce.py diff --git a/libs/community/langchain_google_community/bq_storage_vectorstores/_base.py b/libs/community/langchain_google_community/bq_storage_vectorstores/_base.py index b69d432f..bd7ff9e2 100644 --- a/libs/community/langchain_google_community/bq_storage_vectorstores/_base.py +++ b/libs/community/langchain_google_community/bq_storage_vectorstores/_base.py @@ -573,3 +573,28 @@ def max_marginal_relevance_search_by_vector( np.array(embedding), doc_embeddings, lambda_mult=lambda_mult, k=k ) return [doc_tuples[i][0] for i in mmr_doc_indexes] # type: ignore[index] + + def _convert_vector_store(self, vector_store_class, **kwargs: Any) -> Any: + """ + Converts the current object's parameters into another Vector Store instance. + + This method combines the base parameters of the current object to create a + `BigQueryVectorStore` or a `VertexFSVectorStore` object. + + Args: + **kwargs: Additional keyword arguments to be passed to the ` + class constructor. These override any matching + parameters in the base object. + + Returns: + BaseBigQueryVectorStore: A child of `BaseBigQueryVectorStore` object ready + for vector search operations. + + Raises: + ValueError: If any of the combined parameters are invalid + """ + base_params = self.dict(include=BaseBigQueryVectorStore.__fields__.keys()) + base_params["embedding"] = self.embedding + all_params = {**base_params, **kwargs} + bq_obj = vector_store_class(**all_params) + return bq_obj diff --git a/libs/community/langchain_google_community/bq_storage_vectorstores/bigquery.py b/libs/community/langchain_google_community/bq_storage_vectorstores/bigquery.py index 00045e15..d0c44943 100644 --- a/libs/community/langchain_google_community/bq_storage_vectorstores/bigquery.py +++ b/libs/community/langchain_google_community/bq_storage_vectorstores/bigquery.py @@ -76,7 +76,7 @@ def get_documents( "int_property": 123 } Returns: - List of ids from adding the texts into the vectorstore. + List output documents. """ from google.cloud import bigquery # type: ignore[attr-defined] @@ -503,11 +503,32 @@ def to_vertex_fs_vector_store(self, **kwargs: Any) -> Any: VertexFSVectorStore, ) - base_params = self.dict(include=BaseBigQueryVectorStore.__fields__.keys()) - base_params["embedding"] = self.embedding - all_params = {**base_params, **kwargs} - fs_obj = VertexFSVectorStore(**all_params) - return fs_obj + return self._convert_vector_store(VertexFSVectorStore, **kwargs) + + def to_bf_vector_store(self, **kwargs: Any) -> Any: + """ + Converts the current object's parameters into a `BigQueryVectorStore` instance. + + This method combines the base parameters of the current object to create a + `BigQueryVectorStore` object. + + Args: + **kwargs: Additional keyword arguments to be passed to the ` + BigQueryVectorStore` constructor. These override any matching + parameters in the base object. + + Returns: + BigQueryVectorStore: An initialized `BigQueryVectorStore` object ready + for vector search operations. + + Raises: + ValueError: If any of the combined parameters are invalid for initializing + a `BigQueryVectorStore`. + """ + from langchain_google_community.bq_storage_vectorstores.bruteforce import ( + BruteForceBQVectorStore, + ) + return self._convert_vector_store(BruteForceBQVectorStore, **kwargs) def job_stats(self, job_id: str) -> Dict: """Return the statistics for a single job execution. diff --git a/libs/community/langchain_google_community/bq_storage_vectorstores/bruteforce.py b/libs/community/langchain_google_community/bq_storage_vectorstores/bruteforce.py new file mode 100644 index 00000000..4b3cbd29 --- /dev/null +++ b/libs/community/langchain_google_community/bq_storage_vectorstores/bruteforce.py @@ -0,0 +1,159 @@ +import numpy as np +from typing import Any, Dict, List, Optional, Union +from langchain_core.documents import Document +from langchain_google_community.bq_storage_vectorstores._base import ( + BaseBigQueryVectorStore, +) +from langchain_google_community.bq_storage_vectorstores.utils import ( + doc_match_filter, +) + + +class BruteForceBQVectorStore(BaseBigQueryVectorStore): + + def sync_data(self): + self._df = self._query_table_to_df() + self._vectors = np.array( + self._df[self.embedding_field].tolist() + ) + self._vectors_transpose = self._vectors.T + self._df_records = self._df.drop( + columns=[self.embedding_field] + ).to_dict("records") + + def _similarity_search_by_vectors_with_scores_and_embeddings( + self, + embeddings: List[List[float]], + filter: Optional[Dict[str, Any]] = None, + k: int = 5, + batch_size: Union[int, None] = None, + **kwargs: Any, + ) -> List[List[List[Any]]]: + """Performs a similarity search using vector embeddings + + This function takes a set of query embeddings and searches for similar documents + It returns the top-k matching documents, along with their similarity scores + and their corresponding embeddings. + + Args: + embeddings: A list of lists, where each inner list represents a + query embedding. + filter: (Optional) A dictionary specifying filter criteria for document + on metadata properties, e.g. + { + "str_property": "foo", + "int_property": 123 + } + k: The number of top results to return for each query. + batch_size: The size of batches to process embeddings. + + Returns: + A list of lists of lists. Each inner list represents the results for a + single query, and contains elements of the form + [Document, score, embedding], where: + - Document: The matching document object. + - score: The similarity score between the query and document. + - embedding: The document's embedding. + """ + num_queries = len(embeddings) + scores = embeddings @ self._vectors_transpose + sorted_indices = np.argsort(-scores)[:, :k] + results = [np.array(self._df_records)[x] for x in sorted_indices] + top_scores = scores[np.arange(num_queries)[:, np.newaxis], sorted_indices] + top_embeddings = self._vectors[sorted_indices] + documents = [] + for query_results, query_scores, embeddings_results in zip( + results, top_scores, top_embeddings + ): + query_docs = [] + for doc, doc_score, embedding in zip( + query_results, query_scores, embeddings_results + ): + if filter is not None and not doc_match_filter( + document=doc, filter=filter + ): + continue + query_docs.append( + [ + Document( + page_content=doc[self._vector_store.content_field], + metadata=doc + ), + doc_score, + embedding + ] + ) + documents.append(query_docs) + return documents + + def _query_table_to_df(self): + from google.cloud import bigquery + table = self._full_table_id + query = f"SELECT * FROM {table}" + # Create a query job to read the data + self._logger.info(f"Reading data from {table}. It might take a few minutes...") + job_config = bigquery.QueryJobConfig( + use_query_cache=True, + priority=bigquery.QueryPriority.INTERACTIVE + ) + query_job = self._bq_client.query(query, job_config=job_config) + df = query_job.to_dataframe() + return df + + def get_documents( + self, + ids: Optional[List[str]], + filter: Optional[Dict[str, Any]] = None, + **kwargs: Any, + ) -> List[Document]: + """Search documents by their ids or metadata values. + Args: + ids: List of ids of documents to retrieve from the vectorstore. + filter: Filter on metadata properties, e.g. + { + "str_property": "foo", + "int_property": 123 + } + Returns: + List output documents. + """ + output_df = self._df[self._df[self.doc_id_field.isin(ids)]] + if filter is not None: + for col_name, col_value in filter.items(): + output_df = output_df[output_df[col_name] == output_df[col_value]] + records = output_df.to_dict("records") + output_docs = [] + for record in records: + content = record[self.content_field] + del record[self.embedding_field] + del record[self.content_field] + output_docs.append( + Document(page_content=content, metadata=record) + ) + return output_docs + + + def to_bq_vector_store(self, **kwargs: Any) -> Any: + """ + Converts the current object's parameters into a `BigQueryVectorStore` instance. + + This method combines the base parameters of the current object to create a + `BigQueryVectorStore` object. + + Args: + **kwargs: Additional keyword arguments to be passed to the ` + BigQueryVectorStore` constructor. These override any matching + parameters in the base object. + + Returns: + BigQueryVectorStore: An initialized `BigQueryVectorStore` object ready + for vector search operations. + + Raises: + ValueError: If any of the combined parameters are invalid for initializing + a `BigQueryVectorStore`. + """ + from langchain_google_community.bq_storage_vectorstores.bigquery import ( + BigQueryVectorStore, + ) + return self._convert_vector_store(BigQueryVectorStore, **kwargs) \ No newline at end of file diff --git a/libs/community/langchain_google_community/bq_storage_vectorstores/featurestore.py b/libs/community/langchain_google_community/bq_storage_vectorstores/featurestore.py index 63538532..c45d37fd 100644 --- a/libs/community/langchain_google_community/bq_storage_vectorstores/featurestore.py +++ b/libs/community/langchain_google_community/bq_storage_vectorstores/featurestore.py @@ -277,7 +277,7 @@ def get_documents( "int_property": 123 } Returns: - List of ids from adding the texts into the vectorstore. + List output documents. """ from google.cloud import aiplatform @@ -514,12 +514,7 @@ def to_bq_vector_store(self, **kwargs: Any) -> Any: from langchain_google_community.bq_storage_vectorstores.bigquery import ( BigQueryVectorStore, ) - - base_params = self.dict(include=BaseBigQueryVectorStore.__fields__.keys()) - base_params["embedding"] = self.embedding - all_params = {**base_params, **kwargs} - bq_obj = BigQueryVectorStore(**all_params) - return bq_obj + return self._convert_vector_store(BigQueryVectorStore, **kwargs) def _create_online_store(