diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 7221db90..1d9e2b4b 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -44,6 +44,9 @@ jobs: - name: Install Sample requirements run: pip install -r samples/requirements.txt + - name: Install Migration snippets requirements + run: pip install -r samples/migrations/requirements.txt + - name: Install module (and test requirements) run: pip install -e .[test] diff --git a/samples/migrations/alloydb_snippets.py b/samples/migrations/alloydb_snippets.py new file mode 100644 index 00000000..22dc3f19 --- /dev/null +++ b/samples/migrations/alloydb_snippets.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python + +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any + +from langchain_google_alloydb_pg import AlloyDBEngine + + +async def acreate_alloydb_client( + project_id: str, + region: str, + cluster: str, + instance: str, + db_name: str, + db_user: str, + db_pwd: str, +) -> AlloyDBEngine: + # [START langchain_alloydb_migration_get_client] + from langchain_google_alloydb_pg import AlloyDBEngine + + engine = await AlloyDBEngine.afrom_instance( + project_id=project_id, + region=region, + cluster=cluster, + instance=instance, + database=db_name, + user=db_user, + password=db_pwd, + ) + print("Langchain AlloyDB client initiated.") + # [END langchain_alloydb_migration_get_client] + return engine + + +from langchain_core.embeddings import Embeddings + + +def get_embeddings_service( + project_id: str, model_name: str = "textembedding-gecko@001" +) -> Embeddings: + # [START langchain_alloydb_migration_embedding_service] + from langchain_google_vertexai import VertexAIEmbeddings + + # choice of model determines the vector size + embedding_service = VertexAIEmbeddings(project=project_id, model_name=model_name) + # [END langchain_alloydb_migration_embedding_service] + print("Langchain Vertex AI Embeddings service initiated.") + return embedding_service + + +def get_fake_embeddings_service(vector_size: int = 768) -> Embeddings: + # [START langchain_alloydb_migration_fake_embedding_service] + from langchain_core.embeddings import FakeEmbeddings + + embedding_service = FakeEmbeddings(size=vector_size) + # [END langchain_alloydb_migration_fake_embedding_service] + print("Langchain Fake Embeddings service initiated.") + return embedding_service + + +async def ainit_vector_store( + engine: AlloyDBEngine, + table_name: str = "alloydb_table", + vector_size: int = 768, + **kwargs: Any, +) -> None: + # [START langchain_create_alloydb_migration_vector_store_table] + await engine.ainit_vectorstore_table( + table_name=table_name, + vector_size=vector_size, + **kwargs, + ) + # [END langchain_create_alloydb_migration_vector_store_table] + print("Langchain AlloyDB vector store table initialized.") + + +from langchain_google_alloydb_pg import AlloyDBEngine, AlloyDBVectorStore + + +async def aget_vector_store( + engine: AlloyDBEngine, + embeddings_service: Embeddings, + table_name: str = "alloydb_table", + **kwargs: Any, +) -> AlloyDBVectorStore: + # [START langchain_get_alloydb_migration_vector_store] + from langchain_google_alloydb_pg import AlloyDBVectorStore + + vector_store = await AlloyDBVectorStore.create( + engine=engine, + embedding_service=embeddings_service, + table_name=table_name, + **kwargs, + ) + # [END langchain_get_alloydb_migration_vector_store] + print("Langchain AlloyDB vector store instantiated.") + return vector_store + + +async def ainsert_data( + vector_store: AlloyDBVectorStore, + texts: list[str], + embeddings: list[list[float]], + metadatas: list[dict[str, Any]], + ids: list[str], +) -> list[str]: + # [START langchain_alloydb_migration_vector_store_insert_data] + inserted_ids = await vector_store.aadd_embeddings( + texts=texts, + embeddings=embeddings, + metadatas=metadatas, + ids=ids, + ) + # [END langchain_alloydb_migration_vector_store_insert_data] + print("AlloyDB client inserted the provided data.") + return inserted_ids diff --git a/samples/migrations/migrate_pinecone_vectorstore_to_alloydb.py b/samples/migrations/migrate_pinecone_vectorstore_to_alloydb.py new file mode 100644 index 00000000..92c33c45 --- /dev/null +++ b/samples/migrations/migrate_pinecone_vectorstore_to_alloydb.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python + +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import asyncio +from typing import Any, Iterator + +"""Migrate PineconeVectorStore to Langchain AlloyDBVectorStore. + +Given a pinecone index, the following code fetches the data from pinecone +in batches and uploads to an AlloyDBVectorStore. +""" + +# TODO(dev): Replace the values below +PINECONE_API_KEY = "YOUR_API_KEY" +PINECONE_INDEX_NAME = "YOUR_INDEX_NAME" +PROJECT_ID = "YOUR_PROJECT_ID" +REGION = "YOUR_REGION" +CLUSTER = "YOUR_CLUSTER_ID" +INSTANCE = "YOUR_INSTANCE_ID" +DB_NAME = "YOUR_DATABASE_ID" +DB_USER = "YOUR_DATABASE_USERNAME" +DB_PWD = "YOUR_DATABASE_PASSWORD" + +# TODO(developer): Optional, change the values below. +PINECONE_NAMESPACE = "" +PINECONE_VECTOR_SIZE = 768 +PINECONE_BATCH_SIZE = 10 +ALLOYDB_TABLE_NAME = "alloydb_table" +EMBEDDING_MODEL_NAME = "textembedding-gecko@001" + +from pinecone import Index # type: ignore + + +def get_ids_batch( + pinecone_index: Index, + pinecone_namespace: str = PINECONE_NAMESPACE, + pinecone_batch_size: int = PINECONE_BATCH_SIZE, +) -> Iterator[list[str]]: + results = pinecone_index.list_paginated( + prefix="", namespace=pinecone_namespace, limit=pinecone_batch_size + ) + ids = [v.id for v in results.vectors] + yield ids + + while results.pagination is not None: + pagination_token = results.pagination.next + results = pinecone_index.list_paginated( + prefix="", pagination_token=pagination_token, limit=pinecone_batch_size + ) + + # Extract and yield the next batch of IDs + ids = [v.id for v in results.vectors] + yield ids + # [END pinecone_get_ids_batch] + print("Pinecone client fetched all ids from index.") + + +def get_data_batch( + pinecone_index: Index, id_iterator: Iterator[list[str]] +) -> Iterator[tuple[list[str], list[Any], list[str], list[Any]]]: + # [START pinecone_get_data_batch] + # Iterate through the IDs and download their contents + for ids in id_iterator: + # Fetch vectors for the current batch of IDs + all_data = pinecone_index.fetch(ids=ids) + ids = [] + embeddings = [] + contents = [] + metadatas = [] + + # Process each vector in the current batch + for doc in all_data["vectors"].values(): + ids.append(doc["id"]) + embeddings.append(doc["values"]) + contents.append(str(doc["metadata"])) + metadata = doc["metadata"] + metadatas.append(metadata) + + # Yield the current batch of results + yield ids, embeddings, contents, metadatas + # [END pinecone_get_data_batch] + print("Pinecone client fetched all data from index.") + + +async def main( + pinecone_api_key: str = PINECONE_API_KEY, + pinecone_index_name: str = PINECONE_INDEX_NAME, + pinecone_namespace: str = PINECONE_NAMESPACE, + pinecone_vector_size: int = PINECONE_VECTOR_SIZE, + pinecone_batch_size: int = PINECONE_BATCH_SIZE, + project_id: str = PROJECT_ID, + region: str = REGION, + cluster: str = CLUSTER, + instance: str = INSTANCE, + alloydb_table: str = ALLOYDB_TABLE_NAME, + db_name: str = DB_NAME, + db_user: str = DB_USER, + db_pwd: str = DB_PWD, +) -> None: + # [START pinecone_get_client] + from pinecone import Pinecone, ServerlessSpec # type: ignore + + pinecone_client = Pinecone( + api_key=pinecone_api_key, + spec=ServerlessSpec(cloud="aws", region="us-east-1"), + ) + # [END pinecone_get_client] + print("Pinecone client initiated.") + + # [START pinecone_get_index] + pinecone_index = pinecone_client.Index(pinecone_index_name) + # [END pinecone_get_index] + print("Pinecone index reference initiated.") + + from alloydb_snippets import acreate_alloydb_client + + alloydb_engine = await acreate_alloydb_client( + project_id=project_id, + region=region, + cluster=cluster, + instance=instance, + db_name=db_name, + db_user=db_user, + db_pwd=db_pwd, + ) + + # [START pinecone_alloydb_migration_get_alloydb_vectorstore] + from alloydb_snippets import aget_vector_store, get_embeddings_service + + await alloydb_engine.ainit_vectorstore_table( + table_name=alloydb_table, + vector_size=pinecone_vector_size, + overwrite_existing=True, + ) + + embeddings_service = get_embeddings_service( + project_id, model_name=EMBEDDING_MODEL_NAME + ) + vs = await aget_vector_store( + engine=alloydb_engine, + embeddings_service=embeddings_service, + table_name=alloydb_table, + ) + # [END pinecone_alloydb_migration_get_alloydb_vectorstore] + print("Pinecone migration AlloyDBVectorStore table created.") + + id_iterator = get_ids_batch(pinecone_index, pinecone_namespace, pinecone_batch_size) + for ids, embeddings, contents, metadatas in get_data_batch( + pinecone_index=pinecone_index, + id_iterator=id_iterator, + ): + # [START pinecone_alloydb_migration_insert_data_batch] + inserted_ids = await vs.aadd_embeddings( + texts=contents, + embeddings=embeddings, + metadatas=metadatas, + ids=ids, + ) + # [END pinecone_alloydb_migration_insert_data_batch] + print("Migration completed, inserted all the batches of data to AlloyDB.") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/samples/migrations/pinecone_migration_test.py b/samples/migrations/pinecone_migration_test.py new file mode 100644 index 00000000..2dbe6529 --- /dev/null +++ b/samples/migrations/pinecone_migration_test.py @@ -0,0 +1,167 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import uuid +from typing import Sequence + +import pytest +import pytest_asyncio +from migrate_pinecone_vectorstore_to_alloydb import main +from sqlalchemy import text +from sqlalchemy.engine.row import RowMapping + +from langchain_google_alloydb_pg import AlloyDBEngine + +DEFAULT_TABLE = "test_pinecone_migration" + str(uuid.uuid4()) + + +def get_env_var(key: str, desc: str) -> str: + v = os.environ.get(key) + if v is None: + raise ValueError(f"Must set env var {key} to: {desc}") + return v + + +async def aexecute( + engine: AlloyDBEngine, + query: str, +) -> None: + async def run(engine, query): + async with engine._pool.connect() as conn: + await conn.execute(text(query)) + await conn.commit() + + await engine._run_as_async(run(engine, query)) + + +async def afetch(engine: AlloyDBEngine, query: str) -> Sequence[RowMapping]: + async def run(engine, query): + async with engine._pool.connect() as conn: + result = await conn.execute(text(query)) + result_map = result.mappings() + result_fetch = result_map.fetchall() + return result_fetch + + return await engine._run_as_async(run(engine, query)) + + +@pytest.mark.asyncio(loop_scope="class") +class TestMigrations: + @pytest.fixture(scope="module") + def db_project(self) -> str: + return get_env_var("PROJECT_ID", "project id for google cloud") + + @pytest.fixture(scope="module") + def db_region(self) -> str: + return get_env_var("REGION", "region for AlloyDB instance") + + @pytest.fixture(scope="module") + def db_cluster(self) -> str: + return get_env_var("CLUSTER_ID", "cluster for AlloyDB") + + @pytest.fixture(scope="module") + def db_instance(self) -> str: + return get_env_var("INSTANCE_ID", "instance for AlloyDB") + + @pytest.fixture(scope="module") + def db_name(self) -> str: + return get_env_var("DATABASE_ID", "database name on AlloyDB instance") + + @pytest.fixture(scope="module") + def db_user(self) -> str: + return get_env_var("DB_USER", "database user for AlloyDB") + + @pytest.fixture(scope="module") + def db_password(self) -> str: + return get_env_var("DB_PASSWORD", "database password for AlloyDB") + + @pytest.fixture(scope="module") + def pinecone_api_key(self) -> str: + return get_env_var("PINECONE_API_KEY", "API KEY for pinecone instance") + + @pytest.fixture(scope="module") + def pinecone_index_name(self) -> str: + return get_env_var("PINECONE_INDEX_NAME", "index name for pinecone instance") + + @pytest_asyncio.fixture(scope="class") + async def engine( + self, + db_project, + db_region, + db_cluster, + db_instance, + db_name, + db_user, + db_password, + ): + engine = await AlloyDBEngine.afrom_instance( + project_id=db_project, + cluster=db_cluster, + instance=db_instance, + region=db_region, + database=db_name, + user=db_user, + password=db_password, + ) + + yield engine + await aexecute(engine, f'DROP TABLE IF EXISTS "{DEFAULT_TABLE}"') + await engine.close() + + async def test_pinecone( + self, + engine, + capsys, + pinecone_api_key, + pinecone_index_name, + db_project, + db_region, + db_cluster, + db_instance, + db_name, + db_user, + db_password, + ): + await main( + pinecone_api_key=pinecone_api_key, + pinecone_index_name=pinecone_index_name, + pinecone_namespace="", + pinecone_vector_size=768, + pinecone_batch_size=50, + project_id=db_project, + region=db_region, + cluster=db_cluster, + instance=db_instance, + alloydb_table=DEFAULT_TABLE, + db_name=db_name, + db_user=db_user, + db_pwd=db_password, + ) + + out, err = capsys.readouterr() + + # Assert on the script's output + assert "Error" not in err # Check for errors + assert "Pinecone client initiated" in out + assert "Pinecone index reference initiated" in out + assert "Langchain AlloyDB client initiated" in out + assert "Langchain Vertex AI Embeddings service initiated" in out + assert "Pinecone migration AlloyDBVectorStore table created" in out + assert "Langchain AlloyDB vector store instantiated" in out + assert "Pinecone client fetched all ids from index" in out + assert "Pinecone client fetched all data from index" in out + assert "Migration completed, inserted all the batches of data to AlloyDB" in out + results = await afetch(engine, f'SELECT * FROM "{DEFAULT_TABLE}"') + assert len(results) == 100 diff --git a/samples/migrations/requirements-test.txt b/samples/migrations/requirements-test.txt new file mode 100644 index 00000000..8d6d9eec --- /dev/null +++ b/samples/migrations/requirements-test.txt @@ -0,0 +1,2 @@ +pytest==8.3.3 +pytest-asyncio==0.24.0 diff --git a/samples/migrations/requirements.txt b/samples/migrations/requirements.txt new file mode 100644 index 00000000..1a3f34ad --- /dev/null +++ b/samples/migrations/requirements.txt @@ -0,0 +1,10 @@ +langchain-google-alloydb-pg==0.8.0 +langchain-core==0.3.26 +pinecone-client==5.0.1 +weaviate-client==4.10.2 +langchain-chroma==0.1.4 +qdrant-client==1.12.1 +pymilvus==2.5.0 +protobuf==5.29.1 +grpcio-tools==1.67.1 +langchain-google-vertexai==2.0.9