diff --git a/libs/community/langchain_google_community/__init__.py b/libs/community/langchain_google_community/__init__.py index 0a1d7015..8d8b6c65 100644 --- a/libs/community/langchain_google_community/__init__.py +++ b/libs/community/langchain_google_community/__init__.py @@ -3,6 +3,9 @@ from langchain_google_community.bq_storage_vectorstores.bigquery import ( BigQueryVectorStore, ) +from langchain_google_community.bq_storage_vectorstores.bruteforce import ( + BruteForceBQVectorStore, +) from langchain_google_community.bq_storage_vectorstores.featurestore import ( VertexFSVectorStore, ) @@ -40,6 +43,7 @@ "BigQueryLoader", "BigQueryVectorStore", "BigQueryVectorSearch", + "BruteForceBQVectorStore", "CloudVisionLoader", "CloudVisionParser", "DocAIParser", diff --git a/libs/community/langchain_google_community/bq_storage_vectorstores/bruteforce.py b/libs/community/langchain_google_community/bq_storage_vectorstores/bruteforce.py index b352c3ee..19e4c894 100644 --- a/libs/community/langchain_google_community/bq_storage_vectorstores/bruteforce.py +++ b/libs/community/langchain_google_community/bq_storage_vectorstores/bruteforce.py @@ -1,6 +1,9 @@ import numpy as np +import pandas as pd +from pydantic import Field from typing import Any, Dict, List, Optional, Union from langchain_core.documents import Document +from langchain_core.pydantic_v1 import root_validator from langchain_google_community.bq_storage_vectorstores._base import ( BaseBigQueryVectorStore, ) @@ -10,16 +13,52 @@ class BruteForceBQVectorStore(BaseBigQueryVectorStore): + """ + A Bruteforce local vector store that source the data from a BigQuery Table - def sync_data(self): - self._df = self._query_table_to_df() - self._vectors = np.array( - self._df[self.embedding_field].tolist() + This class is particularly indicated for quick small local prototyping. + + Attributes: + embedding: Embedding model for generating and comparing embeddings. + project_id: Google Cloud Project ID where BigQuery resources are located. + dataset_name: BigQuery dataset name. + table_name: BigQuery table name. + location: BigQuery region/location. + content_field: Name of the column storing document content (default: "content"). + embedding_field: Name of the column storing text embeddings (default: + "embedding"). + doc_id_field: Name of the column storing document IDs (default: "doc_id"). + credentials: Optional Google Cloud credentials object. + """ + + _df: pd.DataFrame + _vectors: np.array + _vectors_transpose: np.array + _df_records: List[Dict] + + @root_validator(pre=False, skip_on_failure=True) + def initialize_bf_vector_index(cls, values: dict) -> dict: + values["_df"] = cls._query_table_to_df(values) + values["_vectors"] = np.array( + values["_df"][values["embedding_field"]].tolist() ) - self._vectors_transpose = self._vectors.T - self._df_records = self._df.drop( - columns=[self.embedding_field] + values["_vectors_transpose"] = values["_vectors"].T + values["_df_records"] = values["_df"].drop( + columns=[values["embedding_field"]] ).to_dict("records") + return values + + + def sync_data(self): + pass + # self._df = self._query_table_to_df() + # self._vectors = np.array( + # self._df[self.embedding_field].tolist() + # ) + # self._vectors_transpose = self._vectors.T + # self._df_records = self._df.drop( + # columns=[self.embedding_field] + # ).to_dict("records") def _similarity_search_by_vectors_with_scores_and_embeddings( self, @@ -73,11 +112,14 @@ def _similarity_search_by_vectors_with_scores_and_embeddings( document=doc, filter=filter ): continue + metadata = doc.copy() + del metadata[self.content_field] + query_docs.append( [ Document( - page_content=doc[self._vector_store.content_field], - metadata=doc + page_content=doc[self.content_field], + metadata=metadata ), doc_score, embedding @@ -85,18 +127,33 @@ def _similarity_search_by_vectors_with_scores_and_embeddings( ) documents.append(query_docs) return documents - - def _query_table_to_df(self): + + # def _query_table_to_df(self) -> pd.DataFrame: + # from google.cloud import bigquery + # table = self._full_table_id + # query = f"SELECT * FROM {table}" + # # Create a query job to read the data + # self._logger.info(f"Reading data from {table}. It might take a few minutes...") + # job_config = bigquery.QueryJobConfig( + # use_query_cache=True, + # priority=bigquery.QueryPriority.INTERACTIVE + # ) + # query_job = self._bq_client.query(query, job_config=job_config) + # df = query_job.to_dataframe() + # return df + + @staticmethod + def _query_table_to_df(values) -> pd.DataFrame: from google.cloud import bigquery - table = self._full_table_id + table = values["_full_table_id"] query = f"SELECT * FROM {table}" # Create a query job to read the data - self._logger.info(f"Reading data from {table}. It might take a few minutes...") + values["_logger"].info(f"Reading data from {table}. It might take a few minutes...") job_config = bigquery.QueryJobConfig( use_query_cache=True, priority=bigquery.QueryPriority.INTERACTIVE ) - query_job = self._bq_client.query(query, job_config=job_config) + query_job = values["_bq_client"].query(query, job_config=job_config) df = query_job.to_dataframe() return df