Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: add new clean up strategy "scoped_full" to indexing #28505

Merged
merged 8 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions docs/docs/how_to/indexing.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,20 @@
"| None | ✅ | ✅ | ❌ | ❌ | - |\n",
"| Incremental | ✅ | ✅ | ❌ | ✅ | Continuously |\n",
"| Full | ✅ | ❌ | ✅ | ✅ | At end of indexing |\n",
"| Scoped_Full | ✅ | ✅ | ❌ | ✅ | At end of indexing |\n",
"\n",
"\n",
"`None` does not do any automatic clean up, allowing the user to manually do clean up of old content. \n",
"\n",
"`incremental` and `full` offer the following automated clean up:\n",
"`incremental`, `full` and `scoped_full` offer the following automated clean up:\n",
"\n",
"* If the content of the source document or derived documents has **changed**, both `incremental` or `full` modes will clean up (delete) previous versions of the content.\n",
"* If the source document has been **deleted** (meaning it is not included in the documents currently being indexed), the `full` cleanup mode will delete it from the vector store correctly, but the `incremental` mode will not.\n",
"* If the content of the source document or derived documents has **changed**, all 3 modes will clean up (delete) previous versions of the content.\n",
"* If the source document has been **deleted** (meaning it is not included in the documents currently being indexed), the `full` cleanup mode will delete it from the vector store correctly, but the `incremental` and `scoped_full` mode will not.\n",
"\n",
"When content is mutated (e.g., the source PDF file was revised) there will be a period of time during indexing when both the new and old versions may be returned to the user. This happens after the new content was written, but before the old version was deleted.\n",
"\n",
"* `incremental` indexing minimizes this period of time as it is able to do clean up continuously, as it writes.\n",
"* `full` mode does the clean up after all batches have been written.\n",
"* `full` and `scoped_full` mode does the clean up after all batches have been written.\n",
"\n",
"## Requirements\n",
"\n",
Expand All @@ -64,7 +65,7 @@
" \n",
"## Caution\n",
"\n",
"The record manager relies on a time-based mechanism to determine what content can be cleaned up (when using `full` or `incremental` cleanup modes).\n",
"The record manager relies on a time-based mechanism to determine what content can be cleaned up (when using `full` or `incremental` or `scoped_full` cleanup modes).\n",
"\n",
"If two tasks run back-to-back, and the first task finishes before the clock time changes, then the second task may not be able to clean up content.\n",
"\n",
Expand Down
106 changes: 78 additions & 28 deletions libs/core/langchain_core/indexing/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def index(
vector_store: Union[VectorStore, DocumentIndex],
*,
batch_size: int = 100,
cleanup: Literal["incremental", "full", None] = None,
cleanup: Literal["incremental", "full", "scoped_full", None] = None,
source_id_key: Union[str, Callable[[Document], str], None] = None,
cleanup_batch_size: int = 1_000,
force_update: bool = False,
Expand All @@ -215,7 +215,7 @@ def index(
are not able to specify the uid of the document.

IMPORTANT:
* if auto_cleanup is set to True, the loader should be returning
eyurtsev marked this conversation as resolved.
Show resolved Hide resolved
* In full mode, the loader should be returning
the entire dataset, and not just a subset of the dataset.
Otherwise, the auto_cleanup will remove documents that it is not
supposed to.
Expand All @@ -227,6 +227,11 @@ def index(
chunks, and we index them using a batch size of 5, we'll have 3 batches
all with the same source id. In general, to avoid doing too much
redundant work select as big a batch size as possible.
* The `scoped_full` mode is suitable if determining an appropriate batch size
is challenging or if your data loader cannot return the entire dataset at
once. This mode keeps track of source IDs in memory, which should be fine
for most use cases. If your dataset is large (10M+ docs), you will likely
need to parallelize the indexing process regardless.

Args:
docs_source: Data loader or iterable of documents to index.
Expand All @@ -235,16 +240,19 @@ def index(
vector_store: VectorStore or DocumentIndex to index the documents into.
batch_size: Batch size to use when indexing. Default is 100.
cleanup: How to handle clean up of documents. Default is None.
- Incremental: Cleans up all documents that haven't been updated AND
- incremental: Cleans up all documents that haven't been updated AND
that are associated with source ids that were seen
during indexing.
Clean up is done continuously during indexing helping
to minimize the probability of users seeing duplicated
content.
- Full: Delete all documents that have not been returned by the loader
- full: Delete all documents that have not been returned by the loader
during this run of indexing.
Clean up runs after all documents have been indexed.
This means that users may see duplicated content during indexing.
- scoped_full: Similar to Full, but only deletes all documents
that haven't been updated AND that are associated with
source ids that were seen during indexing.
- None: Do not delete any documents.
source_id_key: Optional key that helps identify the original source
of the document. Default is None.
Expand All @@ -270,16 +278,22 @@ def index(
ValueError: If vectorstore does not have
"delete" and "add_documents" required methods.
ValueError: If source_id_key is not None, but is not a string or callable.

.. version_modified:: 0.3.25

* Added `scoped_full` cleanup mode.
"""
if cleanup not in {"incremental", "full", None}:
if cleanup not in {"incremental", "full", "scoped_full", None}:
msg = (
f"cleanup should be one of 'incremental', 'full' or None. "
f"cleanup should be one of 'incremental', 'full', 'scoped_full' or None. "
f"Got {cleanup}."
)
raise ValueError(msg)

if cleanup == "incremental" and source_id_key is None:
msg = "Source id key is required when cleanup mode is incremental."
if (cleanup == "incremental" or cleanup == "scoped_full") and source_id_key is None:
msg = (
"Source id key is required when cleanup mode is incremental or scoped_full."
)
raise ValueError(msg)

destination = vector_store # Renaming internally for clarity
Expand Down Expand Up @@ -326,6 +340,7 @@ def index(
num_skipped = 0
num_updated = 0
num_deleted = 0
scoped_full_cleanup_source_ids: set[str] = set()

for doc_batch in _batch(batch_size, doc_iterator):
hashed_docs = list(
Expand All @@ -338,17 +353,20 @@ def index(
source_id_assigner(doc) for doc in hashed_docs
]

if cleanup == "incremental":
# If the cleanup mode is incremental, source ids are required.
if cleanup == "incremental" or cleanup == "scoped_full":
# source ids are required.
for source_id, hashed_doc in zip(source_ids, hashed_docs):
if source_id is None:
msg = (
"Source ids are required when cleanup mode is incremental. "
f"Source ids are required when cleanup mode is "
f"incremental or scoped_full. "
f"Document that starts with "
f"content: {hashed_doc.page_content[:100]} was not assigned "
f"as source id."
)
raise ValueError(msg)
if cleanup == "scoped_full":
scoped_full_cleanup_source_ids.add(source_id)
# source ids cannot be None after for loop above.
source_ids = cast(Sequence[str], source_ids) # type: ignore[assignment]

Expand Down Expand Up @@ -425,9 +443,12 @@ def index(
record_manager.delete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)

if cleanup == "full":
if cleanup == "full" or cleanup == "scoped_full":
delete_group_ids: Optional[Sequence[str]] = None
if cleanup == "scoped_full":
delete_group_ids = list(scoped_full_cleanup_source_ids)
while uids_to_delete := record_manager.list_keys(
before=index_start_dt, limit=cleanup_batch_size
group_ids=delete_group_ids, before=index_start_dt, limit=cleanup_batch_size
):
# First delete from record store.
destination.delete(uids_to_delete)
Expand Down Expand Up @@ -456,7 +477,7 @@ async def aindex(
vector_store: Union[VectorStore, DocumentIndex],
*,
batch_size: int = 100,
cleanup: Literal["incremental", "full", None] = None,
cleanup: Literal["incremental", "full", "scoped_full", None] = None,
source_id_key: Union[str, Callable[[Document], str], None] = None,
cleanup_batch_size: int = 1_000,
force_update: bool = False,
Expand All @@ -474,10 +495,23 @@ async def aindex(
are not able to specify the uid of the document.

IMPORTANT:
if auto_cleanup is set to True, the loader should be returning
the entire dataset, and not just a subset of the dataset.
Otherwise, the auto_cleanup will remove documents that it is not
supposed to.
* In full mode, the loader should be returning
the entire dataset, and not just a subset of the dataset.
Otherwise, the auto_cleanup will remove documents that it is not
supposed to.
* In incremental mode, if documents associated with a particular
source id appear across different batches, the indexing API
will do some redundant work. This will still result in the
correct end state of the index, but will unfortunately not be
100% efficient. For example, if a given document is split into 15
chunks, and we index them using a batch size of 5, we'll have 3 batches
all with the same source id. In general, to avoid doing too much
redundant work select as big a batch size as possible.
* The `scoped_full` mode is suitable if determining an appropriate batch size
is challenging or if your data loader cannot return the entire dataset at
once. This mode keeps track of source IDs in memory, which should be fine
for most use cases. If your dataset is large (10M+ docs), you will likely
need to parallelize the indexing process regardless.

Args:
docs_source: Data loader or iterable of documents to index.
Expand All @@ -486,15 +520,18 @@ async def aindex(
vector_store: VectorStore or DocumentIndex to index the documents into.
batch_size: Batch size to use when indexing. Default is 100.
cleanup: How to handle clean up of documents. Default is None.
- Incremental: Cleans up all documents that haven't been updated AND
- incremental: Cleans up all documents that haven't been updated AND
that are associated with source ids that were seen
during indexing.
Clean up is done continuously during indexing helping
to minimize the probability of users seeing duplicated
content.
- Full: Delete all documents that haven to been returned by the loader.
- full: Delete all documents that haven to been returned by the loader.
Clean up runs after all documents have been indexed.
This means that users may see duplicated content during indexing.
- scoped_full: Similar to Full, but only deletes all documents
that haven't been updated AND that are associated with
source ids that were seen during indexing.
- None: Do not delete any documents.
source_id_key: Optional key that helps identify the original source
of the document. Default is None.
Expand All @@ -520,17 +557,23 @@ async def aindex(
ValueError: If vectorstore does not have
"adelete" and "aadd_documents" required methods.
ValueError: If source_id_key is not None, but is not a string or callable.

.. version_modified:: 0.3.25

* Added `scoped_full` cleanup mode.
"""

if cleanup not in {"incremental", "full", None}:
if cleanup not in {"incremental", "full", "scoped_full", None}:
msg = (
f"cleanup should be one of 'incremental', 'full' or None. "
f"cleanup should be one of 'incremental', 'full', 'scoped_full' or None. "
f"Got {cleanup}."
)
raise ValueError(msg)

if cleanup == "incremental" and source_id_key is None:
msg = "Source id key is required when cleanup mode is incremental."
if (cleanup == "incremental" or cleanup == "scoped_full") and source_id_key is None:
msg = (
"Source id key is required when cleanup mode is incremental or scoped_full."
)
raise ValueError(msg)

destination = vector_store # Renaming internally for clarity
Expand Down Expand Up @@ -586,6 +629,7 @@ async def aindex(
num_skipped = 0
num_updated = 0
num_deleted = 0
scoped_full_cleanup_source_ids: set[str] = set()

async for doc_batch in _abatch(batch_size, async_doc_iterator):
hashed_docs = list(
Expand All @@ -598,17 +642,20 @@ async def aindex(
source_id_assigner(doc) for doc in hashed_docs
]

if cleanup == "incremental":
if cleanup == "incremental" or cleanup == "scoped_full":
# If the cleanup mode is incremental, source ids are required.
for source_id, hashed_doc in zip(source_ids, hashed_docs):
if source_id is None:
msg = (
"Source ids are required when cleanup mode is incremental. "
f"Source ids are required when cleanup mode is "
f"incremental or scoped_full. "
f"Document that starts with "
f"content: {hashed_doc.page_content[:100]} was not assigned "
f"as source id."
)
raise ValueError(msg)
if cleanup == "scoped_full":
scoped_full_cleanup_source_ids.add(source_id)
# source ids cannot be None after for loop above.
source_ids = cast(Sequence[str], source_ids)

Expand Down Expand Up @@ -685,9 +732,12 @@ async def aindex(
await record_manager.adelete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)

if cleanup == "full":
if cleanup == "full" or cleanup == "scoped_full":
delete_group_ids: Optional[Sequence[str]] = None
if cleanup == "scoped_full":
delete_group_ids = list(scoped_full_cleanup_source_ids)
while uids_to_delete := await record_manager.alist_keys(
before=index_start_dt, limit=cleanup_batch_size
group_ids=delete_group_ids, before=index_start_dt, limit=cleanup_batch_size
):
# First delete from record store.
await destination.adelete(uids_to_delete)
Expand Down
Loading
Loading