Skip to content

Commit

Permalink
introducing BF vector store
Browse files Browse the repository at this point in the history
  • Loading branch information
lspataroG committed Jun 27, 2024
1 parent b632cb9 commit ee017c3
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -573,3 +573,28 @@ def max_marginal_relevance_search_by_vector(
np.array(embedding), doc_embeddings, lambda_mult=lambda_mult, k=k
)
return [doc_tuples[i][0] for i in mmr_doc_indexes] # type: ignore[index]

def _convert_vector_store(self, vector_store_class, **kwargs: Any) -> Any:
"""
Converts the current object's parameters into another Vector Store instance.
This method combines the base parameters of the current object to create a
`BigQueryVectorStore` or a `VertexFSVectorStore` object.
Args:
**kwargs: Additional keyword arguments to be passed to the `
class constructor. These override any matching
parameters in the base object.
Returns:
BaseBigQueryVectorStore: A child of `BaseBigQueryVectorStore` object ready
for vector search operations.
Raises:
ValueError: If any of the combined parameters are invalid
"""
base_params = self.dict(include=BaseBigQueryVectorStore.__fields__.keys())
base_params["embedding"] = self.embedding
all_params = {**base_params, **kwargs}
bq_obj = vector_store_class(**all_params)
return bq_obj
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def get_documents(
"int_property": 123
}
Returns:
List of ids from adding the texts into the vectorstore.
List output documents.
"""
from google.cloud import bigquery # type: ignore[attr-defined]

Expand Down Expand Up @@ -503,11 +503,32 @@ def to_vertex_fs_vector_store(self, **kwargs: Any) -> Any:
VertexFSVectorStore,
)

base_params = self.dict(include=BaseBigQueryVectorStore.__fields__.keys())
base_params["embedding"] = self.embedding
all_params = {**base_params, **kwargs}
fs_obj = VertexFSVectorStore(**all_params)
return fs_obj
return self._convert_vector_store(VertexFSVectorStore, **kwargs)

def to_bf_vector_store(self, **kwargs: Any) -> Any:
"""
Converts the current object's parameters into a `BigQueryVectorStore` instance.
This method combines the base parameters of the current object to create a
`BigQueryVectorStore` object.
Args:
**kwargs: Additional keyword arguments to be passed to the `
BigQueryVectorStore` constructor. These override any matching
parameters in the base object.
Returns:
BigQueryVectorStore: An initialized `BigQueryVectorStore` object ready
for vector search operations.
Raises:
ValueError: If any of the combined parameters are invalid for initializing
a `BigQueryVectorStore`.
"""
from langchain_google_community.bq_storage_vectorstores.bruteforce import (
BruteForceBQVectorStore,
)
return self._convert_vector_store(BruteForceBQVectorStore, **kwargs)

def job_stats(self, job_id: str) -> Dict:
"""Return the statistics for a single job execution.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import numpy as np
from typing import Any, Dict, List, Optional, Union
from langchain_core.documents import Document
from langchain_google_community.bq_storage_vectorstores._base import (
BaseBigQueryVectorStore,
)
from langchain_google_community.bq_storage_vectorstores.utils import (
doc_match_filter,
)


class BruteForceBQVectorStore(BaseBigQueryVectorStore):

Check failure on line 12 in libs/community/langchain_google_community/bq_storage_vectorstores/bruteforce.py

View workflow job for this annotation

GitHub Actions / cd libs/community / - / make lint #3.8

Ruff (I001)

langchain_google_community/bq_storage_vectorstores/bruteforce.py:1:1: I001 Import block is un-sorted or un-formatted

Check failure on line 12 in libs/community/langchain_google_community/bq_storage_vectorstores/bruteforce.py

View workflow job for this annotation

GitHub Actions / cd libs/community / - / make lint #3.11

Ruff (I001)

langchain_google_community/bq_storage_vectorstores/bruteforce.py:1:1: I001 Import block is un-sorted or un-formatted

def sync_data(self):
self._df = self._query_table_to_df()
self._vectors = np.array(
self._df[self.embedding_field].tolist()
)
self._vectors_transpose = self._vectors.T
self._df_records = self._df.drop(
columns=[self.embedding_field]
).to_dict("records")

def _similarity_search_by_vectors_with_scores_and_embeddings(
self,
embeddings: List[List[float]],
filter: Optional[Dict[str, Any]] = None,
k: int = 5,
batch_size: Union[int, None] = None,
**kwargs: Any,
) -> List[List[List[Any]]]:
"""Performs a similarity search using vector embeddings
This function takes a set of query embeddings and searches for similar documents
It returns the top-k matching documents, along with their similarity scores
and their corresponding embeddings.
Args:
embeddings: A list of lists, where each inner list represents a
query embedding.
filter: (Optional) A dictionary specifying filter criteria for document
on metadata properties, e.g.
{
"str_property": "foo",
"int_property": 123
}
k: The number of top results to return for each query.
batch_size: The size of batches to process embeddings.
Returns:
A list of lists of lists. Each inner list represents the results for a
single query, and contains elements of the form
[Document, score, embedding], where:
- Document: The matching document object.
- score: The similarity score between the query and document.
- embedding: The document's embedding.
"""
num_queries = len(embeddings)
scores = embeddings @ self._vectors_transpose
sorted_indices = np.argsort(-scores)[:, :k]
results = [np.array(self._df_records)[x] for x in sorted_indices]
top_scores = scores[np.arange(num_queries)[:, np.newaxis], sorted_indices]
top_embeddings = self._vectors[sorted_indices]
documents = []
for query_results, query_scores, embeddings_results in zip(
results, top_scores, top_embeddings
):
query_docs = []
for doc, doc_score, embedding in zip(
query_results, query_scores, embeddings_results
):
if filter is not None and not doc_match_filter(
document=doc, filter=filter
):
continue
query_docs.append(
[
Document(
page_content=doc[self._vector_store.content_field],
metadata=doc
),
doc_score,
embedding
]
)
documents.append(query_docs)
return documents

def _query_table_to_df(self):
from google.cloud import bigquery
table = self._full_table_id
query = f"SELECT * FROM {table}"
# Create a query job to read the data
self._logger.info(f"Reading data from {table}. It might take a few minutes...")
job_config = bigquery.QueryJobConfig(
use_query_cache=True,
priority=bigquery.QueryPriority.INTERACTIVE
)
query_job = self._bq_client.query(query, job_config=job_config)
df = query_job.to_dataframe()
return df

def get_documents(
self,
ids: Optional[List[str]],
filter: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Document]:
"""Search documents by their ids or metadata values.
Args:
ids: List of ids of documents to retrieve from the vectorstore.
filter: Filter on metadata properties, e.g.
{
"str_property": "foo",
"int_property": 123
}
Returns:
List output documents.
"""
output_df = self._df[self._df[self.doc_id_field.isin(ids)]]
if filter is not None:
for col_name, col_value in filter.items():
output_df = output_df[output_df[col_name] == output_df[col_value]]
records = output_df.to_dict("records")
output_docs = []
for record in records:
content = record[self.content_field]
del record[self.embedding_field]
del record[self.content_field]
output_docs.append(
Document(page_content=content, metadata=record)
)
return output_docs


def to_bq_vector_store(self, **kwargs: Any) -> Any:
"""
Converts the current object's parameters into a `BigQueryVectorStore` instance.
This method combines the base parameters of the current object to create a
`BigQueryVectorStore` object.
Args:
**kwargs: Additional keyword arguments to be passed to the `
BigQueryVectorStore` constructor. These override any matching
parameters in the base object.
Returns:
BigQueryVectorStore: An initialized `BigQueryVectorStore` object ready
for vector search operations.
Raises:
ValueError: If any of the combined parameters are invalid for initializing
a `BigQueryVectorStore`.
"""
from langchain_google_community.bq_storage_vectorstores.bigquery import (
BigQueryVectorStore,
)
return self._convert_vector_store(BigQueryVectorStore, **kwargs)
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ def get_documents(
"int_property": 123
}
Returns:
List of ids from adding the texts into the vectorstore.
List output documents.
"""
from google.cloud import aiplatform

Expand Down Expand Up @@ -514,12 +514,7 @@ def to_bq_vector_store(self, **kwargs: Any) -> Any:
from langchain_google_community.bq_storage_vectorstores.bigquery import (
BigQueryVectorStore,
)

base_params = self.dict(include=BaseBigQueryVectorStore.__fields__.keys())
base_params["embedding"] = self.embedding
all_params = {**base_params, **kwargs}
bq_obj = BigQueryVectorStore(**all_params)
return bq_obj
return self._convert_vector_store(BigQueryVectorStore, **kwargs)


def _create_online_store(
Expand Down

0 comments on commit ee017c3

Please sign in to comment.