Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qdrant - add hybrid retriever #675

Merged
merged 7 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
#
# SPDX-License-Identifier: Apache-2.0

from .retriever import QdrantEmbeddingRetriever, QdrantSparseEmbeddingRetriever
from .retriever import QdrantEmbeddingRetriever, QdrantHybridRetriever, QdrantSparseEmbeddingRetriever

__all__ = ("QdrantEmbeddingRetriever", "QdrantSparseEmbeddingRetriever")
__all__ = ("QdrantEmbeddingRetriever", "QdrantSparseEmbeddingRetriever", "QdrantHybridRetriever")
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ class QdrantEmbeddingRetriever:
":memory:",
recreate_index=True,
return_embedding=True,
wait_result_from_api=True,
)

document_store.write_documents([Document(content="test", embedding=[0.5]*768)])

retriever = QdrantEmbeddingRetriever(document_store=document_store)

# using a fake vector to keep the example simple
Expand Down Expand Up @@ -112,7 +114,7 @@ def run(
The retrieved documents.

"""
docs = self._document_store.query_by_embedding(
docs = self._document_store._query_by_embedding(
query_embedding=query_embedding,
filters=filters or self._filters,
top_k=top_k or self._top_k,
Expand All @@ -136,10 +138,14 @@ class QdrantSparseEmbeddingRetriever:

document_store = QdrantDocumentStore(
":memory:",
use_sparse_embeddings=True,
recreate_index=True,
return_embedding=True,
wait_result_from_api=True,
)

doc = Document(content="test", sparse_embedding=SparseEmbedding(indices=[0, 3, 5], values=[0.1, 0.5, 0.12]))
document_store.write_documents([doc])

retriever = QdrantSparseEmbeddingRetriever(document_store=document_store)
sparse_embedding = SparseEmbedding(indices=[0, 1, 2, 3], values=[0.1, 0.8, 0.05, 0.33])
retriever.run(query_sparse_embedding=sparse_embedding)
Expand Down Expand Up @@ -196,7 +202,7 @@ def to_dict(self) -> Dict[str, Any]:
return d

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "QdrantEmbeddingRetriever":
def from_dict(cls, data: Dict[str, Any]) -> "QdrantSparseEmbeddingRetriever":
"""
Deserializes the component from a dictionary.

Expand Down Expand Up @@ -230,7 +236,7 @@ def run(
The retrieved documents.

"""
docs = self._document_store.query_by_sparse(
docs = self._document_store._query_by_sparse(
query_sparse_embedding=query_sparse_embedding,
filters=filters or self._filters,
top_k=top_k or self._top_k,
Expand All @@ -239,3 +245,124 @@ def run(
)

return {"documents": docs}


@component
class QdrantHybridRetriever:
"""
A component for retrieving documents from an QdrantDocumentStore using both dense and sparse vectors
and fusing the results using Reciprocal Rank Fusion.

Usage example:
```python
from haystack_integrations.components.retrievers.qdrant import QdrantHybridRetriever
from haystack_integrations.document_stores.qdrant import QdrantDocumentStore
from haystack.dataclasses.sparse_embedding import SparseEmbedding

document_store = QdrantDocumentStore(
":memory:",
use_sparse_embeddings=True,
recreate_index=True,
return_embedding=True,
wait_result_from_api=True,
)

doc = Document(content="test",
embedding=[0.5]*768,
sparse_embedding=SparseEmbedding(indices=[0, 3, 5], values=[0.1, 0.5, 0.12]))

document_store.write_documents([doc])

retriever = QdrantHybridRetriever(document_store=document_store)
embedding = [0.1]*768
sparse_embedding = SparseEmbedding(indices=[0, 1, 2, 3], values=[0.1, 0.8, 0.05, 0.33])
retriever.run(query_embedding=embedding, query_sparse_embedding=sparse_embedding)
```
"""

def __init__(
self,
document_store: QdrantDocumentStore,
filters: Optional[Dict[str, Any]] = None,
top_k: int = 10,
return_embedding: bool = False,
):
"""
Create a QdrantHybridRetriever component.

:param document_store: An instance of QdrantDocumentStore.
:param filters: A dictionary with filters to narrow down the search space.
:param top_k: The maximum number of documents to retrieve.
:param return_embedding: Whether to return the embeddings of the retrieved Documents.

:raises ValueError: If 'document_store' is not an instance of QdrantDocumentStore.
"""

if not isinstance(document_store, QdrantDocumentStore):
msg = "document_store must be an instance of QdrantDocumentStore"
raise ValueError(msg)

self._document_store = document_store
self._filters = filters
self._top_k = top_k
self._return_embedding = return_embedding

def to_dict(self) -> Dict[str, Any]:
"""
Serializes the component to a dictionary.

:returns:
Dictionary with serialized data.
"""
return default_to_dict(
self,
document_store=self._document_store.to_dict(),
filters=self._filters,
top_k=self._top_k,
return_embedding=self._return_embedding,
)

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "QdrantHybridRetriever":
"""
Deserializes the component from a dictionary.

:param data:
Dictionary to deserialize from.
:returns:
Deserialized component.
"""
document_store = QdrantDocumentStore.from_dict(data["init_parameters"]["document_store"])
data["init_parameters"]["document_store"] = document_store
return default_from_dict(cls, data)

@component.output_types(documents=List[Document])
def run(
self,
query_embedding: List[float],
query_sparse_embedding: SparseEmbedding,
filters: Optional[Dict[str, Any]] = None,
top_k: Optional[int] = None,
return_embedding: Optional[bool] = None,
):
"""
Run the Sparse Embedding Retriever on the given input data.

:param query_embedding: Dense embedding of the query.
:param query_sparse_embedding: Sparse embedding of the query.
:param filters: A dictionary with filters to narrow down the search space.
:param top_k: The maximum number of documents to return.
:param return_embedding: Whether to return the embedding of the retrieved Documents.
:returns:
The retrieved documents.

"""
docs = self._document_store._query_hybrid(
query_embedding=query_embedding,
query_sparse_embedding=query_sparse_embedding,
filters=filters or self._filters,
top_k=top_k or self._top_k,
return_embedding=return_embedding or self._return_embedding,
)

return {"documents": docs}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from qdrant_client import grpc
from qdrant_client.http import models as rest
from qdrant_client.http.exceptions import UnexpectedResponse
from qdrant_client.hybrid.fusion import reciprocal_rank_fusion
from tqdm import tqdm

from .converters import (
Expand Down Expand Up @@ -307,7 +308,7 @@ def get_documents_by_id(
)
return documents

def query_by_sparse(
def _query_by_sparse(
self,
query_sparse_embedding: SparseEmbedding,
filters: Optional[Dict[str, Any]] = None,
Expand Down Expand Up @@ -349,7 +350,7 @@ def query_by_sparse(
document.score = score
return results

def query_by_embedding(
def _query_by_embedding(
self,
query_embedding: List[float],
filters: Optional[Dict[str, Any]] = None,
Expand Down Expand Up @@ -383,6 +384,86 @@ def query_by_embedding(
document.score = score
return results

def _query_hybrid(
self,
query_embedding: List[float],
query_sparse_embedding: SparseEmbedding,
filters: Optional[Dict[str, Any]] = None,
top_k: int = 10,
return_embedding: bool = False,
) -> List[Document]:
"""
Retrieves documents based on dense and sparse embeddings and fuses the results using Reciprocal Rank Fusion.

This method is not part of the public interface of `QdrantDocumentStore` and shouldn't be used directly.
Use the `QdrantHybridRetriever` instead.

:param query_embedding: Dense embedding of the query.
:param query_sparse_embedding: Sparse embedding of the query.
:param filters: Filters applied to the retrieved Documents.
:param top_k: Maximum number of Documents to return.
:param return_embedding: Whether to return the embeddings of the retrieved documents.

:returns: List of Document that are most similar to `query_embedding` and `query_sparse_embedding`.

:raises QdrantStoreError:
If the Document Store was initialized with `use_sparse_embeddings=False`.
"""

# This implementation is based on the code from the Python Qdrant client:
# https://github.com/qdrant/qdrant-client/blob/8e3ea58f781e4110d11c0a6985b5e6bb66b85d33/qdrant_client/qdrant_fastembed.py#L519
if not self.use_sparse_embeddings:
message = (
"You are trying to query using sparse embeddings, but the Document Store "
"was initialized with `use_sparse_embeddings=False`. "
)
raise QdrantStoreError(message)

qdrant_filters = convert_filters_to_qdrant(filters)

sparse_request = rest.SearchRequest(
masci marked this conversation as resolved.
Show resolved Hide resolved
vector=rest.NamedSparseVector(
name=SPARSE_VECTORS_NAME,
vector=rest.SparseVector(
indices=query_sparse_embedding.indices,
values=query_sparse_embedding.values,
),
),
filter=qdrant_filters,
limit=top_k,
with_payload=True,
with_vector=return_embedding,
)

dense_request = rest.SearchRequest(
vector=rest.NamedVector(
name=DENSE_VECTORS_NAME,
vector=query_embedding,
),
filter=qdrant_filters,
limit=top_k,
with_payload=True,
with_vector=return_embedding,
)

try:
dense_request_response, sparse_request_response = self.client.search_batch(
collection_name=self.index, requests=[dense_request, sparse_request]
)
except Exception as e:
msg = "Error during hybrid search"
raise QdrantStoreError(msg) from e

try:
points = reciprocal_rank_fusion(responses=[dense_request_response, sparse_request_response], limit=top_k)
except Exception as e:
msg = "Error while applying Reciprocal Rank Fusion"
raise QdrantStoreError(msg) from e

results = [convert_qdrant_point_to_haystack_document(point, use_sparse_embeddings=True) for point in points]

return results

def _get_distance(self, similarity: str) -> rest.Distance:
try:
return self.SIMILARITY[similarity]
Expand Down
18 changes: 18 additions & 0 deletions integrations/qdrant/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import numpy as np
import pytest
from haystack.dataclasses import SparseEmbedding


@pytest.fixture(scope="session")
def generate_sparse_embedding():
"""
This fixture returns a function that generates a random SparseEmbedding each time it is called.
"""

def _generate_random_sparse_embedding():
random_indice_length = np.random.randint(3, 15)
indices = list(range(random_indice_length))
values = [np.random.random_sample() for _ in range(random_indice_length)]
return SparseEmbedding(indices=indices, values=values)

return _generate_random_sparse_embedding
masci marked this conversation as resolved.
Show resolved Hide resolved
Loading