Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Async experiments #137

Merged
merged 28 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
555f044
feat: Implement `AsyncPipeline` (#65)
shadeMe Sep 6, 2024
297ca42
feat: Add support for async in `OpenAIChatGenerator` (#83)
shadeMe Sep 10, 2024
88ef5c9
feat: Implement `AsyncPipeline` (#65)
shadeMe Sep 6, 2024
decd665
Merge branch 'async-experiments' of github.com:deepset-ai/haystack-ex…
shadeMe Sep 10, 2024
4a2cc5e
refactor: Rename `async_run` to `run_async` (#93)
shadeMe Sep 18, 2024
90d5966
feat: Add support for async in `DocumentStore` (#96)
shadeMe Sep 25, 2024
a181f19
tests: Skip OpenSearch tests if the test backend is inactive (#103)
shadeMe Sep 26, 2024
19b880f
feat: Implement `AsyncPipeline` (#65)
shadeMe Sep 6, 2024
575efba
feat: Add support for async in `OpenAIChatGenerator` (#83)
shadeMe Sep 10, 2024
505f9a3
refactor: Rename `async_run` to `run_async` (#93)
shadeMe Sep 18, 2024
df16628
feat: Add support for async in `DocumentStore` (#96)
shadeMe Sep 25, 2024
8163ffe
tests: Skip OpenSearch tests if the test backend is inactive (#103)
shadeMe Sep 26, 2024
1a1adfa
Merge branch 'async-experiments' of github.com:deepset-ai/haystack-ex…
shadeMe Sep 26, 2024
d054f75
feat: Implement async version of `InMemoryDocumentStore` (#107)
shadeMe Sep 27, 2024
e2e4882
fix: Async pipeline tests to accommodate upstream changes (#108)
shadeMe Oct 1, 2024
5b39455
Merge branch 'main' into async-experiments
shadeMe Oct 2, 2024
ff6df67
Merge branch 'async-experiments' of github.com:deepset-ai/haystack-ex…
shadeMe Oct 2, 2024
be59b66
feat: Implement async support in `DocumentWriter` (#111)
shadeMe Oct 8, 2024
779cba8
feat: Add async support to `OpenSearchBM25Retreiver` (#116)
shadeMe Oct 9, 2024
cb872b5
feat: Add async support to `OpenSearchEmbeddingRetreiver` (#117)
shadeMe Oct 21, 2024
a714453
Merge branch 'main' into async-experiments
shadeMe Oct 31, 2024
ab70e41
Merge branch 'async-experiments' of github.com:deepset-ai/haystack-ex…
shadeMe Oct 31, 2024
0050bb3
refactor: Port subgraph pipeline logic and tests to async pipelines (…
shadeMe Nov 19, 2024
c90af50
Merge branch 'main' into async-experiments
shadeMe Nov 20, 2024
65dd328
fix: Tests and lints
shadeMe Nov 20, 2024
8b11072
doc: Update doc configs
shadeMe Nov 20, 2024
96c8167
fix: Lints
shadeMe Nov 20, 2024
de094f9
Merge branch 'main' into async-experiments
shadeMe Nov 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions docs/pydoc/config/document_stores_api.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
loaders:
- type: haystack_pydoc_tools.loaders.CustomPythonLoader
search_path: [../../../]
modules:
[
"haystack_experimental.document_stores.in_memory.document_store",
"haystack_experimental.document_stores.opensearch.document_store",
"haystack_experimental.document_stores.types.protocol",
]
ignore_when_discovered: ["__init__"]
processors:
- type: filter
expression:
documented_only: true
do_not_filter_modules: false
skip_empty_modules: true
- type: smart
- type: crossref
renderer:
type: haystack_pydoc_tools.renderers.ReadmeCoreRenderer
excerpt: Stores your texts and meta data and provides them to the Retriever at query time.
category_slug: experiments-api
title: Document Stores
slug: experimental-document-stores-api
order: 160
markdown:
descriptive_class_title: false
classdef_code_block: false
descriptive_module_title: true
add_method_class_prefix: true
add_member_class_prefix: false
filename: document_stores_api.md
6 changes: 4 additions & 2 deletions docs/pydoc/config/retrievers_api.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ loaders:
modules:
[
"haystack_experimental.components.retrievers.auto_merging_retriever",
"haystack_experimental.components.retrievers.chat_message_retriever"
"haystack_experimental.components.retrievers.chat_message_retriever",
"haystack_experimental.components.retrievers.opensearch.bm25_retriever",
"haystack_experimental.components.retrievers.opensearch.embedding_retriever",
]
ignore_when_discovered: ["__init__"]
processors:
Expand All @@ -28,4 +30,4 @@ renderer:
descriptive_module_title: true
add_method_class_prefix: true
add_member_class_prefix: false
filename: experimental_retrievers_api.md
filename: experimental_retrievers_api.md
6 changes: 5 additions & 1 deletion docs/pydoc/config/writers_api.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
loaders:
- type: haystack_pydoc_tools.loaders.CustomPythonLoader
search_path: [../../../]
modules: ["haystack_experimental.components.writers.chat_message_writer"]
modules:
[
"haystack_experimental.components.writers.chat_message_writer",
"haystack_experimental.components.writers.document_writer",
]
ignore_when_discovered: ["__init__"]
processors:
- type: filter
Expand Down
441 changes: 339 additions & 102 deletions haystack_experimental/components/generators/chat/openai.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# SPDX-FileCopyrightText: 2022-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0

from .bm25_retriever import OpenSearchBM25Retriever
from .embedding_retriever import OpenSearchEmbeddingRetriever

__all__ = ["OpenSearchBM25Retriever", "OpenSearchEmbeddingRetriever"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
# SPDX-FileCopyrightText: 2022-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0

from typing import Any, Dict, List, Optional, Union

from haystack import component, default_from_dict, default_to_dict, logging
from haystack.dataclasses import Document
from haystack.document_stores.types import FilterPolicy
from haystack.document_stores.types.filter_policy import apply_filter_policy

from haystack_experimental.document_stores.opensearch import OpenSearchDocumentStore

logger = logging.getLogger(__name__)


@component
class OpenSearchBM25Retriever:
"""
OpenSearch BM25 retriever with async support.
"""

def __init__(
self,
*,
document_store: OpenSearchDocumentStore,
filters: Optional[Dict[str, Any]] = None,
fuzziness: str = "AUTO",
top_k: int = 10,
scale_score: bool = False,
all_terms_must_match: bool = False,
filter_policy: Union[str, FilterPolicy] = FilterPolicy.REPLACE,
custom_query: Optional[Dict[str, Any]] = None,
raise_on_failure: bool = True,
):
"""
Creates the OpenSearchBM25Retriever component.

:param document_store: An instance of OpenSearchDocumentStore to use with the Retriever.
:param filters: Filters to narrow down the search for documents in the Document Store.
:param fuzziness: Fuzziness parameter for full-text queries to apply approximate string matching.
For more information, see [OpenSearch fuzzy query](https://opensearch.org/docs/latest/query-dsl/term/fuzzy/).
:param top_k: Maximum number of documents to return.
:param scale_score: If `True`, scales the score of retrieved documents to a range between 0 and 1.
This is useful when comparing documents across different indexes.
:param all_terms_must_match: If `True`, all terms in the query string must be present in the
retrieved documents. This is useful when searching for short text where even one term
can make a difference.
:param filter_policy: Policy to determine how filters are applied. Possible options:
- `replace`: Runtime filters replace initialization filters. Use this policy to change the filtering scope
for specific queries.
- `merge`: Runtime filters are merged with initialization filters.
:param custom_query: The query containing a mandatory `$query` and an optional `$filters` placeholder.

**An example custom_query:**

```python
{
"query": {
"bool": {
"should": [{"multi_match": {
"query": "$query", // mandatory query placeholder
"type": "most_fields",
"fields": ["content", "title"]}}],
"filter": "$filters" // optional filter placeholder
}
}
}
```

An example `run()` method for this `custom_query`:

```python
retriever.run(
query="Why did the revenue increase?",
filters={
"operator": "AND",
"conditions": [
{"field": "meta.years", "operator": "==", "value": "2019"},
{"field": "meta.quarters", "operator": "in", "value": ["Q1", "Q2"]},
],
},
)
```
:param raise_on_failure:
Whether to raise an exception if the API call fails. Otherwise log a warning and return an empty list.

:raises ValueError: If `document_store` is not an instance of OpenSearchDocumentStore.

"""
if not isinstance(document_store, OpenSearchDocumentStore):
msg = "document_store must be an instance of OpenSearchDocumentStore"
raise ValueError(msg)

self._document_store = document_store
self._filters = filters or {}
self._fuzziness = fuzziness
self._top_k = top_k
self._scale_score = scale_score
self._all_terms_must_match = all_terms_must_match
self._filter_policy = (
filter_policy if isinstance(filter_policy, FilterPolicy) else FilterPolicy.from_str(filter_policy)
)
self._custom_query = custom_query
self._raise_on_failure = raise_on_failure

def to_dict(self) -> Dict[str, Any]:
"""
Serializes the component to a dictionary.

:returns:
Dictionary with serialized data.
"""
return default_to_dict(
self,
filters=self._filters,
fuzziness=self._fuzziness,
top_k=self._top_k,
scale_score=self._scale_score,
document_store=self._document_store.to_dict(),
filter_policy=self._filter_policy.value,
custom_query=self._custom_query,
raise_on_failure=self._raise_on_failure,
)

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "OpenSearchBM25Retriever":
"""
Deserializes the component from a dictionary.

:param data:
Dictionary to deserialize from.

:returns:
Deserialized component.
"""
data["init_parameters"]["document_store"] = OpenSearchDocumentStore.from_dict(
data["init_parameters"]["document_store"]
)
data["init_parameters"]["filter_policy"] = FilterPolicy.from_str(data["init_parameters"]["filter_policy"])
return default_from_dict(cls, data)

def _prepare_bm25_args(
self,
*,
query: str,
filters: Optional[Dict[str, Any]],
all_terms_must_match: Optional[bool],
top_k: Optional[int],
fuzziness: Optional[str],
scale_score: Optional[bool],
custom_query: Optional[Dict[str, Any]],
) -> Dict[str, Any]:
filters = apply_filter_policy(self._filter_policy, self._filters, filters)

if filters is None:
filters = self._filters
if all_terms_must_match is None:
all_terms_must_match = self._all_terms_must_match
if top_k is None:
top_k = self._top_k
if fuzziness is None:
fuzziness = self._fuzziness
if scale_score is None:
scale_score = self._scale_score
if custom_query is None:
custom_query = self._custom_query

return {
"query": query,
"filters": filters,
"fuzziness": fuzziness,
"top_k": top_k,
"scale_score": scale_score,
"all_terms_must_match": all_terms_must_match,
"custom_query": custom_query,
}

@component.output_types(documents=List[Document])
def run( # pylint: disable=too-many-positional-arguments
self,
query: str,
filters: Optional[Dict[str, Any]] = None,
all_terms_must_match: Optional[bool] = None,
top_k: Optional[int] = None,
fuzziness: Optional[str] = None,
scale_score: Optional[bool] = None,
custom_query: Optional[Dict[str, Any]] = None,
):
"""
Retrieve documents using BM25 retrieval.

:param query: The query string.
:param filters: Filters applied to the retrieved documents. The way runtime filters are applied depends on
the `filter_policy` specified at Retriever's initialization.
:param all_terms_must_match: If `True`, all terms in the query string must be present in the
retrieved documents.
:param top_k: Maximum number of documents to return.
:param fuzziness: Fuzziness parameter for full-text queries to apply approximate string matching.
For more information, see [OpenSearch fuzzy query](https://opensearch.org/docs/latest/query-dsl/term/fuzzy/).
:param scale_score: If `True`, scales the score of retrieved documents to a range between 0 and 1.
This is useful when comparing documents across different indexes.
:param custom_query: A custom OpenSearch query. It must include a `$query` and may optionally
include a `$filters` placeholder.

:returns:
A dictionary containing the retrieved documents with the following structure:
- documents: List of retrieved Documents.

"""
docs: List[Document] = []
bm25_args = self._prepare_bm25_args(
query=query,
filters=filters,
all_terms_must_match=all_terms_must_match,
top_k=top_k,
fuzziness=fuzziness,
scale_score=scale_score,
custom_query=custom_query,
)
try:
docs = self._document_store._bm25_retrieval(**bm25_args)
except Exception as e:
if self._raise_on_failure:
raise e
logger.warning(
"An error during BM25 retrieval occurred and will be ignored by returning empty results: {error}",
error=str(e),
exc_info=True,
)

return {"documents": docs}

@component.output_types(documents=List[Document])
async def run_async( # pylint: disable=too-many-positional-arguments
self,
query: str,
filters: Optional[Dict[str, Any]] = None,
all_terms_must_match: Optional[bool] = None,
top_k: Optional[int] = None,
fuzziness: Optional[str] = None,
scale_score: Optional[bool] = None,
custom_query: Optional[Dict[str, Any]] = None,
):
"""
Retrieve documents using BM25 retrieval.

:param query: The query string.
:param filters: Filters applied to the retrieved documents. The way runtime filters are applied depends on
the `filter_policy` specified at Retriever's initialization.
:param all_terms_must_match: If `True`, all terms in the query string must be present in the
retrieved documents.
:param top_k: Maximum number of documents to return.
:param fuzziness: Fuzziness parameter for full-text queries to apply approximate string matching.
For more information, see [OpenSearch fuzzy query](https://opensearch.org/docs/latest/query-dsl/term/fuzzy/).
:param scale_score: If `True`, scales the score of retrieved documents to a range between 0 and 1.
This is useful when comparing documents across different indexes.
:param custom_query: A custom OpenSearch query. It must include a `$query` and may optionally
include a `$filters` placeholder.

:returns:
A dictionary containing the retrieved documents with the following structure:
- documents: List of retrieved Documents.

"""
docs: List[Document] = []
bm25_args = self._prepare_bm25_args(
query=query,
filters=filters,
all_terms_must_match=all_terms_must_match,
top_k=top_k,
fuzziness=fuzziness,
scale_score=scale_score,
custom_query=custom_query,
)
try:
docs = await self._document_store._bm25_retrieval_async(**bm25_args)
except Exception as e:
if self._raise_on_failure:
raise e
logger.warning(
"An error during BM25 retrieval occurred and will be ignored by returning empty results: {error}",
error=str(e),
exc_info=True,
)

return {"documents": docs}
Loading
Loading