Skip to content

Commit

Permalink
weaviate: migrate from weaviate python client v3 to v4 (#463)
Browse files Browse the repository at this point in the history
* upgrade to latest weaviate server

* upgrade to latest weaviate client

* reformat code

* create client using v4 api

* use v4 api to create collection

* store collection obj for convenience

* upgrade filters to use v4 api

* upgrade batch write to use v4 api

* use v4 api cursor to retrieve all docs

* upgrade query with filters to use v4 api

* upgrade filter documents to use v4 API

* update weaviate fixture to align with v4 API

* update v4 to v3 conversion logic

* fix typo

* fix date v4 to v3 conversion logic

* hardcode limit in query filter

* fix typo

* upgrade weaviate server

* update v4 to v3 object date conversion

the property name will still appear in the object's
propertities even though it is not set. So, we need
to check if it is not None too

* fix invert logic bug

* upgrade delete function to v4 API

* update bm25 search to v4 API

* update count docs to v4 API

* update _write to use v4 API

* support optional filters in bm25

* update embedding retrieval to use v4 API

* update from_dict for v4 API

* fix write invalid input test

* update other test_from_dict for V4

* update test_to_dict for v4

* update test_init for v4 API

* try to pas test_init

* pass test_init

* add exception handling in _query_paginated

* remove commented out code

* remove dead code

* remove commented out code

* return weaviate traceback too when query error occurs

* make _query_paginated return an iterator

* refactor _to_document

* remove v4 to v3 object conv fn

* update to_dict serialization

* update test case

* update weaviate server

* updates due to latest client changes

* update test case due to latest client changes

* Fix filter converters return types

* Rework query methods

* Fix batch writing errors

* Handle different vector types in _to_document

* Add pagination tests

* Fix pagination test

---------

Co-authored-by: Silvano Cerza <[email protected]>
  • Loading branch information
hsm207 and silvanocerza authored Mar 13, 2024
1 parent f95e4d0 commit 38bc78e
Show file tree
Hide file tree
Showing 8 changed files with 273 additions and 346 deletions.
2 changes: 1 addition & 1 deletion integrations/weaviate/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ services:
- '8080'
- --scheme
- http
image: semitechnologies/weaviate:1.23.2
image: semitechnologies/weaviate:1.24.1
ports:
- 8080:8080
- 50051:50051
Expand Down
2 changes: 1 addition & 1 deletion integrations/weaviate/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ classifiers = [
]
dependencies = [
"haystack-ai",
"weaviate-client==3.*",
"weaviate-client",
"haystack-pydoc-tools",
"python-dateutil",
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
from haystack.errors import FilterError
from pandas import DataFrame

import weaviate
from weaviate.collections.classes.filters import Filter, FilterReturn

def convert_filters(filters: Dict[str, Any]) -> Dict[str, Any]:

def convert_filters(filters: Dict[str, Any]) -> FilterReturn:
"""
Convert filters from Haystack format to Weaviate format.
"""
Expand All @@ -14,7 +17,7 @@ def convert_filters(filters: Dict[str, Any]) -> Dict[str, Any]:
raise FilterError(msg)

if "field" in filters:
return {"operator": "And", "operands": [_parse_comparison_condition(filters)]}
return Filter.all_of([_parse_comparison_condition(filters)])
return _parse_logical_condition(filters)


Expand All @@ -29,7 +32,7 @@ def convert_filters(filters: Dict[str, Any]) -> Dict[str, Any]:
"not in": "in",
"AND": "OR",
"OR": "AND",
"NOT": "AND",
"NOT": "OR",
}


Expand All @@ -51,7 +54,13 @@ def _invert_condition(filters: Dict[str, Any]) -> Dict[str, Any]:
return inverted_condition


def _parse_logical_condition(condition: Dict[str, Any]) -> Dict[str, Any]:
LOGICAL_OPERATORS = {
"AND": Filter.all_of,
"OR": Filter.any_of,
}


def _parse_logical_condition(condition: Dict[str, Any]) -> FilterReturn:
if "operator" not in condition:
msg = f"'operator' key missing in {condition}"
raise FilterError(msg)
Expand All @@ -67,7 +76,7 @@ def _parse_logical_condition(condition: Dict[str, Any]) -> Dict[str, Any]:
operands.append(_parse_logical_condition(c))
else:
operands.append(_parse_comparison_condition(c))
return {"operator": operator.lower().capitalize(), "operands": operands}
return LOGICAL_OPERATORS[operator](operands)
elif operator == "NOT":
inverted_conditions = _invert_condition(condition)
return _parse_logical_condition(inverted_conditions)
Expand All @@ -76,28 +85,6 @@ def _parse_logical_condition(condition: Dict[str, Any]) -> Dict[str, Any]:
raise FilterError(msg)


def _infer_value_type(value: Any) -> str:
if value is None:
return "valueNull"

if isinstance(value, bool):
return "valueBoolean"
if isinstance(value, int):
return "valueInt"
if isinstance(value, float):
return "valueNumber"

if isinstance(value, str):
try:
parser.isoparse(value)
return "valueDate"
except ValueError:
return "valueText"

msg = f"Unknown value type {type(value)}"
raise FilterError(msg)


def _handle_date(value: Any) -> str:
if isinstance(value, str):
try:
Expand All @@ -107,25 +94,22 @@ def _handle_date(value: Any) -> str:
return value


def _equal(field: str, value: Any) -> Dict[str, Any]:
def _equal(field: str, value: Any) -> FilterReturn:
if value is None:
return {"path": field, "operator": "IsNull", "valueBoolean": True}
return {"path": field, "operator": "Equal", _infer_value_type(value): _handle_date(value)}
return weaviate.classes.query.Filter.by_property(field).is_none(True)
return weaviate.classes.query.Filter.by_property(field).equal(_handle_date(value))


def _not_equal(field: str, value: Any) -> Dict[str, Any]:
def _not_equal(field: str, value: Any) -> FilterReturn:
if value is None:
return {"path": field, "operator": "IsNull", "valueBoolean": False}
return {
"operator": "Or",
"operands": [
{"path": field, "operator": "NotEqual", _infer_value_type(value): _handle_date(value)},
{"path": field, "operator": "IsNull", "valueBoolean": True},
],
}
return weaviate.classes.query.Filter.by_property(field).is_none(False)

return weaviate.classes.query.Filter.by_property(field).not_equal(
_handle_date(value)
) | weaviate.classes.query.Filter.by_property(field).is_none(True)

def _greater_than(field: str, value: Any) -> Dict[str, Any]:

def _greater_than(field: str, value: Any) -> FilterReturn:
if value is None:
# When the value is None and '>' is used we create a filter that would return a Document
# if it has a field set and not set at the same time.
Expand All @@ -144,10 +128,10 @@ def _greater_than(field: str, value: Any) -> Dict[str, Any]:
if type(value) in [list, DataFrame]:
msg = f"Filter value can't be of type {type(value)} using operators '>', '>=', '<', '<='"
raise FilterError(msg)
return {"path": field, "operator": "GreaterThan", _infer_value_type(value): _handle_date(value)}
return weaviate.classes.query.Filter.by_property(field).greater_than(_handle_date(value))


def _greater_than_equal(field: str, value: Any) -> Dict[str, Any]:
def _greater_than_equal(field: str, value: Any) -> FilterReturn:
if value is None:
# When the value is None and '>=' is used we create a filter that would return a Document
# if it has a field set and not set at the same time.
Expand All @@ -166,10 +150,10 @@ def _greater_than_equal(field: str, value: Any) -> Dict[str, Any]:
if type(value) in [list, DataFrame]:
msg = f"Filter value can't be of type {type(value)} using operators '>', '>=', '<', '<='"
raise FilterError(msg)
return {"path": field, "operator": "GreaterThanEqual", _infer_value_type(value): _handle_date(value)}
return weaviate.classes.query.Filter.by_property(field).greater_or_equal(_handle_date(value))


def _less_than(field: str, value: Any) -> Dict[str, Any]:
def _less_than(field: str, value: Any) -> FilterReturn:
if value is None:
# When the value is None and '<' is used we create a filter that would return a Document
# if it has a field set and not set at the same time.
Expand All @@ -188,10 +172,10 @@ def _less_than(field: str, value: Any) -> Dict[str, Any]:
if type(value) in [list, DataFrame]:
msg = f"Filter value can't be of type {type(value)} using operators '>', '>=', '<', '<='"
raise FilterError(msg)
return {"path": field, "operator": "LessThan", _infer_value_type(value): _handle_date(value)}
return weaviate.classes.query.Filter.by_property(field).less_than(_handle_date(value))


def _less_than_equal(field: str, value: Any) -> Dict[str, Any]:
def _less_than_equal(field: str, value: Any) -> FilterReturn:
if value is None:
# When the value is None and '<=' is used we create a filter that would return a Document
# if it has a field set and not set at the same time.
Expand All @@ -210,22 +194,23 @@ def _less_than_equal(field: str, value: Any) -> Dict[str, Any]:
if type(value) in [list, DataFrame]:
msg = f"Filter value can't be of type {type(value)} using operators '>', '>=', '<', '<='"
raise FilterError(msg)
return {"path": field, "operator": "LessThanEqual", _infer_value_type(value): _handle_date(value)}
return weaviate.classes.query.Filter.by_property(field).less_or_equal(_handle_date(value))


def _in(field: str, value: Any) -> Dict[str, Any]:
def _in(field: str, value: Any) -> FilterReturn:
if not isinstance(value, list):
msg = f"{field}'s value must be a list when using 'in' or 'not in' comparators"
raise FilterError(msg)

return {"operator": "And", "operands": [_equal(field, v) for v in value]}
return weaviate.classes.query.Filter.by_property(field).contains_any(value)


def _not_in(field: str, value: Any) -> Dict[str, Any]:
def _not_in(field: str, value: Any) -> FilterReturn:
if not isinstance(value, list):
msg = f"{field}'s value must be a list when using 'in' or 'not in' comparators"
raise FilterError(msg)
return {"operator": "And", "operands": [_not_equal(field, v) for v in value]}
operands = [weaviate.classes.query.Filter.by_property(field).not_equal(v) for v in value]
return Filter.all_of(operands)


COMPARISON_OPERATORS = {
Expand All @@ -240,7 +225,7 @@ def _not_in(field: str, value: Any) -> Dict[str, Any]:
}


def _parse_comparison_condition(condition: Dict[str, Any]) -> Dict[str, Any]:
def _parse_comparison_condition(condition: Dict[str, Any]) -> FilterReturn:
field: str = condition["field"]

if field.startswith("meta."):
Expand All @@ -265,15 +250,11 @@ def _parse_comparison_condition(condition: Dict[str, Any]) -> Dict[str, Any]:
return COMPARISON_OPERATORS[operator](field, value)


def _match_no_document(field: str) -> Dict[str, Any]:
def _match_no_document(field: str) -> FilterReturn:
"""
Returns a filters that will match no Document, this is used to keep the behavior consistent
between different Document Stores.
"""
return {
"operator": "And",
"operands": [
{"path": field, "operator": "IsNull", "valueBoolean": False},
{"path": field, "operator": "IsNull", "valueBoolean": True},
],
}

operands = [weaviate.classes.query.Filter.by_property(field).is_none(val) for val in [False, True]]
return Filter.all_of(operands)
Loading

0 comments on commit 38bc78e

Please sign in to comment.