Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Improvements to NvidiaRanker and adding user input timeout #1193

Merged
merged 16 commits into from
Nov 21, 2024
Merged
2 changes: 1 addition & 1 deletion integrations/nvidia/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ hatch run test
To only run unit tests:

```
hatch run test -m"not integration"
hatch run test -m "not integration"
```

To run the linters `ruff` and `mypy`:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@
#
# SPDX-License-Identifier: Apache-2.0

import os
import warnings
from typing import Any, Dict, List, Optional, Tuple, Union

from haystack import Document, component, default_from_dict, default_to_dict
from haystack import Document, component, default_from_dict, default_to_dict, logging
from haystack.utils import Secret, deserialize_secrets_inplace
from tqdm import tqdm

from haystack_integrations.components.embedders.nvidia.truncate import EmbeddingTruncateMode
from haystack_integrations.utils.nvidia import NimBackend, is_hosted, url_validation

logger = logging.getLogger(__name__)

_DEFAULT_API_URL = "https://ai.api.nvidia.com/v1/retrieval/nvidia"


Expand Down Expand Up @@ -47,6 +50,7 @@ def __init__(
meta_fields_to_embed: Optional[List[str]] = None,
embedding_separator: str = "\n",
truncate: Optional[Union[EmbeddingTruncateMode, str]] = None,
timeout: Optional[float] = None,
):
"""
Create a NvidiaTextEmbedder component.
Expand Down Expand Up @@ -76,6 +80,9 @@ def __init__(
:param truncate:
Specifies how inputs longer that the maximum token length should be truncated.
sjrl marked this conversation as resolved.
Show resolved Hide resolved
If None the behavior is model-dependent, see the official documentation for more information.
:param timeout:
Timeout for request calls, if not set it is inferred from the `NVIDIA_TIMEOUT` environment variable
or set to 60 by default.
"""

self.api_key = api_key
Expand All @@ -98,6 +105,10 @@ def __init__(
if is_hosted(api_url) and not self.model: # manually set default model
self.model = "nvidia/nv-embedqa-e5-v5"

if timeout is None:
timeout = float(os.environ.get("NVIDIA_TIMEOUT", 60.0))
self.timeout = timeout

def default_model(self):
"""Set default model in local NIM mode."""
valid_models = [
Expand Down Expand Up @@ -128,10 +139,11 @@ def warm_up(self):
if self.truncate is not None:
model_kwargs["truncate"] = str(self.truncate)
self.backend = NimBackend(
self.model,
model=self.model,
api_url=self.api_url,
api_key=self.api_key,
model_kwargs=model_kwargs,
timeout=self.timeout,
)

self._initialized = True
Expand Down Expand Up @@ -238,8 +250,7 @@ def run(self, documents: List[Document]):

for doc in documents:
if not doc.content:
msg = f"Document '{doc.id}' has no content to embed."
raise ValueError(msg)
logger.warning(f"Document '{doc.id}' has no content to embed.")

texts_to_embed = self._prepare_texts_to_embed(documents)
embeddings, metadata = self._embed_batch(texts_to_embed, self.batch_size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@
#
# SPDX-License-Identifier: Apache-2.0

import os
import warnings
from typing import Any, Dict, List, Optional, Union

from haystack import component, default_from_dict, default_to_dict
from haystack import component, default_from_dict, default_to_dict, logging
from haystack.utils import Secret, deserialize_secrets_inplace

from haystack_integrations.components.embedders.nvidia.truncate import EmbeddingTruncateMode
from haystack_integrations.utils.nvidia import NimBackend, is_hosted, url_validation

logger = logging.getLogger(__name__)

_DEFAULT_API_URL = "https://ai.api.nvidia.com/v1/retrieval/nvidia"


Expand Down Expand Up @@ -44,6 +47,7 @@ def __init__(
prefix: str = "",
suffix: str = "",
truncate: Optional[Union[EmbeddingTruncateMode, str]] = None,
timeout: Optional[float] = None,
sjrl marked this conversation as resolved.
Show resolved Hide resolved
):
"""
Create a NvidiaTextEmbedder component.
Expand All @@ -64,6 +68,9 @@ def __init__(
:param truncate:
Specifies how inputs longer that the maximum token length should be truncated.
If None the behavior is model-dependent, see the official documentation for more information.
:param timeout:
Timeout for request calls, if not set it is inferred from the `NVIDIA_TIMEOUT` environment variable
or set to 60 by default.
"""

self.api_key = api_key
Expand All @@ -82,13 +89,23 @@ def __init__(
if is_hosted(api_url) and not self.model: # manually set default model
self.model = "nvidia/nv-embedqa-e5-v5"

if timeout is None:
timeout = float(os.environ.get("NVIDIA_TIMEOUT", 60.0))
self.timeout = timeout

def default_model(self):
"""Set default model in local NIM mode."""
valid_models = [
model.id for model in self.backend.models() if not model.base_model or model.base_model == model.id
]
name = next(iter(valid_models), None)
if name:
logger.warning(
"Default model is set as: {model_name}. \n"
"Set model using model parameter. \n"
"To get available models use available_models property.",
model_name=name,
)
warnings.warn(
f"Default model is set as: {name}. \n"
"Set model using model parameter. \n"
Expand All @@ -112,10 +129,11 @@ def warm_up(self):
if self.truncate is not None:
model_kwargs["truncate"] = str(self.truncate)
self.backend = NimBackend(
self.model,
model=self.model,
api_url=self.api_url,
api_key=self.api_key,
model_kwargs=model_kwargs,
timeout=self.timeout,
)

self._initialized = True
Expand Down Expand Up @@ -150,7 +168,9 @@ def from_dict(cls, data: Dict[str, Any]) -> "NvidiaTextEmbedder":
:returns:
The deserialized component.
"""
deserialize_secrets_inplace(data["init_parameters"], keys=["api_key"])
init_parameters = data.get("init_parameters", {})
if init_parameters:
deserialize_secrets_inplace(data["init_parameters"], keys=["api_key"])
return default_from_dict(cls, data)

@component.output_types(embedding=List[float], meta=Dict[str, Any])
Expand All @@ -162,7 +182,7 @@ def run(self, text: str):
The text to embed.
:returns:
A dictionary with the following keys and values:
- `embedding` - Embeddng of the text.
- `embedding` - Embedding of the text.
- `meta` - Metadata on usage statistics, etc.
:raises RuntimeError:
If the component was not initialized.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#
# SPDX-License-Identifier: Apache-2.0

import os
import warnings
from typing import Any, Dict, List, Optional

Expand Down Expand Up @@ -49,6 +50,7 @@ def __init__(
api_url: str = _DEFAULT_API_URL,
api_key: Optional[Secret] = Secret.from_env_var("NVIDIA_API_KEY"),
model_arguments: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
sjrl marked this conversation as resolved.
Show resolved Hide resolved
):
"""
Create a NvidiaGenerator component.
Expand All @@ -70,6 +72,9 @@ def __init__(
specific to a model.
Search your model in the [NVIDIA NIM](https://ai.nvidia.com)
to find the arguments it accepts.
:param timeout:
Timeout for request calls, if not set it is inferred from the `NVIDIA_TIMEOUT` environment variable
or set to 60 by default.
"""
self._model = model
self._api_url = url_validation(api_url, _DEFAULT_API_URL, ["v1/chat/completions"])
Expand All @@ -79,6 +84,9 @@ def __init__(
self._backend: Optional[Any] = None

self.is_hosted = is_hosted(api_url)
if timeout is None:
timeout = float(os.environ.get("NVIDIA_TIMEOUT", 60.0))
self.timeout = timeout

def default_model(self):
"""Set default model in local NIM mode."""
Expand Down Expand Up @@ -110,10 +118,11 @@ def warm_up(self):
msg = "API key is required for hosted NVIDIA NIMs."
raise ValueError(msg)
self._backend = NimBackend(
self._model,
model=self._model,
api_url=self._api_url,
api_key=self._api_key,
model_kwargs=self._model_arguments,
timeout=self.timeout,
)

if not self.is_hosted and not self._model:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#
# SPDX-License-Identifier: Apache-2.0

import os
import warnings
from typing import Any, Dict, List, Optional, Union

Expand Down Expand Up @@ -58,6 +59,11 @@ def __init__(
api_url: Optional[str] = None,
api_key: Optional[Secret] = Secret.from_env_var("NVIDIA_API_KEY"),
top_k: int = 5,
query_prefix: str = "",
document_prefix: str = "",
meta_fields_to_embed: Optional[List[str]] = None,
embedding_separator: str = "\n",
timeout: Optional[float] = None,
):
"""
Create a NvidiaRanker component.
Expand All @@ -72,6 +78,19 @@ def __init__(
Custom API URL for the NVIDIA NIM.
:param top_k:
Number of documents to return.
:param query_prefix:
A string to add at the beginning of the query text before ranking.
Use it to prepend the text with an instruction, as required by reranking models like `bge`.
:param document_prefix:
A string to add at the beginning of each document before ranking. You can use it to prepend the document
with an instruction, as required by embedding models like `bge`.
:param meta_fields_to_embed:
List of metadata fields to embed with the document.
:param embedding_separator:
Separator to concatenate metadata fields to the document.
:param timeout:
Timeout for request calls, if not set it is inferred from the `NVIDIA_TIMEOUT` environment variable
or set to 60 by default.
"""
if model is not None and not isinstance(model, str):
msg = "Ranker expects the `model` parameter to be a string."
Expand All @@ -86,27 +105,35 @@ def __init__(
raise TypeError(msg)

# todo: detect default in non-hosted case (when api_url is provided)
self._model = model or _DEFAULT_MODEL
self._truncate = truncate
self._api_key = api_key
self.model = model or _DEFAULT_MODEL
self.truncate = truncate
self.api_key = api_key
# if no api_url is provided, we're using a hosted model and can
# - assume the default url will work, because there's only one model
# - assume we won't call backend.models()
if api_url is not None:
self._api_url = url_validation(api_url, None, ["v1/ranking"])
self._endpoint = None # we let backend.rank() handle the endpoint
self.api_url = url_validation(api_url, None, ["v1/ranking"])
self.endpoint = None # we let backend.rank() handle the endpoint
else:
if self._model not in _MODEL_ENDPOINT_MAP:
if self.model not in _MODEL_ENDPOINT_MAP:
msg = f"Model '{model}' is unknown. Please provide an api_url to access it."
raise ValueError(msg)
self._api_url = None # we handle the endpoint
self._endpoint = _MODEL_ENDPOINT_MAP[self._model]
self.api_url = None # we handle the endpoint
self.endpoint = _MODEL_ENDPOINT_MAP[self.model]
if api_key is None:
self._api_key = Secret.from_env_var("NVIDIA_API_KEY")
self._top_k = top_k
self.top_k = top_k
self._initialized = False
self._backend: Optional[Any] = None

self.query_prefix = query_prefix
self.document_prefix = document_prefix
self.meta_fields_to_embed = meta_fields_to_embed or []
self.embedding_separator = embedding_separator
if timeout is None:
timeout = float(os.environ.get("NVIDIA_TIMEOUT", 60.0))
self.timeout = timeout

def to_dict(self) -> Dict[str, Any]:
"""
Serialize the ranker to a dictionary.
Expand All @@ -115,11 +142,15 @@ def to_dict(self) -> Dict[str, Any]:
"""
return default_to_dict(
self,
model=self._model,
top_k=self._top_k,
truncate=self._truncate,
api_url=self._api_url,
api_key=self._api_key.to_dict() if self._api_key else None,
model=self.model,
top_k=self.top_k,
truncate=self.truncate,
api_url=self.api_url,
api_key=self.api_key.to_dict() if self.api_key else None,
query_prefix=self.query_prefix,
document_prefix=self.document_prefix,
meta_fields_to_embed=self.meta_fields_to_embed,
embedding_separator=self.embedding_separator,
)

@classmethod
Expand All @@ -143,18 +174,31 @@ def warm_up(self):
"""
if not self._initialized:
model_kwargs = {}
if self._truncate is not None:
model_kwargs.update(truncate=str(self._truncate))
if self.truncate is not None:
model_kwargs.update(truncate=str(self.truncate))
self._backend = NimBackend(
self._model,
api_url=self._api_url,
api_key=self._api_key,
model=self.model,
api_url=self.api_url,
api_key=self.api_key,
model_kwargs=model_kwargs,
timeout=self.timeout,
)
if not self._model:
self._model = _DEFAULT_MODEL
if not self.model:
self.model = _DEFAULT_MODEL
self._initialized = True

def _prepare_documents_to_embed(self, documents: List[Document]) -> List[str]:
document_texts = []
for doc in documents:
meta_values_to_embed = [
str(doc.meta[key])
for key in self.meta_fields_to_embed
if key in doc.meta and doc.meta[key] # noqa: RUF019
]
text_to_embed = self.embedding_separator.join([*meta_values_to_embed, doc.content or ""])
document_texts.append(self.document_prefix + text_to_embed)
return document_texts

@component.output_types(documents=List[Document])
def run(
self,
Expand Down Expand Up @@ -193,18 +237,22 @@ def run(
if len(documents) == 0:
return {"documents": []}

top_k = top_k if top_k is not None else self._top_k
top_k = top_k if top_k is not None else self.top_k
if top_k < 1:
logger.warning("top_k should be at least 1, returning nothing")
warnings.warn("top_k should be at least 1, returning nothing", stacklevel=2)
return {"documents": []}

assert self._backend is not None

query_text = self.query_prefix + query
document_texts = self._prepare_documents_to_embed(documents=documents)

# rank result is list[{index: int, logit: float}] sorted by logit
sorted_indexes_and_scores = self._backend.rank(
query,
documents,
endpoint=self._endpoint,
query_text=query_text,
document_texts=document_texts,
endpoint=self.endpoint,
)
sorted_documents = []
for item in sorted_indexes_and_scores[:top_k]:
Expand Down
Loading