Skip to content

Commit

Permalink
test: add stats task feature cases and DefaultVectorSearchParams (#36768
Browse files Browse the repository at this point in the history
)

issue: #36767

---------

Signed-off-by: ThreadDao <[email protected]>
  • Loading branch information
ThreadDao authored Oct 14, 2024
1 parent 937ebec commit d566b0c
Show file tree
Hide file tree
Showing 5 changed files with 306 additions and 30 deletions.
24 changes: 17 additions & 7 deletions tests/python_client/common/bulk_insert_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,17 +389,20 @@ def gen_vectors_in_numpy_file(dir, data_field, float_vector, rows, dim, vector_t
return file_name


def gen_string_in_numpy_file(dir, data_field, rows, start=0, force=False):
def gen_string_in_numpy_file(dir, data_field, rows, start=0, force=False, **kwargs):
file_name = f"{data_field}.npy"
file = f"{dir}/{file_name}"
shuffle_pk = kwargs.get("shuffle_pk", False)
if not os.path.exists(file) or force:
# non vector columns
data = []
if rows > 0:
data = [gen_unique_str(str(i)) for i in range(start, rows+start)]
arr = np.array(data)
# print(f"file_name: {file_name} data type: {arr.dtype}")
log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}")
if shuffle_pk:
np.random.shuffle(arr)
log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}, shuffle_pk: {shuffle_pk}")
np.save(file, arr)
return file_name

Expand Down Expand Up @@ -463,9 +466,10 @@ def gen_json_in_numpy_file(dir, data_field, rows, start=0, force=False):
return file_name


def gen_int_or_float_in_numpy_file(dir, data_field, rows, start=0, force=False, nullable=False):
def gen_int_or_float_in_numpy_file(dir, data_field, rows, start=0, force=False, nullable=False, **kwargs):
file_name = f"{data_field}.npy"
file = f"{dir}/{file_name}"
shuffle_pk = kwargs.get("shuffle_pk", False)
if not os.path.exists(file) or force:
# non vector columns
data = []
Expand All @@ -477,13 +481,15 @@ def gen_int_or_float_in_numpy_file(dir, data_field, rows, start=0, force=False,
data = [np.float64(random.random()) for _ in range(rows)]
elif data_field == DataField.pk_field:
data = [i for i in range(start, start + rows)]
if shuffle_pk:
random.shuffle(data)
elif data_field == DataField.int_field:
if not nullable:
data = [random.randint(-999999, 9999999) for _ in range(rows)]
else:
data = [None for _ in range(rows)]
arr = np.array(data)
log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}")
log.info(f"file_name: {file_name} data type: {arr.dtype} data shape: {arr.shape}, shuffle_pk: {shuffle_pk}")
np.save(file, arr)
return file_name

Expand Down Expand Up @@ -694,6 +700,7 @@ def gen_json_files(is_row_based, rows, dim, auto_id, str_pk,

def gen_dict_data_by_data_field(data_fields, rows, start=0, float_vector=True, dim=128, array_length=None, enable_dynamic_field=False, **kwargs):
schema = kwargs.get("schema", None)
shuffle = kwargs.get("shuffle", False)
schema = schema.to_dict() if schema is not None else None
data = []
nullable = False
Expand Down Expand Up @@ -785,7 +792,9 @@ def gen_dict_data_by_data_field(data_fields, rows, start=0, float_vector=True, d
d["name"] = fake.name()
d["address"] = fake.address()
data.append(d)

if shuffle:
random.shuffle(data)
log.info(f"shuffle={shuffle}")
return data


Expand Down Expand Up @@ -850,6 +859,7 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_size=None, file_num
files = []
start_uid = 0
nullable = False
shuffle_pk = kwargs.get("shuffle_pk", False)
if file_nums == 1:
# gen the numpy file without subfolders if only one set of files
for data_field in data_fields:
Expand Down Expand Up @@ -878,7 +888,7 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_size=None, file_num
file_name = gen_vectors_in_numpy_file(dir=data_source_new, data_field=data_field, float_vector=float_vector,
vector_type=vector_type, rows=rows, dim=dim, force=force)
elif data_field == DataField.string_field: # string field for numpy not supported yet at 2022-10-17
file_name = gen_string_in_numpy_file(dir=data_source_new, data_field=data_field, rows=rows, force=force)
file_name = gen_string_in_numpy_file(dir=data_source_new, data_field=data_field, rows=rows, force=force, shuffle_pk=shuffle_pk)
elif data_field == DataField.text_field:
file_name = gen_text_in_numpy_file(dir=data_source_new, data_field=data_field, rows=rows, force=force, nullable=nullable)
elif data_field == DataField.bool_field:
Expand All @@ -887,7 +897,7 @@ def gen_npy_files(float_vector, rows, dim, data_fields, file_size=None, file_num
file_name = gen_json_in_numpy_file(dir=data_source_new, data_field=data_field, rows=rows, force=force)
else:
file_name = gen_int_or_float_in_numpy_file(dir=data_source_new, data_field=data_field,
rows=rows, force=force, nullable=nullable)
rows=rows, force=force, nullable=nullable, shuffle_pk=shuffle_pk)
files.append(file_name)
if enable_dynamic_field and include_meta:
file_name = gen_dynamic_field_in_numpy_file(dir=data_source_new, rows=rows, force=force)
Expand Down
2 changes: 1 addition & 1 deletion tests/python_client/common/common_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -1846,7 +1846,7 @@ def gen_values(schema: CollectionSchema, nb, start_id=0, default_values: dict =
if default_value is not None:
data.append(default_value)
elif field.auto_id is False:
data.append(gen_data_by_collection_field(field, nb, start_id * nb))
data.append(gen_data_by_collection_field(field, nb, start_id))
return data


Expand Down
132 changes: 124 additions & 8 deletions tests/python_client/common/common_params.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from dataclasses import dataclass
from typing import List, Dict

from pymilvus import DataType
from typing import List, Dict, Optional

""" Define param names"""

Expand Down Expand Up @@ -284,6 +282,18 @@ class IndexPrams(BasePrams):
params: dict = None
metric_type: str = None

@dataclass
class SearchInsidePrams(BasePrams):
# inside params
radius: Optional[float] = None
range_filter: Optional[float] = None
group_by_field: Optional[str] = None

@dataclass
class SearchPrams(BasePrams):
metric_type: str = MetricType.L2
params: dict = None


""" Define default params """

Expand All @@ -307,9 +317,15 @@ def IVF_SQ8(field: str, nlist: int = 1024, metric_type=MetricType.L2):
}

@staticmethod
def HNSW(field: str, m: int = 8, ef: int = 200, metric_type=MetricType.L2):
def HNSW(field: str, m: int = 8, efConstruction: int = 200, metric_type=MetricType.L2):
return {
field: IndexPrams(index_type=IndexName.HNSW, params={"M": m, "efConstruction": ef}, metric_type=metric_type)
field: IndexPrams(index_type=IndexName.HNSW, params={"M": m, "efConstruction": efConstruction}, metric_type=metric_type)
}

@staticmethod
def SCANN(field: str, nlist: int = 128, metric_type=MetricType.L2):
return {
field: IndexPrams(index_type=IndexName.SCANN, params={"nlist": nlist}, metric_type=metric_type)
}

@staticmethod
Expand All @@ -330,20 +346,19 @@ def BIN_IVF_FLAT(field: str, nlist: int = 1024, metric_type=MetricType.JACCARD):
}

@staticmethod
def SPARSE_WAND(field: str, drop_ratio_build: int = 0.2, metric_type=MetricType.IP):
def SPARSE_WAND(field: str, drop_ratio_build: float = 0.2, metric_type=MetricType.IP):
return {
field: IndexPrams(index_type=IndexName.SPARSE_WAND, params={"drop_ratio_build": drop_ratio_build},
metric_type=metric_type)
}

@staticmethod
def SPARSE_INVERTED_INDEX(field: str, drop_ratio_build: int = 0.2, metric_type=MetricType.IP):
def SPARSE_INVERTED_INDEX(field: str, drop_ratio_build: float = 0.2, metric_type=MetricType.IP):
return {
field: IndexPrams(index_type=IndexName.SPARSE_INVERTED_INDEX, params={"drop_ratio_build": drop_ratio_build},
metric_type=metric_type)
}


class DefaultScalarIndexParams:

@staticmethod
Expand Down Expand Up @@ -389,6 +404,107 @@ def index_offset_cache(enable: bool = True):
def index_mmap(enable: bool = True):
return {'mmap.enabled': enable}

class DefaultVectorSearchParams:

@staticmethod
def FLAT(metric_type=MetricType.L2, inside_params: SearchInsidePrams = None, **kwargs):
inside_params_dict = {}
if inside_params is not None:
inside_params_dict.update(inside_params.to_dict)

sp = SearchPrams(params=inside_params_dict, metric_type=metric_type).to_dict
sp.update(kwargs)
return sp

@staticmethod
def IVF_FLAT(metric_type=MetricType.L2, nprobe: int = 32, inside_params: SearchInsidePrams = None, **kwargs):
inside_params_dict = {"nprobe": nprobe}
if inside_params is not None:
inside_params_dict.update(inside_params.to_dict)

sp = SearchPrams(params=inside_params_dict, metric_type=metric_type).to_dict
sp.update(kwargs)
return sp

@staticmethod
def IVF_SQ8(metric_type=MetricType.L2, nprobe: int = 32, inside_params: SearchInsidePrams = None, **kwargs):
inside_params_dict = {"nprobe": nprobe}
if inside_params is not None:
inside_params_dict.update(inside_params.to_dict)

sp = SearchPrams(params=inside_params_dict, metric_type=metric_type).to_dict
sp.update(kwargs)
return sp

@staticmethod
def HNSW(metric_type=MetricType.L2, ef: int = 200, inside_params: SearchInsidePrams = None, **kwargs):
inside_params_dict = {"ef": ef}
if inside_params is not None:
inside_params_dict.update(inside_params.to_dict)

sp = SearchPrams(params=inside_params_dict, metric_type=metric_type).to_dict
sp.update(kwargs)
return sp

@staticmethod
def SCANN(metric_type=MetricType.L2, nprobe: int = 32, reorder_k: int = 200, inside_params: SearchInsidePrams = None, **kwargs):
inside_params_dict = {"nprobe": nprobe, "reorder_k": reorder_k}
if inside_params is not None:
inside_params_dict.update(inside_params.to_dict)

sp = SearchPrams(params=inside_params_dict, metric_type=metric_type).to_dict
sp.update(kwargs)
return sp

@staticmethod
def DISKANN(metric_type=MetricType.L2, search_list: int = 30, inside_params: SearchInsidePrams = None, **kwargs):
inside_params_dict = {"search_list": search_list}
if inside_params is not None:
inside_params_dict.update(inside_params.to_dict)

sp = SearchPrams(params=inside_params_dict, metric_type=metric_type).to_dict
sp.update(kwargs)
return sp

@staticmethod
def BIN_FLAT(metric_type=MetricType.JACCARD, inside_params: SearchInsidePrams = None, **kwargs):
inside_params_dict = {}
if inside_params is not None:
inside_params_dict.update(inside_params.to_dict)

sp = SearchPrams(params=inside_params_dict, metric_type=metric_type).to_dict
sp.update(kwargs)
return sp

@staticmethod
def BIN_IVF_FLAT(metric_type=MetricType.JACCARD, nprobe: int = 32, inside_params: SearchInsidePrams = None, **kwargs):
inside_params_dict = {"nprobe": nprobe}
if inside_params is not None:
inside_params_dict.update(inside_params.to_dict)

sp = SearchPrams(params=inside_params_dict, metric_type=metric_type).to_dict
sp.update(kwargs)
return sp

@staticmethod
def SPARSE_WAND(metric_type=MetricType.IP, drop_ratio_search: float = 0.2, inside_params: SearchInsidePrams = None, **kwargs):
inside_params_dict = {"drop_ratio_search": drop_ratio_search}
if inside_params is not None:
inside_params_dict.update(inside_params.to_dict)

sp = SearchPrams(params=inside_params_dict, metric_type=metric_type).to_dict
sp.update(kwargs)
return sp

@staticmethod
def SPARSE_INVERTED_INDEX(metric_type=MetricType.IP, drop_ratio_search: float = 0.2, inside_params: SearchInsidePrams = None, **kwargs):
inside_params_dict = {"drop_ratio_search": drop_ratio_search}
if inside_params is not None:
inside_params_dict.update(inside_params.to_dict)

sp = SearchPrams(params=inside_params_dict, metric_type=metric_type).to_dict
sp.update(kwargs)
return sp

@dataclass
class ExprCheckParams:
Expand Down
75 changes: 75 additions & 0 deletions tests/python_client/testcases/test_bulk_insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from base.client_base import TestcaseBase
from common import common_func as cf
from common import common_type as ct
from common.common_params import DefaultVectorIndexParams, DefaultVectorSearchParams
from common.milvus_sys import MilvusSys
from common.common_type import CaseLabel, CheckTasks
from utils.util_log import test_log as log
Expand Down Expand Up @@ -2160,3 +2161,77 @@ def test_partition_key_on_multi_numpy_files(
empty_partition_num += 1
num_entities += p.num_entities
assert num_entities == entities * file_nums

@pytest.mark.parametrize("pk_field", [df.pk_field, df.string_field])
@pytest.mark.tags(CaseLabel.L3)
def test_bulk_import_random_pk_stats_task(self, pk_field):
# connect -> prepare json data
self._connect()
collection_name = cf.gen_unique_str("stats_task")
nb = 3000
fields = []
files = ""

# prepare data: int64_pk -> json data; varchar_pk -> numpy data
if pk_field == df.pk_field:
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=False),
cf.gen_float_vec_field(name=df.float_vec_field, dim=ct.default_dim),
]
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
files = prepare_bulk_insert_new_json_files(
minio_endpoint=self.minio_endpoint, bucket_name=self.bucket_name,
is_row_based=True, rows=nb, dim=ct.default_dim, auto_id=False, data_fields=data_fields, force=True,
shuffle=True
)
elif pk_field == df.string_field:
fields = [
cf.gen_string_field(name=df.string_field, is_primary=True, auto_id=False),
cf.gen_float_vec_field(name=df.float_vec_field, dim=ct.default_dim),
]
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
files = prepare_bulk_insert_numpy_files(
minio_endpoint=self.minio_endpoint, bucket_name=self.bucket_name,
rows=nb, dim=ct.default_dim, data_fields=data_fields, enable_dynamic_field=False, force=True,
shuffle_pk=True
)
else:
log.error(f"pk_field name {pk_field} not supported now, [{df.pk_field}, {df.string_field}] expected~")

# create collection -> create vector index
schema = cf.gen_collection_schema(fields=fields)
self.collection_wrap.init_collection(collection_name, schema=schema)
self.build_multi_index(index_params=DefaultVectorIndexParams.IVF_SQ8(df.float_vec_field))

# bulk_insert data
t0 = time.time()
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=collection_name, files=files
)
logging.info(f"bulk insert task ids:{task_id}")
completed, _ = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=[task_id], timeout=300
)
tt = time.time() - t0
log.info(f"bulk insert state:{completed} with latency {tt}")
assert completed

# load -> get_segment_info -> verify stats task
self.collection_wrap.load()
res_segment_info, _ = self.utility_wrap.get_query_segment_info(collection_name)
assert len(res_segment_info) > 0 # maybe mix compaction to 1 segment
cnt = 0
for r in res_segment_info:
log.info(f"segmentID {r.segmentID}: state: {r.state}; num_rows: {r.num_rows}; is_sorted: {r.is_sorted} ")
cnt += r.num_rows
assert r.is_sorted is True
assert cnt == nb

# verify search
self.collection_wrap.search(
data=cf.gen_vectors(ct.default_nq, ct.default_dim, vector_data_type=DataType.FLOAT_VECTOR.name),
anns_field=df.float_vec_field, param=DefaultVectorSearchParams.IVF_SQ8(),
limit=ct.default_limit,
check_task=CheckTasks.check_search_results,
check_items={"nq": ct.default_nq,
"limit": ct.default_limit})
Loading

0 comments on commit d566b0c

Please sign in to comment.