Skip to content

Commit

Permalink
feat(FastEmbed): Support for SPLADE Sparse Embedder (#579)
Browse files Browse the repository at this point in the history
* fix(opensearch): bulk error without create key

* feat(FastEmbed): Scaffold for SPLADE Sparse Embedding Support

* Revert "fix(opensearch): bulk error without create key"

This reverts commit afc8e79.

* feat(FastEmbed): __all__ fix

* feat(FastEmbed): fix one test

* feat(FastEmbed): fix one test

* feat(FastEmbed): fix a second test

* feat(FastEmbed): removed old TODO (fixed)

* feat(FastEmbed): fixing all test + doc

* fix output typing

* Fix output component

* feat(FastEmbed): renaming SPLADE to Sparse because it makes more sense

* feat(FastEmbed): hatch run all lint

* feat(FastEmbed): modify PR for haystack 2.1.0 with proper sparse vectors

* try testing with Haystack main branch

* update model name

* Update integrations/fastembed/src/haystack_integrations/components/embedders/fastembed/fastembed_sparse_document_embedder.py

Co-authored-by: Stefano Fiorucci <[email protected]>

* Update integrations/fastembed/src/haystack_integrations/components/embedders/fastembed/fastembed_sparse_document_embedder.py

Co-authored-by: Stefano Fiorucci <[email protected]>

* Update integrations/fastembed/src/haystack_integrations/components/embedders/fastembed/fastembed_sparse_document_embedder.py

Co-authored-by: Stefano Fiorucci <[email protected]>

* Update integrations/fastembed/src/haystack_integrations/components/embedders/fastembed/fastembed_sparse_document_embedder.py

Co-authored-by: Stefano Fiorucci <[email protected]>

* Update integrations/fastembed/src/haystack_integrations/components/embedders/fastembed/fastembed_sparse_text_embedder.py

Co-authored-by: Stefano Fiorucci <[email protected]>

* feat(FastEmbed): remove prefix/suffix

* feat(FastEmbed): fix linting

* feat(FastEmbed): suggestion for progress bar

* feat(FastEmbed): return Haystack's SparseEmbedding instead of Dict

* feat(FastEmbed): fix lint

* feat(Fastembed): run output type from dict to haystack sparseembedding class

* feat(FastEmbed): reduce default sparse batch size

* Update integrations/fastembed/src/haystack_integrations/components/embedders/fastembed/fastembed_sparse_text_embedder.py

Co-authored-by: Stefano Fiorucci <[email protected]>

* feat(FastEmbed): fix test

* updates after 2.0.1 release

* small fixes; naive example

---------

Co-authored-by: anakin87 <[email protected]>
  • Loading branch information
lambda-science and anakin87 authored Apr 10, 2024
1 parent 52b833a commit 363c7b5
Show file tree
Hide file tree
Showing 12 changed files with 910 additions and 7 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/fastembed.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:

- name: Run tests
id: tests
run: hatch run cov
run: hatch run cov

- name: Nightly - run unit tests with Haystack main branch
if: github.event_name == 'schedule'
Expand All @@ -60,4 +60,4 @@ jobs:
core-integrations failure:
${{ (steps.tests.conclusion == 'nightly-haystack-main') && 'nightly-haystack-main' || 'tests' }}
- ${{ github.workflow }}
api-key: ${{ secrets.CORE_DATADOG_API_KEY }}
api-key: ${{ secrets.CORE_DATADOG_API_KEY }}
25 changes: 25 additions & 0 deletions integrations/fastembed/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,31 @@ doc = Document(content="fastembed is supported by and maintained by Qdrant.", me
result = embedder.run(documents=[doc])
```

You can use `FastembedSparseTextEmbedder` and `FastembedSparseDocumentEmbedder` by importing as:

```python
from haystack_integrations.components.embedders.fastembed import FastembedSparseTextEmbedder

text = "fastembed is supported by and maintained by Qdrant."
text_embedder = FastembedSparseTextEmbedder(
model="prithvida/Splade_PP_en_v1"
)
text_embedder.warm_up()
embedding = text_embedder.run(text)["embedding"]
```

```python
from haystack_integrations.components.embedders.fastembed import FastembedSparseDocumentEmbedder
from haystack.dataclasses import Document

embedder = FastembedSparseDocumentEmbedder(
model="prithvida/Splade_PP_en_v1",
)
embedder.warm_up()
doc = Document(content="fastembed is supported by and maintained by Qdrant.", meta={"long_answer": "no",})
result = embedder.run(documents=[doc])
```

## License

`fastembed-haystack` is distributed under the terms of the [Apache-2.0](https://spdx.org/licenses/Apache-2.0.html) license.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

result = query_pipeline.run({"text_embedder": {"text": query}})

print(result["retriever"]["documents"][0]) # noqa: T201
print(result["retriever"]["documents"][0])

# Document(id=...,
# content: 'fastembed is supported by and maintained by Qdrant.',
Expand Down
32 changes: 32 additions & 0 deletions integrations/fastembed/examples/sparse_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Currently, this example shows how to use the FastembedSparseDocumentEmbedder component to embed a list of documents.

# TODO: Once we have a proper SparseEmbeddingRetriever, we should replace this naive example with a more realistic one,
# involving indexing and retrieval of documents.

from haystack import Document
from haystack_integrations.components.embedders.fastembed import FastembedSparseDocumentEmbedder

document_list = [
Document(
content="Oxidative stress generated within inflammatory joints can produce autoimmune phenomena and joint destruction. Radical species with oxidative activity, including reactive nitrogen species, represent mediators of inflammation and cartilage damage.",
meta={
"pubid": "25,445,628",
"long_answer": "yes",
},
),
Document(
content="Plasma levels of pancreatic polypeptide (PP) rise upon food intake. Although other pancreatic islet hormones, such as insulin and glucagon, have been extensively investigated, PP secretion and actions are still poorly understood.",
meta={
"pubid": "25,445,712",
"long_answer": "yes",
},
),
]

document_embedder = FastembedSparseDocumentEmbedder()
document_embedder.warm_up()
documents_with_embeddings = document_embedder.run(document_list)["documents"]

for doc in documents_with_embeddings:
print(f"Document Text: {doc.content}")
print(f"Document Sparse Embedding: {doc.sparse_embedding.to_dict()}")
2 changes: 2 additions & 0 deletions integrations/fastembed/pydoc/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ loaders:
[
"haystack_integrations.components.embedders.fastembed.fastembed_document_embedder",
"haystack_integrations.components.embedders.fastembed.fastembed_text_embedder",
"haystack_integrations.components.embedders.fastembed.fastembed_sparse_document_embedder",
"haystack_integrations.components.embedders.fastembed.fastembed_sparse_text_embedder"
]
ignore_when_discovered: ["__init__"]
processors:
Expand Down
6 changes: 3 additions & 3 deletions integrations/fastembed/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ classifiers = [
"Programming Language :: Python :: Implementation :: PyPy",
]
dependencies = [
"haystack-ai",
"fastembed>=0.2",
"haystack-ai>=2.0.1",
"fastembed>=0.2.5",
]

[project.urls]
Expand Down Expand Up @@ -155,7 +155,7 @@ ban-relative-imports = "parents"
# Tests can use magic values, assertions, and relative imports
"tests/**/*" = ["PLR2004", "S101", "TID252"]
# examples can contain "print" commands
"examples/**/*" = ["T201"]
"examples/**/*" = ["T201", "E501"]

[tool.coverage.run]
source = ["haystack_integrations"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@
#
# SPDX-License-Identifier: Apache-2.0
from .fastembed_document_embedder import FastembedDocumentEmbedder
from .fastembed_sparse_document_embedder import FastembedSparseDocumentEmbedder
from .fastembed_sparse_text_embedder import FastembedSparseTextEmbedder
from .fastembed_text_embedder import FastembedTextEmbedder

__all__ = ["FastembedDocumentEmbedder", "FastembedTextEmbedder"]
__all__ = [
"FastembedDocumentEmbedder",
"FastembedTextEmbedder",
"FastembedSparseDocumentEmbedder",
"FastembedSparseTextEmbedder",
]
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from typing import ClassVar, Dict, List, Optional

from haystack.dataclasses.sparse_embedding import SparseEmbedding
from tqdm import tqdm

from fastembed import TextEmbedding
from fastembed.sparse.sparse_text_embedding import SparseTextEmbedding


class _FastembedEmbeddingBackendFactory:
Expand Down Expand Up @@ -50,3 +52,58 @@ def embed(self, data: List[str], progress_bar=True, **kwargs) -> List[List[float
):
embeddings.append(np_array.tolist())
return embeddings


class _FastembedSparseEmbeddingBackendFactory:
"""
Factory class to create instances of fastembed sparse embedding backends.
"""

_instances: ClassVar[Dict[str, "_FastembedSparseEmbeddingBackend"]] = {}

@staticmethod
def get_embedding_backend(
model_name: str,
cache_dir: Optional[str] = None,
threads: Optional[int] = None,
):
embedding_backend_id = f"{model_name}{cache_dir}{threads}"

if embedding_backend_id in _FastembedSparseEmbeddingBackendFactory._instances:
return _FastembedSparseEmbeddingBackendFactory._instances[embedding_backend_id]

embedding_backend = _FastembedSparseEmbeddingBackend(
model_name=model_name, cache_dir=cache_dir, threads=threads
)
_FastembedSparseEmbeddingBackendFactory._instances[embedding_backend_id] = embedding_backend
return embedding_backend


class _FastembedSparseEmbeddingBackend:
"""
Class to manage fastembed sparse embeddings.
"""

def __init__(
self,
model_name: str,
cache_dir: Optional[str] = None,
threads: Optional[int] = None,
):
self.model = SparseTextEmbedding(model_name=model_name, cache_dir=cache_dir, threads=threads)

def embed(self, data: List[List[str]], progress_bar=True, **kwargs) -> List[SparseEmbedding]:
# The embed method returns a Iterable[SparseEmbedding], so we convert to Haystack SparseEmbedding type.
# Each SparseEmbedding contains an `indices` key containing a list of int and
# an `values` key containing a list of floats.

sparse_embeddings = []
sparse_embeddings_iterable = self.model.embed(data, **kwargs)
for sparse_embedding in tqdm(
sparse_embeddings_iterable, disable=not progress_bar, desc="Calculating sparse embeddings", total=len(data)
):
sparse_embeddings.append(
SparseEmbedding(indices=sparse_embedding.indices.tolist(), values=sparse_embedding.values.tolist())
)

return sparse_embeddings
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
from typing import Any, Dict, List, Optional

from haystack import Document, component, default_to_dict

from .embedding_backend.fastembed_backend import _FastembedSparseEmbeddingBackendFactory


@component
class FastembedSparseDocumentEmbedder:
"""
FastembedSparseDocumentEmbedder computes Document embeddings using Fastembed sparse models.
Usage example:
```python
# To use this component, install the "fastembed-haystack" package.
# pip install fastembed-haystack
from haystack_integrations.components.embedders.fastembed import FastembedSparseDocumentEmbedder
from haystack.dataclasses import Document
doc_embedder = FastembedSparseDocumentEmbedder(
model="prithvida/Splade_PP_en_v1",
batch_size=32,
)
doc_embedder.warm_up()
# Text taken from PubMed QA Dataset (https://huggingface.co/datasets/pubmed_qa)
document_list = [
Document(
content="Oxidative stress generated within inflammatory joints can produce autoimmune phenomena and joint destruction. Radical species with oxidative activity, including reactive nitrogen species, represent mediators of inflammation and cartilage damage.",
meta={
"pubid": "25,445,628",
"long_answer": "yes",
},
),
Document(
content="Plasma levels of pancreatic polypeptide (PP) rise upon food intake. Although other pancreatic islet hormones, such as insulin and glucagon, have been extensively investigated, PP secretion and actions are still poorly understood.",
meta={
"pubid": "25,445,712",
"long_answer": "yes",
},
),
]
result = doc_embedder.run(document_list)
print(f"Document Text: {result['documents'][0].content}")
print(f"Document Embedding: {result['documents'][0].sparse_embedding}")
print(f"Embedding Dimension: {len(result['documents'][0].sparse_embedding)}")
```
""" # noqa: E501

def __init__(
self,
model: str = "prithvida/Splade_PP_en_v1",
cache_dir: Optional[str] = None,
threads: Optional[int] = None,
batch_size: int = 32,
progress_bar: bool = True,
parallel: Optional[int] = None,
meta_fields_to_embed: Optional[List[str]] = None,
embedding_separator: str = "\n",
):
"""
Create an FastembedDocumentEmbedder component.
:param model: Local path or name of the model in Hugging Face's model hub,
such as `prithvida/Splade_PP_en_v1`.
:param cache_dir: The path to the cache directory.
Can be set using the `FASTEMBED_CACHE_PATH` env variable.
Defaults to `fastembed_cache` in the system's temp directory.
:param threads: The number of threads single onnxruntime session can use.
:param batch_size: Number of strings to encode at once.
:param progress_bar: If `True`, displays progress bar during embedding.
:param parallel:
If > 1, data-parallel encoding will be used, recommended for offline encoding of large datasets.
If 0, use all available cores.
If None, don't use data-parallel processing, use default onnxruntime threading instead.
:param meta_fields_to_embed: List of meta fields that should be embedded along with the Document content.
:param embedding_separator: Separator used to concatenate the meta fields to the Document content.
"""

self.model_name = model
self.cache_dir = cache_dir
self.threads = threads
self.batch_size = batch_size
self.progress_bar = progress_bar
self.parallel = parallel
self.meta_fields_to_embed = meta_fields_to_embed or []
self.embedding_separator = embedding_separator

def to_dict(self) -> Dict[str, Any]:
"""
Serializes the component to a dictionary.
:returns:
Dictionary with serialized data.
"""
return default_to_dict(
self,
model=self.model_name,
cache_dir=self.cache_dir,
threads=self.threads,
batch_size=self.batch_size,
progress_bar=self.progress_bar,
parallel=self.parallel,
meta_fields_to_embed=self.meta_fields_to_embed,
embedding_separator=self.embedding_separator,
)

def warm_up(self):
"""
Initializes the component.
"""
if not hasattr(self, "embedding_backend"):
self.embedding_backend = _FastembedSparseEmbeddingBackendFactory.get_embedding_backend(
model_name=self.model_name, cache_dir=self.cache_dir, threads=self.threads
)

def _prepare_texts_to_embed(self, documents: List[Document]) -> List[str]:
texts_to_embed = []
for doc in documents:
meta_values_to_embed = [
str(doc.meta[key]) for key in self.meta_fields_to_embed if key in doc.meta and doc.meta[key] is not None
]
text_to_embed = self.embedding_separator.join([*meta_values_to_embed, doc.content or ""])

texts_to_embed.append(text_to_embed)
return texts_to_embed

@component.output_types(documents=List[Document])
def run(self, documents: List[Document]):
"""
Embeds a list of Documents.
:param documents: List of Documents to embed.
:returns: A dictionary with the following keys:
- `documents`: List of Documents with each Document's `sparse_embedding`
field set to the computed embeddings.
"""
if not isinstance(documents, list) or documents and not isinstance(documents[0], Document):
msg = (
"FastembedSparseDocumentEmbedder expects a list of Documents as input. "
"In case you want to embed a list of strings, please use the FastembedTextEmbedder."
)
raise TypeError(msg)
if not hasattr(self, "embedding_backend"):
msg = "The embedding model has not been loaded. Please call warm_up() before running."
raise RuntimeError(msg)

texts_to_embed = self._prepare_texts_to_embed(documents=documents)
embeddings = self.embedding_backend.embed(
texts_to_embed,
batch_size=self.batch_size,
show_progress_bar=self.progress_bar,
parallel=self.parallel,
)

for doc, emb in zip(documents, embeddings):
doc.sparse_embedding = emb
return {"documents": documents}
Loading

0 comments on commit 363c7b5

Please sign in to comment.