Skip to content

Commit

Permalink
updated code
Browse files Browse the repository at this point in the history
  • Loading branch information
lspataroG committed Jun 27, 2024
1 parent e5c5c83 commit 306aaf7
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 14 deletions.
4 changes: 4 additions & 0 deletions libs/community/langchain_google_community/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
from langchain_google_community.bq_storage_vectorstores.bigquery import (
BigQueryVectorStore,
)
from langchain_google_community.bq_storage_vectorstores.bruteforce import (
BruteForceBQVectorStore,
)
from langchain_google_community.bq_storage_vectorstores.featurestore import (
VertexFSVectorStore,
)
Expand Down Expand Up @@ -40,6 +43,7 @@
"BigQueryLoader",
"BigQueryVectorStore",
"BigQueryVectorSearch",
"BruteForceBQVectorStore",
"CloudVisionLoader",
"CloudVisionParser",
"DocAIParser",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import numpy as np
import pandas as pd
from pydantic import Field
from typing import Any, Dict, List, Optional, Union
from langchain_core.documents import Document
from langchain_core.pydantic_v1 import root_validator
from langchain_google_community.bq_storage_vectorstores._base import (
BaseBigQueryVectorStore,
)
Expand All @@ -10,16 +13,52 @@


class BruteForceBQVectorStore(BaseBigQueryVectorStore):
"""
A Bruteforce local vector store that source the data from a BigQuery Table
def sync_data(self):
self._df = self._query_table_to_df()
self._vectors = np.array(
self._df[self.embedding_field].tolist()
This class is particularly indicated for quick small local prototyping.
Attributes:
embedding: Embedding model for generating and comparing embeddings.
project_id: Google Cloud Project ID where BigQuery resources are located.
dataset_name: BigQuery dataset name.
table_name: BigQuery table name.
location: BigQuery region/location.
content_field: Name of the column storing document content (default: "content").
embedding_field: Name of the column storing text embeddings (default:
"embedding").
doc_id_field: Name of the column storing document IDs (default: "doc_id").
credentials: Optional Google Cloud credentials object.
"""

_df: pd.DataFrame
_vectors: np.array
_vectors_transpose: np.array
_df_records: List[Dict]

@root_validator(pre=False, skip_on_failure=True)
def initialize_bf_vector_index(cls, values: dict) -> dict:
values["_df"] = cls._query_table_to_df(values)
values["_vectors"] = np.array(
values["_df"][values["embedding_field"]].tolist()
)
self._vectors_transpose = self._vectors.T
self._df_records = self._df.drop(
columns=[self.embedding_field]
values["_vectors_transpose"] = values["_vectors"].T
values["_df_records"] = values["_df"].drop(
columns=[values["embedding_field"]]
).to_dict("records")
return values


def sync_data(self):
pass
# self._df = self._query_table_to_df()
# self._vectors = np.array(
# self._df[self.embedding_field].tolist()
# )
# self._vectors_transpose = self._vectors.T
# self._df_records = self._df.drop(
# columns=[self.embedding_field]
# ).to_dict("records")

def _similarity_search_by_vectors_with_scores_and_embeddings(
self,
Expand Down Expand Up @@ -73,30 +112,48 @@ def _similarity_search_by_vectors_with_scores_and_embeddings(
document=doc, filter=filter
):
continue
metadata = doc.copy()
del metadata[self.content_field]

query_docs.append(
[
Document(
page_content=doc[self._vector_store.content_field],
metadata=doc
page_content=doc[self.content_field],
metadata=metadata
),
doc_score,
embedding
]
)
documents.append(query_docs)
return documents

def _query_table_to_df(self):

# def _query_table_to_df(self) -> pd.DataFrame:
# from google.cloud import bigquery
# table = self._full_table_id
# query = f"SELECT * FROM {table}"
# # Create a query job to read the data
# self._logger.info(f"Reading data from {table}. It might take a few minutes...")
# job_config = bigquery.QueryJobConfig(
# use_query_cache=True,
# priority=bigquery.QueryPriority.INTERACTIVE
# )
# query_job = self._bq_client.query(query, job_config=job_config)
# df = query_job.to_dataframe()
# return df

@staticmethod
def _query_table_to_df(values) -> pd.DataFrame:
from google.cloud import bigquery
table = self._full_table_id
table = values["_full_table_id"]
query = f"SELECT * FROM {table}"
# Create a query job to read the data
self._logger.info(f"Reading data from {table}. It might take a few minutes...")
values["_logger"].info(f"Reading data from {table}. It might take a few minutes...")
job_config = bigquery.QueryJobConfig(
use_query_cache=True,
priority=bigquery.QueryPriority.INTERACTIVE
)
query_job = self._bq_client.query(query, job_config=job_config)
query_job = values["_bq_client"].query(query, job_config=job_config)
df = query_job.to_dataframe()
return df

Expand Down

0 comments on commit 306aaf7

Please sign in to comment.