Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: add new clean up strategy "scoped_full" to indexing #28505

Merged
merged 8 commits into from
Dec 13, 2024
33 changes: 24 additions & 9 deletions libs/core/langchain_core/indexing/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ def index(
cleanup_batch_size: int = 1_000,
force_update: bool = False,
upsert_kwargs: Optional[dict[str, Any]] = None,
scoped_full_cleanup: bool = False,
) -> IndexingResult:
"""Index data from the loader into the vector store.

Expand All @@ -215,10 +216,6 @@ def index(
are not able to specify the uid of the document.

IMPORTANT:
* if auto_cleanup is set to True, the loader should be returning
eyurtsev marked this conversation as resolved.
Show resolved Hide resolved
the entire dataset, and not just a subset of the dataset.
Otherwise, the auto_cleanup will remove documents that it is not
supposed to.
* In incremental mode, if documents associated with a particular
source id appear across different batches, the indexing API
will do some redundant work. This will still result in the
Expand Down Expand Up @@ -259,6 +256,11 @@ def index(
specify a custom vector_field:
upsert_kwargs={"vector_field": "embedding"}
.. versionadded:: 0.3.10
scoped_full_cleanup: This argument will be valid only when `claneup` is Full.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we turn this into another clean up mode so we don't introduce another parameter that only works conditionally?

This looks like cleanup == 'full_scoped' or something like that.

And we'd document:

  1. In the important section -- that full_scoped is a solution to problem w/ batches being a best effort
  2. This solution keeps track of the source ids in memory (probably fine for most use cases in terms of memory consumption) -- would require parallelizing for 10M+ docs anyway

If True, Full cleanup deletes all documents that haven't
been updated AND that are associated with source ids that
were seen during indexing.
Default is False.

Returns:
Indexing result which contains information about how many documents
Expand All @@ -278,8 +280,15 @@ def index(
)
raise ValueError(msg)

if cleanup == "incremental" and source_id_key is None:
msg = "Source id key is required when cleanup mode is incremental."
if scoped_full_cleanup and cleanup != "full":
msg = "scoped_full_cleanup is valid only when cleanup mode is 'full'."
raise ValueError(msg)

if (cleanup == "incremental" or scoped_full_cleanup) and source_id_key is None:
msg = (
"Source id key is required when cleanup mode is incremental"
"or scoped_full_clenup is True."
)
raise ValueError(msg)

destination = vector_store # Renaming internally for clarity
Expand Down Expand Up @@ -326,6 +335,7 @@ def index(
num_skipped = 0
num_updated = 0
num_deleted = 0
scoped_full_cleanup_source_ids: set[str] = set()

for doc_batch in _batch(batch_size, doc_iterator):
hashed_docs = list(
Expand All @@ -338,8 +348,8 @@ def index(
source_id_assigner(doc) for doc in hashed_docs
]

if cleanup == "incremental":
# If the cleanup mode is incremental, source ids are required.
if cleanup == "incremental" or scoped_full_cleanup:
# source ids are required.
for source_id, hashed_doc in zip(source_ids, hashed_docs):
if source_id is None:
msg = (
Expand All @@ -349,6 +359,8 @@ def index(
f"as source id."
)
raise ValueError(msg)
if cleanup == "full":
scoped_full_cleanup_source_ids.add(source_id)
# source ids cannot be None after for loop above.
source_ids = cast(Sequence[str], source_ids) # type: ignore[assignment]

Expand Down Expand Up @@ -427,8 +439,11 @@ def index(
num_deleted += len(uids_to_delete)

if cleanup == "full":
delete_group_ids: Optional[Sequence[str]] = None
if scoped_full_cleanup:
delete_group_ids = list(scoped_full_cleanup_source_ids)
while uids_to_delete := record_manager.list_keys(
before=index_start_dt, limit=cleanup_batch_size
group_ids=delete_group_ids, before=index_start_dt, limit=cleanup_batch_size
):
# First delete from record store.
destination.delete(uids_to_delete)
Expand Down
Loading