Skip to content

Commit

Permalink
community: Add document manager and mongo document manager (#17320)
Browse files Browse the repository at this point in the history
- **Description:** 
    - Add DocumentManager class, which is a nosql record manager. 
- In order to use index and aindex in
libs/langchain/langchain/indexes/_api.py, DocumentManager inherits
RecordManager.
    - Also I added the MongoDB implementation of Document Manager too.
  - **Dependencies:** pymongo, motor
  
<!-- Thank you for contributing to LangChain!

Please title your PR "<package>: <description>", where <package> is
whichever of langchain, community, core, experimental, etc. is being
modified.

Replace this entire comment with:
- **Description:** Add DocumentManager class, which is a no sql record
manager. To use index method and aindex method in indexes._api.py,
Document Manager inherits RecordManager.Add the MongoDB implementation
of Document Manager.
  - **Dependencies:** pymongo, motor

Please make sure your PR is passing linting and testing before
submitting. Run `make format`, `make lint` and `make test` from the root
of the package you've modified to check this locally.

See contribution guidelines for more information on how to write/run
tests, lint, etc: https://python.langchain.com/docs/contributing/

If you're adding a new integration, please include:
1. a test for the integration, preferably unit tests that do not rely on
network access,
2. an example notebook showing its use. It lives in
`docs/docs/integrations` directory.

If no one reviews your PR within a few days, please @-mention one of
@baskaryan, @eyurtsev, @hwchase17.
 -->

---------

Co-authored-by: Eugene Yurtsev <[email protected]>
  • Loading branch information
2jimoo and eyurtsev authored Feb 24, 2024
1 parent 3f6bf85 commit 7fc9034
Show file tree
Hide file tree
Showing 4 changed files with 512 additions and 0 deletions.
13 changes: 13 additions & 0 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# docker-compose to make it easier to spin up integration tests.
# Services should use NON standard ports to avoid collision with
# any existing services that might be used for development.
# ATTENTION: When adding a service below use a non-standard port
# increment by one from the preceding port.
# For credentials always use `langchain` and `langchain` for the
# username and password.
version: "3"
name: langchain-tests

Expand All @@ -19,3 +24,11 @@ services:
image: graphdb
ports:
- "6021:7200"
mongo:
image: mongo:latest
container_name: mongo_container
ports:
- "6022:27017"
environment:
MONGO_INITDB_ROOT_USERNAME: langchain
MONGO_INITDB_ROOT_PASSWORD: langchain
231 changes: 231 additions & 0 deletions libs/community/langchain_community/indexes/_document_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
from typing import Any, Dict, List, Optional, Sequence

from langchain_community.indexes.base import RecordManager

IMPORT_PYMONGO_ERROR = (
"Could not import MongoClient. Please install it with `pip install pymongo`."
)
IMPORT_MOTOR_ASYNCIO_ERROR = (
"Could not import AsyncIOMotorClient. Please install it with `pip install motor`."
)


def _import_pymongo() -> Any:
"""Import PyMongo if available, otherwise raise error."""
try:
from pymongo import MongoClient
except ImportError:
raise ImportError(IMPORT_PYMONGO_ERROR)
return MongoClient


def _get_pymongo_client(mongodb_url: str, **kwargs: Any) -> Any:
"""Get MongoClient for sync operations from the mongodb_url,
otherwise raise error."""
try:
pymongo = _import_pymongo()
client = pymongo(mongodb_url, **kwargs)
except ValueError as e:
raise ImportError(
f"MongoClient string provided is not in proper format. " f"Got error: {e} "
)
return client


def _import_motor_asyncio() -> Any:
"""Import Motor if available, otherwise raise error."""
try:
from motor.motor_asyncio import AsyncIOMotorClient
except ImportError:
raise ImportError(IMPORT_MOTOR_ASYNCIO_ERROR)
return AsyncIOMotorClient


def _get_motor_client(mongodb_url: str, **kwargs: Any) -> Any:
"""Get AsyncIOMotorClient for async operations from the mongodb_url,
otherwise raise error."""
try:
motor = _import_motor_asyncio()
client = motor(mongodb_url, **kwargs)
except ValueError as e:
raise ImportError(
f"AsyncIOMotorClient string provided is not in proper format. "
f"Got error: {e} "
)
return client


class MongoDocumentManager(RecordManager):
"""A MongoDB based implementation of the document manager."""

def __init__(
self,
namespace: str,
*,
mongodb_url: str,
db_name: str,
collection_name: str = "documentMetadata",
) -> None:
"""Initialize the MongoDocumentManager.
Args:
namespace: The namespace associated with this document manager.
db_name: The name of the database to use.
collection_name: The name of the collection to use.
Default is 'documentMetadata'.
"""
super().__init__(namespace=namespace)
self.sync_client = _get_pymongo_client(mongodb_url)
self.sync_db = self.sync_client[db_name]
self.sync_collection = self.sync_db[collection_name]
self.async_client = _get_motor_client(mongodb_url)
self.async_db = self.async_client[db_name]
self.async_collection = self.async_db[collection_name]

def create_schema(self) -> None:
"""Create the database schema for the document manager."""
pass

async def acreate_schema(self) -> None:
"""Create the database schema for the document manager."""
pass

def update(
self,
keys: Sequence[str],
*,
group_ids: Optional[Sequence[Optional[str]]] = None,
time_at_least: Optional[float] = None,
) -> None:
"""Upsert documents into the MongoDB collection."""
if group_ids is None:
group_ids = [None] * len(keys)

if len(keys) != len(group_ids):
raise ValueError("Number of keys does not match number of group_ids")

for key, group_id in zip(keys, group_ids):
self.sync_collection.find_one_and_update(
{"namespace": self.namespace, "key": key},
{"$set": {"group_id": group_id, "updated_at": self.get_time()}},
upsert=True,
)

async def aupdate(
self,
keys: Sequence[str],
*,
group_ids: Optional[Sequence[Optional[str]]] = None,
time_at_least: Optional[float] = None,
) -> None:
"""Asynchronously upsert documents into the MongoDB collection."""
if group_ids is None:
group_ids = [None] * len(keys)

if len(keys) != len(group_ids):
raise ValueError("Number of keys does not match number of group_ids")

update_time = await self.aget_time()
if time_at_least and update_time < time_at_least:
raise ValueError("Server time is behind the expected time_at_least")

for key, group_id in zip(keys, group_ids):
await self.async_collection.find_one_and_update(
{"namespace": self.namespace, "key": key},
{"$set": {"group_id": group_id, "updated_at": update_time}},
upsert=True,
)

def get_time(self) -> float:
"""Get the current server time as a timestamp."""
server_info = self.sync_db.command("hostInfo")
local_time = server_info["system"]["currentTime"]
timestamp = local_time.timestamp()
return timestamp

async def aget_time(self) -> float:
"""Asynchronously get the current server time as a timestamp."""
host_info = await self.async_collection.database.command("hostInfo")
local_time = host_info["system"]["currentTime"]
return local_time.timestamp()

def exists(self, keys: Sequence[str]) -> List[bool]:
"""Check if the given keys exist in the MongoDB collection."""
existing_keys = {
doc["key"]
for doc in self.sync_collection.find(
{"namespace": self.namespace, "key": {"$in": keys}}, {"key": 1}
)
}
return [key in existing_keys for key in keys]

async def aexists(self, keys: Sequence[str]) -> List[bool]:
"""Asynchronously check if the given keys exist in the MongoDB collection."""
cursor = self.async_collection.find(
{"namespace": self.namespace, "key": {"$in": keys}}, {"key": 1}
)
existing_keys = {doc["key"] async for doc in cursor}
return [key in existing_keys for key in keys]

def list_keys(
self,
*,
before: Optional[float] = None,
after: Optional[float] = None,
group_ids: Optional[Sequence[str]] = None,
limit: Optional[int] = None,
) -> List[str]:
"""List documents in the MongoDB collection based on the provided date range."""
query: Dict[str, Any] = {"namespace": self.namespace}
if before:
query["updated_at"] = {"$lt": before}
if after:
query["updated_at"] = {"$gt": after}
if group_ids:
query["group_id"] = {"$in": group_ids}

cursor = (
self.sync_collection.find(query, {"key": 1}).limit(limit)
if limit
else self.sync_collection.find(query, {"key": 1})
)
return [doc["key"] for doc in cursor]

async def alist_keys(
self,
*,
before: Optional[float] = None,
after: Optional[float] = None,
group_ids: Optional[Sequence[str]] = None,
limit: Optional[int] = None,
) -> List[str]:
"""
Asynchronously list documents in the MongoDB collection
based on the provided date range.
"""
query: Dict[str, Any] = {"namespace": self.namespace}
if before:
query["updated_at"] = {"$lt": before}
if after:
query["updated_at"] = {"$gt": after}
if group_ids:
query["group_id"] = {"$in": group_ids}

cursor = (
self.async_collection.find(query, {"key": 1}).limit(limit)
if limit
else self.async_collection.find(query, {"key": 1})
)
return [doc["key"] async for doc in cursor]

def delete_keys(self, keys: Sequence[str]) -> None:
"""Delete documents from the MongoDB collection."""
self.sync_collection.delete_many(
{"namespace": self.namespace, "key": {"$in": keys}}
)

async def adelete_keys(self, keys: Sequence[str]) -> None:
"""Asynchronously delete documents from the MongoDB collection."""
await self.async_collection.delete_many(
{"namespace": self.namespace, "key": {"$in": keys}}
)
Empty file.
Loading

0 comments on commit 7fc9034

Please sign in to comment.