Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: add samples to migrate pinecone to alloy db #292

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
3 changes: 3 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ jobs:
- name: Install Sample requirements
run: pip install -r samples/requirements.txt

- name: Install Migration snippets requirements
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I have been adding all sample reqs to https://github.com/googleapis/langchain-google-alloydb-pg-python/blob/main/samples/requirements.txt so this file doesn't need to be updated. I am also ok with this pattern of adding the new req file to the workflow

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried to follow this snippet in the current version of of snippets

run: pip install -r samples/migrations/requirements.txt

- name: Install module (and test requirements)
run: pip install -e .[test]

Expand Down
129 changes: 129 additions & 0 deletions samples/migrations/alloydb_snippets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#!/usr/bin/env python

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any

from langchain_google_alloydb_pg import AlloyDBEngine


async def acreate_alloydb_client(
project_id: str,
region: str,
cluster: str,
instance: str,
db_name: str,
db_user: str,
db_pwd: str,
) -> AlloyDBEngine:
# [START langchain_alloydb_migration_get_client]
from langchain_google_alloydb_pg import AlloyDBEngine

engine = await AlloyDBEngine.afrom_instance(
project_id=project_id,
region=region,
cluster=cluster,
instance=instance,
database=db_name,
user=db_user,
password=db_pwd,
)
print("Langchain AlloyDB client initiated.")
# [END langchain_alloydb_migration_get_client]
return engine


from langchain_core.embeddings import Embeddings


def get_embeddings_service(
project_id: str, model_name: str = "textembedding-gecko@001"
) -> Embeddings:
# [START langchain_alloydb_migration_embedding_service]
from langchain_google_vertexai import VertexAIEmbeddings

# choice of model determines the vector size
embedding_service = VertexAIEmbeddings(project=project_id, model_name=model_name)
# [END langchain_alloydb_migration_embedding_service]
print("Langchain Vertex AI Embeddings service initiated.")
return embedding_service


def get_fake_embeddings_service(vector_size: int = 768) -> Embeddings:
# [START langchain_alloydb_migration_fake_embedding_service]
from langchain_core.embeddings import FakeEmbeddings

embedding_service = FakeEmbeddings(size=vector_size)
# [END langchain_alloydb_migration_fake_embedding_service]
print("Langchain Fake Embeddings service initiated.")
return embedding_service


async def ainit_vector_store(
engine: AlloyDBEngine,
table_name: str = "alloydb_table",
vector_size: int = 768,
**kwargs: Any,
) -> None:
# [START langchain_create_alloydb_migration_vector_store_table]
await engine.ainit_vectorstore_table(
table_name=table_name,
vector_size=vector_size,
**kwargs,
)
# [END langchain_create_alloydb_migration_vector_store_table]
print("Langchain AlloyDB vector store table initialized.")


from langchain_google_alloydb_pg import AlloyDBEngine, AlloyDBVectorStore


async def aget_vector_store(
engine: AlloyDBEngine,
embeddings_service: Embeddings,
table_name: str = "alloydb_table",
**kwargs: Any,
) -> AlloyDBVectorStore:
# [START langchain_get_alloydb_migration_vector_store]
from langchain_google_alloydb_pg import AlloyDBVectorStore

vector_store = await AlloyDBVectorStore.create(
engine=engine,
embedding_service=embeddings_service,
table_name=table_name,
**kwargs,
)
# [END langchain_get_alloydb_migration_vector_store]
print("Langchain AlloyDB vector store instantiated.")
return vector_store


async def ainsert_data(
vector_store: AlloyDBVectorStore,
texts: list[str],
embeddings: list[list[float]],
metadatas: list[dict[str, Any]],
ids: list[str],
) -> list[str]:
# [START langchain_alloydb_migration_vector_store_insert_data]
inserted_ids = await vector_store.aadd_embeddings(
texts=texts,
embeddings=embeddings,
metadatas=metadatas,
ids=ids,
)
# [END langchain_alloydb_migration_vector_store_insert_data]
print("AlloyDB client inserted the provided data.")
return inserted_ids
178 changes: 178 additions & 0 deletions samples/migrations/migrate_pinecone_vectorstore_to_alloydb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
#!/usr/bin/env python

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import asyncio
from typing import Any, Iterator

"""Migrate PineconeVectorStore to Langchain AlloyDBVectorStore.

Given a pinecone index, the following code fetches the data from pinecone
in batches and uploads to an AlloyDBVectorStore.
"""

# TODO(dev): Replace the values below
PINECONE_API_KEY = "YOUR_API_KEY"
PINECONE_INDEX_NAME = "YOUR_INDEX_NAME"
PROJECT_ID = "YOUR_PROJECT_ID"
REGION = "YOUR_REGION"
CLUSTER = "YOUR_CLUSTER_ID"
INSTANCE = "YOUR_INSTANCE_ID"
DB_NAME = "YOUR_DATABASE_ID"
DB_USER = "YOUR_DATABASE_USERNAME"
DB_PWD = "YOUR_DATABASE_PASSWORD"

# TODO(developer): Optional, change the values below.
PINECONE_NAMESPACE = ""
PINECONE_VECTOR_SIZE = 768
PINECONE_BATCH_SIZE = 10
ALLOYDB_TABLE_NAME = "alloydb_table"
EMBEDDING_MODEL_NAME = "textembedding-gecko@001"

from pinecone import Index # type: ignore


def get_ids_batch(
pinecone_index: Index,
pinecone_namespace: str = PINECONE_NAMESPACE,
pinecone_batch_size: int = PINECONE_BATCH_SIZE,
) -> Iterator[list[str]]:
results = pinecone_index.list_paginated(
prefix="", namespace=pinecone_namespace, limit=pinecone_batch_size
)
ids = [v.id for v in results.vectors]
yield ids

while results.pagination is not None:
pagination_token = results.pagination.next
results = pinecone_index.list_paginated(
prefix="", pagination_token=pagination_token, limit=pinecone_batch_size
)

# Extract and yield the next batch of IDs
ids = [v.id for v in results.vectors]
yield ids
# [END pinecone_get_ids_batch]
print("Pinecone client fetched all ids from index.")


def get_data_batch(
pinecone_index: Index, id_iterator: Iterator[list[str]]
) -> Iterator[tuple[list[str], list[Any], list[str], list[Any]]]:
# [START pinecone_get_data_batch]
# Iterate through the IDs and download their contents
for ids in id_iterator:
# Fetch vectors for the current batch of IDs
all_data = pinecone_index.fetch(ids=ids)
ids = []
embeddings = []
contents = []
metadatas = []

# Process each vector in the current batch
for doc in all_data["vectors"].values():
ids.append(doc["id"])
embeddings.append(doc["values"])
contents.append(str(doc["metadata"]))
metadata = doc["metadata"]
metadatas.append(metadata)

# Yield the current batch of results
yield ids, embeddings, contents, metadatas
# [END pinecone_get_data_batch]
print("Pinecone client fetched all data from index.")


async def main(
pinecone_api_key: str = PINECONE_API_KEY,
pinecone_index_name: str = PINECONE_INDEX_NAME,
pinecone_namespace: str = PINECONE_NAMESPACE,
pinecone_vector_size: int = PINECONE_VECTOR_SIZE,
pinecone_batch_size: int = PINECONE_BATCH_SIZE,
project_id: str = PROJECT_ID,
region: str = REGION,
cluster: str = CLUSTER,
instance: str = INSTANCE,
alloydb_table: str = ALLOYDB_TABLE_NAME,
db_name: str = DB_NAME,
db_user: str = DB_USER,
db_pwd: str = DB_PWD,
) -> None:
# [START pinecone_get_client]
from pinecone import Pinecone, ServerlessSpec # type: ignore

pinecone_client = Pinecone(
api_key=pinecone_api_key,
spec=ServerlessSpec(cloud="aws", region="us-east-1"),
)
# [END pinecone_get_client]
print("Pinecone client initiated.")

# [START pinecone_get_index]
pinecone_index = pinecone_client.Index(pinecone_index_name)
# [END pinecone_get_index]
print("Pinecone index reference initiated.")

from alloydb_snippets import acreate_alloydb_client

alloydb_engine = await acreate_alloydb_client(
project_id=project_id,
region=region,
cluster=cluster,
instance=instance,
db_name=db_name,
db_user=db_user,
db_pwd=db_pwd,
)

# [START pinecone_alloydb_migration_get_alloydb_vectorstore]
from alloydb_snippets import aget_vector_store, get_embeddings_service

await alloydb_engine.ainit_vectorstore_table(
table_name=alloydb_table,
vector_size=pinecone_vector_size,
overwrite_existing=True,
)

embeddings_service = get_embeddings_service(
project_id, model_name=EMBEDDING_MODEL_NAME
)
vs = await aget_vector_store(
engine=alloydb_engine,
embeddings_service=embeddings_service,
table_name=alloydb_table,
)
# [END pinecone_alloydb_migration_get_alloydb_vectorstore]
print("Pinecone migration AlloyDBVectorStore table created.")

id_iterator = get_ids_batch(pinecone_index, pinecone_namespace, pinecone_batch_size)
for ids, embeddings, contents, metadatas in get_data_batch(
pinecone_index=pinecone_index,
id_iterator=id_iterator,
):
# [START pinecone_alloydb_migration_insert_data_batch]
inserted_ids = await vs.aadd_embeddings(
texts=contents,
embeddings=embeddings,
metadatas=metadatas,
ids=ids,
)
# [END pinecone_alloydb_migration_insert_data_batch]
print("Migration completed, inserted all the batches of data to AlloyDB.")


if __name__ == "__main__":
asyncio.run(main())
Loading
Loading