From c8847afdc1710d5ef71455548feb3b0cda8fbd8d Mon Sep 17 00:00:00 2001 From: hsm207 Date: Thu, 22 Feb 2024 07:35:34 +0000 Subject: [PATCH 01/52] upgrade to latest weaviate server --- integrations/weaviate/docker-compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/weaviate/docker-compose.yml b/integrations/weaviate/docker-compose.yml index c61b0ed57..f28328751 100644 --- a/integrations/weaviate/docker-compose.yml +++ b/integrations/weaviate/docker-compose.yml @@ -8,7 +8,7 @@ services: - '8080' - --scheme - http - image: semitechnologies/weaviate:1.23.2 + image: semitechnologies/weaviate:1.23.10 ports: - 8080:8080 - 50051:50051 From 885a77766d2481ad5b7455aa6f47ae5027cfc2a2 Mon Sep 17 00:00:00 2001 From: hsm207 Date: Thu, 22 Feb 2024 08:17:49 +0000 Subject: [PATCH 02/52] upgrade to latest weaviate client --- integrations/weaviate/pyproject.toml | 2 +- integrations/weaviate/tests/test_document_store.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/integrations/weaviate/pyproject.toml b/integrations/weaviate/pyproject.toml index 00aa500e6..5e2d31466 100644 --- a/integrations/weaviate/pyproject.toml +++ b/integrations/weaviate/pyproject.toml @@ -24,7 +24,7 @@ classifiers = [ ] dependencies = [ "haystack-ai", - "weaviate-client==3.*", + "weaviate-client", "haystack-pydoc-tools", "python-dateutil", ] diff --git a/integrations/weaviate/tests/test_document_store.py b/integrations/weaviate/tests/test_document_store.py index a2b32d578..8602f46de 100644 --- a/integrations/weaviate/tests/test_document_store.py +++ b/integrations/weaviate/tests/test_document_store.py @@ -245,9 +245,11 @@ def test_to_dict(self, _mock_weaviate, monkeypatch): }, "additional_config": { "grpc_port_experimental": 12345, + 'grpc_secure_experimental': False, "connection_config": { "session_pool_connections": 20, - "session_pool_maxsize": 20, + "session_pool_maxsize": 100, + 'session_pool_max_retries': 3, }, }, }, From 104a981ca7c1f671a7561da315b9357f7418443b Mon Sep 17 00:00:00 2001 From: hsm207 Date: Thu, 22 Feb 2024 15:08:59 +0000 Subject: [PATCH 03/52] reformat code --- integrations/weaviate/tests/test_document_store.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integrations/weaviate/tests/test_document_store.py b/integrations/weaviate/tests/test_document_store.py index 8602f46de..d6aed9a83 100644 --- a/integrations/weaviate/tests/test_document_store.py +++ b/integrations/weaviate/tests/test_document_store.py @@ -245,11 +245,11 @@ def test_to_dict(self, _mock_weaviate, monkeypatch): }, "additional_config": { "grpc_port_experimental": 12345, - 'grpc_secure_experimental': False, + "grpc_secure_experimental": False, "connection_config": { "session_pool_connections": 20, "session_pool_maxsize": 100, - 'session_pool_max_retries': 3, + "session_pool_max_retries": 3, }, }, }, From fe7b1a35be5545402871aa977e9d9c37c3a4dff1 Mon Sep 17 00:00:00 2001 From: hsm207 Date: Thu, 22 Feb 2024 15:09:40 +0000 Subject: [PATCH 04/52] create client using v4 api --- .../weaviate/document_store.py | 54 ++++++------------- 1 file changed, 16 insertions(+), 38 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index 071fe336b..1519d24d3 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -11,7 +11,7 @@ from haystack.document_stores.types.policy import DuplicatePolicy import weaviate -from weaviate.config import Config, ConnectionConfig +from weaviate.config import AdditionalConfig, Config, ConnectionConfig from weaviate.embedded import EmbeddedOptions from weaviate.util import generate_uuid5 @@ -54,13 +54,11 @@ def __init__( url: Optional[str] = None, collection_settings: Optional[Dict[str, Any]] = None, auth_client_secret: Optional[AuthCredentials] = None, - timeout_config: TimeoutType = (10, 60), - proxies: Optional[Union[Dict, str]] = None, - trust_env: bool = False, additional_headers: Optional[Dict] = None, - startup_period: Optional[int] = 5, embedded_options: Optional[EmbeddedOptions] = None, - additional_config: Optional[Config] = None, + additional_config: Optional[AdditionalConfig] = None, + grpc_port: int = 50051, + grpc_secure: bool = False, ): """ Create a new instance of WeaviateDocumentStore and connects to the Weaviate instance. @@ -88,43 +86,31 @@ def __init__( - `AuthClientPassword` to use username and password for oidc Resource Owner Password flow - `AuthClientCredentials` to use a client secret for oidc client credential flow - `AuthApiKey` to use an API key - :param timeout_config: Timeout configuration for all requests to the Weaviate server, defaults to (10, 60). - It can be a real number or, a tuple of two real numbers: (connect timeout, read timeout). - If only one real number is passed then both connect and read timeout will be set to - that value, by default (2, 20). - :param proxies: Proxy configuration, defaults to None. - Can be passed as a dict using the - ``requests` format`_, - or a string. If a string is passed it will be used for both HTTP and HTTPS requests. - :param trust_env: Whether to read proxies from the ENV variables, defaults to False. - Proxies will be read from the following ENV variables: - * `HTTP_PROXY` - * `http_proxy` - * `HTTPS_PROXY` - * `https_proxy` - If `proxies` is not None, `trust_env` is ignored. :param additional_headers: Additional headers to include in the requests, defaults to None. Can be used to set OpenAI/HuggingFace keys. OpenAI/HuggingFace key looks like this: ``` {"X-OpenAI-Api-Key": ""}, {"X-HuggingFace-Api-Key": ""} ``` - :param startup_period: How many seconds the client will wait for Weaviate to start before - raising a RequestsConnectionError, defaults to 5. :param embedded_options: If set create an embedded Weaviate cluster inside the client, defaults to None. For a full list of options see `weaviate.embedded.EmbeddedOptions`. :param additional_config: Additional and advanced configuration options for weaviate, defaults to None. + :param grpc_port: The port to use for the gRPC connection, defaults to 50051. + :param grpc_secure: Whether to use a secure channel for the underlying gRPC API. """ - self._client = weaviate.Client( - url=url, + # proxies, timeout_config, trust_env are part of additional_config now + # startup_period has been removed + connection_params = weaviate.connect.base.ConnectionParams.from_url( + url=url, grpc_port=grpc_port, grpc_secure=grpc_secure + ) + self._client = weaviate.WeaviateClient( + connection_params=connection_params, auth_client_secret=auth_client_secret.resolve_value() if auth_client_secret else None, - timeout_config=timeout_config, - proxies=proxies, - trust_env=trust_env, + additional_config=additional_config, additional_headers=additional_headers, - startup_period=startup_period, embedded_options=embedded_options, - additional_config=additional_config, + skip_init_checks=False, ) + self._client.connect() # Test connection, it will raise an exception if it fails. self._client.schema.get() @@ -147,11 +133,7 @@ def __init__( self._url = url self._collection_settings = collection_settings self._auth_client_secret = auth_client_secret - self._timeout_config = timeout_config - self._proxies = proxies - self._trust_env = trust_env self._additional_headers = additional_headers - self._startup_period = startup_period self._embedded_options = embedded_options self._additional_config = additional_config @@ -164,11 +146,7 @@ def to_dict(self) -> Dict[str, Any]: url=self._url, collection_settings=self._collection_settings, auth_client_secret=self._auth_client_secret.to_dict() if self._auth_client_secret else None, - timeout_config=self._timeout_config, - proxies=self._proxies, - trust_env=self._trust_env, additional_headers=self._additional_headers, - startup_period=self._startup_period, embedded_options=embedded_options, additional_config=additional_config, ) From 115d965898c324ca300275cb58b5538b02d95f0a Mon Sep 17 00:00:00 2001 From: hsm207 Date: Thu, 22 Feb 2024 15:42:04 +0000 Subject: [PATCH 05/52] use v4 api to create collection --- .../document_stores/weaviate/document_store.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index 1519d24d3..6d40dbbfc 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -113,7 +113,7 @@ def __init__( self._client.connect() # Test connection, it will raise an exception if it fails. - self._client.schema.get() + self._client.collections._get_all(simple=True) if collection_settings is None: collection_settings = { @@ -127,8 +127,8 @@ def __init__( # Set the properties if they're not set collection_settings["properties"] = collection_settings.get("properties", DOCUMENT_COLLECTION_PROPERTIES) - if not self._client.schema.exists(collection_settings["class"]): - self._client.schema.create_class(collection_settings) + if not self._client.collections.exists(collection_settings["class"]): + self._client.collections.create_from_dict(collection_settings) self._url = url self._collection_settings = collection_settings From e88b1ac677ddb648740a44651ffd15b337a17ffb Mon Sep 17 00:00:00 2001 From: hsm207 Date: Thu, 22 Feb 2024 15:42:13 +0000 Subject: [PATCH 06/52] store collection obj for convenience --- .../document_stores/weaviate/document_store.py | 1 + 1 file changed, 1 insertion(+) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index 6d40dbbfc..2dde76ce7 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -136,6 +136,7 @@ def __init__( self._additional_headers = additional_headers self._embedded_options = embedded_options self._additional_config = additional_config + self._collection = self._client.collections.get(collection_settings["class"]) def to_dict(self) -> Dict[str, Any]: embedded_options = asdict(self._embedded_options) if self._embedded_options else None From e45c7b52115a7871e279936c46fdd8a831a205da Mon Sep 17 00:00:00 2001 From: hsm207 Date: Tue, 27 Feb 2024 11:54:46 +0000 Subject: [PATCH 07/52] upgrade filters to use v4 api --- .../document_stores/weaviate/_filters.py | 71 +++++++++++++------ 1 file changed, 48 insertions(+), 23 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py index a192c6947..f22a9c4cc 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py @@ -4,6 +4,9 @@ from haystack.errors import FilterError from pandas import DataFrame +import weaviate +from weaviate.collections.classes.filters import _FilterAnd, _FilterOr + def convert_filters(filters: Dict[str, Any]) -> Dict[str, Any]: """ @@ -14,7 +17,8 @@ def convert_filters(filters: Dict[str, Any]) -> Dict[str, Any]: raise FilterError(msg) if "field" in filters: - return {"operator": "And", "operands": [_parse_comparison_condition(filters)]} + # return {"operator": "And", "operands": [_parse_comparison_condition(filters)]} + return _FilterAnd(_parse_comparison_condition(filters)) return _parse_logical_condition(filters) @@ -51,6 +55,12 @@ def _invert_condition(filters: Dict[str, Any]) -> Dict[str, Any]: return inverted_condition +LOGICAL_OPERATORS = { + "AND": _FilterAnd, + "OR": _FilterOr, +} + + def _parse_logical_condition(condition: Dict[str, Any]) -> Dict[str, Any]: if "operator" not in condition: msg = f"'operator' key missing in {condition}" @@ -67,7 +77,8 @@ def _parse_logical_condition(condition: Dict[str, Any]) -> Dict[str, Any]: operands.append(_parse_logical_condition(c)) else: operands.append(_parse_comparison_condition(c)) - return {"operator": operator.lower().capitalize(), "operands": operands} + # return {"operator": operator.lower().capitalize(), "operands": operands} + return LOGICAL_OPERATORS[operator](*operands) elif operator == "NOT": inverted_conditions = _invert_condition(condition) return _parse_logical_condition(inverted_conditions) @@ -110,19 +121,23 @@ def _handle_date(value: Any) -> str: def _equal(field: str, value: Any) -> Dict[str, Any]: if value is None: return {"path": field, "operator": "IsNull", "valueBoolean": True} - return {"path": field, "operator": "Equal", _infer_value_type(value): _handle_date(value)} + # return {"path": field, "operator": "Equal", _infer_value_type(value): _handle_date(value)} + return weaviate.classes.query.Filter.by_property(field).equal(_handle_date(value)) def _not_equal(field: str, value: Any) -> Dict[str, Any]: if value is None: return {"path": field, "operator": "IsNull", "valueBoolean": False} - return { - "operator": "Or", - "operands": [ - {"path": field, "operator": "NotEqual", _infer_value_type(value): _handle_date(value)}, - {"path": field, "operator": "IsNull", "valueBoolean": True}, - ], - } + # return { + # "operator": "Or", + # "operands": [ + # {"path": field, "operator": "NotEqual", _infer_value_type(value): _handle_date(value)}, + # {"path": field, "operator": "IsNull", "valueBoolean": True}, + # ], + # } + return weaviate.classes.query.Filter.by_property(field).not_equal( + _handle_date(value) + ) | weaviate.classes.query.Filter.by_property(field).is_none(True) def _greater_than(field: str, value: Any) -> Dict[str, Any]: @@ -144,7 +159,8 @@ def _greater_than(field: str, value: Any) -> Dict[str, Any]: if type(value) in [list, DataFrame]: msg = f"Filter value can't be of type {type(value)} using operators '>', '>=', '<', '<='" raise FilterError(msg) - return {"path": field, "operator": "GreaterThan", _infer_value_type(value): _handle_date(value)} + # return {"path": field, "operator": "GreaterThan", _infer_value_type(value): _handle_date(value)} + return weaviate.classes.query.Filter.by_property(field).greater_than(_handle_date(value)) def _greater_than_equal(field: str, value: Any) -> Dict[str, Any]: @@ -166,7 +182,8 @@ def _greater_than_equal(field: str, value: Any) -> Dict[str, Any]: if type(value) in [list, DataFrame]: msg = f"Filter value can't be of type {type(value)} using operators '>', '>=', '<', '<='" raise FilterError(msg) - return {"path": field, "operator": "GreaterThanEqual", _infer_value_type(value): _handle_date(value)} + # return {"path": field, "operator": "GreaterThanEqual", _infer_value_type(value): _handle_date(value)} + return weaviate.classes.query.Filter.by_property(field).greater_or_equal(_handle_date(value)) def _less_than(field: str, value: Any) -> Dict[str, Any]: @@ -188,7 +205,8 @@ def _less_than(field: str, value: Any) -> Dict[str, Any]: if type(value) in [list, DataFrame]: msg = f"Filter value can't be of type {type(value)} using operators '>', '>=', '<', '<='" raise FilterError(msg) - return {"path": field, "operator": "LessThan", _infer_value_type(value): _handle_date(value)} + # return {"path": field, "operator": "LessThan", _infer_value_type(value): _handle_date(value)} + return weaviate.classes.query.Filter.by_property(field).less_than(_handle_date(value)) def _less_than_equal(field: str, value: Any) -> Dict[str, Any]: @@ -210,7 +228,8 @@ def _less_than_equal(field: str, value: Any) -> Dict[str, Any]: if type(value) in [list, DataFrame]: msg = f"Filter value can't be of type {type(value)} using operators '>', '>=', '<', '<='" raise FilterError(msg) - return {"path": field, "operator": "LessThanEqual", _infer_value_type(value): _handle_date(value)} + # return {"path": field, "operator": "LessThanEqual", _infer_value_type(value): _handle_date(value)} + return weaviate.classes.query.Filter.by_property(field).less_or_equal(_handle_date(value)) def _in(field: str, value: Any) -> Dict[str, Any]: @@ -218,14 +237,17 @@ def _in(field: str, value: Any) -> Dict[str, Any]: msg = f"{field}'s value must be a list when using 'in' or 'not in' comparators" raise FilterError(msg) - return {"operator": "And", "operands": [_equal(field, v) for v in value]} + # return {"operator": "And", "operands": [_equal(field, v) for v in value]} + return weaviate.classes.query.Filter.by_property(field).contains_any(value) def _not_in(field: str, value: Any) -> Dict[str, Any]: if not isinstance(value, list): msg = f"{field}'s value must be a list when using 'in' or 'not in' comparators" raise FilterError(msg) - return {"operator": "And", "operands": [_not_equal(field, v) for v in value]} + # return {"operator": "And", "operands": [_not_equal(field, v) for v in value]} + operands = [weaviate.classes.query.Filter.by_property(field).not_equal(v) for v in value] + return _FilterAnd(*operands) COMPARISON_OPERATORS = { @@ -270,10 +292,13 @@ def _match_no_document(field: str) -> Dict[str, Any]: Returns a filters that will match no Document, this is used to keep the behavior consistent between different Document Stores. """ - return { - "operator": "And", - "operands": [ - {"path": field, "operator": "IsNull", "valueBoolean": False}, - {"path": field, "operator": "IsNull", "valueBoolean": True}, - ], - } + # return { + # "operator": "And", + # "operands": [ + # {"path": field, "operator": "IsNull", "valueBoolean": False}, + # {"path": field, "operator": "IsNull", "valueBoolean": True}, + # ], + # } + + operands = [weaviate.classes.query.Filter.by_property(field).is_none(val) for val in [False, True]] + return _FilterAnd(*operands) From 379150145270a0ef859251efb7b36fd3120dc01d Mon Sep 17 00:00:00 2001 From: hsm207 Date: Tue, 27 Feb 2024 11:55:43 +0000 Subject: [PATCH 08/52] upgrade batch write to use v4 api --- .../weaviate/document_store.py | 70 ++++++++++++------- 1 file changed, 44 insertions(+), 26 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index 2dde76ce7..0cba36c9a 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -291,33 +291,51 @@ def _batch_write(self, documents: List[Document]) -> int: Documents with the same id will be overwritten. Raises in case of errors. """ - statuses = [] - for doc in documents: - if not isinstance(doc, Document): - msg = f"Expected a Document, got '{type(doc)}' instead." - raise ValueError(msg) - if self._client.batch.num_objects() == self._client.batch.recommended_num_objects: - # Batch is full, let's create the objects - statuses.extend(self._client.batch.create_objects()) - self._client.batch.add_data_object( - uuid=generate_uuid5(doc.id), - data_object=self._to_data_object(doc), - class_name=self._collection_settings["class"], - vector=doc.embedding, + # statuses = [] + # for doc in documents: + # if not isinstance(doc, Document): + # msg = f"Expected a Document, got '{type(doc)}' instead." + # raise ValueError(msg) + # if self._client.batch.num_objects() == self._client.batch.recommended_num_objects: + # # Batch is full, let's create the objects + # statuses.extend(self._client.batch.create_objects()) + # self._client.batch.add_data_object( + # uuid=generate_uuid5(doc.id), + # data_object=self._to_data_object(doc), + # class_name=self._collection_settings["class"], + # vector=doc.embedding, + # ) + # # Write remaining documents + # statuses.extend(self._client.batch.create_objects()) + + # errors = [] + # # Gather errors and number of written documents + # for status in statuses: + # result_status = status.get("result", {}).get("status") + # if result_status == "FAILED": + # errors.extend([e["message"] for e in status["result"]["errors"]["error"]]) + + # if errors: + # msg = "\n".join(errors) + # msg = f"Failed to write documents in Weaviate. Errors:\n{msg}" + # raise DocumentStoreError(msg) + + with self._client.batch.dynamic() as batch: + for doc in documents: + batch.add_object( + properties=self._to_data_object(doc), + collection=self._collection.name, + uuid=generate_uuid5(doc.id), + vector=doc.embedding, + ) + failed_objects = self._client.batch.failed_objects + if failed_objects: + msg = "\n".join( + [ + f"Failed to write object with id '{obj.object_._original_id}'. Error: '{obj.message}'" + for obj in failed_objects + ] ) - # Write remaining documents - statuses.extend(self._client.batch.create_objects()) - - errors = [] - # Gather errors and number of written documents - for status in statuses: - result_status = status.get("result", {}).get("status") - if result_status == "FAILED": - errors.extend([e["message"] for e in status["result"]["errors"]["error"]]) - - if errors: - msg = "\n".join(errors) - msg = f"Failed to write documents in Weaviate. Errors:\n{msg}" raise DocumentStoreError(msg) # If the document already exists we get no status message back from Weaviate. From 9d70bdec4a76b122b85c19d57178cd3246225f01 Mon Sep 17 00:00:00 2001 From: hsm207 Date: Tue, 27 Feb 2024 11:56:35 +0000 Subject: [PATCH 09/52] use v4 api cursor to retrieve all docs --- .../weaviate/document_store.py | 50 ++++++++++--------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index 0cba36c9a..1771f35ec 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -217,31 +217,33 @@ def _to_document(self, data: Dict[str, Any]) -> Document: return Document.from_dict(data) - def _query_paginated(self, properties: List[str], cursor=None): - collection_name = self._collection_settings["class"] - query = ( - self._client.query.get( - collection_name, - properties, - ) - .with_additional(["id vector"]) - .with_limit(100) - ) - - if cursor: - # Fetch the next set of results - result = query.with_after(cursor).do() - else: - # Fetch the first set of results - result = query.do() - - if "errors" in result: - errors = [e["message"] for e in result.get("errors", {})] - msg = "\n".join(errors) - msg = f"Failed to query documents in Weaviate. Errors:\n{msg}" - raise DocumentStoreError(msg) + def _query_paginated(self, properties: List[str]): + # collection_name = self._collection_settings["class"] + # query = ( + # self._client.query.get( + # collection_name, + # properties, + # ) + # .with_additional(["id vector"]) + # .with_limit(100) + # ) + + # if cursor: + # # Fetch the next set of results + # result = query.with_after(cursor).do() + # else: + # # Fetch the first set of results + # result = query.do() + + # if "errors" in result: + # errors = [e["message"] for e in result.get("errors", {})] + # msg = "\n".join(errors) + # msg = f"Failed to query documents in Weaviate. Errors:\n{msg}" + # raise DocumentStoreError(msg) - return result["data"]["Get"][collection_name] + # return result["data"]["Get"][collection_name] + result = self._collection.iterator(include_vector=True, return_properties=properties) + return list(result) def _query_with_filters(self, properties: List[str], filters: Dict[str, Any]) -> List[Dict[str, Any]]: collection_name = self._collection_settings["class"] From 7a121b8531c997ed33382b0f91e62587f9a341f3 Mon Sep 17 00:00:00 2001 From: hsm207 Date: Tue, 27 Feb 2024 11:57:17 +0000 Subject: [PATCH 10/52] upgrade query with filters to use v4 api --- .../weaviate/document_store.py | 43 +++++++++++-------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index 1771f35ec..43082d886 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -245,26 +245,35 @@ def _query_paginated(self, properties: List[str]): result = self._collection.iterator(include_vector=True, return_properties=properties) return list(result) - def _query_with_filters(self, properties: List[str], filters: Dict[str, Any]) -> List[Dict[str, Any]]: - collection_name = self._collection_settings["class"] - query = ( - self._client.query.get( - collection_name, - properties, - ) - .with_additional(["id vector"]) - .with_where(convert_filters(filters)) - ) + def _query_with_filters(self, filters: weaviate.collections.classes.filters.Filter) -> List[Dict[str, Any]]: + # collection_name = self._collection_settings["class"] - result = query.do() + try: + result = self._collection.query.fetch_objects(filters=convert_filters(filters), include_vector=True) + except weaviate.exceptions.WeaviateQueryError as e: + msg = f"Failed to query documents in Weaviate. Error: {e.message}" + raise DocumentStoreError(msg) from None - if "errors" in result: - errors = [e["message"] for e in result.get("errors", {})] - msg = "\n".join(errors) - msg = f"Failed to query documents in Weaviate. Errors:\n{msg}" - raise DocumentStoreError(msg) + # query = ( + # self._client.query.get( + # collection_name, + # properties, + # ) + # .with_additional(["id vector"]) + # .with_where(convert_filters(filters)) + # ) + + # result = query.do() + + # if "errors" in result: + # errors = [e["message"] for e in result.get("errors", {})] + # msg = "\n".join(errors) + # msg = f"Failed to query documents in Weaviate. Errors:\n{msg}" + # raise DocumentStoreError(msg) + + # return result["data"]["Get"][collection_name] - return result["data"]["Get"][collection_name] + return result.objects def filter_documents(self, filters: Optional[Dict[str, Any]] = None) -> List[Document]: properties = self._client.schema.get(self._collection_settings["class"]).get("properties", []) From 3e57ba29f039aed1603c3542560f1292945b6682 Mon Sep 17 00:00:00 2001 From: hsm207 Date: Tue, 27 Feb 2024 11:58:10 +0000 Subject: [PATCH 11/52] upgrade filter documents to use v4 API --- .../weaviate/document_store.py | 40 +++++++++++++------ 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index 43082d886..d5653a844 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -11,6 +11,7 @@ from haystack.document_stores.types.policy import DuplicatePolicy import weaviate +from weaviate.collections.classes.internal import Object from weaviate.config import AdditionalConfig, Config, ConnectionConfig from weaviate.embedded import EmbeddedOptions from weaviate.util import generate_uuid5 @@ -195,6 +196,16 @@ def _to_data_object(self, document: Document) -> Dict[str, Any]: return data + def _convert_weaviate_v4_object_to_v3_object(self, data: Object) -> Dict[str, Any]: + v4_object = data.__dict__ + v3_object = v4_object.pop("properties") + v3_object["_additional"] = {"vector": v4_object.pop("vector").get("default")} + + if "blob_data" not in v3_object: + v3_object["blob_data"] = None + v3_object["blob_mime_type"] = None + return v3_object + def _to_document(self, data: Dict[str, Any]) -> Document: """ Convert a data object read from Weaviate into a Document. @@ -275,25 +286,28 @@ def _query_with_filters(self, filters: weaviate.collections.classes.filters.Filt return result.objects - def filter_documents(self, filters: Optional[Dict[str, Any]] = None) -> List[Document]: - properties = self._client.schema.get(self._collection_settings["class"]).get("properties", []) - properties = [prop["name"] for prop in properties] + def filter_documents(self, filters: Optional[weaviate.collections.classes.filters.Filter] = None) -> List[Document]: + # properties = self._client.schema.get(self._collection_settings["class"]).get("properties", []) + properties = self._collection.config.get().properties + properties = [prop.name for prop in properties] if filters: - result = self._query_with_filters(properties, filters) + result = self._query_with_filters(filters) return [self._to_document(doc) for doc in result] - result = [] + # result = [] - cursor = None - while batch := self._query_paginated(properties, cursor): - # Take the cursor before we convert the batch to Documents as we manipulate - # the batch dictionary and might lose that information. - cursor = batch[-1]["_additional"]["id"] + # cursor = None + # while batch := self._query_paginated(properties, cursor): + # # Take the cursor before we convert the batch to Documents as we manipulate + # # the batch dictionary and might lose that information. + # cursor = batch[-1]["_additional"]["id"] - for doc in batch: - result.append(self._to_document(doc)) - # Move the cursor to the last returned uuid + # for doc in batch: + # result.append(self._to_document(doc)) + # # Move the cursor to the last returned uuid + result = self._query_paginated(properties) + result = [self._to_document(self._convert_weaviate_v4_object_to_v3_object(doc)) for doc in result] return result def _batch_write(self, documents: List[Document]) -> int: From c9544bd53faf11ca97fcaf9a1c81200c8b499ffc Mon Sep 17 00:00:00 2001 From: hsm207 Date: Tue, 27 Feb 2024 11:58:38 +0000 Subject: [PATCH 12/52] update weaviate fixture to align with v4 API --- integrations/weaviate/tests/test_document_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/weaviate/tests/test_document_store.py b/integrations/weaviate/tests/test_document_store.py index d6aed9a83..49014cf89 100644 --- a/integrations/weaviate/tests/test_document_store.py +++ b/integrations/weaviate/tests/test_document_store.py @@ -53,7 +53,7 @@ def document_store(self, request) -> WeaviateDocumentStore: collection_settings=collection_settings, ) yield store - store._client.schema.delete_class(collection_settings["class"]) + store._client.collections.delete(collection_settings["class"]) @pytest.fixture def filterable_docs(self) -> List[Document]: From 6c8c43ae450a111b04bb00b49f9fe98013e822b7 Mon Sep 17 00:00:00 2001 From: hsm207 Date: Tue, 27 Feb 2024 12:47:06 +0000 Subject: [PATCH 13/52] update v4 to v3 conversion logic --- .../document_stores/weaviate/document_store.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index d5653a844..677f44e83 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -197,6 +197,9 @@ def _to_data_object(self, document: Document) -> Dict[str, Any]: return data def _convert_weaviate_v4_object_to_v3_object(self, data: Object) -> Dict[str, Any]: + properties = self._collection.config.get().properties + properties_with_date_type = [p.name for p in properties if p.data_type.name == "DATE"] + v4_object = data.__dict__ v3_object = v4_object.pop("properties") v3_object["_additional"] = {"vector": v4_object.pop("vector").get("default")} @@ -204,6 +207,9 @@ def _convert_weaviate_v4_object_to_v3_object(self, data: Object) -> Dict[str, An if "blob_data" not in v3_object: v3_object["blob_data"] = None v3_object["blob_mime_type"] = None + + for date_prop in properties_with_date_type: + v3_object[date_prop] = v3_object[date_prop].strftime("%Y-%m-%dT%H:%M:%SZ") return v3_object def _to_document(self, data: Dict[str, Any]) -> Document: From 25d72ad24562eec4dcc198193284047fcd5070da Mon Sep 17 00:00:00 2001 From: hsm207 Date: Tue, 27 Feb 2024 12:47:24 +0000 Subject: [PATCH 14/52] fix typo --- .../document_stores/weaviate/document_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index 677f44e83..d8d732fcc 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -299,7 +299,7 @@ def filter_documents(self, filters: Optional[weaviate.collections.classes.filter if filters: result = self._query_with_filters(filters) - return [self._to_document(doc) for doc in result] + return [self._to_document(self._convert_weaviate_v4_object_to_v3_object(doc)) for doc in result] # result = [] From 887e35f85f12531eb4ea865476dc1cae023943ee Mon Sep 17 00:00:00 2001 From: hsm207 Date: Tue, 27 Feb 2024 13:01:28 +0000 Subject: [PATCH 15/52] fix date v4 to v3 conversion logic --- .../document_stores/weaviate/document_store.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index d8d732fcc..c6de5ca73 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -198,7 +198,9 @@ def _to_data_object(self, document: Document) -> Dict[str, Any]: def _convert_weaviate_v4_object_to_v3_object(self, data: Object) -> Dict[str, Any]: properties = self._collection.config.get().properties - properties_with_date_type = [p.name for p in properties if p.data_type.name == "DATE"] + properties_with_date_type = [ + p.name for p in properties if p.data_type.name == "DATE" and p.name in data.properties + ] v4_object = data.__dict__ v3_object = v4_object.pop("properties") From 0e481d0963ae60a37664140f04fe2384b58b0b07 Mon Sep 17 00:00:00 2001 From: hsm207 Date: Tue, 27 Feb 2024 15:44:48 +0000 Subject: [PATCH 16/52] hardcode limit in query filter --- .../document_stores/weaviate/document_store.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index c6de5ca73..087a61388 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -268,7 +268,14 @@ def _query_with_filters(self, filters: weaviate.collections.classes.filters.Filt # collection_name = self._collection_settings["class"] try: - result = self._collection.query.fetch_objects(filters=convert_filters(filters), include_vector=True) + # this is the default value for max number of objects to retrieve in Weaviate + # see QUERY_MAXIMUM_RESULTS + # see https://weaviate.io/developers/weaviate/config-refs/env-vars#overview + # and https://weaviate.io/developers/weaviate/api/graphql/additional-operators#pagination-with-offset + limit = 10_000 + result = self._collection.query.fetch_objects( + filters=convert_filters(filters), include_vector=True, limit=limit + ) except weaviate.exceptions.WeaviateQueryError as e: msg = f"Failed to query documents in Weaviate. Error: {e.message}" raise DocumentStoreError(msg) from None From b4c5b49b67197caf0463955fb36795191b1dc3ea Mon Sep 17 00:00:00 2001 From: hsm207 Date: Tue, 27 Feb 2024 21:08:07 +0000 Subject: [PATCH 17/52] fix typo --- .../document_stores/weaviate/_filters.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py index f22a9c4cc..8b6e17d3f 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py @@ -120,14 +120,16 @@ def _handle_date(value: Any) -> str: def _equal(field: str, value: Any) -> Dict[str, Any]: if value is None: - return {"path": field, "operator": "IsNull", "valueBoolean": True} + # return {"path": field, "operator": "IsNull", "valueBoolean": True} + return weaviate.classes.query.Filter.by_property(field).is_none(True) # return {"path": field, "operator": "Equal", _infer_value_type(value): _handle_date(value)} return weaviate.classes.query.Filter.by_property(field).equal(_handle_date(value)) def _not_equal(field: str, value: Any) -> Dict[str, Any]: if value is None: - return {"path": field, "operator": "IsNull", "valueBoolean": False} + # return {"path": field, "operator": "IsNull", "valueBoolean": False} + return weaviate.classes.query.Filter.by_property(field).is_none(False) # return { # "operator": "Or", # "operands": [ From d79c4177119f2bcb995fc099e15d61029e4e2f3b Mon Sep 17 00:00:00 2001 From: hsm207 Date: Tue, 27 Feb 2024 21:33:58 +0000 Subject: [PATCH 18/52] upgrade weaviate server --- integrations/weaviate/docker-compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/weaviate/docker-compose.yml b/integrations/weaviate/docker-compose.yml index f28328751..faff1ca4a 100644 --- a/integrations/weaviate/docker-compose.yml +++ b/integrations/weaviate/docker-compose.yml @@ -8,7 +8,7 @@ services: - '8080' - --scheme - http - image: semitechnologies/weaviate:1.23.10 + image: semitechnologies/weaviate:1.24.0 ports: - 8080:8080 - 50051:50051 From c77765890e5f08409c7f22c9b058d83b49e4430d Mon Sep 17 00:00:00 2001 From: hsm207 Date: Tue, 27 Feb 2024 21:36:32 +0000 Subject: [PATCH 19/52] update v4 to v3 object date conversion the property name will still appear in the object's propertities even though it is not set. So, we need to check if it is not None too --- .../document_stores/weaviate/document_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index 087a61388..f3cecde30 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -199,7 +199,7 @@ def _to_data_object(self, document: Document) -> Dict[str, Any]: def _convert_weaviate_v4_object_to_v3_object(self, data: Object) -> Dict[str, Any]: properties = self._collection.config.get().properties properties_with_date_type = [ - p.name for p in properties if p.data_type.name == "DATE" and p.name in data.properties + p.name for p in properties if p.data_type.name == "DATE" and data.properties.get(p.name) ] v4_object = data.__dict__ From 5e7b4316fa677390d8a709fe9098b2718927441c Mon Sep 17 00:00:00 2001 From: hsm207 Date: Tue, 27 Feb 2024 22:20:28 +0000 Subject: [PATCH 20/52] fix invert logic bug --- .../haystack_integrations/document_stores/weaviate/_filters.py | 2 +- integrations/weaviate/tests/test_filters.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py index 8b6e17d3f..04a174b1b 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py @@ -33,7 +33,7 @@ def convert_filters(filters: Dict[str, Any]) -> Dict[str, Any]: "not in": "in", "AND": "OR", "OR": "AND", - "NOT": "AND", + "NOT": "OR", } diff --git a/integrations/weaviate/tests/test_filters.py b/integrations/weaviate/tests/test_filters.py index cf38d84be..c32d69e2f 100644 --- a/integrations/weaviate/tests/test_filters.py +++ b/integrations/weaviate/tests/test_filters.py @@ -19,7 +19,7 @@ def test_invert_conditions(): inverted = _invert_condition(filters) assert inverted == { - "operator": "AND", + "operator": "OR", "conditions": [ {"field": "meta.number", "operator": "!=", "value": 100}, {"field": "meta.name", "operator": "!=", "value": "name_0"}, From 67ec755952cc8c42d769196899ad46de54d906d1 Mon Sep 17 00:00:00 2001 From: hsm207 Date: Tue, 27 Feb 2024 22:38:29 +0000 Subject: [PATCH 21/52] upgrade delete function to v4 API --- .../document_stores/weaviate/document_store.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index f3cecde30..d56172931 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -434,14 +434,16 @@ def write_documents(self, documents: List[Document], policy: DuplicatePolicy = D return self._write(documents, policy) def delete_documents(self, document_ids: List[str]) -> None: - self._client.batch.delete_objects( - class_name=self._collection_settings["class"], - where={ - "path": ["id"], - "operator": "ContainsAny", - "valueTextArray": [generate_uuid5(doc_id) for doc_id in document_ids], - }, - ) + # self._client.batch.delete_objects( + # class_name=self._collection_settings["class"], + # where={ + # "path": ["id"], + # "operator": "ContainsAny", + # "valueTextArray": [generate_uuid5(doc_id) for doc_id in document_ids], + # }, + # ) + weaviate_ids = [generate_uuid5(doc_id) for doc_id in document_ids] + self._collection.data.delete_many(where=weaviate.classes.query.Filter.by_id().contains_any(weaviate_ids)) def _bm25_retrieval( self, query: str, filters: Optional[Dict[str, Any]] = None, top_k: Optional[int] = None From 34042298caaf4ba8e78910cb245ea2329808437b Mon Sep 17 00:00:00 2001 From: hsm207 Date: Tue, 27 Feb 2024 22:48:10 +0000 Subject: [PATCH 22/52] update bm25 search to v4 API --- .../weaviate/document_store.py | 37 ++++++++++++------- .../weaviate/tests/test_document_store.py | 4 +- 2 files changed, 26 insertions(+), 15 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index d56172931..0967ec6ec 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -448,25 +448,34 @@ def delete_documents(self, document_ids: List[str]) -> None: def _bm25_retrieval( self, query: str, filters: Optional[Dict[str, Any]] = None, top_k: Optional[int] = None ) -> List[Document]: - collection_name = self._collection_settings["class"] - properties = self._client.schema.get(self._collection_settings["class"]).get("properties", []) - properties = [prop["name"] for prop in properties] + # collection_name = self._collection_settings["class"] + # properties = self._client.schema.get(self._collection_settings["class"]).get("properties", []) + # properties = [prop["name"] for prop in properties] - query_builder = ( - self._client.query.get(collection_name, properties=properties) - .with_bm25(query=query, properties=["content"]) - .with_additional(["vector"]) - ) + # query_builder = ( + # self._client.query.get(collection_name, properties=properties) + # .with_bm25(query=query, properties=["content"]) + # .with_additional(["vector"]) + # ) - if filters: - query_builder = query_builder.with_where(convert_filters(filters)) + # if filters: + # query_builder = query_builder.with_where(convert_filters(filters)) - if top_k: - query_builder = query_builder.with_limit(top_k) + # if top_k: + # query_builder = query_builder.with_limit(top_k) - result = query_builder.do() + # result = query_builder.do() - return [self._to_document(doc) for doc in result["data"]["Get"][collection_name]] + # return [self._to_document(doc) for doc in result["data"]["Get"][collection_name]] + result = self._collection.query.bm25( + query=query, + filters=filters, + limit=top_k, + include_vector=True, + query_properties=["content"], + ) + + return [self._to_document(self._convert_weaviate_v4_object_to_v3_object(doc)) for doc in result.objects] def _embedding_retrieval( self, diff --git a/integrations/weaviate/tests/test_document_store.py b/integrations/weaviate/tests/test_document_store.py index 49014cf89..4e855d5af 100644 --- a/integrations/weaviate/tests/test_document_store.py +++ b/integrations/weaviate/tests/test_document_store.py @@ -4,6 +4,7 @@ from unittest.mock import MagicMock, patch import pytest +import weaviate from dateutil import parser from haystack.dataclasses.byte_stream import ByteStream from haystack.dataclasses.document import Document @@ -531,7 +532,8 @@ def test_bm25_retrieval_with_filters(self, document_store): Document(content="PHP is a object oriented programming language"), ] ) - filters = {"field": "content", "operator": "==", "value": "Haskell"} + # filters = {"field": "content", "operator": "==", "value": "Haskell"} + filters = weaviate.classes.query.Filter.by_property("content").equal("Haskell") result = document_store._bm25_retrieval("functional Haskell", filters=filters) assert len(result) == 1 assert "Haskell is a functional programming language" == result[0].content From d9a5862b671ba113030294f9dabe85b8bc321c6f Mon Sep 17 00:00:00 2001 From: hsm207 Date: Tue, 27 Feb 2024 22:54:53 +0000 Subject: [PATCH 23/52] update count docs to v4 API --- .../document_stores/weaviate/document_store.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index 0967ec6ec..314dfef0a 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -172,9 +172,11 @@ def from_dict(cls, data: Dict[str, Any]) -> "WeaviateDocumentStore": ) def count_documents(self) -> int: - collection_name = self._collection_settings["class"] - res = self._client.query.aggregate(collection_name).with_meta_count().do() - return res.get("data", {}).get("Aggregate", {}).get(collection_name, [{}])[0].get("meta", {}).get("count", 0) + # collection_name = self._collection_settings["class"] + # res = self._client.query.aggregate(collection_name).with_meta_count().do() + # return res.get("data", {}).get("Aggregate", {}).get(collection_name, [{}])[0].get("meta", {}).get("count", 0) + total = self._collection.aggregate.over_all(total_count=True).total_count + return total if total else 0 def _to_data_object(self, document: Document) -> Dict[str, Any]: """ From d2a5280412d25605e99bd9333a66f761939293f6 Mon Sep 17 00:00:00 2001 From: hsm207 Date: Tue, 27 Feb 2024 23:13:49 +0000 Subject: [PATCH 24/52] update _write to use v4 API --- .../weaviate/document_store.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index 314dfef0a..dd46655bd 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -398,22 +398,25 @@ def _write(self, documents: List[Document], policy: DuplicatePolicy) -> int: msg = f"Expected a Document, got '{type(doc)}' instead." raise ValueError(msg) - if policy == DuplicatePolicy.SKIP and self._client.data_object.exists( - uuid=generate_uuid5(doc.id), - class_name=self._collection_settings["class"], - ): + if policy == DuplicatePolicy.SKIP and self._collection.data.exists(uuid=generate_uuid5(doc.id)): # This Document already exists, we skip it continue try: - self._client.data_object.create( + # self._client.data_object.create( + # uuid=generate_uuid5(doc.id), + # data_object=self._to_data_object(doc), + # class_name=self._collection_settings["class"], + # vector=doc.embedding, + # ) + self._collection.data.insert( uuid=generate_uuid5(doc.id), - data_object=self._to_data_object(doc), - class_name=self._collection_settings["class"], + properties=self._to_data_object(doc), vector=doc.embedding, ) + written += 1 - except weaviate.exceptions.ObjectAlreadyExistsException: + except weaviate.exceptions.UnexpectedStatusCodeError: if policy == DuplicatePolicy.FAIL: duplicate_errors_ids.append(doc.id) if duplicate_errors_ids: From 995287d68df89880a98851f3c32bb2a91507d9f8 Mon Sep 17 00:00:00 2001 From: hsm207 Date: Tue, 27 Feb 2024 23:27:09 +0000 Subject: [PATCH 25/52] support optional filters in bm25 --- .../document_stores/weaviate/document_store.py | 2 +- integrations/weaviate/tests/test_document_store.py | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index dd46655bd..185acf840 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -474,7 +474,7 @@ def _bm25_retrieval( # return [self._to_document(doc) for doc in result["data"]["Get"][collection_name]] result = self._collection.query.bm25( query=query, - filters=filters, + filters=convert_filters(filters) if filters else None, limit=top_k, include_vector=True, query_properties=["content"], diff --git a/integrations/weaviate/tests/test_document_store.py b/integrations/weaviate/tests/test_document_store.py index 4e855d5af..49014cf89 100644 --- a/integrations/weaviate/tests/test_document_store.py +++ b/integrations/weaviate/tests/test_document_store.py @@ -4,7 +4,6 @@ from unittest.mock import MagicMock, patch import pytest -import weaviate from dateutil import parser from haystack.dataclasses.byte_stream import ByteStream from haystack.dataclasses.document import Document @@ -532,8 +531,7 @@ def test_bm25_retrieval_with_filters(self, document_store): Document(content="PHP is a object oriented programming language"), ] ) - # filters = {"field": "content", "operator": "==", "value": "Haskell"} - filters = weaviate.classes.query.Filter.by_property("content").equal("Haskell") + filters = {"field": "content", "operator": "==", "value": "Haskell"} result = document_store._bm25_retrieval("functional Haskell", filters=filters) assert len(result) == 1 assert "Haskell is a functional programming language" == result[0].content From 756b9ef8553d83acf1975bb80608f697247a6634 Mon Sep 17 00:00:00 2001 From: hsm207 Date: Tue, 27 Feb 2024 23:27:29 +0000 Subject: [PATCH 26/52] update embedding retrieval to use v4 API --- .../weaviate/document_store.py | 59 +++++++++++-------- 1 file changed, 35 insertions(+), 24 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index 185acf840..925f506d6 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -494,30 +494,41 @@ def _embedding_retrieval( msg = "Can't use 'distance' and 'certainty' parameters together" raise ValueError(msg) - collection_name = self._collection_settings["class"] - properties = self._client.schema.get(self._collection_settings["class"]).get("properties", []) - properties = [prop["name"] for prop in properties] - - near_vector: Dict[str, Union[float, List[float]]] = { - "vector": query_embedding, - } - if distance is not None: - near_vector["distance"] = distance - - if certainty is not None: - near_vector["certainty"] = certainty - - query_builder = ( - self._client.query.get(collection_name, properties=properties) - .with_near_vector(near_vector) - .with_additional(["vector"]) - ) + # collection_name = self._collection_settings["class"] + # properties = self._client.schema.get(self._collection_settings["class"]).get("properties", []) + # properties = [prop["name"] for prop in properties] - if filters: - query_builder = query_builder.with_where(convert_filters(filters)) + # near_vector: Dict[str, Union[float, List[float]]] = { + # "vector": query_embedding, + # } + # if distance is not None: + # near_vector["distance"] = distance + + # if certainty is not None: + # near_vector["certainty"] = certainty - if top_k: - query_builder = query_builder.with_limit(top_k) + # query_builder = ( + # self._client.query.get(collection_name, properties=properties) + # .with_near_vector(near_vector) + # .with_additional(["vector"]) + # ) + + # if filters: + # query_builder = query_builder.with_where(convert_filters(filters)) - result = query_builder.do() - return [self._to_document(doc) for doc in result["data"]["Get"][collection_name]] + # if top_k: + # query_builder = query_builder.with_limit(top_k) + + # result = query_builder.do() + # return [self._to_document(doc) for doc in result["data"]["Get"][collection_name]] + + result = self._collection.query.near_vector( + near_vector=query_embedding, + distance=distance, + certainty=certainty, + include_vector=True, + filters=convert_filters(filters) if filters else None, + limit=top_k, + ) + + return [self._to_document(self._convert_weaviate_v4_object_to_v3_object(doc)) for doc in result.objects] From 928c88b0f1c08c4dd0f5eb581590d59180e775c3 Mon Sep 17 00:00:00 2001 From: hsm207 Date: Wed, 28 Feb 2024 06:03:51 +0000 Subject: [PATCH 27/52] update from_dict for v4 API --- .../weaviate/document_store.py | 14 +++++------ .../weaviate/tests/test_document_store.py | 24 +++++++++---------- 2 files changed, 18 insertions(+), 20 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index 925f506d6..b37de15ae 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -12,7 +12,7 @@ import weaviate from weaviate.collections.classes.internal import Object -from weaviate.config import AdditionalConfig, Config, ConnectionConfig +from weaviate.config import AdditionalConfig # , Config, ConnectionConfig from weaviate.embedded import EmbeddedOptions from weaviate.util import generate_uuid5 @@ -155,17 +155,17 @@ def to_dict(self) -> Dict[str, Any]: @classmethod def from_dict(cls, data: Dict[str, Any]) -> "WeaviateDocumentStore": - if (timeout_config := data["init_parameters"].get("timeout_config")) is not None: - data["init_parameters"]["timeout_config"] = ( - tuple(timeout_config) if isinstance(timeout_config, list) else timeout_config - ) + # if (timeout_config := data["init_parameters"].get("timeout_config")) is not None: + # data["init_parameters"]["timeout_config"] = ( + # tuple(timeout_config) if isinstance(timeout_config, list) else timeout_config + # ) if (auth_client_secret := data["init_parameters"].get("auth_client_secret")) is not None: data["init_parameters"]["auth_client_secret"] = AuthCredentials.from_dict(auth_client_secret) if (embedded_options := data["init_parameters"].get("embedded_options")) is not None: data["init_parameters"]["embedded_options"] = EmbeddedOptions(**embedded_options) if (additional_config := data["init_parameters"].get("additional_config")) is not None: - additional_config["connection_config"] = ConnectionConfig(**additional_config["connection_config"]) - data["init_parameters"]["additional_config"] = Config(**additional_config) + # additional_config["connection_config"] = ConnectionConfig(**additional_config["connection_config"]) + data["init_parameters"]["additional_config"] = AdditionalConfig(**additional_config) return default_from_dict( cls, data, diff --git a/integrations/weaviate/tests/test_document_store.py b/integrations/weaviate/tests/test_document_store.py index 49014cf89..b33a693c2 100644 --- a/integrations/weaviate/tests/test_document_store.py +++ b/integrations/weaviate/tests/test_document_store.py @@ -270,11 +270,7 @@ def test_from_dict(self, _mock_weaviate, monkeypatch): "api_key": {"env_vars": ["WEAVIATE_API_KEY"], "strict": True, "type": "env_var"} }, }, - "timeout_config": [10, 60], - "proxies": {"http": "http://proxy:1234"}, - "trust_env": False, "additional_headers": {"X-HuggingFace-Api-Key": "MY_HUGGINGFACE_KEY"}, - "startup_period": 5, "embedded_options": { "persistence_data_path": DEFAULT_PERSISTENCE_DATA_PATH, "binary_path": DEFAULT_BINARY_PATH, @@ -285,11 +281,13 @@ def test_from_dict(self, _mock_weaviate, monkeypatch): "grpc_port": DEFAULT_GRPC_PORT, }, "additional_config": { - "grpc_port_experimental": 12345, - "connection_config": { + "connection": { "session_pool_connections": 20, "session_pool_maxsize": 20, }, + "proxies": {"http": "http://proxy:1234"}, + "timeout": [10, 60], + "trust_env": False, }, }, } @@ -309,11 +307,11 @@ def test_from_dict(self, _mock_weaviate, monkeypatch): ], } assert document_store._auth_client_secret == AuthApiKey() - assert document_store._timeout_config == (10, 60) - assert document_store._proxies == {"http": "http://proxy:1234"} - assert not document_store._trust_env + assert document_store._additional_config.timeout == (10, 60) + assert document_store._additional_config.proxies == {"http": "http://proxy:1234"} + assert not document_store._additional_config.trust_env assert document_store._additional_headers == {"X-HuggingFace-Api-Key": "MY_HUGGINGFACE_KEY"} - assert document_store._startup_period == 5 + # assert document_store._startup_period == 5 assert document_store._embedded_options.persistence_data_path == DEFAULT_PERSISTENCE_DATA_PATH assert document_store._embedded_options.binary_path == DEFAULT_BINARY_PATH assert document_store._embedded_options.version == "1.23.0" @@ -321,9 +319,9 @@ def test_from_dict(self, _mock_weaviate, monkeypatch): assert document_store._embedded_options.hostname == "127.0.0.1" assert document_store._embedded_options.additional_env_vars is None assert document_store._embedded_options.grpc_port == DEFAULT_GRPC_PORT - assert document_store._additional_config.grpc_port_experimental == 12345 - assert document_store._additional_config.connection_config.session_pool_connections == 20 - assert document_store._additional_config.connection_config.session_pool_maxsize == 20 + # assert document_store._additional_config.grpc_port_experimental == 12345 + assert document_store._additional_config.connection.session_pool_connections == 20 + assert document_store._additional_config.connection.session_pool_maxsize == 20 def test_to_data_object(self, document_store, test_files_path): doc = Document(content="test doc") From 16b42b1ad4ce035ba7e1cfa265eeaec1d91e5ad8 Mon Sep 17 00:00:00 2001 From: hsm207 Date: Wed, 28 Feb 2024 11:06:01 +0000 Subject: [PATCH 28/52] fix write invalid input test --- .../document_stores/weaviate/document_store.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index b37de15ae..3053e1aee 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -363,7 +363,12 @@ def _batch_write(self, documents: List[Document]) -> int: # raise DocumentStoreError(msg) with self._client.batch.dynamic() as batch: + for doc in documents: + if not isinstance(doc, Document): + msg = f"Expected a Document, got '{type(doc)}' instead." + raise ValueError(msg) + batch.add_object( properties=self._to_data_object(doc), collection=self._collection.name, From 04983fee56dfbb7a30110fe992440b48da195a73 Mon Sep 17 00:00:00 2001 From: hsm207 Date: Wed, 28 Feb 2024 11:13:10 +0000 Subject: [PATCH 29/52] update other test_from_dict for V4 --- integrations/weaviate/tests/test_bm25_retriever.py | 4 ---- integrations/weaviate/tests/test_embedding_retriever.py | 4 ---- 2 files changed, 8 deletions(-) diff --git a/integrations/weaviate/tests/test_bm25_retriever.py b/integrations/weaviate/tests/test_bm25_retriever.py index 83f90735b..59e99879c 100644 --- a/integrations/weaviate/tests/test_bm25_retriever.py +++ b/integrations/weaviate/tests/test_bm25_retriever.py @@ -76,11 +76,7 @@ def test_from_dict(_mock_weaviate): ], }, "auth_client_secret": None, - "timeout_config": (10, 60), - "proxies": None, - "trust_env": False, "additional_headers": None, - "startup_period": 5, "embedded_options": None, "additional_config": None, }, diff --git a/integrations/weaviate/tests/test_embedding_retriever.py b/integrations/weaviate/tests/test_embedding_retriever.py index 7f07d8a24..b3e103c09 100644 --- a/integrations/weaviate/tests/test_embedding_retriever.py +++ b/integrations/weaviate/tests/test_embedding_retriever.py @@ -89,11 +89,7 @@ def test_from_dict(_mock_weaviate): ], }, "auth_client_secret": None, - "timeout_config": (10, 60), - "proxies": None, - "trust_env": False, "additional_headers": None, - "startup_period": 5, "embedded_options": None, "additional_config": None, }, From f3f49f3e5a2a104c4d26fde7047ea3b04ef3d5e9 Mon Sep 17 00:00:00 2001 From: hsm207 Date: Wed, 28 Feb 2024 11:39:42 +0000 Subject: [PATCH 30/52] update test_to_dict for v4 --- .../weaviate/document_store.py | 3 ++- .../weaviate/tests/test_bm25_retriever.py | 4 ---- .../weaviate/tests/test_document_store.py | 21 ++++++++++--------- .../tests/test_embedding_retriever.py | 4 ---- 4 files changed, 13 insertions(+), 19 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index 3053e1aee..522bf60ec 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -2,6 +2,7 @@ # # SPDX-License-Identifier: Apache-2.0 import base64 +import json from dataclasses import asdict from typing import Any, Dict, List, Optional, Tuple, Union @@ -141,7 +142,7 @@ def __init__( def to_dict(self) -> Dict[str, Any]: embedded_options = asdict(self._embedded_options) if self._embedded_options else None - additional_config = asdict(self._additional_config) if self._additional_config else None + additional_config = json.loads(self._additional_config.model_dump_json()) if self._additional_config else None return default_to_dict( self, diff --git a/integrations/weaviate/tests/test_bm25_retriever.py b/integrations/weaviate/tests/test_bm25_retriever.py index 59e99879c..23b7c8f92 100644 --- a/integrations/weaviate/tests/test_bm25_retriever.py +++ b/integrations/weaviate/tests/test_bm25_retriever.py @@ -38,11 +38,7 @@ def test_to_dict(_mock_weaviate): ], }, "auth_client_secret": None, - "timeout_config": (10, 60), - "proxies": None, - "trust_env": False, "additional_headers": None, - "startup_period": 5, "embedded_options": None, "additional_config": None, }, diff --git a/integrations/weaviate/tests/test_document_store.py b/integrations/weaviate/tests/test_document_store.py index b33a693c2..c489fbe5c 100644 --- a/integrations/weaviate/tests/test_document_store.py +++ b/integrations/weaviate/tests/test_document_store.py @@ -25,7 +25,7 @@ from numpy import float32 as np_float32 from pandas import DataFrame from weaviate.auth import AuthApiKey as WeaviateAuthApiKey -from weaviate.config import Config +from weaviate.config import AdditionalConfig, ConnectionConfig from weaviate.embedded import ( DEFAULT_BINARY_PATH, DEFAULT_GRPC_PORT, @@ -197,7 +197,6 @@ def test_to_dict(self, _mock_weaviate, monkeypatch): document_store = WeaviateDocumentStore( url="http://localhost:8080", auth_client_secret=AuthApiKey(), - proxies={"http": "http://proxy:1234"}, additional_headers={"X-HuggingFace-Api-Key": "MY_HUGGINGFACE_KEY"}, embedded_options=EmbeddedOptions( persistence_data_path=DEFAULT_PERSISTENCE_DATA_PATH, @@ -205,7 +204,12 @@ def test_to_dict(self, _mock_weaviate, monkeypatch): version="1.23.0", hostname="127.0.0.1", ), - additional_config=Config(grpc_port_experimental=12345), + additional_config=AdditionalConfig( + connection=ConnectionConfig(), + timeout=(30, 90), + trust_env=False, + proxies={"http": "http://proxy:1234"}, + ), ) assert document_store.to_dict() == { "type": "haystack_integrations.document_stores.weaviate.document_store.WeaviateDocumentStore", @@ -229,11 +233,7 @@ def test_to_dict(self, _mock_weaviate, monkeypatch): "api_key": {"env_vars": ["WEAVIATE_API_KEY"], "strict": True, "type": "env_var"} }, }, - "timeout_config": (10, 60), - "proxies": {"http": "http://proxy:1234"}, - "trust_env": False, "additional_headers": {"X-HuggingFace-Api-Key": "MY_HUGGINGFACE_KEY"}, - "startup_period": 5, "embedded_options": { "persistence_data_path": DEFAULT_PERSISTENCE_DATA_PATH, "binary_path": DEFAULT_BINARY_PATH, @@ -244,13 +244,14 @@ def test_to_dict(self, _mock_weaviate, monkeypatch): "grpc_port": DEFAULT_GRPC_PORT, }, "additional_config": { - "grpc_port_experimental": 12345, - "grpc_secure_experimental": False, - "connection_config": { + "connection": { "session_pool_connections": 20, "session_pool_maxsize": 100, "session_pool_max_retries": 3, }, + "proxies": {"http": "http://proxy:1234"}, + "timeout": [30, 90], + "trust_env": False, }, }, } diff --git a/integrations/weaviate/tests/test_embedding_retriever.py b/integrations/weaviate/tests/test_embedding_retriever.py index b3e103c09..a406c40db 100644 --- a/integrations/weaviate/tests/test_embedding_retriever.py +++ b/integrations/weaviate/tests/test_embedding_retriever.py @@ -49,11 +49,7 @@ def test_to_dict(_mock_weaviate): ], }, "auth_client_secret": None, - "timeout_config": (10, 60), - "proxies": None, - "trust_env": False, "additional_headers": None, - "startup_period": 5, "embedded_options": None, "additional_config": None, }, From 1655f0fdb0f5eebd3afb06a046f24ca0d1761b22 Mon Sep 17 00:00:00 2001 From: hsm207 Date: Wed, 28 Feb 2024 11:48:15 +0000 Subject: [PATCH 31/52] update test_init for v4 API --- .../weaviate/document_store.py | 9 ++-- .../weaviate/tests/test_document_store.py | 53 +++++++++++-------- 2 files changed, 36 insertions(+), 26 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index 522bf60ec..b99fe3bf4 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -101,11 +101,12 @@ def __init__( """ # proxies, timeout_config, trust_env are part of additional_config now # startup_period has been removed - connection_params = weaviate.connect.base.ConnectionParams.from_url( - url=url, grpc_port=grpc_port, grpc_secure=grpc_secure - ) self._client = weaviate.WeaviateClient( - connection_params=connection_params, + connection_params=( + weaviate.connect.base.ConnectionParams.from_url(url=url, grpc_port=grpc_port, grpc_secure=grpc_secure) + if url + else None + ), auth_client_secret=auth_client_secret.resolve_value() if auth_client_secret else None, additional_config=additional_config, additional_headers=additional_headers, diff --git a/integrations/weaviate/tests/test_document_store.py b/integrations/weaviate/tests/test_document_store.py index c489fbe5c..c2c29ad72 100644 --- a/integrations/weaviate/tests/test_document_store.py +++ b/integrations/weaviate/tests/test_document_store.py @@ -24,7 +24,8 @@ from numpy import array_equal as np_array_equal from numpy import float32 as np_float32 from pandas import DataFrame -from weaviate.auth import AuthApiKey as WeaviateAuthApiKey + +# from weaviate.auth import AuthApiKey as WeaviateAuthApiKey from weaviate.config import AdditionalConfig, ConnectionConfig from weaviate.embedded import ( DEFAULT_BINARY_PATH, @@ -153,35 +154,43 @@ def test_init(self, mock_weaviate_client_class, monkeypatch): monkeypatch.setenv("WEAVIATE_API_KEY", "my_api_key") WeaviateDocumentStore( url="http://localhost:8080", + # url=None, collection_settings={"class": "My_collection"}, - auth_client_secret=AuthApiKey(), - proxies={"http": "http://proxy:1234"}, + # auth_client_secret=AuthApiKey(), + # proxies={"http": "http://proxy:1234"}, additional_headers={"X-HuggingFace-Api-Key": "MY_HUGGINGFACE_KEY"}, - embedded_options=EmbeddedOptions( - persistence_data_path=DEFAULT_PERSISTENCE_DATA_PATH, - binary_path=DEFAULT_BINARY_PATH, - version="1.23.0", - hostname="127.0.0.1", - ), - additional_config=Config(grpc_port_experimental=12345), + # embedded_options=EmbeddedOptions( + # persistence_data_path=DEFAULT_PERSISTENCE_DATA_PATH, + # binary_path=DEFAULT_BINARY_PATH, + # version="1.23.7", + # hostname="127.0.0.1", + # ), + # additional_config=AdditionalConfig( + # proxies={"http": "http://proxy:1234"}, + # ), ) # Verify client is created with correct parameters + mock_weaviate_client_class.assert_called_once_with( url="http://localhost:8080", - auth_client_secret=WeaviateAuthApiKey("my_api_key"), - timeout_config=(10, 60), - proxies={"http": "http://proxy:1234"}, - trust_env=False, + collection_settings={"class": "My_collection"}, + # url=None, + # auth_client_secret=WeaviateAuthApiKey("my_api_key"), + # timeout_config=(10, 60), + # proxies={"http": "http://proxy:1234"}, + # trust_env=False, additional_headers={"X-HuggingFace-Api-Key": "MY_HUGGINGFACE_KEY"}, - startup_period=5, - embedded_options=EmbeddedOptions( - persistence_data_path=DEFAULT_PERSISTENCE_DATA_PATH, - binary_path=DEFAULT_BINARY_PATH, - version="1.23.0", - hostname="127.0.0.1", - ), - additional_config=Config(grpc_port_experimental=12345), + # startup_period=5, + # embedded_options=EmbeddedOptions( + # persistence_data_path=DEFAULT_PERSISTENCE_DATA_PATH, + # binary_path=DEFAULT_BINARY_PATH, + # version="1.23.7", + # hostname="127.0.0.1", + # ), + # additional_config=AdditionalConfig( + # proxies={"http": "http://proxy:1234"}, + # ), ) # Verify collection is created From 66c1fd062ac0e1d60b5fd556ea2c7e46635434f1 Mon Sep 17 00:00:00 2001 From: hsm207 Date: Wed, 28 Feb 2024 12:13:54 +0000 Subject: [PATCH 32/52] try to pas test_init --- .../weaviate/tests/test_document_store.py | 44 +++++++++---------- 1 file changed, 21 insertions(+), 23 deletions(-) diff --git a/integrations/weaviate/tests/test_document_store.py b/integrations/weaviate/tests/test_document_store.py index c2c29ad72..00373a406 100644 --- a/integrations/weaviate/tests/test_document_store.py +++ b/integrations/weaviate/tests/test_document_store.py @@ -15,6 +15,7 @@ FilterDocumentsTest, WriteDocumentsTest, ) +import weaviate from haystack_integrations.document_stores.weaviate.auth import AuthApiKey from haystack_integrations.document_stores.weaviate.document_store import ( DOCUMENT_COLLECTION_PROPERTIES, @@ -146,25 +147,23 @@ def assert_documents_are_equal(self, received: List[Document], expected: List[Do for key in meta_keys: assert received_meta.get(key) == expected_meta.get(key) - @patch("haystack_integrations.document_stores.weaviate.document_store.weaviate.Client") + @patch("haystack_integrations.document_stores.weaviate.document_store.weaviate.WeaviateClient") def test_init(self, mock_weaviate_client_class, monkeypatch): mock_client = MagicMock() mock_client.schema.exists.return_value = False mock_weaviate_client_class.return_value = mock_client monkeypatch.setenv("WEAVIATE_API_KEY", "my_api_key") WeaviateDocumentStore( - url="http://localhost:8080", - # url=None, - collection_settings={"class": "My_collection"}, + # collection_settings={"class": "My_collection"}, # auth_client_secret=AuthApiKey(), # proxies={"http": "http://proxy:1234"}, additional_headers={"X-HuggingFace-Api-Key": "MY_HUGGINGFACE_KEY"}, - # embedded_options=EmbeddedOptions( - # persistence_data_path=DEFAULT_PERSISTENCE_DATA_PATH, - # binary_path=DEFAULT_BINARY_PATH, - # version="1.23.7", - # hostname="127.0.0.1", - # ), + embedded_options=EmbeddedOptions( + persistence_data_path=DEFAULT_PERSISTENCE_DATA_PATH, + binary_path=DEFAULT_BINARY_PATH, + version="1.23.7", + hostname="127.0.0.1", + ), # additional_config=AdditionalConfig( # proxies={"http": "http://proxy:1234"}, # ), @@ -173,24 +172,23 @@ def test_init(self, mock_weaviate_client_class, monkeypatch): # Verify client is created with correct parameters mock_weaviate_client_class.assert_called_once_with( - url="http://localhost:8080", - collection_settings={"class": "My_collection"}, - # url=None, + # connection_params = connection_params, + # collection_settings={"class": "My_collection"}, # auth_client_secret=WeaviateAuthApiKey("my_api_key"), # timeout_config=(10, 60), # proxies={"http": "http://proxy:1234"}, # trust_env=False, + connection_params=None, + auth_client_secret=None, + additional_config=None, additional_headers={"X-HuggingFace-Api-Key": "MY_HUGGINGFACE_KEY"}, - # startup_period=5, - # embedded_options=EmbeddedOptions( - # persistence_data_path=DEFAULT_PERSISTENCE_DATA_PATH, - # binary_path=DEFAULT_BINARY_PATH, - # version="1.23.7", - # hostname="127.0.0.1", - # ), - # additional_config=AdditionalConfig( - # proxies={"http": "http://proxy:1234"}, - # ), + embedded_options=EmbeddedOptions( + persistence_data_path=DEFAULT_PERSISTENCE_DATA_PATH, + binary_path=DEFAULT_BINARY_PATH, + version="1.23.7", + hostname="127.0.0.1", + ), + skip_init_checks=False, ) # Verify collection is created From e2ea14c52dd95e9d85793d3207790b286e9eaf03 Mon Sep 17 00:00:00 2001 From: hsm207 Date: Wed, 28 Feb 2024 14:33:25 +0000 Subject: [PATCH 33/52] pass test_init --- .../weaviate/tests/test_document_store.py | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/integrations/weaviate/tests/test_document_store.py b/integrations/weaviate/tests/test_document_store.py index 00373a406..2fef2bcbb 100644 --- a/integrations/weaviate/tests/test_document_store.py +++ b/integrations/weaviate/tests/test_document_store.py @@ -15,7 +15,6 @@ FilterDocumentsTest, WriteDocumentsTest, ) -import weaviate from haystack_integrations.document_stores.weaviate.auth import AuthApiKey from haystack_integrations.document_stores.weaviate.document_store import ( DOCUMENT_COLLECTION_PROPERTIES, @@ -150,11 +149,11 @@ def assert_documents_are_equal(self, received: List[Document], expected: List[Do @patch("haystack_integrations.document_stores.weaviate.document_store.weaviate.WeaviateClient") def test_init(self, mock_weaviate_client_class, monkeypatch): mock_client = MagicMock() - mock_client.schema.exists.return_value = False + mock_client.collections.exists.return_value = False mock_weaviate_client_class.return_value = mock_client monkeypatch.setenv("WEAVIATE_API_KEY", "my_api_key") WeaviateDocumentStore( - # collection_settings={"class": "My_collection"}, + collection_settings={"class": "My_collection"}, # auth_client_secret=AuthApiKey(), # proxies={"http": "http://proxy:1234"}, additional_headers={"X-HuggingFace-Api-Key": "MY_HUGGINGFACE_KEY"}, @@ -178,9 +177,9 @@ def test_init(self, mock_weaviate_client_class, monkeypatch): # timeout_config=(10, 60), # proxies={"http": "http://proxy:1234"}, # trust_env=False, - connection_params=None, - auth_client_secret=None, - additional_config=None, + connection_params=None, + auth_client_secret=None, + additional_config=None, additional_headers={"X-HuggingFace-Api-Key": "MY_HUGGINGFACE_KEY"}, embedded_options=EmbeddedOptions( persistence_data_path=DEFAULT_PERSISTENCE_DATA_PATH, @@ -192,11 +191,15 @@ def test_init(self, mock_weaviate_client_class, monkeypatch): ) # Verify collection is created - mock_client.schema.get.assert_called_once() - mock_client.schema.exists.assert_called_once_with("My_collection") - mock_client.schema.create_class.assert_called_once_with( + # mock_client.schema.get.assert_called_once() + # mock_client.schema.exists.assert_called_once_with("My_collection") + mock_client.collections.exists.assert_called_once_with("My_collection") + mock_client.collections.create_from_dict.assert_called_once_with( {"class": "My_collection", "properties": DOCUMENT_COLLECTION_PROPERTIES} ) + # mock_client.schema.create_class.assert_called_once_with( + # {"class": "My_collection", "properties": DOCUMENT_COLLECTION_PROPERTIES} + # ) @patch("haystack_integrations.document_stores.weaviate.document_store.weaviate") def test_to_dict(self, _mock_weaviate, monkeypatch): From 8ee53fdf8879b0e4f221c4088c101d2313f67bb7 Mon Sep 17 00:00:00 2001 From: hsm207 Date: Wed, 28 Feb 2024 20:51:12 +0000 Subject: [PATCH 34/52] add exception handling in _query_paginated --- .../document_stores/weaviate/document_store.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index b99fe3bf4..538e38ce6 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -265,7 +265,11 @@ def _query_paginated(self, properties: List[str]): # raise DocumentStoreError(msg) # return result["data"]["Get"][collection_name] - result = self._collection.iterator(include_vector=True, return_properties=properties) + try: + result = self._collection.iterator(include_vector=True, return_properties=properties) + except weaviate.exceptions.WeaviateQueryError as e: + msg = f"Failed to query documents in Weaviate. Error: {e.message}" + raise DocumentStoreError(msg) from None return list(result) def _query_with_filters(self, filters: weaviate.collections.classes.filters.Filter) -> List[Dict[str, Any]]: From 1be0ec9c3b1f33fad815ce9261a8d3d276c6483b Mon Sep 17 00:00:00 2001 From: hsm207 Date: Wed, 28 Feb 2024 20:56:48 +0000 Subject: [PATCH 35/52] remove commented out code --- .../document_stores/weaviate/_filters.py | 26 +-- .../weaviate/document_store.py | 157 +----------------- 2 files changed, 5 insertions(+), 178 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py index 04a174b1b..5bfbd7ac9 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py @@ -17,7 +17,6 @@ def convert_filters(filters: Dict[str, Any]) -> Dict[str, Any]: raise FilterError(msg) if "field" in filters: - # return {"operator": "And", "operands": [_parse_comparison_condition(filters)]} return _FilterAnd(_parse_comparison_condition(filters)) return _parse_logical_condition(filters) @@ -77,7 +76,6 @@ def _parse_logical_condition(condition: Dict[str, Any]) -> Dict[str, Any]: operands.append(_parse_logical_condition(c)) else: operands.append(_parse_comparison_condition(c)) - # return {"operator": operator.lower().capitalize(), "operands": operands} return LOGICAL_OPERATORS[operator](*operands) elif operator == "NOT": inverted_conditions = _invert_condition(condition) @@ -120,23 +118,14 @@ def _handle_date(value: Any) -> str: def _equal(field: str, value: Any) -> Dict[str, Any]: if value is None: - # return {"path": field, "operator": "IsNull", "valueBoolean": True} return weaviate.classes.query.Filter.by_property(field).is_none(True) - # return {"path": field, "operator": "Equal", _infer_value_type(value): _handle_date(value)} return weaviate.classes.query.Filter.by_property(field).equal(_handle_date(value)) def _not_equal(field: str, value: Any) -> Dict[str, Any]: if value is None: - # return {"path": field, "operator": "IsNull", "valueBoolean": False} return weaviate.classes.query.Filter.by_property(field).is_none(False) - # return { - # "operator": "Or", - # "operands": [ - # {"path": field, "operator": "NotEqual", _infer_value_type(value): _handle_date(value)}, - # {"path": field, "operator": "IsNull", "valueBoolean": True}, - # ], - # } + return weaviate.classes.query.Filter.by_property(field).not_equal( _handle_date(value) ) | weaviate.classes.query.Filter.by_property(field).is_none(True) @@ -161,7 +150,6 @@ def _greater_than(field: str, value: Any) -> Dict[str, Any]: if type(value) in [list, DataFrame]: msg = f"Filter value can't be of type {type(value)} using operators '>', '>=', '<', '<='" raise FilterError(msg) - # return {"path": field, "operator": "GreaterThan", _infer_value_type(value): _handle_date(value)} return weaviate.classes.query.Filter.by_property(field).greater_than(_handle_date(value)) @@ -184,7 +172,6 @@ def _greater_than_equal(field: str, value: Any) -> Dict[str, Any]: if type(value) in [list, DataFrame]: msg = f"Filter value can't be of type {type(value)} using operators '>', '>=', '<', '<='" raise FilterError(msg) - # return {"path": field, "operator": "GreaterThanEqual", _infer_value_type(value): _handle_date(value)} return weaviate.classes.query.Filter.by_property(field).greater_or_equal(_handle_date(value)) @@ -207,7 +194,6 @@ def _less_than(field: str, value: Any) -> Dict[str, Any]: if type(value) in [list, DataFrame]: msg = f"Filter value can't be of type {type(value)} using operators '>', '>=', '<', '<='" raise FilterError(msg) - # return {"path": field, "operator": "LessThan", _infer_value_type(value): _handle_date(value)} return weaviate.classes.query.Filter.by_property(field).less_than(_handle_date(value)) @@ -230,7 +216,6 @@ def _less_than_equal(field: str, value: Any) -> Dict[str, Any]: if type(value) in [list, DataFrame]: msg = f"Filter value can't be of type {type(value)} using operators '>', '>=', '<', '<='" raise FilterError(msg) - # return {"path": field, "operator": "LessThanEqual", _infer_value_type(value): _handle_date(value)} return weaviate.classes.query.Filter.by_property(field).less_or_equal(_handle_date(value)) @@ -239,7 +224,6 @@ def _in(field: str, value: Any) -> Dict[str, Any]: msg = f"{field}'s value must be a list when using 'in' or 'not in' comparators" raise FilterError(msg) - # return {"operator": "And", "operands": [_equal(field, v) for v in value]} return weaviate.classes.query.Filter.by_property(field).contains_any(value) @@ -247,7 +231,6 @@ def _not_in(field: str, value: Any) -> Dict[str, Any]: if not isinstance(value, list): msg = f"{field}'s value must be a list when using 'in' or 'not in' comparators" raise FilterError(msg) - # return {"operator": "And", "operands": [_not_equal(field, v) for v in value]} operands = [weaviate.classes.query.Filter.by_property(field).not_equal(v) for v in value] return _FilterAnd(*operands) @@ -294,13 +277,6 @@ def _match_no_document(field: str) -> Dict[str, Any]: Returns a filters that will match no Document, this is used to keep the behavior consistent between different Document Stores. """ - # return { - # "operator": "And", - # "operands": [ - # {"path": field, "operator": "IsNull", "valueBoolean": False}, - # {"path": field, "operator": "IsNull", "valueBoolean": True}, - # ], - # } operands = [weaviate.classes.query.Filter.by_property(field).is_none(val) for val in [False, True]] return _FilterAnd(*operands) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index 538e38ce6..6ff171b8d 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -13,7 +13,7 @@ import weaviate from weaviate.collections.classes.internal import Object -from weaviate.config import AdditionalConfig # , Config, ConnectionConfig +from weaviate.config import AdditionalConfig from weaviate.embedded import EmbeddedOptions from weaviate.util import generate_uuid5 @@ -157,16 +157,12 @@ def to_dict(self) -> Dict[str, Any]: @classmethod def from_dict(cls, data: Dict[str, Any]) -> "WeaviateDocumentStore": - # if (timeout_config := data["init_parameters"].get("timeout_config")) is not None: - # data["init_parameters"]["timeout_config"] = ( - # tuple(timeout_config) if isinstance(timeout_config, list) else timeout_config - # ) + if (auth_client_secret := data["init_parameters"].get("auth_client_secret")) is not None: data["init_parameters"]["auth_client_secret"] = AuthCredentials.from_dict(auth_client_secret) if (embedded_options := data["init_parameters"].get("embedded_options")) is not None: data["init_parameters"]["embedded_options"] = EmbeddedOptions(**embedded_options) if (additional_config := data["init_parameters"].get("additional_config")) is not None: - # additional_config["connection_config"] = ConnectionConfig(**additional_config["connection_config"]) data["init_parameters"]["additional_config"] = AdditionalConfig(**additional_config) return default_from_dict( cls, @@ -174,9 +170,6 @@ def from_dict(cls, data: Dict[str, Any]) -> "WeaviateDocumentStore": ) def count_documents(self) -> int: - # collection_name = self._collection_settings["class"] - # res = self._client.query.aggregate(collection_name).with_meta_count().do() - # return res.get("data", {}).get("Aggregate", {}).get(collection_name, [{}])[0].get("meta", {}).get("count", 0) total = self._collection.aggregate.over_all(total_count=True).total_count return total if total else 0 @@ -241,30 +234,7 @@ def _to_document(self, data: Dict[str, Any]) -> Document: return Document.from_dict(data) def _query_paginated(self, properties: List[str]): - # collection_name = self._collection_settings["class"] - # query = ( - # self._client.query.get( - # collection_name, - # properties, - # ) - # .with_additional(["id vector"]) - # .with_limit(100) - # ) - - # if cursor: - # # Fetch the next set of results - # result = query.with_after(cursor).do() - # else: - # # Fetch the first set of results - # result = query.do() - - # if "errors" in result: - # errors = [e["message"] for e in result.get("errors", {})] - # msg = "\n".join(errors) - # msg = f"Failed to query documents in Weaviate. Errors:\n{msg}" - # raise DocumentStoreError(msg) - - # return result["data"]["Get"][collection_name] + try: result = self._collection.iterator(include_vector=True, return_properties=properties) except weaviate.exceptions.WeaviateQueryError as e: @@ -273,7 +243,6 @@ def _query_paginated(self, properties: List[str]): return list(result) def _query_with_filters(self, filters: weaviate.collections.classes.filters.Filter) -> List[Dict[str, Any]]: - # collection_name = self._collection_settings["class"] try: # this is the default value for max number of objects to retrieve in Weaviate @@ -288,29 +257,9 @@ def _query_with_filters(self, filters: weaviate.collections.classes.filters.Filt msg = f"Failed to query documents in Weaviate. Error: {e.message}" raise DocumentStoreError(msg) from None - # query = ( - # self._client.query.get( - # collection_name, - # properties, - # ) - # .with_additional(["id vector"]) - # .with_where(convert_filters(filters)) - # ) - - # result = query.do() - - # if "errors" in result: - # errors = [e["message"] for e in result.get("errors", {})] - # msg = "\n".join(errors) - # msg = f"Failed to query documents in Weaviate. Errors:\n{msg}" - # raise DocumentStoreError(msg) - - # return result["data"]["Get"][collection_name] - return result.objects def filter_documents(self, filters: Optional[weaviate.collections.classes.filters.Filter] = None) -> List[Document]: - # properties = self._client.schema.get(self._collection_settings["class"]).get("properties", []) properties = self._collection.config.get().properties properties = [prop.name for prop in properties] @@ -318,17 +267,6 @@ def filter_documents(self, filters: Optional[weaviate.collections.classes.filter result = self._query_with_filters(filters) return [self._to_document(self._convert_weaviate_v4_object_to_v3_object(doc)) for doc in result] - # result = [] - - # cursor = None - # while batch := self._query_paginated(properties, cursor): - # # Take the cursor before we convert the batch to Documents as we manipulate - # # the batch dictionary and might lose that information. - # cursor = batch[-1]["_additional"]["id"] - - # for doc in batch: - # result.append(self._to_document(doc)) - # # Move the cursor to the last returned uuid result = self._query_paginated(properties) result = [self._to_document(self._convert_weaviate_v4_object_to_v3_object(doc)) for doc in result] return result @@ -339,34 +277,6 @@ def _batch_write(self, documents: List[Document]) -> int: Documents with the same id will be overwritten. Raises in case of errors. """ - # statuses = [] - # for doc in documents: - # if not isinstance(doc, Document): - # msg = f"Expected a Document, got '{type(doc)}' instead." - # raise ValueError(msg) - # if self._client.batch.num_objects() == self._client.batch.recommended_num_objects: - # # Batch is full, let's create the objects - # statuses.extend(self._client.batch.create_objects()) - # self._client.batch.add_data_object( - # uuid=generate_uuid5(doc.id), - # data_object=self._to_data_object(doc), - # class_name=self._collection_settings["class"], - # vector=doc.embedding, - # ) - # # Write remaining documents - # statuses.extend(self._client.batch.create_objects()) - - # errors = [] - # # Gather errors and number of written documents - # for status in statuses: - # result_status = status.get("result", {}).get("status") - # if result_status == "FAILED": - # errors.extend([e["message"] for e in status["result"]["errors"]["error"]]) - - # if errors: - # msg = "\n".join(errors) - # msg = f"Failed to write documents in Weaviate. Errors:\n{msg}" - # raise DocumentStoreError(msg) with self._client.batch.dynamic() as batch: @@ -414,12 +324,6 @@ def _write(self, documents: List[Document], policy: DuplicatePolicy) -> int: continue try: - # self._client.data_object.create( - # uuid=generate_uuid5(doc.id), - # data_object=self._to_data_object(doc), - # class_name=self._collection_settings["class"], - # vector=doc.embedding, - # ) self._collection.data.insert( uuid=generate_uuid5(doc.id), properties=self._to_data_object(doc), @@ -450,39 +354,14 @@ def write_documents(self, documents: List[Document], policy: DuplicatePolicy = D return self._write(documents, policy) def delete_documents(self, document_ids: List[str]) -> None: - # self._client.batch.delete_objects( - # class_name=self._collection_settings["class"], - # where={ - # "path": ["id"], - # "operator": "ContainsAny", - # "valueTextArray": [generate_uuid5(doc_id) for doc_id in document_ids], - # }, - # ) + weaviate_ids = [generate_uuid5(doc_id) for doc_id in document_ids] self._collection.data.delete_many(where=weaviate.classes.query.Filter.by_id().contains_any(weaviate_ids)) def _bm25_retrieval( self, query: str, filters: Optional[Dict[str, Any]] = None, top_k: Optional[int] = None ) -> List[Document]: - # collection_name = self._collection_settings["class"] - # properties = self._client.schema.get(self._collection_settings["class"]).get("properties", []) - # properties = [prop["name"] for prop in properties] - - # query_builder = ( - # self._client.query.get(collection_name, properties=properties) - # .with_bm25(query=query, properties=["content"]) - # .with_additional(["vector"]) - # ) - # if filters: - # query_builder = query_builder.with_where(convert_filters(filters)) - - # if top_k: - # query_builder = query_builder.with_limit(top_k) - - # result = query_builder.do() - - # return [self._to_document(doc) for doc in result["data"]["Get"][collection_name]] result = self._collection.query.bm25( query=query, filters=convert_filters(filters) if filters else None, @@ -505,34 +384,6 @@ def _embedding_retrieval( msg = "Can't use 'distance' and 'certainty' parameters together" raise ValueError(msg) - # collection_name = self._collection_settings["class"] - # properties = self._client.schema.get(self._collection_settings["class"]).get("properties", []) - # properties = [prop["name"] for prop in properties] - - # near_vector: Dict[str, Union[float, List[float]]] = { - # "vector": query_embedding, - # } - # if distance is not None: - # near_vector["distance"] = distance - - # if certainty is not None: - # near_vector["certainty"] = certainty - - # query_builder = ( - # self._client.query.get(collection_name, properties=properties) - # .with_near_vector(near_vector) - # .with_additional(["vector"]) - # ) - - # if filters: - # query_builder = query_builder.with_where(convert_filters(filters)) - - # if top_k: - # query_builder = query_builder.with_limit(top_k) - - # result = query_builder.do() - # return [self._to_document(doc) for doc in result["data"]["Get"][collection_name]] - result = self._collection.query.near_vector( near_vector=query_embedding, distance=distance, From 5b9d2b2cb9cd76158672aafd8d3185c3aed5d7cc Mon Sep 17 00:00:00 2001 From: hsm207 Date: Wed, 28 Feb 2024 20:58:54 +0000 Subject: [PATCH 36/52] remove dead code --- .../document_stores/weaviate/_filters.py | 22 ------------------- 1 file changed, 22 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py index 5bfbd7ac9..f0264098a 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py @@ -85,28 +85,6 @@ def _parse_logical_condition(condition: Dict[str, Any]) -> Dict[str, Any]: raise FilterError(msg) -def _infer_value_type(value: Any) -> str: - if value is None: - return "valueNull" - - if isinstance(value, bool): - return "valueBoolean" - if isinstance(value, int): - return "valueInt" - if isinstance(value, float): - return "valueNumber" - - if isinstance(value, str): - try: - parser.isoparse(value) - return "valueDate" - except ValueError: - return "valueText" - - msg = f"Unknown value type {type(value)}" - raise FilterError(msg) - - def _handle_date(value: Any) -> str: if isinstance(value, str): try: From 8d724237e3f50cab0f03d57f2b42b5af22135e60 Mon Sep 17 00:00:00 2001 From: hsm207 Date: Thu, 29 Feb 2024 16:43:40 +0000 Subject: [PATCH 37/52] remove commented out code --- integrations/weaviate/tests/test_document_store.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/integrations/weaviate/tests/test_document_store.py b/integrations/weaviate/tests/test_document_store.py index 2fef2bcbb..f492ab806 100644 --- a/integrations/weaviate/tests/test_document_store.py +++ b/integrations/weaviate/tests/test_document_store.py @@ -191,15 +191,10 @@ def test_init(self, mock_weaviate_client_class, monkeypatch): ) # Verify collection is created - # mock_client.schema.get.assert_called_once() - # mock_client.schema.exists.assert_called_once_with("My_collection") mock_client.collections.exists.assert_called_once_with("My_collection") mock_client.collections.create_from_dict.assert_called_once_with( {"class": "My_collection", "properties": DOCUMENT_COLLECTION_PROPERTIES} ) - # mock_client.schema.create_class.assert_called_once_with( - # {"class": "My_collection", "properties": DOCUMENT_COLLECTION_PROPERTIES} - # ) @patch("haystack_integrations.document_stores.weaviate.document_store.weaviate") def test_to_dict(self, _mock_weaviate, monkeypatch): @@ -322,7 +317,6 @@ def test_from_dict(self, _mock_weaviate, monkeypatch): assert document_store._additional_config.proxies == {"http": "http://proxy:1234"} assert not document_store._additional_config.trust_env assert document_store._additional_headers == {"X-HuggingFace-Api-Key": "MY_HUGGINGFACE_KEY"} - # assert document_store._startup_period == 5 assert document_store._embedded_options.persistence_data_path == DEFAULT_PERSISTENCE_DATA_PATH assert document_store._embedded_options.binary_path == DEFAULT_BINARY_PATH assert document_store._embedded_options.version == "1.23.0" @@ -330,7 +324,6 @@ def test_from_dict(self, _mock_weaviate, monkeypatch): assert document_store._embedded_options.hostname == "127.0.0.1" assert document_store._embedded_options.additional_env_vars is None assert document_store._embedded_options.grpc_port == DEFAULT_GRPC_PORT - # assert document_store._additional_config.grpc_port_experimental == 12345 assert document_store._additional_config.connection.session_pool_connections == 20 assert document_store._additional_config.connection.session_pool_maxsize == 20 From f35ef05530c4a451061757034597067fa401ef55 Mon Sep 17 00:00:00 2001 From: hsm207 Date: Thu, 29 Feb 2024 16:45:57 +0000 Subject: [PATCH 38/52] return weaviate traceback too when query error occurs --- .../document_stores/weaviate/document_store.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index 6ff171b8d..1d8e7846b 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -239,7 +239,7 @@ def _query_paginated(self, properties: List[str]): result = self._collection.iterator(include_vector=True, return_properties=properties) except weaviate.exceptions.WeaviateQueryError as e: msg = f"Failed to query documents in Weaviate. Error: {e.message}" - raise DocumentStoreError(msg) from None + raise DocumentStoreError(msg) from e return list(result) def _query_with_filters(self, filters: weaviate.collections.classes.filters.Filter) -> List[Dict[str, Any]]: @@ -255,7 +255,7 @@ def _query_with_filters(self, filters: weaviate.collections.classes.filters.Filt ) except weaviate.exceptions.WeaviateQueryError as e: msg = f"Failed to query documents in Weaviate. Error: {e.message}" - raise DocumentStoreError(msg) from None + raise DocumentStoreError(msg) from e return result.objects From bdbe86a55466bb1f0ee2cd69f55aed4cb577f0a0 Mon Sep 17 00:00:00 2001 From: hsm207 Date: Fri, 1 Mar 2024 06:31:25 +0000 Subject: [PATCH 39/52] make _query_paginated return an iterator --- .../document_stores/weaviate/document_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index 1d8e7846b..6e40cbb0d 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -240,7 +240,7 @@ def _query_paginated(self, properties: List[str]): except weaviate.exceptions.WeaviateQueryError as e: msg = f"Failed to query documents in Weaviate. Error: {e.message}" raise DocumentStoreError(msg) from e - return list(result) + return result def _query_with_filters(self, filters: weaviate.collections.classes.filters.Filter) -> List[Dict[str, Any]]: From 6be11afd0acab4186d3692ed7234b64048e9a92b Mon Sep 17 00:00:00 2001 From: hsm207 Date: Mon, 4 Mar 2024 15:25:42 +0000 Subject: [PATCH 40/52] refactor _to_document --- .../weaviate/document_store.py | 25 +++++++++---------- .../weaviate/tests/test_document_store.py | 23 +++++++++-------- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index 6e40cbb0d..85da8c486 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -12,6 +12,7 @@ from haystack.document_stores.types.policy import DuplicatePolicy import weaviate +from weaviate.collections.classes.data import DataObject from weaviate.collections.classes.internal import Object from weaviate.config import AdditionalConfig from weaviate.embedded import EmbeddedOptions @@ -211,27 +212,25 @@ def _convert_weaviate_v4_object_to_v3_object(self, data: Object) -> Dict[str, An v3_object[date_prop] = v3_object[date_prop].strftime("%Y-%m-%dT%H:%M:%SZ") return v3_object - def _to_document(self, data: Dict[str, Any]) -> Document: + def _to_document(self, data: DataObject) -> Document: """ Convert a data object read from Weaviate into a Document. """ - data["id"] = data.pop("_original_id") - data["embedding"] = data["_additional"].pop("vector") if data["_additional"].get("vector") else None + document_data = data.properties + document_data["id"] = document_data.pop("_original_id") + document_data["embedding"] = data.vector if data.vector else None - if (blob_data := data.get("blob_data")) is not None: - data["blob"] = { + if (blob_data := document_data.get("blob_data")) is not None: + document_data["blob"] = { "data": base64.b64decode(blob_data), - "mime_type": data.get("blob_mime_type"), + "mime_type": document_data.get("blob_mime_type"), } - # We always delete these fields as they're not part of the Document dataclass - data.pop("blob_data") - data.pop("blob_mime_type") - # We don't need these fields anymore, this usually only contains the uuid - # used by Weaviate to identify the object and the embedding vector that we already extracted. - del data["_additional"] + # We always delete these fields as they're not part of the Document dataclass + document_data.pop("blob_data", None) + document_data.pop("blob_mime_type", None) - return Document.from_dict(data) + return Document.from_dict(document_data) def _query_paginated(self, properties: List[str]): diff --git a/integrations/weaviate/tests/test_document_store.py b/integrations/weaviate/tests/test_document_store.py index f492ab806..e497c24e7 100644 --- a/integrations/weaviate/tests/test_document_store.py +++ b/integrations/weaviate/tests/test_document_store.py @@ -24,6 +24,7 @@ from numpy import array_equal as np_array_equal from numpy import float32 as np_float32 from pandas import DataFrame +from weaviate.collections.classes.data import DataObject # from weaviate.auth import AuthApiKey as WeaviateAuthApiKey from weaviate.config import AdditionalConfig, ConnectionConfig @@ -357,18 +358,18 @@ def test_to_data_object(self, document_store, test_files_path): def test_to_document(self, document_store, test_files_path): image = ByteStream.from_file_path(test_files_path / "robot1.jpg", mime_type="image/jpeg") - data = { - "_additional": { - "vector": [1, 2, 3], + data = DataObject( + properties={ + "_original_id": "123", + "content": "some content", + "blob_data": base64.b64encode(image.data).decode(), + "blob_mime_type": "image/jpeg", + "dataframe": None, + "score": None, + "key": "value", }, - "_original_id": "123", - "content": "some content", - "blob_data": base64.b64encode(image.data).decode(), - "blob_mime_type": "image/jpeg", - "dataframe": None, - "score": None, - "meta": {"key": "value"}, - } + vector=[1, 2, 3], + ) doc = document_store._to_document(data) assert doc.id == "123" From 0fa483e4539f5acca39a4aee49f86eb719a68ec0 Mon Sep 17 00:00:00 2001 From: hsm207 Date: Mon, 4 Mar 2024 15:42:01 +0000 Subject: [PATCH 41/52] remove v4 to v3 object conv fn --- .../weaviate/document_store.py | 26 +++---------------- 1 file changed, 4 insertions(+), 22 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index 85da8c486..61cfb5952 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -194,24 +194,6 @@ def _to_data_object(self, document: Document) -> Dict[str, Any]: return data - def _convert_weaviate_v4_object_to_v3_object(self, data: Object) -> Dict[str, Any]: - properties = self._collection.config.get().properties - properties_with_date_type = [ - p.name for p in properties if p.data_type.name == "DATE" and data.properties.get(p.name) - ] - - v4_object = data.__dict__ - v3_object = v4_object.pop("properties") - v3_object["_additional"] = {"vector": v4_object.pop("vector").get("default")} - - if "blob_data" not in v3_object: - v3_object["blob_data"] = None - v3_object["blob_mime_type"] = None - - for date_prop in properties_with_date_type: - v3_object[date_prop] = v3_object[date_prop].strftime("%Y-%m-%dT%H:%M:%SZ") - return v3_object - def _to_document(self, data: DataObject) -> Document: """ Convert a data object read from Weaviate into a Document. @@ -264,10 +246,10 @@ def filter_documents(self, filters: Optional[weaviate.collections.classes.filter if filters: result = self._query_with_filters(filters) - return [self._to_document(self._convert_weaviate_v4_object_to_v3_object(doc)) for doc in result] + return [self._to_document(doc) for doc in result] result = self._query_paginated(properties) - result = [self._to_document(self._convert_weaviate_v4_object_to_v3_object(doc)) for doc in result] + result = [self._to_document(doc) for doc in result] return result def _batch_write(self, documents: List[Document]) -> int: @@ -369,7 +351,7 @@ def _bm25_retrieval( query_properties=["content"], ) - return [self._to_document(self._convert_weaviate_v4_object_to_v3_object(doc)) for doc in result.objects] + return [self._to_document(doc) for doc in result.objects] def _embedding_retrieval( self, @@ -392,4 +374,4 @@ def _embedding_retrieval( limit=top_k, ) - return [self._to_document(self._convert_weaviate_v4_object_to_v3_object(doc)) for doc in result.objects] + return [self._to_document(doc) for doc in result.objects] From 1202aa5fabd4828e06da407a4cd5dd0b3b155711 Mon Sep 17 00:00:00 2001 From: hsm207 Date: Mon, 4 Mar 2024 16:02:12 +0000 Subject: [PATCH 42/52] update to_dict serialization --- .../document_stores/weaviate/document_store.py | 4 +++- integrations/weaviate/tests/test_document_store.py | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index 61cfb5952..5beaeef8e 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -144,7 +144,9 @@ def __init__( def to_dict(self) -> Dict[str, Any]: embedded_options = asdict(self._embedded_options) if self._embedded_options else None - additional_config = json.loads(self._additional_config.model_dump_json()) if self._additional_config else None + additional_config = ( + json.loads(self._additional_config.model_dump_json(by_alias=True)) if self._additional_config else None + ) return default_to_dict( self, diff --git a/integrations/weaviate/tests/test_document_store.py b/integrations/weaviate/tests/test_document_store.py index e497c24e7..394e0e14d 100644 --- a/integrations/weaviate/tests/test_document_store.py +++ b/integrations/weaviate/tests/test_document_store.py @@ -255,7 +255,7 @@ def test_to_dict(self, _mock_weaviate, monkeypatch): "session_pool_maxsize": 100, "session_pool_max_retries": 3, }, - "proxies": {"http": "http://proxy:1234"}, + "proxies": {"http": "http://proxy:1234", "https": None, "grpc": None}, "timeout": [30, 90], "trust_env": False, }, From 9647151bd0a43f536632da5e1c282665037f6098 Mon Sep 17 00:00:00 2001 From: hsm207 Date: Mon, 4 Mar 2024 16:09:24 +0000 Subject: [PATCH 43/52] update test case --- integrations/weaviate/tests/test_document_store.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/integrations/weaviate/tests/test_document_store.py b/integrations/weaviate/tests/test_document_store.py index 394e0e14d..6e78d1da6 100644 --- a/integrations/weaviate/tests/test_document_store.py +++ b/integrations/weaviate/tests/test_document_store.py @@ -27,7 +27,7 @@ from weaviate.collections.classes.data import DataObject # from weaviate.auth import AuthApiKey as WeaviateAuthApiKey -from weaviate.config import AdditionalConfig, ConnectionConfig +from weaviate.config import AdditionalConfig, ConnectionConfig, Proxies, Timeout from weaviate.embedded import ( DEFAULT_BINARY_PATH, DEFAULT_GRPC_PORT, @@ -314,8 +314,8 @@ def test_from_dict(self, _mock_weaviate, monkeypatch): ], } assert document_store._auth_client_secret == AuthApiKey() - assert document_store._additional_config.timeout == (10, 60) - assert document_store._additional_config.proxies == {"http": "http://proxy:1234"} + assert document_store._additional_config.timeout == Timeout(query=10, insert=60) + assert document_store._additional_config.proxies == Proxies(http="http://proxy:1234", https=None, grpc=None) assert not document_store._additional_config.trust_env assert document_store._additional_headers == {"X-HuggingFace-Api-Key": "MY_HUGGINGFACE_KEY"} assert document_store._embedded_options.persistence_data_path == DEFAULT_PERSISTENCE_DATA_PATH From 207f42745026f973ada5420d752feefe35b64258 Mon Sep 17 00:00:00 2001 From: hsm207 Date: Tue, 5 Mar 2024 16:27:41 +0000 Subject: [PATCH 44/52] update weaviate server --- integrations/weaviate/docker-compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/weaviate/docker-compose.yml b/integrations/weaviate/docker-compose.yml index faff1ca4a..f7f033eee 100644 --- a/integrations/weaviate/docker-compose.yml +++ b/integrations/weaviate/docker-compose.yml @@ -8,7 +8,7 @@ services: - '8080' - --scheme - http - image: semitechnologies/weaviate:1.24.0 + image: semitechnologies/weaviate:1.24.1 ports: - 8080:8080 - 50051:50051 From 9a30bdd5bf370289a4502f6d738f0cecfe9bba55 Mon Sep 17 00:00:00 2001 From: hsm207 Date: Tue, 5 Mar 2024 16:34:33 +0000 Subject: [PATCH 45/52] updates due to latest client changes --- .../document_stores/weaviate/_filters.py | 14 +++++++------- .../document_stores/weaviate/document_store.py | 8 ++++++-- integrations/weaviate/tests/test_document_store.py | 2 +- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py index f0264098a..f397231ec 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py @@ -5,7 +5,7 @@ from pandas import DataFrame import weaviate -from weaviate.collections.classes.filters import _FilterAnd, _FilterOr +from weaviate.collections.classes.filters import Filter def convert_filters(filters: Dict[str, Any]) -> Dict[str, Any]: @@ -17,7 +17,7 @@ def convert_filters(filters: Dict[str, Any]) -> Dict[str, Any]: raise FilterError(msg) if "field" in filters: - return _FilterAnd(_parse_comparison_condition(filters)) + return Filter.all_of([_parse_comparison_condition(filters)]) return _parse_logical_condition(filters) @@ -55,8 +55,8 @@ def _invert_condition(filters: Dict[str, Any]) -> Dict[str, Any]: LOGICAL_OPERATORS = { - "AND": _FilterAnd, - "OR": _FilterOr, + "AND": Filter.all_of, + "OR": Filter.any_of, } @@ -76,7 +76,7 @@ def _parse_logical_condition(condition: Dict[str, Any]) -> Dict[str, Any]: operands.append(_parse_logical_condition(c)) else: operands.append(_parse_comparison_condition(c)) - return LOGICAL_OPERATORS[operator](*operands) + return LOGICAL_OPERATORS[operator](operands) elif operator == "NOT": inverted_conditions = _invert_condition(condition) return _parse_logical_condition(inverted_conditions) @@ -210,7 +210,7 @@ def _not_in(field: str, value: Any) -> Dict[str, Any]: msg = f"{field}'s value must be a list when using 'in' or 'not in' comparators" raise FilterError(msg) operands = [weaviate.classes.query.Filter.by_property(field).not_equal(v) for v in value] - return _FilterAnd(*operands) + return Filter.all_of(operands) COMPARISON_OPERATORS = { @@ -257,4 +257,4 @@ def _match_no_document(field: str) -> Dict[str, Any]: """ operands = [weaviate.classes.query.Filter.by_property(field).is_none(val) for val in [False, True]] - return _FilterAnd(*operands) + return Filter.all_of(operands) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index 5beaeef8e..d001d3ecf 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -2,6 +2,7 @@ # # SPDX-License-Identifier: Apache-2.0 import base64 +import datetime import json from dataclasses import asdict from typing import Any, Dict, List, Optional, Tuple, Union @@ -13,7 +14,6 @@ import weaviate from weaviate.collections.classes.data import DataObject -from weaviate.collections.classes.internal import Object from weaviate.config import AdditionalConfig from weaviate.embedded import EmbeddedOptions from weaviate.util import generate_uuid5 @@ -202,7 +202,7 @@ def _to_document(self, data: DataObject) -> Document: """ document_data = data.properties document_data["id"] = document_data.pop("_original_id") - document_data["embedding"] = data.vector if data.vector else None + document_data["embedding"] = data.vector["default"] if data.vector else None if (blob_data := document_data.get("blob_data")) is not None: document_data["blob"] = { @@ -214,6 +214,10 @@ def _to_document(self, data: DataObject) -> Document: document_data.pop("blob_data", None) document_data.pop("blob_mime_type", None) + for key, value in document_data.items(): + if isinstance(value, datetime.datetime): + document_data[key] = value.strftime("%Y-%m-%dT%H:%M:%SZ") + return Document.from_dict(document_data) def _query_paginated(self, properties: List[str]): diff --git a/integrations/weaviate/tests/test_document_store.py b/integrations/weaviate/tests/test_document_store.py index 6e78d1da6..a35d4d817 100644 --- a/integrations/weaviate/tests/test_document_store.py +++ b/integrations/weaviate/tests/test_document_store.py @@ -368,7 +368,7 @@ def test_to_document(self, document_store, test_files_path): "score": None, "key": "value", }, - vector=[1, 2, 3], + vector={"default": [1, 2, 3]}, ) doc = document_store._to_document(data) From f620aaf5f5c0c099555a156617d727a3ea7ccbd0 Mon Sep 17 00:00:00 2001 From: hsm207 Date: Tue, 5 Mar 2024 17:21:29 +0000 Subject: [PATCH 46/52] update test case due to latest client changes --- .../weaviate/tests/test_document_store.py | 21 +++++++------------ 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/integrations/weaviate/tests/test_document_store.py b/integrations/weaviate/tests/test_document_store.py index a35d4d817..9abf0db27 100644 --- a/integrations/weaviate/tests/test_document_store.py +++ b/integrations/weaviate/tests/test_document_store.py @@ -155,8 +155,7 @@ def test_init(self, mock_weaviate_client_class, monkeypatch): monkeypatch.setenv("WEAVIATE_API_KEY", "my_api_key") WeaviateDocumentStore( collection_settings={"class": "My_collection"}, - # auth_client_secret=AuthApiKey(), - # proxies={"http": "http://proxy:1234"}, + auth_client_secret=AuthApiKey(), additional_headers={"X-HuggingFace-Api-Key": "MY_HUGGINGFACE_KEY"}, embedded_options=EmbeddedOptions( persistence_data_path=DEFAULT_PERSISTENCE_DATA_PATH, @@ -164,23 +163,16 @@ def test_init(self, mock_weaviate_client_class, monkeypatch): version="1.23.7", hostname="127.0.0.1", ), - # additional_config=AdditionalConfig( - # proxies={"http": "http://proxy:1234"}, - # ), + additional_config=AdditionalConfig( + proxies={"http": "http://proxy:1234"}, trust_env=False, timeout=(10, 60) + ), ) # Verify client is created with correct parameters mock_weaviate_client_class.assert_called_once_with( - # connection_params = connection_params, - # collection_settings={"class": "My_collection"}, - # auth_client_secret=WeaviateAuthApiKey("my_api_key"), - # timeout_config=(10, 60), - # proxies={"http": "http://proxy:1234"}, - # trust_env=False, + auth_client_secret=AuthApiKey().resolve_value(), connection_params=None, - auth_client_secret=None, - additional_config=None, additional_headers={"X-HuggingFace-Api-Key": "MY_HUGGINGFACE_KEY"}, embedded_options=EmbeddedOptions( persistence_data_path=DEFAULT_PERSISTENCE_DATA_PATH, @@ -189,6 +181,9 @@ def test_init(self, mock_weaviate_client_class, monkeypatch): hostname="127.0.0.1", ), skip_init_checks=False, + additional_config=AdditionalConfig( + proxies={"http": "http://proxy:1234"}, trust_env=False, timeout=(10, 60) + ), ) # Verify collection is created From 7ff8dde0fcd8afa1320dd759155509123777d926 Mon Sep 17 00:00:00 2001 From: Silvano Cerza Date: Tue, 12 Mar 2024 15:15:44 +0100 Subject: [PATCH 47/52] Fix filter converters return types --- .../document_stores/weaviate/_filters.py | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py index f397231ec..a2201f0a5 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/_filters.py @@ -5,10 +5,10 @@ from pandas import DataFrame import weaviate -from weaviate.collections.classes.filters import Filter +from weaviate.collections.classes.filters import Filter, FilterReturn -def convert_filters(filters: Dict[str, Any]) -> Dict[str, Any]: +def convert_filters(filters: Dict[str, Any]) -> FilterReturn: """ Convert filters from Haystack format to Weaviate format. """ @@ -60,7 +60,7 @@ def _invert_condition(filters: Dict[str, Any]) -> Dict[str, Any]: } -def _parse_logical_condition(condition: Dict[str, Any]) -> Dict[str, Any]: +def _parse_logical_condition(condition: Dict[str, Any]) -> FilterReturn: if "operator" not in condition: msg = f"'operator' key missing in {condition}" raise FilterError(msg) @@ -94,13 +94,13 @@ def _handle_date(value: Any) -> str: return value -def _equal(field: str, value: Any) -> Dict[str, Any]: +def _equal(field: str, value: Any) -> FilterReturn: if value is None: return weaviate.classes.query.Filter.by_property(field).is_none(True) return weaviate.classes.query.Filter.by_property(field).equal(_handle_date(value)) -def _not_equal(field: str, value: Any) -> Dict[str, Any]: +def _not_equal(field: str, value: Any) -> FilterReturn: if value is None: return weaviate.classes.query.Filter.by_property(field).is_none(False) @@ -109,7 +109,7 @@ def _not_equal(field: str, value: Any) -> Dict[str, Any]: ) | weaviate.classes.query.Filter.by_property(field).is_none(True) -def _greater_than(field: str, value: Any) -> Dict[str, Any]: +def _greater_than(field: str, value: Any) -> FilterReturn: if value is None: # When the value is None and '>' is used we create a filter that would return a Document # if it has a field set and not set at the same time. @@ -131,7 +131,7 @@ def _greater_than(field: str, value: Any) -> Dict[str, Any]: return weaviate.classes.query.Filter.by_property(field).greater_than(_handle_date(value)) -def _greater_than_equal(field: str, value: Any) -> Dict[str, Any]: +def _greater_than_equal(field: str, value: Any) -> FilterReturn: if value is None: # When the value is None and '>=' is used we create a filter that would return a Document # if it has a field set and not set at the same time. @@ -153,7 +153,7 @@ def _greater_than_equal(field: str, value: Any) -> Dict[str, Any]: return weaviate.classes.query.Filter.by_property(field).greater_or_equal(_handle_date(value)) -def _less_than(field: str, value: Any) -> Dict[str, Any]: +def _less_than(field: str, value: Any) -> FilterReturn: if value is None: # When the value is None and '<' is used we create a filter that would return a Document # if it has a field set and not set at the same time. @@ -175,7 +175,7 @@ def _less_than(field: str, value: Any) -> Dict[str, Any]: return weaviate.classes.query.Filter.by_property(field).less_than(_handle_date(value)) -def _less_than_equal(field: str, value: Any) -> Dict[str, Any]: +def _less_than_equal(field: str, value: Any) -> FilterReturn: if value is None: # When the value is None and '<=' is used we create a filter that would return a Document # if it has a field set and not set at the same time. @@ -197,7 +197,7 @@ def _less_than_equal(field: str, value: Any) -> Dict[str, Any]: return weaviate.classes.query.Filter.by_property(field).less_or_equal(_handle_date(value)) -def _in(field: str, value: Any) -> Dict[str, Any]: +def _in(field: str, value: Any) -> FilterReturn: if not isinstance(value, list): msg = f"{field}'s value must be a list when using 'in' or 'not in' comparators" raise FilterError(msg) @@ -205,7 +205,7 @@ def _in(field: str, value: Any) -> Dict[str, Any]: return weaviate.classes.query.Filter.by_property(field).contains_any(value) -def _not_in(field: str, value: Any) -> Dict[str, Any]: +def _not_in(field: str, value: Any) -> FilterReturn: if not isinstance(value, list): msg = f"{field}'s value must be a list when using 'in' or 'not in' comparators" raise FilterError(msg) @@ -225,7 +225,7 @@ def _not_in(field: str, value: Any) -> Dict[str, Any]: } -def _parse_comparison_condition(condition: Dict[str, Any]) -> Dict[str, Any]: +def _parse_comparison_condition(condition: Dict[str, Any]) -> FilterReturn: field: str = condition["field"] if field.startswith("meta."): @@ -250,7 +250,7 @@ def _parse_comparison_condition(condition: Dict[str, Any]) -> Dict[str, Any]: return COMPARISON_OPERATORS[operator](field, value) -def _match_no_document(field: str) -> Dict[str, Any]: +def _match_no_document(field: str) -> FilterReturn: """ Returns a filters that will match no Document, this is used to keep the behavior consistent between different Document Stores. From 9727a1d9406f32fb1e7da322681fe4344091c04f Mon Sep 17 00:00:00 2001 From: Silvano Cerza Date: Tue, 12 Mar 2024 15:32:43 +0100 Subject: [PATCH 48/52] Rework query methods --- .../weaviate/document_store.py | 74 ++++++++++++------- 1 file changed, 47 insertions(+), 27 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index d001d3ecf..bafb44239 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -45,6 +45,16 @@ {"name": "score", "dataType": ["number"]}, ] +# This is the default limit used when querying documents with WeaviateDocumentStore. +# +# We picked this as QUERY_MAXIMUM_RESULTS defaults to 10000, trying to get that many +# documents at once will fail, even if the query is paginated. +# This value will ensure we get the most documents possible without hitting that limit, it would +# still fail if the user lowers the QUERY_MAXIMUM_RESULTS environment variable for their Weaviate instance. +# +# See WeaviateDocumentStore._query_with_filters() for more information. +DEFAULT_QUERY_LIMIT = 9999 + class WeaviateDocumentStore: """ @@ -220,8 +230,8 @@ def _to_document(self, data: DataObject) -> Document: return Document.from_dict(document_data) - def _query_paginated(self, properties: List[str]): - + def _query(self) -> List[Dict[str, Any]]: + properties = [p.name for p in self._collection.config.get().properties] try: result = self._collection.iterator(include_vector=True, return_properties=properties) except weaviate.exceptions.WeaviateQueryError as e: @@ -229,34 +239,44 @@ def _query_paginated(self, properties: List[str]): raise DocumentStoreError(msg) from e return result - def _query_with_filters(self, filters: weaviate.collections.classes.filters.Filter) -> List[Dict[str, Any]]: - - try: - # this is the default value for max number of objects to retrieve in Weaviate - # see QUERY_MAXIMUM_RESULTS - # see https://weaviate.io/developers/weaviate/config-refs/env-vars#overview - # and https://weaviate.io/developers/weaviate/api/graphql/additional-operators#pagination-with-offset - limit = 10_000 - result = self._collection.query.fetch_objects( - filters=convert_filters(filters), include_vector=True, limit=limit - ) - except weaviate.exceptions.WeaviateQueryError as e: - msg = f"Failed to query documents in Weaviate. Error: {e.message}" - raise DocumentStoreError(msg) from e - - return result.objects - - def filter_documents(self, filters: Optional[weaviate.collections.classes.filters.Filter] = None) -> List[Document]: - properties = self._collection.config.get().properties - properties = [prop.name for prop in properties] + def _query_with_filters(self, filters: Dict[str, Any]) -> List[Dict[str, Any]]: + properties = [p.name for p in self._collection.config.get().properties] + # When querying with filters we need to paginate using limit and offset as using + # a cursor with after is not possible. See the official docs: + # https://weaviate.io/developers/weaviate/api/graphql/additional-operators#cursor-with-after + # + # Nonetheless there's also another issue, paginating with limit and offset is not efficient + # and it's still restricted by the QUERY_MAXIMUM_RESULTS environment variable. + # If the sum of limit and offest is greater than QUERY_MAXIMUM_RESULTS an error is raised. + # See the official docs for more: + # https://weaviate.io/developers/weaviate/api/graphql/additional-operators#performance-considerations + offset = 0 + partial_result = None + result = [] + # Keep querying until we get all documents matching the filters + while partial_result is None or len(partial_result.objects) == DEFAULT_QUERY_LIMIT: + try: + partial_result = self._collection.query.fetch_objects( + filters=convert_filters(filters), + include_vector=True, + limit=DEFAULT_QUERY_LIMIT, + offset=offset, + return_properties=properties, + ) + except weaviate.exceptions.WeaviateQueryError as e: + msg = f"Failed to query documents in Weaviate. Error: {e.message}" + raise DocumentStoreError(msg) from e + result.extend(partial_result.objects) + offset += DEFAULT_QUERY_LIMIT + return result + def filter_documents(self, filters: Optional[Dict[str, Any]] = None) -> List[Document]: + result = [] if filters: result = self._query_with_filters(filters) - return [self._to_document(doc) for doc in result] - - result = self._query_paginated(properties) - result = [self._to_document(doc) for doc in result] - return result + else: + result = self._query() + return [self._to_document(doc) for doc in result] def _batch_write(self, documents: List[Document]) -> int: """ From 8cfcf3d5133235f75b9b6a19a35542bcb53e3a59 Mon Sep 17 00:00:00 2001 From: Silvano Cerza Date: Tue, 12 Mar 2024 15:33:35 +0100 Subject: [PATCH 49/52] Fix batch writing errors --- .../document_stores/weaviate/document_store.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index bafb44239..3e804a3b6 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -286,7 +286,6 @@ def _batch_write(self, documents: List[Document]) -> int: """ with self._client.batch.dynamic() as batch: - for doc in documents: if not isinstance(doc, Document): msg = f"Expected a Document, got '{type(doc)}' instead." @@ -298,12 +297,20 @@ def _batch_write(self, documents: List[Document]) -> int: uuid=generate_uuid5(doc.id), vector=doc.embedding, ) - failed_objects = self._client.batch.failed_objects - if failed_objects: + if failed_objects := self._client.batch.failed_objects: + # We fallback to use the UUID if the _original_id is not present, this is just to be + mapped_objects = {} + for obj in failed_objects: + properties = obj.object_.properties or {} + # We get the object uuid just in case the _original_id is not present. + # That's extremely unlikely to happen but let's stay on the safe side. + id_ = properties.get("_original_id", obj.object_.uuid) + mapped_objects[id_] = obj.message + msg = "\n".join( [ - f"Failed to write object with id '{obj.object_._original_id}'. Error: '{obj.message}'" - for obj in failed_objects + f"Failed to write object with id '{id_}'. Error: '{message}'" + for id_, message in mapped_objects.items() ] ) raise DocumentStoreError(msg) From a18e91422deeb609ca4bdc855ced71067a7d2d6c Mon Sep 17 00:00:00 2001 From: Silvano Cerza Date: Tue, 12 Mar 2024 15:34:16 +0100 Subject: [PATCH 50/52] Handle different vector types in _to_document --- .../document_stores/weaviate/document_store.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index 3e804a3b6..34fefa0a5 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -170,7 +170,6 @@ def to_dict(self) -> Dict[str, Any]: @classmethod def from_dict(cls, data: Dict[str, Any]) -> "WeaviateDocumentStore": - if (auth_client_secret := data["init_parameters"].get("auth_client_secret")) is not None: data["init_parameters"]["auth_client_secret"] = AuthCredentials.from_dict(auth_client_secret) if (embedded_options := data["init_parameters"].get("embedded_options")) is not None: @@ -188,7 +187,7 @@ def count_documents(self) -> int: def _to_data_object(self, document: Document) -> Dict[str, Any]: """ - Convert a Document to a Weviate data object ready to be saved. + Convert a Document to a Weaviate data object ready to be saved. """ data = document.to_dict() # Weaviate forces a UUID as an id. @@ -206,13 +205,18 @@ def _to_data_object(self, document: Document) -> Dict[str, Any]: return data - def _to_document(self, data: DataObject) -> Document: + def _to_document(self, data: DataObject[Dict[str, Any], None]) -> Document: """ Convert a data object read from Weaviate into a Document. """ document_data = data.properties document_data["id"] = document_data.pop("_original_id") - document_data["embedding"] = data.vector["default"] if data.vector else None + if isinstance(data.vector, List): + document_data["embedding"] = data.vector + elif isinstance(data.vector, Dict): + document_data["embedding"] = data.vector.get("default") + else: + document_data["embedding"] = None if (blob_data := document_data.get("blob_data")) is not None: document_data["blob"] = { @@ -368,14 +372,12 @@ def write_documents(self, documents: List[Document], policy: DuplicatePolicy = D return self._write(documents, policy) def delete_documents(self, document_ids: List[str]) -> None: - weaviate_ids = [generate_uuid5(doc_id) for doc_id in document_ids] self._collection.data.delete_many(where=weaviate.classes.query.Filter.by_id().contains_any(weaviate_ids)) def _bm25_retrieval( self, query: str, filters: Optional[Dict[str, Any]] = None, top_k: Optional[int] = None ) -> List[Document]: - result = self._collection.query.bm25( query=query, filters=convert_filters(filters) if filters else None, From 5a7cc2189de2aee9013dc7dfa814670329705585 Mon Sep 17 00:00:00 2001 From: Silvano Cerza Date: Tue, 12 Mar 2024 15:34:32 +0100 Subject: [PATCH 51/52] Add pagination tests --- .../weaviate/tests/test_document_store.py | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/integrations/weaviate/tests/test_document_store.py b/integrations/weaviate/tests/test_document_store.py index 9abf0db27..2b6ad088e 100644 --- a/integrations/weaviate/tests/test_document_store.py +++ b/integrations/weaviate/tests/test_document_store.py @@ -7,6 +7,7 @@ from dateutil import parser from haystack.dataclasses.byte_stream import ByteStream from haystack.dataclasses.document import Document +from haystack.document_stores.errors import DocumentStoreError from haystack.testing.document_store import ( TEST_EMBEDDING_1, TEST_EMBEDDING_2, @@ -626,3 +627,22 @@ def test_embedding_retrieval_with_certainty(self, document_store): def test_embedding_retrieval_with_distance_and_certainty(self, document_store): with pytest.raises(ValueError): document_store._embedding_retrieval(query_embedding=[], distance=0.1, certainty=0.1) + + def test_filter_documents_below_default_limit(self, document_store): + docs = [] + for index in range(9999): + docs.append(Document(content="This is some content", meta={"index": index})) + document_store.write_documents(docs) + result = document_store.filter_documents( + {"field": "content", "operator": "==", "value": "This is some content"} + ) + + assert len(result) == 9999 + + def test_filter_documents_over_default_limit(self, document_store): + docs = [] + for index in range(10000): + docs.append(Document(content="This is some content", meta={"index": index})) + document_store.write_documents(docs) + with pytest.raises(DocumentStoreError): + document_store.filter_documents({"field": "content", "operator": "==", "value": "This is some content"}) From 1771e1d1f5f06074f7498a3a316fb13b20d1e1b2 Mon Sep 17 00:00:00 2001 From: Silvano Cerza Date: Wed, 13 Mar 2024 10:37:53 +0100 Subject: [PATCH 52/52] Fix pagination test --- integrations/weaviate/tests/test_document_store.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integrations/weaviate/tests/test_document_store.py b/integrations/weaviate/tests/test_document_store.py index 2b6ad088e..4c1659a86 100644 --- a/integrations/weaviate/tests/test_document_store.py +++ b/integrations/weaviate/tests/test_document_store.py @@ -630,14 +630,14 @@ def test_embedding_retrieval_with_distance_and_certainty(self, document_store): def test_filter_documents_below_default_limit(self, document_store): docs = [] - for index in range(9999): + for index in range(9998): docs.append(Document(content="This is some content", meta={"index": index})) document_store.write_documents(docs) result = document_store.filter_documents( {"field": "content", "operator": "==", "value": "This is some content"} ) - assert len(result) == 9999 + assert len(result) == 9998 def test_filter_documents_over_default_limit(self, document_store): docs = []